diff options
Diffstat (limited to 'src/main/java/com/juick/xmpp')
-rw-r--r-- | src/main/java/com/juick/xmpp/JuickBot.java (renamed from src/main/java/com/juick/xmpp/s2s/JuickBot.java) | 119 | ||||
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/CleaningUp.java | 20 | ||||
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/ConnectionIn.java | 40 | ||||
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/ConnectionListener.java | 20 | ||||
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/ConnectionOut.java | 24 | ||||
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java | 283 | ||||
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/S2SComponent.java | 316 | ||||
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/XMPPComponent.java | 199 |
8 files changed, 438 insertions, 583 deletions
diff --git a/src/main/java/com/juick/xmpp/s2s/JuickBot.java b/src/main/java/com/juick/xmpp/JuickBot.java index d82ed939..6104d19d 100644 --- a/src/main/java/com/juick/xmpp/s2s/JuickBot.java +++ b/src/main/java/com/juick/xmpp/JuickBot.java @@ -1,14 +1,14 @@ -package com.juick.xmpp.s2s; +package com.juick.xmpp; +import com.juick.JuickNotificator; import com.juick.User; import com.juick.server.PMQueries; import com.juick.server.TagQueries; import com.juick.server.UserQueries; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; -import com.juick.xmpp.Presence; import com.juick.xmpp.extensions.Error; import com.juick.xmpp.extensions.JuickMessage; +import com.juick.xmpp.s2s.S2SComponent; +import org.springframework.jdbc.core.JdbcTemplate; import java.util.List; import java.util.regex.Matcher; @@ -20,6 +20,16 @@ import java.util.regex.Pattern; */ public class JuickBot { + JdbcTemplate sql; + Stream xmpp; + JuickNotificator notificator; + + public JuickBot(JuickNotificator notificator, JdbcTemplate sql, Stream xmpp) { + this.notificator = notificator; + this.sql = sql; + this.xmpp = xmpp; + } + public static final JID JuickJID = new JID("juick", "juick.com", "Juick"); private static final String HELPTEXT = "@username text - Send private message\n" @@ -55,7 +65,7 @@ public class JuickBot { + "\n" + "Read more: http://juick.com/help/"; - public static boolean incomingPresence(Presence p) { + public boolean incomingPresence(Presence p) { final String username = p.to.Username.toLowerCase(); final boolean toJuick = username.equals("juick"); @@ -64,12 +74,12 @@ public class JuickBot { reply.from = new JID(p.to.Username, p.to.Host, null); reply.to = new JID(p.from.Username, p.from.Host, null); reply.type = Presence.Type.unsubscribe; - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); return true; } else if (p.type.equals(Presence.Type.probe)) { int uid_to = 0; if (!toJuick) { - uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, username); + uid_to = UserQueries.getUIDbyName(sql, username); } if (toJuick || uid_to > 0) { @@ -78,12 +88,12 @@ public class JuickBot { reply.from.Resource = "Juick"; reply.to = p.from; reply.priority = 10; - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } else { Presence reply = new Presence(p.to, p.from, Presence.Type.error); reply.id = p.id; reply.addChild(new Error(Error.Type.cancel, "item-not-found")); - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); return true; } return true; @@ -92,46 +102,46 @@ public class JuickBot { if (toJuick) { canSubscribe = true; } else { - int uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, username); + int uid_to = UserQueries.getUIDbyName(sql, username); if (uid_to > 0) { - PMQueries.addPMinRoster(XMPPComponent.sql, uid_to, p.from.Bare()); + PMQueries.addPMinRoster(sql, uid_to, p.from.Bare()); canSubscribe = true; } } if (canSubscribe) { Presence reply = new Presence(p.to, p.from, Presence.Type.subscribed); - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); reply.from.Resource = "Juick"; reply.priority = 10; reply.type = null; - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); return true; } else { Presence reply = new Presence(p.to, p.from, Presence.Type.error); reply.id = p.id; reply.addChild(new Error(Error.Type.cancel, "item-not-found")); - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); return true; } } else if (p.type.equals(Presence.Type.unsubscribe)) { if (!toJuick) { - int uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, username); + int uid_to = UserQueries.getUIDbyName(sql, username); if (uid_to > 0) { - PMQueries.removePMinRoster(XMPPComponent.sql, uid_to, p.from.Bare()); + PMQueries.removePMinRoster(sql, uid_to, p.from.Bare()); } } Presence reply = new Presence(p.to, p.from, Presence.Type.unsubscribed); - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } return false; } - public static boolean incomingMessage(Message msg) { + public boolean incomingMessage(Message msg) { if (msg.body == null || msg.body.isEmpty()) { return true; } @@ -140,9 +150,9 @@ public class JuickBot { User user_from = null; String signuphash = ""; - user_from = UserQueries.getUserByJID(XMPPComponent.sql, msg.from.Bare()); + user_from = UserQueries.getUserByJID(sql, msg.from.Bare()); if (user_from == null) { - signuphash = UserQueries.getSignUpHashByJID(XMPPComponent.sql, msg.from.Bare()); + signuphash = UserQueries.getSignUpHashByJID(sql, msg.from.Bare()); } if (user_from == null) { @@ -152,7 +162,7 @@ public class JuickBot { } else { reply.body = "Внимание, системное сообщение!\nВаш JabberID не обнаружен в списке доверенных. Для того, чтобы отправить сообщение пользователю " + username + "@juick.com, пожалуйста зарегистрируйте свой JabberID в системе: http://juick.com/signup?type=xmpp&hash=" + signuphash + "\nЕсли у вас уже есть учетная запись на Juick, вы сможете присоединить этот JabberID к ней.\n\nWarning, system message!\nYour JabberID is not found in our server's white list. To send a message to " + username + "@juick.com, please sign up: http://juick.com/signup?type=xmpp&hash=" + signuphash + "\nIf you already have an account on Juick, you will be proposed to attach this JabberID to your existing account."; } - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); return true; } @@ -160,41 +170,40 @@ public class JuickBot { return incomingMessageJuick(user_from, msg); } - int uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, username); + int uid_to = UserQueries.getUIDbyName(sql, username); if (uid_to == 0) { Message reply = new Message(msg.to, msg.from, Message.Type.error); reply.id = msg.id; reply.addChild(new Error(Error.Type.cancel, "item-not-found")); - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); return true; } boolean success = false; - if (!UserQueries.isInBLAny(XMPPComponent.sql, uid_to, user_from.getUID())) { - success = PMQueries.createPM(XMPPComponent.sql, user_from.getUID(), uid_to, msg.body); + if (!UserQueries.isInBLAny(sql, uid_to, user_from.getUID())) { + success = PMQueries.createPM(sql, user_from.getUID(), uid_to, msg.body); } if (success) { Message m = new Message(); - m.from = new JID("juick", "juick.com", null); m.to = new JID(Integer.toString(uid_to), "push.juick.com", null); JuickMessage jmsg = new JuickMessage(); - jmsg.setUser(UserQueries.getUserByUID(XMPPComponent.sql, user_from.getUID())); + jmsg.setUser(UserQueries.getUserByUID(sql, user_from.getUID())); jmsg.setText(msg.body); m.childs.add(jmsg); - XMPPComponent.connRouter.sendStanza(m.toString()); + notificator.push(m); m.to.Host = "ws.juick.com"; - XMPPComponent.connRouter.sendStanza(m.toString()); + xmpp.send(m); String jid; boolean inroster = false; - jid = UserQueries.getJIDbyUID(XMPPComponent.sql, uid_to); + jid = UserQueries.getJIDbyUID(sql, uid_to); if (jid != null) { - inroster = PMQueries.havePMinRoster(XMPPComponent.sql, user_from.getUID(), jid); + inroster = PMQueries.havePMinRoster(sql, user_from.getUID(), jid); } if (jid != null) { @@ -208,21 +217,21 @@ public class JuickBot { mm.from = new JID("juick", "juick.com", "Juick"); mm.body = "Private message from @" + jmsg.getUser().getUName() + ":\n" + msg.body; } - XMPPComponent.sendOut(mm); + S2SComponent.sendOut(mm); } } else { Message reply = new Message(msg.to, msg.from, Message.Type.error); reply.id = msg.id; reply.addChild(new Error(Error.Type.cancel, "not-allowed")); - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } return true; } private static Pattern regexPM = Pattern.compile("^\\@(\\S+)\\s+([\\s\\S]+)$"); - public static boolean incomingMessageJuick(User user_from, Message msg) { + public boolean incomingMessageJuick(User user_from, Message msg) { String command = msg.body.trim(); int commandlen = command.length(); @@ -262,26 +271,26 @@ public class JuickBot { private static void commandPing(Message m) { Presence p = new Presence(JuickJID, m.from); p.priority = 10; - XMPPComponent.sendOut(p); + S2SComponent.sendOut(p); Message reply = new Message(JuickJID, m.from, Message.Type.chat); reply.body = "PONG"; - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } private static void commandHelp(Message m) { Message reply = new Message(JuickJID, m.from, Message.Type.chat); reply.body = HELPTEXT; - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } - private static void commandLogin(Message m, User user_from) { + private void commandLogin(Message m, User user_from) { Message reply = new Message(JuickJID, m.from, Message.Type.chat); - reply.body = "http://juick.com/login?" + UserQueries.getHashByUID(XMPPComponent.sql, user_from.getUID()); - XMPPComponent.sendOut(reply); + reply.body = "http://juick.com/login?" + UserQueries.getHashByUID(sql, user_from.getUID()); + S2SComponent.sendOut(reply); } - private static void commandPM(Message m, User user_from, String user_to, String body) { + private void commandPM(Message m, User user_from, String user_to, String body) { int ret = 0; int uid_to = 0; @@ -289,17 +298,17 @@ public class JuickBot { boolean haveInRoster = false; if (user_to.indexOf('@') > 0) { - uid_to = UserQueries.getUIDbyJID(XMPPComponent.sql, user_to); + uid_to = UserQueries.getUIDbyJID(sql, user_to); } else { - uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, user_to); + uid_to = UserQueries.getUIDbyName(sql, user_to); } if (uid_to > 0) { - if (!UserQueries.isInBLAny(XMPPComponent.sql, uid_to, user_from.getUID())) { - if (PMQueries.createPM(XMPPComponent.sql, user_from.getUID(), uid_to, body)) { - jid_to = UserQueries.getJIDbyUID(XMPPComponent.sql, uid_to); + if (!UserQueries.isInBLAny(sql, uid_to, user_from.getUID())) { + if (PMQueries.createPM(sql, user_from.getUID(), uid_to, body)) { + jid_to = UserQueries.getJIDbyUID(sql, uid_to); if (jid_to != null) { - haveInRoster = PMQueries.havePMinRoster(XMPPComponent.sql, user_from.getUID(), jid_to); + haveInRoster = PMQueries.havePMinRoster(sql, user_from.getUID(), jid_to); } ret = 200; } else { @@ -321,10 +330,10 @@ public class JuickBot { jmsg.setUser(user_from); jmsg.setText(body); msg.childs.add(jmsg); - XMPPComponent.connRouter.sendStanza(msg.toString()); + notificator.push(msg); msg.to.Host = "ws.juick.com"; - XMPPComponent.connRouter.sendStanza(msg.toString()); + xmpp.send(msg); if (jid_to != null) { Message mm = new Message(); @@ -337,7 +346,7 @@ public class JuickBot { mm.from = new JID("juick", "juick.com", "Juick"); mm.body = "Private message from @" + user_from.getUName() + ":\n" + body; } - XMPPComponent.sendOut(mm); + S2SComponent.sendOut(mm); } } @@ -349,15 +358,15 @@ public class JuickBot { reply.type = Message.Type.error; reply.body = "Error " + ret; } - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } - private static void commandBLShow(Message m, User user_from) { + private void commandBLShow(Message m, User user_from) { List<User> blusers; List<String> bltags; - blusers = UserQueries.getUserBLUsers(XMPPComponent.sql, user_from.getUID()); - bltags = TagQueries.getUserBLTags(XMPPComponent.sql, user_from.getUID()); + blusers = UserQueries.getUserBLUsers(sql, user_from.getUID()); + bltags = TagQueries.getUserBLTags(sql, user_from.getUID()); String txt = ""; @@ -381,6 +390,6 @@ public class JuickBot { Message reply = new Message(JuickJID, m.from, Message.Type.chat); reply.body = txt; - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } } diff --git a/src/main/java/com/juick/xmpp/s2s/CleaningUp.java b/src/main/java/com/juick/xmpp/s2s/CleaningUp.java index 48771580..12ac7449 100644 --- a/src/main/java/com/juick/xmpp/s2s/CleaningUp.java +++ b/src/main/java/com/juick/xmpp/s2s/CleaningUp.java @@ -15,14 +15,14 @@ public class CleaningUp implements Runnable { public void run() { while (true) { try { - PrintWriter statsFile = new PrintWriter(XMPPComponent.STATSFILE, "UTF-8"); + PrintWriter statsFile = new PrintWriter(S2SComponent.STATSFILE, "UTF-8"); statsFile.write("<html><body><h2>Threads: " + Thread.activeCount() + "</h2>"); - statsFile.write("<h2>Out (" + XMPPComponent.outConnections.size() + ")</h2><table border=1><tr><th>to</th><th>sid</th><th>inactive</th><th>out packets</th><th>out bytes</th></tr>"); + statsFile.write("<h2>Out (" + S2SComponent.outConnections.size() + ")</h2><table border=1><tr><th>to</th><th>sid</th><th>inactive</th><th>out packets</th><th>out bytes</th></tr>"); long now = System.currentTimeMillis(); - synchronized (XMPPComponent.outConnections) { - for (Iterator<ConnectionOut> i = XMPPComponent.outConnections.iterator(); i.hasNext();) { + synchronized (S2SComponent.outConnections) { + for (Iterator<ConnectionOut> i = S2SComponent.outConnections.iterator(); i.hasNext();) { ConnectionOut c = i.next(); int inactive = (int) ((double) (now - c.tsLocalData) / 1000.0); if (inactive > 900) { @@ -40,10 +40,10 @@ public class CleaningUp implements Runnable { } } - statsFile.write("</table><h2>In (" + XMPPComponent.inConnections.size() + ")</h2><table border=1><tr><th>from</th><th>sid</th><th>inactive</th><th>in packets</th></tr>"); + statsFile.write("</table><h2>In (" + S2SComponent.inConnections.size() + ")</h2><table border=1><tr><th>from</th><th>sid</th><th>inactive</th><th>in packets</th></tr>"); - synchronized (XMPPComponent.inConnections) { - for (Iterator<ConnectionIn> i = XMPPComponent.inConnections.iterator(); i.hasNext();) { + synchronized (S2SComponent.inConnections) { + for (Iterator<ConnectionIn> i = S2SComponent.inConnections.iterator(); i.hasNext();) { ConnectionIn c = i.next(); int inactive = (int) ((double) (now - c.tsRemoteData) / 1000.0); if (inactive > 900) { @@ -73,10 +73,10 @@ public class CleaningUp implements Runnable { } } - statsFile.write("</table><h2>Cache (" + XMPPComponent.outCache.size() + ")</h2><table border=1><tr><th>host</th><th>live</th><th>size</th></tr>"); + statsFile.write("</table><h2>Cache (" + S2SComponent.outCache.size() + ")</h2><table border=1><tr><th>host</th><th>live</th><th>size</th></tr>"); - synchronized (XMPPComponent.outCache) { - for (Iterator<CacheEntry> i = XMPPComponent.outCache.iterator(); i.hasNext();) { + synchronized (S2SComponent.outCache) { + for (Iterator<CacheEntry> i = S2SComponent.outCache.iterator(); i.hasNext();) { CacheEntry c = i.next(); int inactive = (int) ((double) (now - c.tsCreated) / 1000.0); if (inactive > 600) { diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java index c215e375..a8572ddc 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java @@ -1,9 +1,6 @@ package com.juick.xmpp.s2s; -import com.juick.xmpp.Iq; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; -import com.juick.xmpp.Presence; +import com.juick.xmpp.*; import com.juick.xmpp.utils.XmlUtils; import org.xmlpull.v1.XmlPullParser; @@ -31,9 +28,14 @@ public class ConnectionIn extends Connection { public long tsRemoteData = 0; public long packetsRemote = 0; - public ConnectionIn(AsynchronousSocketChannel socket) { + JuickBot bot; + Stream xmpp; + + public ConnectionIn(AsynchronousSocketChannel socket, JuickBot bot, Stream xmpp) { super(); this.socket = socket; + this.bot = bot; + this.xmpp = xmpp; streamID = UUID.randomUUID().toString(); } @@ -57,7 +59,7 @@ public class ConnectionIn extends Connection { String openStream = "<?xml version='1.0'?><stream:stream xmlns='jabber:server' " + "xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" + - XMPPComponent.HOSTNAME + "' id='" + streamID + "' version='1.0'>"; + S2SComponent.HOSTNAME + "' id='" + streamID + "' version='1.0'>"; if (xmppversionnew) { openStream += "<stream:features></stream:features>"; } @@ -77,14 +79,14 @@ public class ConnectionIn extends Connection { String dfrom = parser.getAttributeValue(null, "from"); String to = parser.getAttributeValue(null, "to"); LOGGER.info("STREAM FROM " + dfrom + " TO " + to + " " + streamID + " ASKING FOR DIALBACK"); - if (dfrom.endsWith(XMPPComponent.HOSTNAME) && (dfrom.equals(XMPPComponent.HOSTNAME) || dfrom.endsWith("." + XMPPComponent.HOSTNAME))) { + if (dfrom.endsWith(S2SComponent.HOSTNAME) && (dfrom.equals(S2SComponent.HOSTNAME) || dfrom.endsWith("." + S2SComponent.HOSTNAME))) { break; } - if (dfrom != null && to != null && to.equals(XMPPComponent.HOSTNAME)) { + if (dfrom != null && to != null && to.equals(S2SComponent.HOSTNAME)) { String dbKey = XmlUtils.getTagText(parser); updateTsRemoteData(); - ConnectionOut c = XMPPComponent.getConnectionOut(dfrom, false); + ConnectionOut c = S2SComponent.getConnectionOut(dfrom, false); if (c != null) { c.sendDialbackVerify(streamID, dbKey); } else { @@ -115,15 +117,15 @@ public class ConnectionIn extends Connection { } else if (tag.equals("presence") && checkFromTo(parser)) { Presence p = Presence.parse(parser, null); if (p != null && (p.type == null || !p.type.equals(Presence.Type.error))) { - JuickBot.incomingPresence(p); + bot.incomingPresence(p); } } else if (tag.equals("message") && checkFromTo(parser)) { updateTsRemoteData(); - Message msg = Message.parse(parser, XMPPComponent.childParsers); + Message msg = Message.parse(parser, S2SComponent.childParsers); if (msg != null && (msg.type == null || !msg.type.equals(Message.Type.error))) { LOGGER.info("STREAM " + streamID + ": " + msg.toString()); - if (!JuickBot.incomingMessage(msg)) { - XMPPComponent.connRouter.sendStanza(msg.toString()); + if (!bot.incomingMessage(msg)) { + xmpp.send(msg); } } } else if (tag.equals("iq") && checkFromTo(parser)) { @@ -132,22 +134,22 @@ public class ConnectionIn extends Connection { String xml = XmlUtils.parseToString(parser, true); if (type == null || !type.equals(Iq.Type.error)) { LOGGER.info("STREAM " + streamID + ": " + xml); - XMPPComponent.connRouter.sendStanza(xml); + xmpp.send(xml); } } else { LOGGER.info("STREAM " + streamID + ": " + XmlUtils.parseToString(parser, true)); } } LOGGER.warning("STREAM " + streamID + " FINISHED"); - XMPPComponent.removeConnectionIn(this); + S2SComponent.removeConnectionIn(this); closeConnection(); } catch (EOFException ex) { LOGGER.info(String.format("STREAM %s CLOSED (dirty)", streamID)); - XMPPComponent.removeConnectionIn(this); + S2SComponent.removeConnectionIn(this); closeConnection(); } catch (Exception e) { LOGGER.log(Level.WARNING, "STREAM " + streamID + " ERROR", e); - XMPPComponent.removeConnectionIn(this); + S2SComponent.removeConnectionIn(this); closeConnection(); } } @@ -158,7 +160,7 @@ public class ConnectionIn extends Connection { public void sendDialbackResult(String sfrom, String type) { try { - sendStanza("<db:result from='" + XMPPComponent.HOSTNAME + "' to='" + sfrom + "' type='" + type + "'/>"); + sendStanza("<db:result from='" + S2SComponent.HOSTNAME + "' to='" + sfrom + "' type='" + type + "'/>"); if (type.equals("valid")) { from.add(sfrom); LOGGER.info("STREAM FROM " + sfrom + " " + streamID + " READY"); @@ -173,7 +175,7 @@ public class ConnectionIn extends Connection { String cto = parser.getAttributeValue(null, "to"); if (cfrom != null && cto != null && !cfrom.isEmpty() && !cto.isEmpty()) { JID jidto = new JID(cto); - if (jidto.Host != null && jidto.Username != null && jidto.Host.equals(XMPPComponent.HOSTNAME) && jidto.Username.matches("^[a-zA-Z0-9\\-]{2,16}$")) { + if (jidto.Host != null && jidto.Username != null && jidto.Host.equals(S2SComponent.HOSTNAME) && jidto.Username.matches("^[a-zA-Z0-9\\-]{2,16}$")) { JID jidfrom = new JID(cfrom); int size = from.size(); for (int i = 0; i < size; i++) { diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java index 02a2be39..28ff48f9 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java @@ -1,11 +1,13 @@ package com.juick.xmpp.s2s; +import com.juick.xmpp.JuickBot; +import com.juick.xmpp.Stream; + import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; @@ -17,6 +19,16 @@ public class ConnectionListener implements Runnable { private static final Logger logger = Logger.getLogger(ConnectionListener.class.getName()); + ExecutorService executorService; + JuickBot bot; + Stream xmpp; + + public ConnectionListener(ExecutorService executorService, JuickBot bot, Stream xmpp) { + this.executorService = executorService; + this.bot = bot; + this.xmpp = xmpp; + } + @Override public void run() { try { @@ -26,9 +38,9 @@ public class ConnectionListener implements Runnable { listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(AsynchronousSocketChannel result, Object attachment) { - listener.accept(XMPPComponent.executorService, this); - ConnectionIn client = new ConnectionIn(result); - XMPPComponent.addConnectionIn(client); + listener.accept(executorService, this); + ConnectionIn client = new ConnectionIn(result, bot, xmpp); + S2SComponent.addConnectionIn(client); client.parseStream(); } diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java index 59fdfb60..d4018ae1 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java @@ -7,14 +7,12 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; -import java.net.Socket; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channels; import java.nio.channels.CompletionHandler; import java.util.logging.Level; import org.xmlpull.v1.XmlPullParser; -import org.xmlpull.v1.XmlPullParserException; /** * @@ -55,7 +53,7 @@ public class ConnectionOut extends Connection { sendStanza("<?xml version='1.0'?><stream:stream xmlns='jabber:server' " + "xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" + - XMPPComponent.HOSTNAME + "' to='" + to + "' version='1.0'>"); + S2SComponent.HOSTNAME + "' to='" + to + "' version='1.0'>"); parser.next(); // stream:stream streamID = parser.getAttributeValue(null, "id"); @@ -64,13 +62,13 @@ public class ConnectionOut extends Connection { } LOGGER.info("STREAM TO " + to + " " + streamID + " OPEN"); - XMPPComponent.addConnectionOut(ConnectionOut.this); + S2SComponent.addConnectionOut(ConnectionOut.this); if (checkSID != null) { sendDialbackVerify(checkSID, dbKey); } - sendStanza("<db:result from='" + XMPPComponent.HOSTNAME + "' to='" + to + "'>" + generateDialbackKey(to, XMPPComponent.HOSTNAME, streamID) + "</db:result>"); + sendStanza("<db:result from='" + S2SComponent.HOSTNAME + "' to='" + to + "'>" + generateDialbackKey(to, S2SComponent.HOSTNAME, streamID) + "</db:result>"); while (parser.next() != XmlPullParser.END_DOCUMENT) { if (parser.getEventType() != XmlPullParser.START_TAG) { @@ -85,7 +83,7 @@ public class ConnectionOut extends Connection { streamReady = true; LOGGER.info("STREAM TO " + to + " " + streamID + " READY"); - String cache = XMPPComponent.getFromCache(to); + String cache = S2SComponent.getFromCache(to); if (cache != null) { LOGGER.info("STREAM TO " + to + " " + streamID + " SENDING CACHE"); sendStanza(cache); @@ -100,7 +98,7 @@ public class ConnectionOut extends Connection { String type = parser.getAttributeValue(null, "type"); String sid = parser.getAttributeValue(null, "id"); if (from != null && from.equals(to) && sid != null && !sid.isEmpty() && type != null) { - ConnectionIn c = XMPPComponent.getConnectionIn(sid); + ConnectionIn c = S2SComponent.getConnectionIn(sid); if (c != null) { c.sendDialbackResult(from, type); } @@ -112,15 +110,15 @@ public class ConnectionOut extends Connection { } LOGGER.warning("STREAM TO " + to + " " + streamID + " FINISHED"); - XMPPComponent.removeConnectionOut(ConnectionOut.this); + S2SComponent.removeConnectionOut(ConnectionOut.this); closeConnection(); } catch (EOFException eofex) { LOGGER.info(String.format("STREAM %s %s CLOSED (dirty)", to, streamID)); - XMPPComponent.removeConnectionOut(ConnectionOut.this); + S2SComponent.removeConnectionOut(ConnectionOut.this); closeConnection(); } catch (Exception e) { LOGGER.log(Level.SEVERE, "s2s out exception", e); - XMPPComponent.removeConnectionOut(ConnectionOut.this); + S2SComponent.removeConnectionOut(ConnectionOut.this); closeConnection(); } } @@ -128,20 +126,20 @@ public class ConnectionOut extends Connection { @Override public void failed(Throwable exc, AsynchronousSocketChannel attachment) { LOGGER.log(Level.WARNING, "s2s out failed", exc); - XMPPComponent.removeConnectionOut(ConnectionOut.this); + S2SComponent.removeConnectionOut(ConnectionOut.this); closeConnection(); } }); } catch (Exception e) { LOGGER.warning(e.toString()); - XMPPComponent.removeConnectionOut(this); + S2SComponent.removeConnectionOut(this); closeConnection(); } } public void sendDialbackVerify(String sid, String key) { try { - sendStanza("<db:verify from='" + XMPPComponent.HOSTNAME + "' to='" + to + "' id='" + sid + "'>" + key + "</db:verify>"); + sendStanza("<db:verify from='" + S2SComponent.HOSTNAME + "' to='" + to + "' id='" + sid + "'>" + key + "</db:verify>"); } catch (IOException e) { LOGGER.log(Level.WARNING, "STREAM TO " + to + " " + streamID + " ERROR", e); } diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java deleted file mode 100644 index c96a5cce..00000000 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java +++ /dev/null @@ -1,283 +0,0 @@ -package com.juick.xmpp.s2s; - -import com.juick.User; -import com.juick.server.MessagesQueries; -import com.juick.server.SubscriptionsQueries; -import com.juick.server.UserQueries; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; -import com.juick.xmpp.extensions.JuickMessage; -import com.juick.xmpp.extensions.Nickname; -import com.juick.xmpp.extensions.XOOB; -import com.juick.xmpp.utils.SHA1; -import com.juick.xmpp.utils.XmlUtils; -import org.springframework.jdbc.core.JdbcTemplate; -import org.xmlpull.v1.XmlPullParser; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.net.InetSocketAddress; -import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.Channels; -import java.nio.channels.CompletionHandler; -import java.util.List; -import java.util.logging.Level; - -/** - * - * @author ugnich - */ -public class ConnectionRouter extends Connection implements Runnable { - - private String componentName; - - ConnectionRouter(String componentName) { - this.componentName = componentName; - } - - @Override - public void run() { - LOGGER.info("STREAM ROUTER START"); - - try { - socket = AsynchronousSocketChannel.open(); - socket.connect(new InetSocketAddress("localhost", 5347), socket, new CompletionHandler<Void, AsynchronousSocketChannel>() { - @Override - public void completed(Void result, AsynchronousSocketChannel client) { - try { - parser.setInput(new InputStreamReader(Channels.newInputStream(client))); - - parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); - writer = new OutputStreamWriter(Channels.newOutputStream(client)); - - String msg = "<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' to='s2s'>"; - writer.write(msg); - writer.flush(); - - parser.next(); // stream:stream - streamID = parser.getAttributeValue(null, "id"); - if (streamID == null || streamID.isEmpty()) { - throw new Exception("FAIL ON FIRST PACKET"); - } - - msg = "<handshake>" + SHA1.encode(streamID + "secret") + "</handshake>"; - writer.write(msg); - writer.flush(); - - parser.next(); - if (!parser.getName().equals("handshake")) { - throw new Exception("NO HANDSHAKE"); - } - XmlUtils.skip(parser); - LOGGER.info("STREAM ROUTER OPEN"); - - while (parser.next() != XmlPullParser.END_DOCUMENT) { - if (parser.getEventType() != XmlPullParser.START_TAG) { - continue; - } - - String tag = parser.getName(); - String to = parser.getAttributeValue(null, "to"); - if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { - JID jid = new JID(to); - if (jid.Host != null) { - if (jid.Host.equals(componentName)) { - if (tag.equals("message")) { - Message xmsg = Message.parse(parser, XMPPComponent.childParsers); - LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); - JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); - if (jmsg != null) { - if (jid.Username != null && jid.Username.equals("recomm")) { - sendJuickRecommendation(jmsg); - } else { - if (jmsg.getRID() > 0) { - sendJuickComment(jmsg); - } else if (jmsg.getMID() > 0) { - sendJuickMessage(jmsg); - } - } - } - } - } else if (jid.Host.endsWith(XMPPComponent.HOSTNAME) && (jid.Host.equals(XMPPComponent.HOSTNAME) || jid.Host.endsWith("." + XMPPComponent.HOSTNAME))) { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER: " + xml); - } else { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER (OUT): " + xml); - XMPPComponent.sendOut(jid.Host, xml); - } - } else { - LOGGER.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); - } - } else { - LOGGER.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); - } - } - - LOGGER.warning("STREAM ROUTER FINISHED"); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "xmpp router exception", e); - } - } - - @Override - public void failed(Throwable exc, AsynchronousSocketChannel attachment) { - LOGGER.log(Level.WARNING, "s2s component failed to connect", exc); - } - }); - Thread.currentThread().join(); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "NIO2 error", e); - } - - } - - @Override - synchronized public void sendStanza(String xml) { - try { - writer.write(xml); - writer.flush(); - } catch (IOException e) { - LOGGER.warning("STREAM ROUTER ERROR: " + xml); - LOGGER.warning("STREAM ROUTER ERROR: " + e.toString()); - System.exit(0); - } - } - - public void sendJuickMessage(JuickMessage jmsg) { - List<User> users; - - - if (jmsg.FriendsOnly) { - users = SubscriptionsQueries.getUsersSubscribedToUser(XMPPComponent.sql, jmsg.getUser().getUID(), jmsg.FriendsOnly); - } else { - users = SubscriptionsQueries.getSubscribedUsers(XMPPComponent.sql, jmsg.getUser().getUID(), jmsg.getMID()); - } - - - String txt = "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; - String attachment = jmsg.getAttachmentURL(); - if (attachment != null) { - txt += attachment + "\n"; - } - txt += jmsg.getText() + "\n\n"; - txt += "#" + jmsg.getMID() + " http://juick.com/" + jmsg.getMID(); - - Nickname nick = new Nickname(); - nick.Nickname = "@" + jmsg.getUser().getUName(); - - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = JuickBot.JuickJID; - msg.body = txt; - msg.type = Message.Type.chat; - msg.thread = "juick-" + jmsg.getMID(); - msg.addChild(jmsg); - msg.addChild(nick); - if (attachment != null) { - XOOB oob = new XOOB(); - oob.URL = attachment; - msg.addChild(oob); - } - for (User user : users) { - for (String jid : UserQueries.getActiveJIDs(XMPPComponent.sql, user)) { - msg.to = new JID(jid); - XMPPComponent.sendOut(msg); - } - } - } - - public void sendJuickComment(JuickMessage jmsg) { - String replyQuote; - - List<User> users = SubscriptionsQueries.getSubscribedUsers(XMPPComponent.sql, jmsg.getUser().getUID(), jmsg.getMID()); - replyQuote = getReplyQuote(XMPPComponent.sql, jmsg.getMID(), jmsg.ReplyTo); - - String txt = "Reply by @" + jmsg.getUser().getUName() + ":\n" + replyQuote + "\n"; - String attachment = jmsg.getAttachmentURL(); - if (attachment != null) { - txt += attachment + "\n"; - } - txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMID() + "/" + jmsg.getRID() + " http://juick.com/" + jmsg.getMID() + "#" + jmsg.getRID(); - - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = JuickBot.JuickJID; - msg.body = txt; - msg.type = Message.Type.chat; - msg.addChild(jmsg); - for (User user : users) { - // TODO: make single query - for (String jid : UserQueries.getActiveJIDs(XMPPComponent.sql, user)) { - msg.to = new JID(jid); - XMPPComponent.sendOut(msg); - } - } - } - - private String getReplyQuote(JdbcTemplate sql, int MID, int ReplyTo) { - String quote = ""; - if (ReplyTo > 0) { - com.juick.Message q = MessagesQueries.getReply(sql, MID, ReplyTo); - if (q != null) { - quote = q.getText(); - } - } else { - com.juick.Message q = MessagesQueries.getMessage(sql, MID); - if (q != null) { - quote = q.getText(); - } - } - if (quote.length() > 50) { - quote = ">" + quote.substring(0, 47).replace('\n', ' ') + "...\n"; - } else if (quote.length() > 0) { - quote = ">" + quote.replace('\n', ' ') + "\n"; - } - return quote; - } - - public void sendJuickRecommendation(JuickMessage recomm) { - JuickMessage jmsg; - jmsg = new JuickMessage(MessagesQueries.getMessage(XMPPComponent.sql, recomm.getMID())); - List<User> users = SubscriptionsQueries.getUsersSubscribedToComments(XMPPComponent.sql, - recomm.getMID(), jmsg.getUser().getUID()); - - String txt = "Recommended by @" + recomm.getUser().getUName() + ":\n"; - txt += "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; - String attachment = jmsg.getAttachmentURL(); - if (attachment != null) { - txt += attachment + "\n"; - } - txt += jmsg.getText() + "\n\n"; - txt += "#" + jmsg.getMID(); - if (jmsg.Replies > 0) { - if (jmsg.Replies % 10 == 1 && jmsg.Replies % 100 != 11) { - txt += " (" + jmsg.Replies + " reply)"; - } else { - txt += " (" + jmsg.Replies + " replies)"; - } - } - txt += " http://juick.com/" + jmsg.getMID(); - - Nickname nick = new Nickname(); - nick.Nickname = "@" + jmsg.getUser().getUName(); - - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = JuickBot.JuickJID; - msg.body = txt; - msg.type = Message.Type.chat; - msg.thread = "juick-" + jmsg.getMID(); - msg.addChild(jmsg); - msg.addChild(nick); - if (attachment != null) { - XOOB oob = new XOOB(); - oob.URL = attachment; - msg.addChild(oob); - } - for (User user : users) { - for (String jid : UserQueries.getActiveJIDs(XMPPComponent.sql, user)) { - msg.to = new JID(jid); - XMPPComponent.sendOut(msg); - } - } - } -} diff --git a/src/main/java/com/juick/xmpp/s2s/S2SComponent.java b/src/main/java/com/juick/xmpp/s2s/S2SComponent.java new file mode 100644 index 00000000..dcb547fb --- /dev/null +++ b/src/main/java/com/juick/xmpp/s2s/S2SComponent.java @@ -0,0 +1,316 @@ +package com.juick.xmpp.s2s; + +import com.juick.JuickComponent; +import com.juick.User; +import com.juick.server.MessagesQueries; +import com.juick.server.SubscriptionsQueries; +import com.juick.server.UserQueries; +import com.juick.xmpp.*; +import com.juick.xmpp.extensions.JuickMessage; +import com.juick.xmpp.extensions.Nickname; +import com.juick.xmpp.extensions.XOOB; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.logging.Logger; + +/** + * + * @author ugnich + */ +public class S2SComponent implements JuickComponent { + + private static final Logger LOGGER = Logger.getLogger(S2SComponent.class.getName()); + + ExecutorService executorService; + + public static String HOSTNAME = null; + public static String componentName = null; + public static String STATSFILE = null; + static final List<ConnectionIn> inConnections = Collections.synchronizedList(new ArrayList<>()); + static final List<ConnectionOut> outConnections = Collections.synchronizedList(new ArrayList<>()); + static final List<CacheEntry> outCache = Collections.synchronizedList(new ArrayList<>()); + JdbcTemplate sql; + final public static HashMap<String, StanzaChild> childParsers = new HashMap<>(); + + public static void addConnectionIn(ConnectionIn c) { + synchronized (inConnections) { + inConnections.add(c); + } + } + + public static void addConnectionOut(ConnectionOut c) { + synchronized (outConnections) { + outConnections.add(c); + } + } + + public static void removeConnectionIn(ConnectionIn c) { + synchronized (inConnections) { + inConnections.remove(c); + } + } + + public static void removeConnectionOut(ConnectionOut c) { + synchronized (outConnections) { + outConnections.remove(c); + } + } + + public static String getFromCache(String hostname) { + CacheEntry ret = null; + synchronized (outCache) { + for (Iterator<CacheEntry> i = outCache.iterator(); i.hasNext();) { + CacheEntry c = i.next(); + if (c.hostname != null && c.hostname.equals(hostname)) { + ret = c; + i.remove(); + break; + } + } + } + return (ret != null) ? ret.xml : null; + } + + public static ConnectionOut getConnectionOut(String hostname, boolean needReady) { + synchronized (outConnections) { + for (ConnectionOut c : outConnections) { + if (c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)) { + return c; + } + } + } + return null; + } + + public static ConnectionIn getConnectionIn(String streamID) { + synchronized (inConnections) { + for (ConnectionIn c : inConnections) { + if (c.streamID != null && c.streamID.equals(streamID)) { + return c; + } + } + } + return null; + } + + public void sendOut(Stanza s) { + sendOut(s.to.Host, s.toString()); + } + + public void sendOut(String hostname, String xml) { + boolean haveAnyConn = false; + + ConnectionOut connOut = null; + synchronized (outConnections) { + for (ConnectionOut c : outConnections) { + if (c.to != null && c.to.equals(hostname)) { + if (c.streamReady) { + connOut = c; + break; + } else { + haveAnyConn = true; + break; + } + } + } + } + if (connOut != null) { + try { + connOut.sendStanza(xml); + } catch (IOException e) { + LOGGER.warning("STREAM TO " + connOut.to + " " + connOut.streamID + " ERROR: " + e.toString()); + } + return; + } + + boolean haveCache = false; + synchronized (outCache) { + for (CacheEntry c : outCache) { + if (c.hostname != null && c.hostname.equals(hostname)) { + c.xml += xml; + c.tsUpdated = System.currentTimeMillis(); + haveCache = true; + break; + } + } + if (!haveCache) { + outCache.add(new CacheEntry(hostname, xml)); + } + } + + if (!haveAnyConn) { + ConnectionOut connectionOut = new ConnectionOut(hostname); + connectionOut.parseStream(); + } + } + + public S2SComponent(JdbcTemplate sql, ExecutorService executorService, Properties conf) { + LOGGER.info("component initialized"); + HOSTNAME = conf.getProperty("hostname"); + componentName = conf.getProperty("componentname"); + STATSFILE = conf.getProperty("statsfile"); + this.sql = sql; + this.executorService = executorService; + executorService.submit(new ConnectionListener(executorService)); + executorService.submit(new CleaningUp()); + } + @Override + public void messageReceived(Message xmsg) { + JID jid = xmsg.to; + if (jid.Host.equals(componentName)) { + LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); + + JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + if (jid.Username != null && jid.Username.equals("recomm")) { + sendJuickRecommendation(jmsg); + } else { + if (jmsg.getRID() > 0) { + sendJuickComment(jmsg); + } else if (jmsg.getMID() > 0) { + sendJuickMessage(jmsg); + } + } + } + } + } + + public void sendJuickMessage(JuickMessage jmsg) { + List<User> users; + + + if (jmsg.FriendsOnly) { + users = SubscriptionsQueries.getUsersSubscribedToUser(sql, jmsg.getUser().getUID(), jmsg.FriendsOnly); + } else { + users = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()); + } + + + String txt = "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n"; + txt += "#" + jmsg.getMID() + " http://juick.com/" + jmsg.getMID(); + + Nickname nick = new Nickname(); + nick.Nickname = "@" + jmsg.getUser().getUName(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = JuickBot.JuickJID; + msg.body = txt; + msg.type = Message.Type.chat; + msg.thread = "juick-" + jmsg.getMID(); + msg.addChild(jmsg); + msg.addChild(nick); + if (attachment != null) { + XOOB oob = new XOOB(); + oob.URL = attachment; + msg.addChild(oob); + } + for (User user : users) { + for (String jid : UserQueries.getActiveJIDs(sql, user)) { + msg.to = new JID(jid); + sendOut(msg); + } + } + } + + public void sendJuickComment(JuickMessage jmsg) { + String replyQuote; + + List<User> users = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()); + replyQuote = getReplyQuote(sql, jmsg.getMID(), jmsg.ReplyTo); + + String txt = "Reply by @" + jmsg.getUser().getUName() + ":\n" + replyQuote + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMID() + "/" + jmsg.getRID() + " http://juick.com/" + jmsg.getMID() + "#" + jmsg.getRID(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = JuickBot.JuickJID; + msg.body = txt; + msg.type = Message.Type.chat; + msg.addChild(jmsg); + for (User user : users) { + // TODO: make single query + for (String jid : UserQueries.getActiveJIDs(sql, user)) { + msg.to = new JID(jid); + sendOut(msg); + } + } + } + + private String getReplyQuote(JdbcTemplate sql, int MID, int ReplyTo) { + String quote = ""; + if (ReplyTo > 0) { + com.juick.Message q = MessagesQueries.getReply(sql, MID, ReplyTo); + if (q != null) { + quote = q.getText(); + } + } else { + com.juick.Message q = MessagesQueries.getMessage(sql, MID); + if (q != null) { + quote = q.getText(); + } + } + if (quote.length() > 50) { + quote = ">" + quote.substring(0, 47).replace('\n', ' ') + "...\n"; + } else if (quote.length() > 0) { + quote = ">" + quote.replace('\n', ' ') + "\n"; + } + return quote; + } + + public void sendJuickRecommendation(JuickMessage recomm) { + JuickMessage jmsg; + jmsg = new JuickMessage(MessagesQueries.getMessage(sql, recomm.getMID())); + List<User> users = SubscriptionsQueries.getUsersSubscribedToComments(sql, + recomm.getMID(), jmsg.getUser().getUID()); + + String txt = "Recommended by @" + recomm.getUser().getUName() + ":\n"; + txt += "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n"; + txt += "#" + jmsg.getMID(); + if (jmsg.Replies > 0) { + if (jmsg.Replies % 10 == 1 && jmsg.Replies % 100 != 11) { + txt += " (" + jmsg.Replies + " reply)"; + } else { + txt += " (" + jmsg.Replies + " replies)"; + } + } + txt += " http://juick.com/" + jmsg.getMID(); + + Nickname nick = new Nickname(); + nick.Nickname = "@" + jmsg.getUser().getUName(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = JuickBot.JuickJID; + msg.body = txt; + msg.type = Message.Type.chat; + msg.thread = "juick-" + jmsg.getMID(); + msg.addChild(jmsg); + msg.addChild(nick); + if (attachment != null) { + XOOB oob = new XOOB(); + oob.URL = attachment; + msg.addChild(oob); + } + for (User user : users) { + for (String jid : UserQueries.getActiveJIDs(sql, user)) { + msg.to = new JID(jid); + sendOut(msg); + } + } + } +} diff --git a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java b/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java deleted file mode 100644 index 1dfdd38d..00000000 --- a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java +++ /dev/null @@ -1,199 +0,0 @@ -package com.juick.xmpp.s2s; - -import com.juick.xmpp.Stanza; -import com.juick.xmpp.StanzaChild; -import com.juick.xmpp.extensions.JuickMessage; -import org.springframework.jdbc.core.JdbcTemplate; - -import javax.servlet.ServletContextEvent; -import javax.servlet.ServletContextListener; -import java.io.IOException; -import java.sql.Driver; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * - * @author ugnich - */ -public class XMPPComponent implements ServletContextListener { - - private static final Logger LOGGER = Logger.getLogger(XMPPComponent.class.getName()); - - public static final ExecutorService executorService = Executors.newWorkStealingPool(); - - public static String HOSTNAME = null; - public static String STATSFILE = null; - public static ConnectionRouter connRouter; - static final List<ConnectionIn> inConnections = Collections.synchronizedList(new ArrayList<>()); - static final List<ConnectionOut> outConnections = Collections.synchronizedList(new ArrayList<>()); - static final List<CacheEntry> outCache = Collections.synchronizedList(new ArrayList<>()); - static JdbcTemplate sql; - final public static HashMap<String, StanzaChild> childParsers = new HashMap<>(); - - public static void addConnectionIn(ConnectionIn c) { - synchronized (inConnections) { - inConnections.add(c); - } - } - - public static void addConnectionOut(ConnectionOut c) { - synchronized (outConnections) { - outConnections.add(c); - } - } - - public static void removeConnectionIn(ConnectionIn c) { - synchronized (inConnections) { - inConnections.remove(c); - } - } - - public static void removeConnectionOut(ConnectionOut c) { - synchronized (outConnections) { - outConnections.remove(c); - } - } - - public static String getFromCache(String hostname) { - CacheEntry ret = null; - synchronized (outCache) { - for (Iterator<CacheEntry> i = outCache.iterator(); i.hasNext();) { - CacheEntry c = i.next(); - if (c.hostname != null && c.hostname.equals(hostname)) { - ret = c; - i.remove(); - break; - } - } - } - return (ret != null) ? ret.xml : null; - } - - public static ConnectionOut getConnectionOut(String hostname, boolean needReady) { - synchronized (outConnections) { - for (ConnectionOut c : outConnections) { - if (c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)) { - return c; - } - } - } - return null; - } - - public static ConnectionIn getConnectionIn(String streamID) { - synchronized (inConnections) { - for (ConnectionIn c : inConnections) { - if (c.streamID != null && c.streamID.equals(streamID)) { - return c; - } - } - } - return null; - } - - public static void sendOut(Stanza s) { - sendOut(s.to.Host, s.toString()); - } - - public static void sendOut(String hostname, String xml) { - boolean haveAnyConn = false; - - ConnectionOut connOut = null; - synchronized (outConnections) { - for (ConnectionOut c : outConnections) { - if (c.to != null && c.to.equals(hostname)) { - if (c.streamReady) { - connOut = c; - break; - } else { - haveAnyConn = true; - break; - } - } - } - } - if (connOut != null) { - try { - connOut.sendStanza(xml); - } catch (IOException e) { - LOGGER.warning("STREAM TO " + connOut.to + " " + connOut.streamID + " ERROR: " + e.toString()); - } - return; - } - - boolean haveCache = false; - synchronized (outCache) { - for (CacheEntry c : outCache) { - if (c.hostname != null && c.hostname.equals(hostname)) { - c.xml += xml; - c.tsUpdated = System.currentTimeMillis(); - haveCache = true; - break; - } - } - if (!haveCache) { - outCache.add(new CacheEntry(hostname, xml)); - } - } - - if (!haveAnyConn) { - ConnectionOut connectionOut = new ConnectionOut(hostname); - connectionOut.parseStream(); - } - } - - @Override - public void contextInitialized(ServletContextEvent sce) { - - LOGGER.info("component initialized"); - executorService.submit(() -> { - Properties conf = new Properties(); - try { - conf.load(sce.getServletContext().getResourceAsStream("WEB-INF/s2s.conf")); - HOSTNAME = conf.getProperty("hostname"); - String componentName = conf.getProperty("componentname"); - STATSFILE = conf.getProperty("statsfile"); - sql = (JdbcTemplate) sce.getServletContext().getAttribute("sql"); - - childParsers.put(JuickMessage.XMLNS, new JuickMessage()); - - connRouter = new ConnectionRouter(componentName); - executorService.submit(connRouter); - executorService.submit(new ConnectionListener()); - executorService.submit(new CleaningUp()); - } catch (IOException e) { - LOGGER.log(Level.SEVERE, "XMPPComponent error", e); - } - }); - } - - - - @Override - public void contextDestroyed(ServletContextEvent sce) { - synchronized (XMPPComponent.outConnections) { - for (Iterator<ConnectionOut> i = XMPPComponent.outConnections.iterator(); i.hasNext();) { - ConnectionOut c = i.next(); - c.closeConnection(); - i.remove(); - } - } - - synchronized (XMPPComponent.inConnections) { - for (Iterator<ConnectionIn> i = XMPPComponent.inConnections.iterator(); i.hasNext();) { - ConnectionIn c = i.next(); - c.closeConnection(); - i.remove(); - } - } - XMPPComponent.connRouter.closeConnection(); - executorService.shutdown(); - LOGGER.info("component destroyed"); - } -} |