diff options
author | Vitaly Takmazov | 2016-11-11 11:48:40 +0300 |
---|---|---|
committer | Vitaly Takmazov | 2016-11-11 11:48:40 +0300 |
commit | 48c1dd8e27ccca932f482766e0747dd3a61b1e98 (patch) | |
tree | 13197ccd1a3b7551e3de1c14110f43610d8c0bab /juick-xmpp/src/main/java | |
parent | 3d977963dfe55c0f14720da8c671f77bf210229d (diff) |
xmpp: reconnect ConnectionRouter using failsafe library
Diffstat (limited to 'juick-xmpp/src/main/java')
-rw-r--r-- | juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java | 139 |
1 files changed, 76 insertions, 63 deletions
diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java index 86f12b82..b10d2540 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java @@ -11,11 +11,15 @@ import com.juick.xmpp.extensions.JuickMessage; import com.juick.xmpp.extensions.Nickname; import com.juick.xmpp.extensions.XOOB; import com.juick.xmpp.utils.XmlUtils; +import net.jodah.failsafe.Execution; +import net.jodah.failsafe.RetryPolicy; import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; +import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; @@ -23,6 +27,7 @@ import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channels; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @@ -46,83 +51,91 @@ public class ConnectionRouter extends Connection implements Runnable { @Override public void run() { logger.info("STREAM ROUTER START"); + @SuppressWarnings("unchecked") RetryPolicy retryPolicy = new RetryPolicy() + .withBackoff(1, 30, TimeUnit.SECONDS) + .withJitter(0.1) + .retryOn(IOException.class, XmlPullParserException.class); + Execution execution = new Execution(retryPolicy); + xmpp.service.submit(() -> { + while (!execution.isComplete()) { + try { + AsynchronousSocketChannel socket = AsynchronousSocketChannel.open(); + socket.connect(new InetSocketAddress(componentPort)); + parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); + parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); + writer = new OutputStreamWriter(Channels.newOutputStream(socket)); - try { - AsynchronousSocketChannel socket = AsynchronousSocketChannel.open(); - socket.connect(new InetSocketAddress(componentPort)); - parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); - parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); - writer = new OutputStreamWriter(Channels.newOutputStream(socket)); - - String msg = String.format("<stream:stream xmlns='jabber:component:accept' " + - "xmlns:stream='http://etherx.jabber.org/streams' to='%s'>", componentName); - writer.write(msg); - writer.flush(); - - parser.next(); // stream:stream - streamID = parser.getAttributeValue(null, "id"); - if (streamID == null || streamID.isEmpty()) { - throw new Exception("FAIL ON FIRST PACKET"); - } + String msg = String.format("<stream:stream xmlns='jabber:component:accept' " + + "xmlns:stream='http://etherx.jabber.org/streams' to='%s'>", componentName); + writer.write(msg); + writer.flush(); - msg = "<handshake>" + DigestUtils.sha1Hex(streamID + password) + "</handshake>"; - writer.write(msg); - writer.flush(); + parser.next(); // stream:stream + streamID = parser.getAttributeValue(null, "id"); + if (streamID == null || streamID.isEmpty()) { + throw new Exception("FAIL ON FIRST PACKET"); + } - parser.next(); - if (!parser.getName().equals("handshake")) { - throw new Exception("NO HANDSHAKE"); - } - XmlUtils.skip(parser); - logger.info("STREAM ROUTER OPEN"); + msg = "<handshake>" + DigestUtils.sha1Hex(streamID + password) + "</handshake>"; + writer.write(msg); + writer.flush(); - while (parser.next() != XmlPullParser.END_DOCUMENT) { - if (parser.getEventType() != XmlPullParser.START_TAG) { - continue; - } + parser.next(); + if (!parser.getName().equals("handshake")) { + throw new Exception("NO HANDSHAKE"); + } + XmlUtils.skip(parser); + logger.info("STREAM ROUTER OPEN"); - String tag = parser.getName(); - String to = parser.getAttributeValue(null, "to"); - if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { - JID jid = new JID(to); - if (jid.Host != null) { - if (jid.Host.equals(componentName)) { - if (tag.equals("message")) { - Message xmsg = Message.parse(parser, xmpp.childParsers); - logger.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); - JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); - if (jmsg != null) { - if (jid.Username != null && jid.Username.equals("recomm")) { - sendJuickRecommendation(jmsg); - } else { - if (jmsg.getRID() > 0) { - sendJuickComment(jmsg); - } else if (jmsg.getMID() > 0) { - sendJuickMessage(jmsg); + while (parser.next() != XmlPullParser.END_DOCUMENT) { + if (parser.getEventType() != XmlPullParser.START_TAG) { + continue; + } + + String tag = parser.getName(); + String to = parser.getAttributeValue(null, "to"); + if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { + JID jid = new JID(to); + if (jid.Host != null) { + if (jid.Host.equals(componentName)) { + if (tag.equals("message")) { + Message xmsg = Message.parse(parser, xmpp.childParsers); + logger.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); + JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + if (jid.Username != null && jid.Username.equals("recomm")) { + sendJuickRecommendation(jmsg); + } else { + if (jmsg.getRID() > 0) { + sendJuickComment(jmsg); + } else if (jmsg.getMID() > 0) { + sendJuickMessage(jmsg); + } + } } } + } else if (jid.Host.endsWith(xmpp.HOSTNAME) && (jid.Host.equals(xmpp.HOSTNAME) || jid.Host.endsWith("." + xmpp.HOSTNAME))) { + String xml = XmlUtils.parseToString(parser, true); + logger.info("STREAM ROUTER: " + xml); + } else { + String xml = XmlUtils.parseToString(parser, true); + logger.info("STREAM ROUTER (OUT): " + xml); + xmpp.sendOut(jid.Host, xml); } + } else { + logger.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); } - } else if (jid.Host.endsWith(xmpp.HOSTNAME) && (jid.Host.equals(xmpp.HOSTNAME) || jid.Host.endsWith("." + xmpp.HOSTNAME))) { - String xml = XmlUtils.parseToString(parser, true); - logger.info("STREAM ROUTER: " + xml); } else { - String xml = XmlUtils.parseToString(parser, true); - logger.info("STREAM ROUTER (OUT): " + xml); - xmpp.sendOut(jid.Host, xml); + logger.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); } - } else { - logger.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); } - } else { - logger.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); + + logger.warn("STREAM ROUTER FINISHED"); + } catch (Exception e) { + logger.warn("router error, reconnection " + e.toString()); } } - - logger.warn("STREAM ROUTER FINISHED"); - } catch (Exception e) { - logger.warn("STREAM ROUTER PARSE ERROR: " + e.toString()); - } + }); } public void sendJuickMessage(JuickMessage jmsg) { |