From 3d977963dfe55c0f14720da8c671f77bf210229d Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Fri, 11 Nov 2016 11:39:56 +0300 Subject: xmpp: revert to Connection-based router because Stream-based router can not route full stanzas --- .../main/java/com/juick/components/XMPPServer.java | 226 +------------------ .../com/juick/components/s2s/ConnectionIn.java | 4 +- .../com/juick/components/s2s/ConnectionRouter.java | 250 +++++++++++++++++++++ .../java/com/juick/components/s2s/JuickBot.java | 28 ++- 4 files changed, 274 insertions(+), 234 deletions(-) create mode 100644 juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java (limited to 'juick-xmpp/src/main/java/com') diff --git a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java index 2c962a51..463d8c2a 100644 --- a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java +++ b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java @@ -1,24 +1,16 @@ package com.juick.components; -import com.juick.User; -import com.juick.server.MessagesQueries; -import com.juick.server.SubscriptionsQueries; -import com.juick.server.UserQueries; import com.juick.components.s2s.*; import com.juick.xmpp.*; import com.juick.xmpp.extensions.JuickMessage; -import com.juick.xmpp.extensions.Nickname; -import com.juick.xmpp.extensions.XOOB; import org.apache.commons.lang3.math.NumberUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.DisposableBean; import org.springframework.core.env.Environment; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Component; import org.xmlpull.v1.XmlPullParserException; -import javax.inject.Inject; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; @@ -34,14 +26,13 @@ import java.util.concurrent.ExecutorService; * * @author ugnich */ -public class XMPPServer implements DisposableBean, Stream.StreamListener, - Message.MessageListener, Iq.IqListener, Presence.PresenceListener { +public class XMPPServer implements DisposableBean { private static final Logger logger = LoggerFactory.getLogger(XMPPServer.class); public ExecutorService service; - private StreamComponent router; - JuickBot bot; + private ConnectionRouter router; + public JuickBot bot; public String HOSTNAME, componentName; public String keystore; @@ -72,19 +63,8 @@ public class XMPPServer implements DisposableBean, Stream.StreamListener, childParsers.put(JuickMessage.XMLNS, new JuickMessage()); - service.submit(() -> { - try { - Socket routerSocket = new Socket("localhost", componentPort); - router = new StreamComponent(new JID(componentName), routerSocket.getInputStream(), routerSocket.getOutputStream(), env.getProperty("xmpp_password")); - router.addChildParser(new JuickMessage()); - router.addListener((Stream.StreamListener) this); - router.addListener((Message.MessageListener) this); - router.addListener((Iq.IqListener) this); - router.startParsing(); - } catch (IOException e) { - logger.error("router error", e); - } - }); + router = new ConnectionRouter(this, componentName, componentPort, env.getProperty("xmpp_password")); + service.submit(router); service.submit(() -> { final ServerSocket listener = new ServerSocket(s2sPort); logger.info("s2s listener ready"); @@ -222,7 +202,6 @@ public class XMPPServer implements DisposableBean, Stream.StreamListener, } } - @Override public void destroy() { synchronized (getOutConnections()) { @@ -240,204 +219,11 @@ public class XMPPServer implements DisposableBean, Stream.StreamListener, i.remove(); } } - - try { - closeRouterConnection(); - } catch (IOException e) { - logger.error("router warning", e); - } service.shutdown(); logger.info("component destroyed"); } - public void closeRouterConnection() throws IOException { - getRouter().logoff(); - } - - public void sendJuickMessage(JuickMessage jmsg) { - List jids = new ArrayList<>(); - - if (jmsg.FriendsOnly) { - jids = SubscriptionsQueries.getJIDSubscribedToUser(jdbc, jmsg.getUser().getUID(), jmsg.FriendsOnly); - } else { - List users = SubscriptionsQueries.getSubscribedUsers(jdbc, jmsg.getUser().getUID(), jmsg.getMID()); - for (User user : users) { - for (String jid : UserQueries.getJIDsbyUID(jdbc, user.getUID())) { - jids.add(jid); - } - } - } - - String txt = "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; - String attachment = jmsg.getAttachmentURL(); - if (attachment != null) { - txt += attachment + "\n"; - } - txt += jmsg.getText() + "\n\n"; - txt += "#" + jmsg.getMID() + " http://juick.com/" + jmsg.getMID(); - - Nickname nick = new Nickname(); - nick.Nickname = "@" + jmsg.getUser().getUName(); - - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = bot.JuickJID; - msg.body = txt; - msg.type = Message.Type.chat; - msg.thread = "juick-" + jmsg.getMID(); - msg.addChild(jmsg); - msg.addChild(nick); - if (attachment != null) { - XOOB oob = new XOOB(); - oob.URL = attachment; - msg.addChild(oob); - } - - for (String jid : jids) { - msg.to = new JID(jid); - sendOut(msg); - } - } - - public void sendJuickComment(JuickMessage jmsg) { - List users; - String replyQuote; - String replyTo; - - users = SubscriptionsQueries.getUsersSubscribedToComments(jdbc, jmsg.getMID(), jmsg.getUser().getUID()); - com.juick.Message replyMessage = jmsg.ReplyTo > 0 ? MessagesQueries.getReply(jdbc, jmsg.getMID(), jmsg.ReplyTo) - : MessagesQueries.getMessage(jdbc, jmsg.getMID()); - replyTo = replyMessage.getUser().getUName(); - replyQuote = getReplyQuote(replyMessage); - - String txt = "Reply by @" + jmsg.getUser().getUName() + ":\n" + replyQuote + "\n@" + replyTo + " "; - String attachment = jmsg.getAttachmentURL(); - if (attachment != null) { - txt += attachment + "\n"; - } - txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMID() + "/" + jmsg.getRID() + " http://juick.com/" + jmsg.getMID() + "#" + jmsg.getRID(); - - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = bot.JuickJID; - msg.body = txt; - msg.type = Message.Type.chat; - msg.addChild(jmsg); - for (User user : users) { - for (String jid : UserQueries.getJIDsbyUID(jdbc, user.getUID())) { - msg.to = new JID(jid); - sendOut(msg); - } - } - } - - private String getReplyQuote(com.juick.Message q) { - String quote = q.getText(); - if (quote.length() > 50) { - quote = ">" + quote.substring(0, 47).replace('\n', ' ') + "...\n"; - } else if (quote.length() > 0) { - quote = ">" + quote.replace('\n', ' ') + "\n"; - } - return quote; - } - - public void sendJuickRecommendation(JuickMessage recomm) { - List users; - JuickMessage jmsg; - jmsg = new JuickMessage(MessagesQueries.getMessage(jdbc, recomm.getMID())); - users = SubscriptionsQueries.getUsersSubscribedToUserRecommendations(jdbc, - recomm.getUser().getUID(), recomm.getMID(), jmsg.getUser().getUID()); - - String txt = "Recommended by @" + recomm.getUser().getUName() + ":\n"; - txt += "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; - String attachment = jmsg.getAttachmentURL(); - if (attachment != null) { - txt += attachment + "\n"; - } - txt += jmsg.getText() + "\n\n"; - txt += "#" + jmsg.getMID(); - if (jmsg.Replies > 0) { - if (jmsg.Replies % 10 == 1 && jmsg.Replies % 100 != 11) { - txt += " (" + jmsg.Replies + " reply)"; - } else { - txt += " (" + jmsg.Replies + " replies)"; - } - } - txt += " http://juick.com/" + jmsg.getMID(); - - Nickname nick = new Nickname(); - nick.Nickname = "@" + jmsg.getUser().getUName(); - - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = bot.JuickJID; - msg.body = txt; - msg.type = Message.Type.chat; - msg.thread = "juick-" + jmsg.getMID(); - msg.addChild(jmsg); - msg.addChild(nick); - if (attachment != null) { - XOOB oob = new XOOB(); - oob.URL = attachment; - msg.addChild(oob); - } - - for (User user : users) { - for (String jid : UserQueries.getJIDsbyUID(jdbc, user.getUID())) { - msg.to = new JID(jid); - sendOut(msg); - } - } - } - - @Override - public boolean onIq(Iq iq) { - JID jid = iq.to; - if (!jid.Host.equals(componentName)) { - logger.info("STREAM ROUTER (IQ): " + iq.toString()); - sendOut(iq); - } - return false; - } - - @Override - public void onMessage(Message xmsg) { - logger.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); - JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); - JID jid = xmsg.to; - if (jid.Host.equals(componentName)) { - if (jmsg != null) { - if (jid.Username != null && jid.Username.equals("recomm")) { - sendJuickRecommendation(jmsg); - } else { - if (jmsg.getRID() > 0) { - sendJuickComment(jmsg); - } else if (jmsg.getMID() > 0) { - sendJuickMessage(jmsg); - } - } - } - } else { - sendOut(xmsg); - } - } - - @Override - public void onPresence(Presence presence) { - JID jid = presence.to; - if (!jid.Host.equals(componentName)) { - logger.info("STREAM ROUTER (PRESENCE): " + presence.toString()); - sendOut(presence); - } - } - - @Override - public void onStreamReady() { - logger.info("STREAM ROUTER (READY)"); - } - - @Override - public void onStreamFail(Exception ex) { - logger.error("STREAM ROUTER (FAIL)", ex); - } - public StreamComponent getRouter() { + public ConnectionRouter getRouter() { return router; } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java index 8cc61e26..3220b074 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java @@ -122,7 +122,7 @@ public class ConnectionIn extends Connection implements Runnable { if (msg != null && (msg.type == null || !msg.type.equals(Message.Type.error))) { LOGGER.info("STREAM " + streamID + ": " + msg.toString()); if (!bot.incomingMessage(msg)) { - xmpp.getRouter().send(msg.toString()); + xmpp.getRouter().sendStanza(msg.toString()); } } } else if (tag.equals("iq") && checkFromTo(parser)) { @@ -131,7 +131,7 @@ public class ConnectionIn extends Connection implements Runnable { String xml = XmlUtils.parseToString(parser, true); if (type == null || !type.equals(Iq.Type.error)) { LOGGER.info("STREAM " + streamID + ": " + xml); - xmpp.getRouter().send(xml); + xmpp.getRouter().sendStanza(xml); } } else if (sc != null && !isSecured() && tag.equals("starttls")) { LOGGER.info("STREAM " + streamID + " SECURING"); diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java new file mode 100644 index 00000000..86f12b82 --- /dev/null +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java @@ -0,0 +1,250 @@ +package com.juick.components.s2s; + +import com.juick.User; +import com.juick.components.XMPPServer; +import com.juick.server.MessagesQueries; +import com.juick.server.SubscriptionsQueries; +import com.juick.server.UserQueries; +import com.juick.xmpp.JID; +import com.juick.xmpp.Message; +import com.juick.xmpp.extensions.JuickMessage; +import com.juick.xmpp.extensions.Nickname; +import com.juick.xmpp.extensions.XOOB; +import com.juick.xmpp.utils.XmlUtils; +import org.apache.commons.codec.digest.DigestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xmlpull.v1.XmlPullParser; + +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.InetSocketAddress; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.Channels; +import java.util.ArrayList; +import java.util.List; + +/** + * + * @author ugnich + */ +public class ConnectionRouter extends Connection implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(ConnectionRouter.class); + + private String componentName; + private int componentPort; + private String password; + + public ConnectionRouter(XMPPServer s2s, String componentName, int componentPort, String password) throws Exception { + super(s2s); + this.componentName = componentName; + this.componentPort = componentPort; + this.password = password; + } + + @Override + public void run() { + logger.info("STREAM ROUTER START"); + + try { + AsynchronousSocketChannel socket = AsynchronousSocketChannel.open(); + socket.connect(new InetSocketAddress(componentPort)); + parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); + parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); + writer = new OutputStreamWriter(Channels.newOutputStream(socket)); + + String msg = String.format("", componentName); + writer.write(msg); + writer.flush(); + + parser.next(); // stream:stream + streamID = parser.getAttributeValue(null, "id"); + if (streamID == null || streamID.isEmpty()) { + throw new Exception("FAIL ON FIRST PACKET"); + } + + msg = "" + DigestUtils.sha1Hex(streamID + password) + ""; + writer.write(msg); + writer.flush(); + + parser.next(); + if (!parser.getName().equals("handshake")) { + throw new Exception("NO HANDSHAKE"); + } + XmlUtils.skip(parser); + logger.info("STREAM ROUTER OPEN"); + + while (parser.next() != XmlPullParser.END_DOCUMENT) { + if (parser.getEventType() != XmlPullParser.START_TAG) { + continue; + } + + String tag = parser.getName(); + String to = parser.getAttributeValue(null, "to"); + if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { + JID jid = new JID(to); + if (jid.Host != null) { + if (jid.Host.equals(componentName)) { + if (tag.equals("message")) { + Message xmsg = Message.parse(parser, xmpp.childParsers); + logger.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); + JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + if (jid.Username != null && jid.Username.equals("recomm")) { + sendJuickRecommendation(jmsg); + } else { + if (jmsg.getRID() > 0) { + sendJuickComment(jmsg); + } else if (jmsg.getMID() > 0) { + sendJuickMessage(jmsg); + } + } + } + } + } else if (jid.Host.endsWith(xmpp.HOSTNAME) && (jid.Host.equals(xmpp.HOSTNAME) || jid.Host.endsWith("." + xmpp.HOSTNAME))) { + String xml = XmlUtils.parseToString(parser, true); + logger.info("STREAM ROUTER: " + xml); + } else { + String xml = XmlUtils.parseToString(parser, true); + logger.info("STREAM ROUTER (OUT): " + xml); + xmpp.sendOut(jid.Host, xml); + } + } else { + logger.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); + } + } else { + logger.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); + } + } + + logger.warn("STREAM ROUTER FINISHED"); + } catch (Exception e) { + logger.warn("STREAM ROUTER PARSE ERROR: " + e.toString()); + } + } + + public void sendJuickMessage(JuickMessage jmsg) { + List jids = new ArrayList<>(); + + if (jmsg.FriendsOnly) { + jids = SubscriptionsQueries.getJIDSubscribedToUser(xmpp.jdbc, jmsg.getUser().getUID(), jmsg.FriendsOnly); + } else { + List users = SubscriptionsQueries.getSubscribedUsers(xmpp.jdbc, jmsg.getUser().getUID(), jmsg.getMID()); + for (User user : users) { + for (String jid : UserQueries.getJIDsbyUID(xmpp.jdbc, user.getUID())) { + jids.add(jid); + } + } + } + + String txt = "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n"; + txt += "#" + jmsg.getMID() + " http://juick.com/" + jmsg.getMID(); + + Nickname nick = new Nickname(); + nick.Nickname = "@" + jmsg.getUser().getUName(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = xmpp.bot.getJid(); + msg.body = txt; + msg.type = Message.Type.chat; + msg.thread = "juick-" + jmsg.getMID(); + msg.addChild(jmsg); + msg.addChild(nick); + if (attachment != null) { + XOOB oob = new XOOB(); + oob.URL = attachment; + msg.addChild(oob); + } + + for (String jid : jids) { + msg.to = new JID(jid); + xmpp.sendOut(msg); + } + } + + public void sendJuickComment(JuickMessage jmsg) { + List users; + String replyQuote; + String replyTo; + + users = SubscriptionsQueries.getUsersSubscribedToComments(xmpp.jdbc, jmsg.getMID(), jmsg.getUser().getUID()); + com.juick.Message replyMessage = jmsg.ReplyTo > 0 ? MessagesQueries.getReply(xmpp.jdbc, jmsg.getMID(), jmsg.ReplyTo) + : MessagesQueries.getMessage(xmpp.jdbc, jmsg.getMID()); + replyTo = replyMessage.getUser().getUName(); + replyQuote = replyMessage.getReplyQuote(); + + String txt = "Reply by @" + jmsg.getUser().getUName() + ":\n" + replyQuote + "\n@" + replyTo + " "; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMID() + "/" + jmsg.getRID() + " http://juick.com/" + jmsg.getMID() + "#" + jmsg.getRID(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = xmpp.bot.getJid(); + msg.body = txt; + msg.type = Message.Type.chat; + msg.addChild(jmsg); + for (User user : users) { + for (String jid : UserQueries.getJIDsbyUID(xmpp.jdbc, user.getUID())) { + msg.to = new JID(jid); + xmpp.sendOut(msg); + } + } + } + + public void sendJuickRecommendation(JuickMessage recomm) { + List users; + JuickMessage jmsg; + jmsg = new JuickMessage(MessagesQueries.getMessage(xmpp.jdbc, recomm.getMID())); + users = SubscriptionsQueries.getUsersSubscribedToUserRecommendations(xmpp.jdbc, + recomm.getUser().getUID(), recomm.getMID(), jmsg.getUser().getUID()); + + String txt = "Recommended by @" + recomm.getUser().getUName() + ":\n"; + txt += "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n"; + txt += "#" + jmsg.getMID(); + if (jmsg.Replies > 0) { + if (jmsg.Replies % 10 == 1 && jmsg.Replies % 100 != 11) { + txt += " (" + jmsg.Replies + " reply)"; + } else { + txt += " (" + jmsg.Replies + " replies)"; + } + } + txt += " http://juick.com/" + jmsg.getMID(); + + Nickname nick = new Nickname(); + nick.Nickname = "@" + jmsg.getUser().getUName(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = xmpp.bot.getJid(); + msg.body = txt; + msg.type = Message.Type.chat; + msg.thread = "juick-" + jmsg.getMID(); + msg.addChild(jmsg); + msg.addChild(nick); + if (attachment != null) { + XOOB oob = new XOOB(); + oob.URL = attachment; + msg.addChild(oob); + } + + for (User user : users) { + for (String jid : UserQueries.getJIDsbyUID(xmpp.jdbc, user.getUID())) { + msg.to = new JID(jid); + xmpp.sendOut(msg); + } + } + } +} \ No newline at end of file diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/JuickBot.java b/juick-xmpp/src/main/java/com/juick/components/s2s/JuickBot.java index 737fbdc2..7458d892 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/JuickBot.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/JuickBot.java @@ -21,12 +21,12 @@ import java.util.regex.Pattern; */ public class JuickBot { XMPPServer xmpp; - public JuickBot(XMPPServer xmpp, JID JuickJID) { + public JuickBot(XMPPServer xmpp, JID jid) { this.xmpp = xmpp; - this.JuickJID = JuickJID; + this.jid = jid; } - public final JID JuickJID; + private final JID jid; private static final String HELPTEXT = "@username text - Send private message\n" + "*tagname Blah-blah-blah - Post a message with tag 'tagname'\n" @@ -189,10 +189,10 @@ public class JuickBot { jmsg.setUser(user_from); jmsg.setText(msg.body); m.childs.add(jmsg); - xmpp.getRouter().send(m.toString()); + xmpp.getRouter().sendStanza(m.toString()); m.to.Host = "ws.juick.com"; - xmpp.getRouter().send(m.toString()); + xmpp.getRouter().sendStanza(m.toString()); List jids; boolean inroster = false; @@ -260,23 +260,23 @@ public class JuickBot { } private void commandPing(Message m) throws Exception { - Presence p = new Presence(JuickJID, m.from); + Presence p = new Presence(jid, m.from); p.priority = 10; xmpp.sendOut(p); - Message reply = new Message(JuickJID, m.from, Message.Type.chat); + Message reply = new Message(jid, m.from, Message.Type.chat); reply.body = "PONG"; xmpp.sendOut(reply); } private void commandHelp(Message m) throws Exception { - Message reply = new Message(JuickJID, m.from, Message.Type.chat); + Message reply = new Message(jid, m.from, Message.Type.chat); reply.body = HELPTEXT; xmpp.sendOut(reply); } private void commandLogin(Message m, User user_from) throws Exception { - Message reply = new Message(JuickJID, m.from, Message.Type.chat); + Message reply = new Message(jid, m.from, Message.Type.chat); reply.body = "http://juick.com/login?" + UserQueries.getHashByUID(xmpp.jdbc, user_from.getUID()); xmpp.sendOut(reply); } @@ -317,10 +317,10 @@ public class JuickBot { jmsg.setUser(user_from); jmsg.setText(body); msg.childs.add(jmsg); - xmpp.getRouter().send(msg.toString()); + xmpp.getRouter().sendStanza(msg.toString()); msg.to.Host = "ws.juick.com"; - xmpp.getRouter().send(msg.toString()); + xmpp.getRouter().sendStanza(msg.toString()); for (String jid : jids_to) { Message mm = new Message(); @@ -372,8 +372,12 @@ public class JuickBot { txt = "You don't have any users or tags in your blacklist."; } - Message reply = new Message(JuickJID, m.from, Message.Type.chat); + Message reply = new Message(jid, m.from, Message.Type.chat); reply.body = txt; xmpp.sendOut(reply); } + + public JID getJid() { + return jid; + } } -- cgit v1.2.3