From 48c1dd8e27ccca932f482766e0747dd3a61b1e98 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Fri, 11 Nov 2016 11:48:40 +0300 Subject: xmpp: reconnect ConnectionRouter using failsafe library --- .../com/juick/components/s2s/ConnectionRouter.java | 139 +++++++++++---------- 1 file changed, 76 insertions(+), 63 deletions(-) (limited to 'juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java') diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java index 86f12b82..b10d2540 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java @@ -11,11 +11,15 @@ import com.juick.xmpp.extensions.JuickMessage; import com.juick.xmpp.extensions.Nickname; import com.juick.xmpp.extensions.XOOB; import com.juick.xmpp.utils.XmlUtils; +import net.jodah.failsafe.Execution; +import net.jodah.failsafe.RetryPolicy; import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; +import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; @@ -23,6 +27,7 @@ import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channels; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; /** * @@ -46,83 +51,91 @@ public class ConnectionRouter extends Connection implements Runnable { @Override public void run() { logger.info("STREAM ROUTER START"); + @SuppressWarnings("unchecked") RetryPolicy retryPolicy = new RetryPolicy() + .withBackoff(1, 30, TimeUnit.SECONDS) + .withJitter(0.1) + .retryOn(IOException.class, XmlPullParserException.class); + Execution execution = new Execution(retryPolicy); + xmpp.service.submit(() -> { + while (!execution.isComplete()) { + try { + AsynchronousSocketChannel socket = AsynchronousSocketChannel.open(); + socket.connect(new InetSocketAddress(componentPort)); + parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); + parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); + writer = new OutputStreamWriter(Channels.newOutputStream(socket)); - try { - AsynchronousSocketChannel socket = AsynchronousSocketChannel.open(); - socket.connect(new InetSocketAddress(componentPort)); - parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); - parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); - writer = new OutputStreamWriter(Channels.newOutputStream(socket)); - - String msg = String.format("", componentName); - writer.write(msg); - writer.flush(); - - parser.next(); // stream:stream - streamID = parser.getAttributeValue(null, "id"); - if (streamID == null || streamID.isEmpty()) { - throw new Exception("FAIL ON FIRST PACKET"); - } + String msg = String.format("", componentName); + writer.write(msg); + writer.flush(); - msg = "" + DigestUtils.sha1Hex(streamID + password) + ""; - writer.write(msg); - writer.flush(); + parser.next(); // stream:stream + streamID = parser.getAttributeValue(null, "id"); + if (streamID == null || streamID.isEmpty()) { + throw new Exception("FAIL ON FIRST PACKET"); + } - parser.next(); - if (!parser.getName().equals("handshake")) { - throw new Exception("NO HANDSHAKE"); - } - XmlUtils.skip(parser); - logger.info("STREAM ROUTER OPEN"); + msg = "" + DigestUtils.sha1Hex(streamID + password) + ""; + writer.write(msg); + writer.flush(); - while (parser.next() != XmlPullParser.END_DOCUMENT) { - if (parser.getEventType() != XmlPullParser.START_TAG) { - continue; - } + parser.next(); + if (!parser.getName().equals("handshake")) { + throw new Exception("NO HANDSHAKE"); + } + XmlUtils.skip(parser); + logger.info("STREAM ROUTER OPEN"); - String tag = parser.getName(); - String to = parser.getAttributeValue(null, "to"); - if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { - JID jid = new JID(to); - if (jid.Host != null) { - if (jid.Host.equals(componentName)) { - if (tag.equals("message")) { - Message xmsg = Message.parse(parser, xmpp.childParsers); - logger.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); - JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); - if (jmsg != null) { - if (jid.Username != null && jid.Username.equals("recomm")) { - sendJuickRecommendation(jmsg); - } else { - if (jmsg.getRID() > 0) { - sendJuickComment(jmsg); - } else if (jmsg.getMID() > 0) { - sendJuickMessage(jmsg); + while (parser.next() != XmlPullParser.END_DOCUMENT) { + if (parser.getEventType() != XmlPullParser.START_TAG) { + continue; + } + + String tag = parser.getName(); + String to = parser.getAttributeValue(null, "to"); + if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { + JID jid = new JID(to); + if (jid.Host != null) { + if (jid.Host.equals(componentName)) { + if (tag.equals("message")) { + Message xmsg = Message.parse(parser, xmpp.childParsers); + logger.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); + JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + if (jid.Username != null && jid.Username.equals("recomm")) { + sendJuickRecommendation(jmsg); + } else { + if (jmsg.getRID() > 0) { + sendJuickComment(jmsg); + } else if (jmsg.getMID() > 0) { + sendJuickMessage(jmsg); + } + } } } + } else if (jid.Host.endsWith(xmpp.HOSTNAME) && (jid.Host.equals(xmpp.HOSTNAME) || jid.Host.endsWith("." + xmpp.HOSTNAME))) { + String xml = XmlUtils.parseToString(parser, true); + logger.info("STREAM ROUTER: " + xml); + } else { + String xml = XmlUtils.parseToString(parser, true); + logger.info("STREAM ROUTER (OUT): " + xml); + xmpp.sendOut(jid.Host, xml); } + } else { + logger.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); } - } else if (jid.Host.endsWith(xmpp.HOSTNAME) && (jid.Host.equals(xmpp.HOSTNAME) || jid.Host.endsWith("." + xmpp.HOSTNAME))) { - String xml = XmlUtils.parseToString(parser, true); - logger.info("STREAM ROUTER: " + xml); } else { - String xml = XmlUtils.parseToString(parser, true); - logger.info("STREAM ROUTER (OUT): " + xml); - xmpp.sendOut(jid.Host, xml); + logger.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); } - } else { - logger.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); } - } else { - logger.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); + + logger.warn("STREAM ROUTER FINISHED"); + } catch (Exception e) { + logger.warn("router error, reconnection " + e.toString()); } } - - logger.warn("STREAM ROUTER FINISHED"); - } catch (Exception e) { - logger.warn("STREAM ROUTER PARSE ERROR: " + e.toString()); - } + }); } public void sendJuickMessage(JuickMessage jmsg) { -- cgit v1.2.3