aboutsummaryrefslogtreecommitdiff
path: root/juick-xmpp/src/main/java/com/juick/components
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2016-11-11 11:48:40 +0300
committerGravatar Vitaly Takmazov2016-11-11 11:48:40 +0300
commit48c1dd8e27ccca932f482766e0747dd3a61b1e98 (patch)
tree13197ccd1a3b7551e3de1c14110f43610d8c0bab /juick-xmpp/src/main/java/com/juick/components
parent3d977963dfe55c0f14720da8c671f77bf210229d (diff)
xmpp: reconnect ConnectionRouter using failsafe library
Diffstat (limited to 'juick-xmpp/src/main/java/com/juick/components')
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java139
1 files changed, 76 insertions, 63 deletions
diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java
index 86f12b82..b10d2540 100644
--- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java
+++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java
@@ -11,11 +11,15 @@ import com.juick.xmpp.extensions.JuickMessage;
import com.juick.xmpp.extensions.Nickname;
import com.juick.xmpp.extensions.XOOB;
import com.juick.xmpp.utils.XmlUtils;
+import net.jodah.failsafe.Execution;
+import net.jodah.failsafe.RetryPolicy;
import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xmlpull.v1.XmlPullParser;
+import org.xmlpull.v1.XmlPullParserException;
+import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
@@ -23,6 +27,7 @@ import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
*
@@ -46,83 +51,91 @@ public class ConnectionRouter extends Connection implements Runnable {
@Override
public void run() {
logger.info("STREAM ROUTER START");
+ @SuppressWarnings("unchecked") RetryPolicy retryPolicy = new RetryPolicy()
+ .withBackoff(1, 30, TimeUnit.SECONDS)
+ .withJitter(0.1)
+ .retryOn(IOException.class, XmlPullParserException.class);
+ Execution execution = new Execution(retryPolicy);
+ xmpp.service.submit(() -> {
+ while (!execution.isComplete()) {
+ try {
+ AsynchronousSocketChannel socket = AsynchronousSocketChannel.open();
+ socket.connect(new InetSocketAddress(componentPort));
+ parser.setInput(new InputStreamReader(Channels.newInputStream(socket)));
+ parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
+ writer = new OutputStreamWriter(Channels.newOutputStream(socket));
- try {
- AsynchronousSocketChannel socket = AsynchronousSocketChannel.open();
- socket.connect(new InetSocketAddress(componentPort));
- parser.setInput(new InputStreamReader(Channels.newInputStream(socket)));
- parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
- writer = new OutputStreamWriter(Channels.newOutputStream(socket));
-
- String msg = String.format("<stream:stream xmlns='jabber:component:accept' " +
- "xmlns:stream='http://etherx.jabber.org/streams' to='%s'>", componentName);
- writer.write(msg);
- writer.flush();
-
- parser.next(); // stream:stream
- streamID = parser.getAttributeValue(null, "id");
- if (streamID == null || streamID.isEmpty()) {
- throw new Exception("FAIL ON FIRST PACKET");
- }
+ String msg = String.format("<stream:stream xmlns='jabber:component:accept' " +
+ "xmlns:stream='http://etherx.jabber.org/streams' to='%s'>", componentName);
+ writer.write(msg);
+ writer.flush();
- msg = "<handshake>" + DigestUtils.sha1Hex(streamID + password) + "</handshake>";
- writer.write(msg);
- writer.flush();
+ parser.next(); // stream:stream
+ streamID = parser.getAttributeValue(null, "id");
+ if (streamID == null || streamID.isEmpty()) {
+ throw new Exception("FAIL ON FIRST PACKET");
+ }
- parser.next();
- if (!parser.getName().equals("handshake")) {
- throw new Exception("NO HANDSHAKE");
- }
- XmlUtils.skip(parser);
- logger.info("STREAM ROUTER OPEN");
+ msg = "<handshake>" + DigestUtils.sha1Hex(streamID + password) + "</handshake>";
+ writer.write(msg);
+ writer.flush();
- while (parser.next() != XmlPullParser.END_DOCUMENT) {
- if (parser.getEventType() != XmlPullParser.START_TAG) {
- continue;
- }
+ parser.next();
+ if (!parser.getName().equals("handshake")) {
+ throw new Exception("NO HANDSHAKE");
+ }
+ XmlUtils.skip(parser);
+ logger.info("STREAM ROUTER OPEN");
- String tag = parser.getName();
- String to = parser.getAttributeValue(null, "to");
- if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) {
- JID jid = new JID(to);
- if (jid.Host != null) {
- if (jid.Host.equals(componentName)) {
- if (tag.equals("message")) {
- Message xmsg = Message.parse(parser, xmpp.childParsers);
- logger.info("STREAM ROUTER (PROCESS): " + xmsg.toString());
- JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS);
- if (jmsg != null) {
- if (jid.Username != null && jid.Username.equals("recomm")) {
- sendJuickRecommendation(jmsg);
- } else {
- if (jmsg.getRID() > 0) {
- sendJuickComment(jmsg);
- } else if (jmsg.getMID() > 0) {
- sendJuickMessage(jmsg);
+ while (parser.next() != XmlPullParser.END_DOCUMENT) {
+ if (parser.getEventType() != XmlPullParser.START_TAG) {
+ continue;
+ }
+
+ String tag = parser.getName();
+ String to = parser.getAttributeValue(null, "to");
+ if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) {
+ JID jid = new JID(to);
+ if (jid.Host != null) {
+ if (jid.Host.equals(componentName)) {
+ if (tag.equals("message")) {
+ Message xmsg = Message.parse(parser, xmpp.childParsers);
+ logger.info("STREAM ROUTER (PROCESS): " + xmsg.toString());
+ JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS);
+ if (jmsg != null) {
+ if (jid.Username != null && jid.Username.equals("recomm")) {
+ sendJuickRecommendation(jmsg);
+ } else {
+ if (jmsg.getRID() > 0) {
+ sendJuickComment(jmsg);
+ } else if (jmsg.getMID() > 0) {
+ sendJuickMessage(jmsg);
+ }
+ }
}
}
+ } else if (jid.Host.endsWith(xmpp.HOSTNAME) && (jid.Host.equals(xmpp.HOSTNAME) || jid.Host.endsWith("." + xmpp.HOSTNAME))) {
+ String xml = XmlUtils.parseToString(parser, true);
+ logger.info("STREAM ROUTER: " + xml);
+ } else {
+ String xml = XmlUtils.parseToString(parser, true);
+ logger.info("STREAM ROUTER (OUT): " + xml);
+ xmpp.sendOut(jid.Host, xml);
}
+ } else {
+ logger.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true));
}
- } else if (jid.Host.endsWith(xmpp.HOSTNAME) && (jid.Host.equals(xmpp.HOSTNAME) || jid.Host.endsWith("." + xmpp.HOSTNAME))) {
- String xml = XmlUtils.parseToString(parser, true);
- logger.info("STREAM ROUTER: " + xml);
} else {
- String xml = XmlUtils.parseToString(parser, true);
- logger.info("STREAM ROUTER (OUT): " + xml);
- xmpp.sendOut(jid.Host, xml);
+ logger.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true));
}
- } else {
- logger.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true));
}
- } else {
- logger.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true));
+
+ logger.warn("STREAM ROUTER FINISHED");
+ } catch (Exception e) {
+ logger.warn("router error, reconnection " + e.toString());
}
}
-
- logger.warn("STREAM ROUTER FINISHED");
- } catch (Exception e) {
- logger.warn("STREAM ROUTER PARSE ERROR: " + e.toString());
- }
+ });
}
public void sendJuickMessage(JuickMessage jmsg) {