From 3304f60f3388b5eea587f3772058f945803d3d8d Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Mon, 1 Feb 2016 19:03:44 +0300 Subject: fix s2s router --- .../java/com/juick/xmpp/s2s/ConnectionRouter.java | 134 ++++++++++++--------- .../java/com/juick/xmpp/s2s/XMPPComponent.java | 3 +- 2 files changed, 77 insertions(+), 60 deletions(-) diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java index ccd6a335..272bc197 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java @@ -10,6 +10,7 @@ import com.juick.xmpp.extensions.XOOB; import com.juick.xmpp.utils.SHA1; import com.juick.xmpp.utils.XmlUtils; import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; import java.io.IOException; import java.io.InputStreamReader; @@ -17,7 +18,9 @@ import java.io.OutputStreamWriter; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channels; +import java.nio.channels.CompletionHandler; import java.util.List; +import java.util.logging.Level; /** * @@ -31,80 +34,95 @@ public class ConnectionRouter extends Connection implements Runnable { try { socket = AsynchronousSocketChannel.open(); - socket.connect(new InetSocketAddress(5347)); - parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); - parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); - writer = new OutputStreamWriter(Channels.newOutputStream(socket)); + socket.connect(new InetSocketAddress(5347), socket, new CompletionHandler() { + @Override + public void completed(Void result, AsynchronousSocketChannel client) { + try { + parser.setInput(new InputStreamReader(Channels.newInputStream(client))); - String msg = ""; - writer.write(msg); - writer.flush(); + parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); + writer = new OutputStreamWriter(Channels.newOutputStream(client)); - parser.next(); // stream:stream - streamID = parser.getAttributeValue(null, "id"); - if (streamID == null || streamID.isEmpty()) { - throw new Exception("FAIL ON FIRST PACKET"); - } + String msg = ""; + writer.write(msg); + writer.flush(); - msg = "" + SHA1.encode(streamID + "secret") + ""; - writer.write(msg); - writer.flush(); + parser.next(); // stream:stream + streamID = parser.getAttributeValue(null, "id"); + if (streamID == null || streamID.isEmpty()) { + throw new Exception("FAIL ON FIRST PACKET"); + } - parser.next(); - if (!parser.getName().equals("handshake")) { - throw new Exception("NO HANDSHAKE"); - } - XmlUtils.skip(parser); - LOGGER.info("STREAM ROUTER OPEN"); + msg = "" + SHA1.encode(streamID + "secret") + ""; + writer.write(msg); + writer.flush(); - while (parser.next() != XmlPullParser.END_DOCUMENT) { - if (parser.getEventType() != XmlPullParser.START_TAG) { - continue; - } + parser.next(); + if (!parser.getName().equals("handshake")) { + throw new Exception("NO HANDSHAKE"); + } + XmlUtils.skip(parser); + LOGGER.info("STREAM ROUTER OPEN"); - String tag = parser.getName(); - String to = parser.getAttributeValue(null, "to"); - if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { - JID jid = new JID(to); - if (jid.Host != null) { - if (jid.Host.equals(XMPPComponent.COMPONENTNAME)) { - if (tag.equals("message")) { - Message xmsg = Message.parse(parser, XMPPComponent.childParsers); - LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); - JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); - if (jmsg != null) { - if (jid.Username != null && jid.Username.equals("recomm")) { - sendJuickRecommendation(jmsg); - } else { - if (jmsg.getRID() > 0) { - sendJuickComment(jmsg); - } else if (jmsg.getMID() > 0) { - sendJuickMessage(jmsg); + while (parser.next() != XmlPullParser.END_DOCUMENT) { + if (parser.getEventType() != XmlPullParser.START_TAG) { + continue; + } + + String tag = parser.getName(); + String to = parser.getAttributeValue(null, "to"); + if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { + JID jid = new JID(to); + if (jid.Host != null) { + if (jid.Host.equals(XMPPComponent.COMPONENTNAME)) { + if (tag.equals("message")) { + Message xmsg = Message.parse(parser, XMPPComponent.childParsers); + LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); + JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + if (jid.Username != null && jid.Username.equals("recomm")) { + sendJuickRecommendation(jmsg); + } else { + if (jmsg.getRID() > 0) { + sendJuickComment(jmsg); + } else if (jmsg.getMID() > 0) { + sendJuickMessage(jmsg); + } + } + } } + } else if (jid.Host.endsWith(XMPPComponent.HOSTNAME) && (jid.Host.equals(XMPPComponent.HOSTNAME) || jid.Host.endsWith("." + XMPPComponent.HOSTNAME))) { + String xml = XmlUtils.parseToString(parser, true); + LOGGER.info("STREAM ROUTER: " + xml); + } else { + String xml = XmlUtils.parseToString(parser, true); + LOGGER.info("STREAM ROUTER (OUT): " + xml); + XMPPComponent.sendOut(jid.Host, xml); } + } else { + LOGGER.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); } + } else { + LOGGER.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); } - } else if (jid.Host.endsWith(XMPPComponent.HOSTNAME) && (jid.Host.equals(XMPPComponent.HOSTNAME) || jid.Host.endsWith("." + XMPPComponent.HOSTNAME))) { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER: " + xml); - } else { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER (OUT): " + xml); - XMPPComponent.sendOut(jid.Host, xml); } - } else { - LOGGER.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); + + LOGGER.warning("STREAM ROUTER FINISHED"); + } catch (Exception e) { + LOGGER.log(Level.SEVERE, "xmpp router exception", e); } - } else { - LOGGER.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); } - } - LOGGER.warning("STREAM ROUTER FINISHED"); + @Override + public void failed(Throwable exc, AsynchronousSocketChannel attachment) { + LOGGER.log(Level.WARNING, "s2s component failed to connect", exc); + } + }); + Thread.currentThread().join(); } catch (Exception e) { - LOGGER.warning("STREAM ROUTER PARSE ERROR: " + e.toString()); + LOGGER.log(Level.SEVERE, "NIO2 error", e); } - System.exit(0); + } @Override diff --git a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java b/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java index bb5f5ef7..95641f5e 100644 --- a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java +++ b/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java @@ -6,7 +6,6 @@ import com.juick.xmpp.extensions.JuickMessage; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; -import java.io.FileInputStream; import java.io.IOException; import java.sql.Driver; import java.sql.DriverManager; @@ -157,7 +156,7 @@ public class XMPPComponent implements ServletContextListener { executorService.submit(() -> { Properties conf = new Properties(); try { - conf.load(new FileInputStream("/etc/juick/s2s.conf")); + conf.load(sce.getServletContext().getResourceAsStream("WEB-INF/s2s.conf")); HOSTNAME = conf.getProperty("hostname"); COMPONENTNAME = conf.getProperty("componentname"); STATSFILE = conf.getProperty("statsfile"); -- cgit v1.2.3