package com.juick.components.s2s; import com.juick.User; import com.juick.components.XMPPServer; import com.juick.xmpp.JID; import com.juick.xmpp.Message; import com.juick.xmpp.extensions.JuickMessage; import com.juick.xmpp.extensions.Nickname; import com.juick.xmpp.extensions.XOOB; import com.juick.xmpp.utils.XmlUtils; import net.jodah.failsafe.Execution; import net.jodah.failsafe.RetryPolicy; import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParserException; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channels; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; /** * @author ugnich */ public class ConnectionRouter extends Connection implements Runnable { private static final Logger logger = LoggerFactory.getLogger(ConnectionRouter.class); private String componentName; private int componentPort; private String password; public ConnectionRouter(XMPPServer s2s, String componentName, int componentPort, String password) throws Exception { super(s2s); this.componentName = componentName; this.componentPort = componentPort; this.password = password; } @Override public void run() { logger.info("stream router start"); @SuppressWarnings("unchecked") RetryPolicy retryPolicy = new RetryPolicy() .withBackoff(1, 30, TimeUnit.SECONDS) .withJitter(0.1) .retryOn(IOException.class, XmlPullParserException.class); Execution execution = new Execution(retryPolicy); xmpp.service.submit(() -> { while (!execution.isComplete()) { try { AsynchronousSocketChannel socket = AsynchronousSocketChannel.open(); socket.connect(new InetSocketAddress("localhost", componentPort)).get(); parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); writer = new OutputStreamWriter(Channels.newOutputStream(socket)); String msg = String.format("", componentName); writer.write(msg); writer.flush(); parser.next(); // stream:stream streamID = parser.getAttributeValue(null, "id"); if (streamID == null || streamID.isEmpty()) { throw new Exception("fail on first packet"); } msg = "" + DigestUtils.sha1Hex(streamID + password) + ""; writer.write(msg); writer.flush(); parser.next(); if (!parser.getName().equals("handshake")) { throw new Exception("no handshake"); } XmlUtils.skip(parser); logger.info("stream router open"); while (parser.next() != XmlPullParser.END_DOCUMENT) { if (parser.getEventType() != XmlPullParser.START_TAG) { continue; } String tag = parser.getName(); String to = parser.getAttributeValue(null, "to"); if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { JID jid = new JID(to); if (jid.Host != null) { if (jid.Host.equals(componentName)) { if (tag.equals("message")) { Message xmsg = Message.parse(parser, xmpp.childParsers); logger.info("stream router (process): {}", xmsg); JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); if (jmsg != null) { if (jid.Username != null && jid.Username.equals("recomm")) { sendJuickRecommendation(jmsg); } else { if (jmsg.getRid() > 0) { sendJuickComment(jmsg); } else if (jmsg.getMid() > 0) { sendJuickMessage(jmsg); } } } } } else if (jid.Host.endsWith(xmpp.HOSTNAME) && (jid.Host.equals(xmpp.HOSTNAME) || jid.Host.endsWith("." + xmpp.HOSTNAME))) { String xml = XmlUtils.parseToString(parser, true); logger.info("stream router: {}", xml); } else { String xml = XmlUtils.parseToString(parser, true); logger.info("stream router (out): {}", xml); xmpp.sendOut(jid.Host, xml); } } else { String invalidStanza = XmlUtils.parseToString(parser, true); logger.info("stream router (no to): {}", invalidStanza); } } else { String unknownStanza = XmlUtils.parseToString(parser, true); logger.info("stream router (unknown): {}", unknownStanza); } } logger.warn("stream router finished"); } catch (Exception e) { logger.warn("router error, reconnection ", e); execution.recordFailure(e); } } }); } public void sendJuickMessage(JuickMessage jmsg) { List jids = new ArrayList<>(); if (jmsg.FriendsOnly) { jids = xmpp.subscriptionService.getJIDSubscribedToUser(jmsg.getUser().getUid(), jmsg.FriendsOnly); } else { List users = xmpp.subscriptionService.getSubscribedUsers(jmsg.getUser().getUid(), jmsg.getMid()); for (User user : users) { for (String jid : xmpp.userService.getJIDsbyUID(user.getUid())) { jids.add(jid); } } } String txt = "@" + jmsg.getUser().getName() + ":" + jmsg.getTagsString() + "\n"; String attachment = jmsg.getAttachmentURL(); if (attachment != null) { txt += attachment + "\n"; } txt += jmsg.getText() + "\n\n"; txt += "#" + jmsg.getMid() + " http://juick.com/" + jmsg.getMid(); Nickname nick = new Nickname(); nick.Nickname = "@" + jmsg.getUser().getName(); com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); msg.from = xmpp.getJid(); msg.body = txt; msg.type = Message.Type.chat; msg.thread = "juick-" + jmsg.getMid(); msg.addChild(jmsg); msg.addChild(nick); if (attachment != null) { XOOB oob = new XOOB(); oob.URL = attachment; msg.addChild(oob); } for (String jid : jids) { msg.to = new JID(jid); xmpp.sendOut(msg); } } public void sendJuickComment(JuickMessage jmsg) { List users; String replyQuote; String replyTo; users = xmpp.subscriptionService.getUsersSubscribedToComments(jmsg.getMid(), jmsg.getUser().getUid()); com.juick.Message replyMessage = jmsg.getReplyto() > 0 ? xmpp.messagesService.getReply(jmsg.getMid(), jmsg.getReplyto()) : xmpp.messagesService.getMessage(jmsg.getMid()); replyTo = replyMessage.getUser().getName(); com.juick.Message fullReply = xmpp.messagesService.getReply(jmsg.getMid(), jmsg.getRid()); replyQuote = fullReply.getReplyQuote(); String txt = "Reply by @" + jmsg.getUser().getName() + ":\n" + replyQuote + "\n@" + replyTo + " "; String attachment = jmsg.getAttachmentURL(); if (attachment != null) { txt += attachment + "\n"; } txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMid() + "/" + jmsg.getRid() + " http://juick.com/" + jmsg.getMid() + "#" + jmsg.getRid(); com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); msg.from = xmpp.getJid(); msg.body = txt; msg.type = Message.Type.chat; msg.addChild(jmsg); for (User user : users) { for (String jid : xmpp.userService.getJIDsbyUID(user.getUid())) { msg.to = new JID(jid); xmpp.sendOut(msg); } } } public void sendJuickRecommendation(JuickMessage recomm) { List users; JuickMessage jmsg; jmsg = new JuickMessage(xmpp.messagesService.getMessage(recomm.getMid())); users = xmpp.subscriptionService.getUsersSubscribedToUserRecommendations(recomm.getUser().getUid(), recomm.getMid(), jmsg.getUser().getUid()); String txt = "Recommended by @" + recomm.getUser().getName() + ":\n"; txt += "@" + jmsg.getUser().getName() + ":" + jmsg.getTagsString() + "\n"; String attachment = jmsg.getAttachmentURL(); if (attachment != null) { txt += attachment + "\n"; } txt += jmsg.getText() + "\n\n"; txt += "#" + jmsg.getMid(); if (jmsg.getReplies() > 0) { if (jmsg.getReplies() % 10 == 1 && jmsg.getReplies() % 100 != 11) { txt += " (" + jmsg.getReplies() + " reply)"; } else { txt += " (" + jmsg.getReplies() + " replies)"; } } txt += " http://juick.com/" + jmsg.getMid(); Nickname nick = new Nickname(); nick.Nickname = "@" + jmsg.getUser().getName(); com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); msg.from = xmpp.getJid(); msg.body = txt; msg.type = Message.Type.chat; msg.thread = "juick-" + jmsg.getMid(); msg.addChild(jmsg); msg.addChild(nick); if (attachment != null) { XOOB oob = new XOOB(); oob.URL = attachment; msg.addChild(oob); } for (User user : users) { for (String jid : xmpp.userService.getJIDsbyUID(user.getUid())) { msg.to = new JID(jid); xmpp.sendOut(msg); } } } }