From 68a0e8f60cafd7bba03ff14fe6f5671855ffac72 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 24 Jan 2017 11:09:06 +0300 Subject: juick-xmpp: shutdown app correctly --- .../main/java/com/juick/components/XMPPServer.java | 36 ++++++++++++++-------- .../components/configuration/XmppInitializer.java | 2 +- .../com/juick/components/s2s/ConnectionRouter.java | 16 ++++++++-- 3 files changed, 38 insertions(+), 16 deletions(-) (limited to 'juick-xmpp') diff --git a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java index ac2c9ec8..3bfa49e1 100644 --- a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java +++ b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java @@ -61,6 +61,8 @@ public class XMPPServer implements AutoCloseable { private JID jid; + private ServerSocket listener; + public XMPPServer(Environment env, ExecutorService service) { this.service = service; @@ -82,22 +84,27 @@ public class XMPPServer implements AutoCloseable { if (!disabled) { router = new ConnectionRouter(this, componentName, componentPort, env.getProperty("xmpp_password")); service.submit(router); + } - service.submit(() -> { - final ServerSocket listener = new ServerSocket(s2sPort); + service.submit(() -> { + try { + listener = new ServerSocket(s2sPort); logger.info("s2s listener ready"); while (true) { - try { - Socket socket = listener.accept(); - ConnectionIn client = new ConnectionIn(this, socket); - addConnectionIn(client); - service.submit(client); - } catch (Exception e) { - logger.error("s2s error", e); - } + if (Thread.currentThread().isInterrupted()) break; + Socket socket = listener.accept(); + ConnectionIn client = new ConnectionIn(this, socket); + addConnectionIn(client); + service.submit(client); } - }); - } + } catch (IOException e) { + logger.warn("io exception", e); + Thread.currentThread().interrupt(); + } catch (Exception ex) { + logger.warn("s2s error", ex); + } + logger.info("s2s interrupted"); + }); } catch (Exception e) { logger.error("XMPPComponent error", e); @@ -121,7 +128,10 @@ public class XMPPServer implements AutoCloseable { i.remove(); } } - logger.info("Xmpp server destroyed"); + if (!listener.isClosed()) { + listener.close(); + } + logger.info("XMPP server destroyed"); } public void addConnectionIn(ConnectionIn c) { diff --git a/juick-xmpp/src/main/java/com/juick/components/configuration/XmppInitializer.java b/juick-xmpp/src/main/java/com/juick/components/configuration/XmppInitializer.java index 6453ef01..28dab1ef 100644 --- a/juick-xmpp/src/main/java/com/juick/components/configuration/XmppInitializer.java +++ b/juick-xmpp/src/main/java/com/juick/components/configuration/XmppInitializer.java @@ -34,6 +34,6 @@ public class XmppInitializer extends AbstractAnnotationConfigDispatcherServletIn @Override protected String getServletName() { - return "Xmpp dispatcher servlet"; + return "XMPP dispatcher servlet"; } } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java index b28298a2..37d3c59f 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java @@ -29,13 +29,14 @@ import java.util.concurrent.TimeUnit; /** * @author ugnich */ -public class ConnectionRouter extends Connection implements Runnable { +public class ConnectionRouter extends Connection implements Runnable, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(ConnectionRouter.class); private String componentName; private int componentPort; private String password; + private Execution execution; public ConnectionRouter(XMPPServer s2s, String componentName, int componentPort, String password) throws Exception { super(s2s); @@ -51,7 +52,7 @@ public class ConnectionRouter extends Connection implements Runnable { .withBackoff(1, 30, TimeUnit.SECONDS) .withJitter(0.1) .retryOn(IOException.class, XmlPullParserException.class); - Execution execution = new Execution(retryPolicy); + execution = new Execution(retryPolicy); xmpp.service.submit(() -> { while (!execution.isComplete()) { try { @@ -109,6 +110,9 @@ public class ConnectionRouter extends Connection implements Runnable { } } } + } else if (tag.equals("iq") || tag.equals("presence")) { + String xml = XmlUtils.parseToString(parser, true); + logger.info("stream router (stanza): {}", xml); } } else if (jid.Host.endsWith(xmpp.HOSTNAME) && (jid.Host.equals(xmpp.HOSTNAME) || jid.Host.endsWith("." + xmpp.HOSTNAME))) { String xml = XmlUtils.parseToString(parser, true); @@ -129,6 +133,9 @@ public class ConnectionRouter extends Connection implements Runnable { } logger.warn("stream router finished"); + } catch (InterruptedException ex) { + logger.info("shutting down"); + execution.complete(); } catch (Exception e) { logger.warn("router error, reconnection ", e); execution.recordFailure(e); @@ -260,4 +267,9 @@ public class ConnectionRouter extends Connection implements Runnable { } } } + + @Override + public void close() throws Exception { + execution.complete(); + } } -- cgit v1.2.3