aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/XMPPServer.java36
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/configuration/XmppInitializer.java2
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java16
3 files changed, 38 insertions, 16 deletions
diff --git a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
index ac2c9ec8..3bfa49e1 100644
--- a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
+++ b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
@@ -61,6 +61,8 @@ public class XMPPServer implements AutoCloseable {
private JID jid;
+ private ServerSocket listener;
+
public XMPPServer(Environment env, ExecutorService service) {
this.service = service;
@@ -82,22 +84,27 @@ public class XMPPServer implements AutoCloseable {
if (!disabled) {
router = new ConnectionRouter(this, componentName, componentPort, env.getProperty("xmpp_password"));
service.submit(router);
+ }
- service.submit(() -> {
- final ServerSocket listener = new ServerSocket(s2sPort);
+ service.submit(() -> {
+ try {
+ listener = new ServerSocket(s2sPort);
logger.info("s2s listener ready");
while (true) {
- try {
- Socket socket = listener.accept();
- ConnectionIn client = new ConnectionIn(this, socket);
- addConnectionIn(client);
- service.submit(client);
- } catch (Exception e) {
- logger.error("s2s error", e);
- }
+ if (Thread.currentThread().isInterrupted()) break;
+ Socket socket = listener.accept();
+ ConnectionIn client = new ConnectionIn(this, socket);
+ addConnectionIn(client);
+ service.submit(client);
}
- });
- }
+ } catch (IOException e) {
+ logger.warn("io exception", e);
+ Thread.currentThread().interrupt();
+ } catch (Exception ex) {
+ logger.warn("s2s error", ex);
+ }
+ logger.info("s2s interrupted");
+ });
} catch (Exception e) {
logger.error("XMPPComponent error", e);
@@ -121,7 +128,10 @@ public class XMPPServer implements AutoCloseable {
i.remove();
}
}
- logger.info("Xmpp server destroyed");
+ if (!listener.isClosed()) {
+ listener.close();
+ }
+ logger.info("XMPP server destroyed");
}
public void addConnectionIn(ConnectionIn c) {
diff --git a/juick-xmpp/src/main/java/com/juick/components/configuration/XmppInitializer.java b/juick-xmpp/src/main/java/com/juick/components/configuration/XmppInitializer.java
index 6453ef01..28dab1ef 100644
--- a/juick-xmpp/src/main/java/com/juick/components/configuration/XmppInitializer.java
+++ b/juick-xmpp/src/main/java/com/juick/components/configuration/XmppInitializer.java
@@ -34,6 +34,6 @@ public class XmppInitializer extends AbstractAnnotationConfigDispatcherServletIn
@Override
protected String getServletName() {
- return "Xmpp dispatcher servlet";
+ return "XMPP dispatcher servlet";
}
}
diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java
index b28298a2..37d3c59f 100644
--- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java
+++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java
@@ -29,13 +29,14 @@ import java.util.concurrent.TimeUnit;
/**
* @author ugnich
*/
-public class ConnectionRouter extends Connection implements Runnable {
+public class ConnectionRouter extends Connection implements Runnable, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ConnectionRouter.class);
private String componentName;
private int componentPort;
private String password;
+ private Execution execution;
public ConnectionRouter(XMPPServer s2s, String componentName, int componentPort, String password) throws Exception {
super(s2s);
@@ -51,7 +52,7 @@ public class ConnectionRouter extends Connection implements Runnable {
.withBackoff(1, 30, TimeUnit.SECONDS)
.withJitter(0.1)
.retryOn(IOException.class, XmlPullParserException.class);
- Execution execution = new Execution(retryPolicy);
+ execution = new Execution(retryPolicy);
xmpp.service.submit(() -> {
while (!execution.isComplete()) {
try {
@@ -109,6 +110,9 @@ public class ConnectionRouter extends Connection implements Runnable {
}
}
}
+ } else if (tag.equals("iq") || tag.equals("presence")) {
+ String xml = XmlUtils.parseToString(parser, true);
+ logger.info("stream router (stanza): {}", xml);
}
} else if (jid.Host.endsWith(xmpp.HOSTNAME) && (jid.Host.equals(xmpp.HOSTNAME) || jid.Host.endsWith("." + xmpp.HOSTNAME))) {
String xml = XmlUtils.parseToString(parser, true);
@@ -129,6 +133,9 @@ public class ConnectionRouter extends Connection implements Runnable {
}
logger.warn("stream router finished");
+ } catch (InterruptedException ex) {
+ logger.info("shutting down");
+ execution.complete();
} catch (Exception e) {
logger.warn("router error, reconnection ", e);
execution.recordFailure(e);
@@ -260,4 +267,9 @@ public class ConnectionRouter extends Connection implements Runnable {
}
}
}
+
+ @Override
+ public void close() throws Exception {
+ execution.complete();
+ }
}