package com.juick.xmpp.s2s; import com.juick.User; import com.juick.server.MessagesQueries; import com.juick.server.SubscriptionsQueries; import com.juick.server.UserQueries; import com.juick.xmpp.JID; import com.juick.xmpp.Message; import com.juick.xmpp.extensions.JuickMessage; import com.juick.xmpp.extensions.Nickname; import com.juick.xmpp.extensions.XOOB; import com.juick.xmpp.utils.SHA1; import com.juick.xmpp.utils.XmlUtils; import org.springframework.jdbc.core.JdbcTemplate; import org.xmlpull.v1.XmlPullParser; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channels; import java.nio.channels.CompletionHandler; import java.util.List; import java.util.logging.Level; /** * * @author ugnich */ public class ConnectionRouter extends Connection implements Runnable { private String componentName; ConnectionRouter(String componentName) { this.componentName = componentName; } @Override public void run() { LOGGER.info("STREAM ROUTER START"); try { socket = AsynchronousSocketChannel.open(); socket.connect(new InetSocketAddress("localhost", 5347), socket, new CompletionHandler() { @Override public void completed(Void result, AsynchronousSocketChannel client) { try { parser.setInput(new InputStreamReader(Channels.newInputStream(client))); parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); writer = new OutputStreamWriter(Channels.newOutputStream(client)); String msg = ""; writer.write(msg); writer.flush(); parser.next(); // stream:stream streamID = parser.getAttributeValue(null, "id"); if (streamID == null || streamID.isEmpty()) { throw new Exception("FAIL ON FIRST PACKET"); } msg = "" + SHA1.encode(streamID + "secret") + ""; writer.write(msg); writer.flush(); parser.next(); if (!parser.getName().equals("handshake")) { throw new Exception("NO HANDSHAKE"); } XmlUtils.skip(parser); LOGGER.info("STREAM ROUTER OPEN"); while (parser.next() != XmlPullParser.END_DOCUMENT) { if (parser.getEventType() != XmlPullParser.START_TAG) { continue; } String tag = parser.getName(); String to = parser.getAttributeValue(null, "to"); if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { JID jid = new JID(to); if (jid.Host != null) { if (jid.Host.equals(componentName)) { if (tag.equals("message")) { Message xmsg = Message.parse(parser, XMPPComponent.childParsers); LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); if (jmsg != null) { if (jid.Username != null && jid.Username.equals("recomm")) { sendJuickRecommendation(jmsg); } else { if (jmsg.getRID() > 0) { sendJuickComment(jmsg); } else if (jmsg.getMID() > 0) { sendJuickMessage(jmsg); } } } } } else if (jid.Host.endsWith(XMPPComponent.HOSTNAME) && (jid.Host.equals(XMPPComponent.HOSTNAME) || jid.Host.endsWith("." + XMPPComponent.HOSTNAME))) { String xml = XmlUtils.parseToString(parser, true); LOGGER.info("STREAM ROUTER: " + xml); } else { String xml = XmlUtils.parseToString(parser, true); LOGGER.info("STREAM ROUTER (OUT): " + xml); XMPPComponent.sendOut(jid.Host, xml); } } else { LOGGER.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); } } else { LOGGER.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); } } LOGGER.warning("STREAM ROUTER FINISHED"); } catch (Exception e) { LOGGER.log(Level.SEVERE, "xmpp router exception", e); } } @Override public void failed(Throwable exc, AsynchronousSocketChannel attachment) { LOGGER.log(Level.WARNING, "s2s component failed to connect", exc); } }); Thread.currentThread().join(); } catch (Exception e) { LOGGER.log(Level.SEVERE, "NIO2 error", e); } } @Override synchronized public void sendStanza(String xml) { try { writer.write(xml); writer.flush(); } catch (IOException e) { LOGGER.warning("STREAM ROUTER ERROR: " + xml); LOGGER.warning("STREAM ROUTER ERROR: " + e.toString()); System.exit(0); } } public void sendJuickMessage(JuickMessage jmsg) { List users; if (jmsg.FriendsOnly) { users = SubscriptionsQueries.getUsersSubscribedToUser(XMPPComponent.sql, jmsg.getUser().getUID(), jmsg.FriendsOnly); } else { users = SubscriptionsQueries.getSubscribedUsers(XMPPComponent.sql, jmsg.getUser().getUID(), jmsg.getMID()); } String txt = "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; String attachment = jmsg.getAttachmentURL(); if (attachment != null) { txt += attachment + "\n"; } txt += jmsg.getText() + "\n\n"; txt += "#" + jmsg.getMID() + " http://juick.com/" + jmsg.getMID(); Nickname nick = new Nickname(); nick.Nickname = "@" + jmsg.getUser().getUName(); com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); msg.from = JuickBot.JuickJID; msg.body = txt; msg.type = Message.Type.chat; msg.thread = "juick-" + jmsg.getMID(); msg.addChild(jmsg); msg.addChild(nick); if (attachment != null) { XOOB oob = new XOOB(); oob.URL = attachment; msg.addChild(oob); } for (User user : users) { for (String jid : UserQueries.getActiveJIDs(XMPPComponent.sql, user)) { msg.to = new JID(jid); XMPPComponent.sendOut(msg); } } } public void sendJuickComment(JuickMessage jmsg) { String replyQuote; List users = SubscriptionsQueries.getSubscribedUsers(XMPPComponent.sql, jmsg.getUser().getUID(), jmsg.getMID()); replyQuote = getReplyQuote(XMPPComponent.sql, jmsg.getMID(), jmsg.ReplyTo); String txt = "Reply by @" + jmsg.getUser().getUName() + ":\n" + replyQuote + "\n"; String attachment = jmsg.getAttachmentURL(); if (attachment != null) { txt += attachment + "\n"; } txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMID() + "/" + jmsg.getRID() + " http://juick.com/" + jmsg.getMID() + "#" + jmsg.getRID(); com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); msg.from = JuickBot.JuickJID; msg.body = txt; msg.type = Message.Type.chat; msg.addChild(jmsg); for (User user : users) { // TODO: make single query for (String jid : UserQueries.getActiveJIDs(XMPPComponent.sql, user)) { msg.to = new JID(jid); XMPPComponent.sendOut(msg); } } } private String getReplyQuote(JdbcTemplate sql, int MID, int ReplyTo) { String quote = ""; if (ReplyTo > 0) { com.juick.Message q = MessagesQueries.getReply(sql, MID, ReplyTo); if (q != null) { quote = q.getText(); } } else { com.juick.Message q = MessagesQueries.getMessage(sql, MID); if (q != null) { quote = q.getText(); } } if (quote.length() > 50) { quote = ">" + quote.substring(0, 47).replace('\n', ' ') + "...\n"; } else if (quote.length() > 0) { quote = ">" + quote.replace('\n', ' ') + "\n"; } return quote; } public void sendJuickRecommendation(JuickMessage recomm) { JuickMessage jmsg; jmsg = new JuickMessage(MessagesQueries.getMessage(XMPPComponent.sql, recomm.getMID())); List users = SubscriptionsQueries.getUsersSubscribedToComments(XMPPComponent.sql, recomm.getMID(), jmsg.getUser().getUID()); String txt = "Recommended by @" + recomm.getUser().getUName() + ":\n"; txt += "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; String attachment = jmsg.getAttachmentURL(); if (attachment != null) { txt += attachment + "\n"; } txt += jmsg.getText() + "\n\n"; txt += "#" + jmsg.getMID(); if (jmsg.Replies > 0) { if (jmsg.Replies % 10 == 1 && jmsg.Replies % 100 != 11) { txt += " (" + jmsg.Replies + " reply)"; } else { txt += " (" + jmsg.Replies + " replies)"; } } txt += " http://juick.com/" + jmsg.getMID(); Nickname nick = new Nickname(); nick.Nickname = "@" + jmsg.getUser().getUName(); com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); msg.from = JuickBot.JuickJID; msg.body = txt; msg.type = Message.Type.chat; msg.thread = "juick-" + jmsg.getMID(); msg.addChild(jmsg); msg.addChild(nick); if (attachment != null) { XOOB oob = new XOOB(); oob.URL = attachment; msg.addChild(oob); } for (User user : users) { for (String jid : UserQueries.getActiveJIDs(XMPPComponent.sql, user)) { msg.to = new JID(jid); XMPPComponent.sendOut(msg); } } } }