From 380018da475ff41d3375e7f2bea0a192a4d9b178 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Wed, 3 Feb 2016 12:35:59 +0300 Subject: single xmpp component, WIP --- src/main/java/com/juick/xmpp/s2s/CleaningUp.java | 20 +- src/main/java/com/juick/xmpp/s2s/ConnectionIn.java | 40 ++- .../com/juick/xmpp/s2s/ConnectionListener.java | 20 +- .../java/com/juick/xmpp/s2s/ConnectionOut.java | 24 +- .../java/com/juick/xmpp/s2s/ConnectionRouter.java | 283 --------------- src/main/java/com/juick/xmpp/s2s/JuickBot.java | 386 --------------------- src/main/java/com/juick/xmpp/s2s/S2SComponent.java | 316 +++++++++++++++++ .../java/com/juick/xmpp/s2s/XMPPComponent.java | 199 ----------- 8 files changed, 374 insertions(+), 914 deletions(-) delete mode 100644 src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java delete mode 100644 src/main/java/com/juick/xmpp/s2s/JuickBot.java create mode 100644 src/main/java/com/juick/xmpp/s2s/S2SComponent.java delete mode 100644 src/main/java/com/juick/xmpp/s2s/XMPPComponent.java (limited to 'src/main/java/com/juick/xmpp/s2s') diff --git a/src/main/java/com/juick/xmpp/s2s/CleaningUp.java b/src/main/java/com/juick/xmpp/s2s/CleaningUp.java index 48771580..12ac7449 100644 --- a/src/main/java/com/juick/xmpp/s2s/CleaningUp.java +++ b/src/main/java/com/juick/xmpp/s2s/CleaningUp.java @@ -15,14 +15,14 @@ public class CleaningUp implements Runnable { public void run() { while (true) { try { - PrintWriter statsFile = new PrintWriter(XMPPComponent.STATSFILE, "UTF-8"); + PrintWriter statsFile = new PrintWriter(S2SComponent.STATSFILE, "UTF-8"); statsFile.write("

Threads: " + Thread.activeCount() + "

"); - statsFile.write("

Out (" + XMPPComponent.outConnections.size() + ")

"); + statsFile.write("

Out (" + S2SComponent.outConnections.size() + ")

tosidinactiveout packetsout bytes
"); long now = System.currentTimeMillis(); - synchronized (XMPPComponent.outConnections) { - for (Iterator i = XMPPComponent.outConnections.iterator(); i.hasNext();) { + synchronized (S2SComponent.outConnections) { + for (Iterator i = S2SComponent.outConnections.iterator(); i.hasNext();) { ConnectionOut c = i.next(); int inactive = (int) ((double) (now - c.tsLocalData) / 1000.0); if (inactive > 900) { @@ -40,10 +40,10 @@ public class CleaningUp implements Runnable { } } - statsFile.write("
tosidinactiveout packetsout bytes

In (" + XMPPComponent.inConnections.size() + ")

"); + statsFile.write("
fromsidinactivein packets

In (" + S2SComponent.inConnections.size() + ")

"); - synchronized (XMPPComponent.inConnections) { - for (Iterator i = XMPPComponent.inConnections.iterator(); i.hasNext();) { + synchronized (S2SComponent.inConnections) { + for (Iterator i = S2SComponent.inConnections.iterator(); i.hasNext();) { ConnectionIn c = i.next(); int inactive = (int) ((double) (now - c.tsRemoteData) / 1000.0); if (inactive > 900) { @@ -73,10 +73,10 @@ public class CleaningUp implements Runnable { } } - statsFile.write("
fromsidinactivein packets

Cache (" + XMPPComponent.outCache.size() + ")

"); + statsFile.write("
hostlivesize

Cache (" + S2SComponent.outCache.size() + ")

"); - synchronized (XMPPComponent.outCache) { - for (Iterator i = XMPPComponent.outCache.iterator(); i.hasNext();) { + synchronized (S2SComponent.outCache) { + for (Iterator i = S2SComponent.outCache.iterator(); i.hasNext();) { CacheEntry c = i.next(); int inactive = (int) ((double) (now - c.tsCreated) / 1000.0); if (inactive > 600) { diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java index c215e375..a8572ddc 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java @@ -1,9 +1,6 @@ package com.juick.xmpp.s2s; -import com.juick.xmpp.Iq; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; -import com.juick.xmpp.Presence; +import com.juick.xmpp.*; import com.juick.xmpp.utils.XmlUtils; import org.xmlpull.v1.XmlPullParser; @@ -31,9 +28,14 @@ public class ConnectionIn extends Connection { public long tsRemoteData = 0; public long packetsRemote = 0; - public ConnectionIn(AsynchronousSocketChannel socket) { + JuickBot bot; + Stream xmpp; + + public ConnectionIn(AsynchronousSocketChannel socket, JuickBot bot, Stream xmpp) { super(); this.socket = socket; + this.bot = bot; + this.xmpp = xmpp; streamID = UUID.randomUUID().toString(); } @@ -57,7 +59,7 @@ public class ConnectionIn extends Connection { String openStream = ""; + S2SComponent.HOSTNAME + "' id='" + streamID + "' version='1.0'>"; if (xmppversionnew) { openStream += ""; } @@ -77,14 +79,14 @@ public class ConnectionIn extends Connection { String dfrom = parser.getAttributeValue(null, "from"); String to = parser.getAttributeValue(null, "to"); LOGGER.info("STREAM FROM " + dfrom + " TO " + to + " " + streamID + " ASKING FOR DIALBACK"); - if (dfrom.endsWith(XMPPComponent.HOSTNAME) && (dfrom.equals(XMPPComponent.HOSTNAME) || dfrom.endsWith("." + XMPPComponent.HOSTNAME))) { + if (dfrom.endsWith(S2SComponent.HOSTNAME) && (dfrom.equals(S2SComponent.HOSTNAME) || dfrom.endsWith("." + S2SComponent.HOSTNAME))) { break; } - if (dfrom != null && to != null && to.equals(XMPPComponent.HOSTNAME)) { + if (dfrom != null && to != null && to.equals(S2SComponent.HOSTNAME)) { String dbKey = XmlUtils.getTagText(parser); updateTsRemoteData(); - ConnectionOut c = XMPPComponent.getConnectionOut(dfrom, false); + ConnectionOut c = S2SComponent.getConnectionOut(dfrom, false); if (c != null) { c.sendDialbackVerify(streamID, dbKey); } else { @@ -115,15 +117,15 @@ public class ConnectionIn extends Connection { } else if (tag.equals("presence") && checkFromTo(parser)) { Presence p = Presence.parse(parser, null); if (p != null && (p.type == null || !p.type.equals(Presence.Type.error))) { - JuickBot.incomingPresence(p); + bot.incomingPresence(p); } } else if (tag.equals("message") && checkFromTo(parser)) { updateTsRemoteData(); - Message msg = Message.parse(parser, XMPPComponent.childParsers); + Message msg = Message.parse(parser, S2SComponent.childParsers); if (msg != null && (msg.type == null || !msg.type.equals(Message.Type.error))) { LOGGER.info("STREAM " + streamID + ": " + msg.toString()); - if (!JuickBot.incomingMessage(msg)) { - XMPPComponent.connRouter.sendStanza(msg.toString()); + if (!bot.incomingMessage(msg)) { + xmpp.send(msg); } } } else if (tag.equals("iq") && checkFromTo(parser)) { @@ -132,22 +134,22 @@ public class ConnectionIn extends Connection { String xml = XmlUtils.parseToString(parser, true); if (type == null || !type.equals(Iq.Type.error)) { LOGGER.info("STREAM " + streamID + ": " + xml); - XMPPComponent.connRouter.sendStanza(xml); + xmpp.send(xml); } } else { LOGGER.info("STREAM " + streamID + ": " + XmlUtils.parseToString(parser, true)); } } LOGGER.warning("STREAM " + streamID + " FINISHED"); - XMPPComponent.removeConnectionIn(this); + S2SComponent.removeConnectionIn(this); closeConnection(); } catch (EOFException ex) { LOGGER.info(String.format("STREAM %s CLOSED (dirty)", streamID)); - XMPPComponent.removeConnectionIn(this); + S2SComponent.removeConnectionIn(this); closeConnection(); } catch (Exception e) { LOGGER.log(Level.WARNING, "STREAM " + streamID + " ERROR", e); - XMPPComponent.removeConnectionIn(this); + S2SComponent.removeConnectionIn(this); closeConnection(); } } @@ -158,7 +160,7 @@ public class ConnectionIn extends Connection { public void sendDialbackResult(String sfrom, String type) { try { - sendStanza(""); + sendStanza(""); if (type.equals("valid")) { from.add(sfrom); LOGGER.info("STREAM FROM " + sfrom + " " + streamID + " READY"); @@ -173,7 +175,7 @@ public class ConnectionIn extends Connection { String cto = parser.getAttributeValue(null, "to"); if (cfrom != null && cto != null && !cfrom.isEmpty() && !cto.isEmpty()) { JID jidto = new JID(cto); - if (jidto.Host != null && jidto.Username != null && jidto.Host.equals(XMPPComponent.HOSTNAME) && jidto.Username.matches("^[a-zA-Z0-9\\-]{2,16}$")) { + if (jidto.Host != null && jidto.Username != null && jidto.Host.equals(S2SComponent.HOSTNAME) && jidto.Username.matches("^[a-zA-Z0-9\\-]{2,16}$")) { JID jidfrom = new JID(cfrom); int size = from.size(); for (int i = 0; i < size; i++) { diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java index 02a2be39..28ff48f9 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java @@ -1,11 +1,13 @@ package com.juick.xmpp.s2s; +import com.juick.xmpp.JuickBot; +import com.juick.xmpp.Stream; + import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; @@ -17,6 +19,16 @@ public class ConnectionListener implements Runnable { private static final Logger logger = Logger.getLogger(ConnectionListener.class.getName()); + ExecutorService executorService; + JuickBot bot; + Stream xmpp; + + public ConnectionListener(ExecutorService executorService, JuickBot bot, Stream xmpp) { + this.executorService = executorService; + this.bot = bot; + this.xmpp = xmpp; + } + @Override public void run() { try { @@ -26,9 +38,9 @@ public class ConnectionListener implements Runnable { listener.accept(null, new CompletionHandler() { @Override public void completed(AsynchronousSocketChannel result, Object attachment) { - listener.accept(XMPPComponent.executorService, this); - ConnectionIn client = new ConnectionIn(result); - XMPPComponent.addConnectionIn(client); + listener.accept(executorService, this); + ConnectionIn client = new ConnectionIn(result, bot, xmpp); + S2SComponent.addConnectionIn(client); client.parseStream(); } diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java index 59fdfb60..d4018ae1 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java @@ -7,14 +7,12 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; -import java.net.Socket; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channels; import java.nio.channels.CompletionHandler; import java.util.logging.Level; import org.xmlpull.v1.XmlPullParser; -import org.xmlpull.v1.XmlPullParserException; /** * @@ -55,7 +53,7 @@ public class ConnectionOut extends Connection { sendStanza(""); + S2SComponent.HOSTNAME + "' to='" + to + "' version='1.0'>"); parser.next(); // stream:stream streamID = parser.getAttributeValue(null, "id"); @@ -64,13 +62,13 @@ public class ConnectionOut extends Connection { } LOGGER.info("STREAM TO " + to + " " + streamID + " OPEN"); - XMPPComponent.addConnectionOut(ConnectionOut.this); + S2SComponent.addConnectionOut(ConnectionOut.this); if (checkSID != null) { sendDialbackVerify(checkSID, dbKey); } - sendStanza("" + generateDialbackKey(to, XMPPComponent.HOSTNAME, streamID) + ""); + sendStanza("" + generateDialbackKey(to, S2SComponent.HOSTNAME, streamID) + ""); while (parser.next() != XmlPullParser.END_DOCUMENT) { if (parser.getEventType() != XmlPullParser.START_TAG) { @@ -85,7 +83,7 @@ public class ConnectionOut extends Connection { streamReady = true; LOGGER.info("STREAM TO " + to + " " + streamID + " READY"); - String cache = XMPPComponent.getFromCache(to); + String cache = S2SComponent.getFromCache(to); if (cache != null) { LOGGER.info("STREAM TO " + to + " " + streamID + " SENDING CACHE"); sendStanza(cache); @@ -100,7 +98,7 @@ public class ConnectionOut extends Connection { String type = parser.getAttributeValue(null, "type"); String sid = parser.getAttributeValue(null, "id"); if (from != null && from.equals(to) && sid != null && !sid.isEmpty() && type != null) { - ConnectionIn c = XMPPComponent.getConnectionIn(sid); + ConnectionIn c = S2SComponent.getConnectionIn(sid); if (c != null) { c.sendDialbackResult(from, type); } @@ -112,15 +110,15 @@ public class ConnectionOut extends Connection { } LOGGER.warning("STREAM TO " + to + " " + streamID + " FINISHED"); - XMPPComponent.removeConnectionOut(ConnectionOut.this); + S2SComponent.removeConnectionOut(ConnectionOut.this); closeConnection(); } catch (EOFException eofex) { LOGGER.info(String.format("STREAM %s %s CLOSED (dirty)", to, streamID)); - XMPPComponent.removeConnectionOut(ConnectionOut.this); + S2SComponent.removeConnectionOut(ConnectionOut.this); closeConnection(); } catch (Exception e) { LOGGER.log(Level.SEVERE, "s2s out exception", e); - XMPPComponent.removeConnectionOut(ConnectionOut.this); + S2SComponent.removeConnectionOut(ConnectionOut.this); closeConnection(); } } @@ -128,20 +126,20 @@ public class ConnectionOut extends Connection { @Override public void failed(Throwable exc, AsynchronousSocketChannel attachment) { LOGGER.log(Level.WARNING, "s2s out failed", exc); - XMPPComponent.removeConnectionOut(ConnectionOut.this); + S2SComponent.removeConnectionOut(ConnectionOut.this); closeConnection(); } }); } catch (Exception e) { LOGGER.warning(e.toString()); - XMPPComponent.removeConnectionOut(this); + S2SComponent.removeConnectionOut(this); closeConnection(); } } public void sendDialbackVerify(String sid, String key) { try { - sendStanza("" + key + ""); + sendStanza("" + key + ""); } catch (IOException e) { LOGGER.log(Level.WARNING, "STREAM TO " + to + " " + streamID + " ERROR", e); } diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java deleted file mode 100644 index c96a5cce..00000000 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java +++ /dev/null @@ -1,283 +0,0 @@ -package com.juick.xmpp.s2s; - -import com.juick.User; -import com.juick.server.MessagesQueries; -import com.juick.server.SubscriptionsQueries; -import com.juick.server.UserQueries; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; -import com.juick.xmpp.extensions.JuickMessage; -import com.juick.xmpp.extensions.Nickname; -import com.juick.xmpp.extensions.XOOB; -import com.juick.xmpp.utils.SHA1; -import com.juick.xmpp.utils.XmlUtils; -import org.springframework.jdbc.core.JdbcTemplate; -import org.xmlpull.v1.XmlPullParser; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.net.InetSocketAddress; -import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.Channels; -import java.nio.channels.CompletionHandler; -import java.util.List; -import java.util.logging.Level; - -/** - * - * @author ugnich - */ -public class ConnectionRouter extends Connection implements Runnable { - - private String componentName; - - ConnectionRouter(String componentName) { - this.componentName = componentName; - } - - @Override - public void run() { - LOGGER.info("STREAM ROUTER START"); - - try { - socket = AsynchronousSocketChannel.open(); - socket.connect(new InetSocketAddress("localhost", 5347), socket, new CompletionHandler() { - @Override - public void completed(Void result, AsynchronousSocketChannel client) { - try { - parser.setInput(new InputStreamReader(Channels.newInputStream(client))); - - parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); - writer = new OutputStreamWriter(Channels.newOutputStream(client)); - - String msg = ""; - writer.write(msg); - writer.flush(); - - parser.next(); // stream:stream - streamID = parser.getAttributeValue(null, "id"); - if (streamID == null || streamID.isEmpty()) { - throw new Exception("FAIL ON FIRST PACKET"); - } - - msg = "" + SHA1.encode(streamID + "secret") + ""; - writer.write(msg); - writer.flush(); - - parser.next(); - if (!parser.getName().equals("handshake")) { - throw new Exception("NO HANDSHAKE"); - } - XmlUtils.skip(parser); - LOGGER.info("STREAM ROUTER OPEN"); - - while (parser.next() != XmlPullParser.END_DOCUMENT) { - if (parser.getEventType() != XmlPullParser.START_TAG) { - continue; - } - - String tag = parser.getName(); - String to = parser.getAttributeValue(null, "to"); - if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { - JID jid = new JID(to); - if (jid.Host != null) { - if (jid.Host.equals(componentName)) { - if (tag.equals("message")) { - Message xmsg = Message.parse(parser, XMPPComponent.childParsers); - LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); - JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); - if (jmsg != null) { - if (jid.Username != null && jid.Username.equals("recomm")) { - sendJuickRecommendation(jmsg); - } else { - if (jmsg.getRID() > 0) { - sendJuickComment(jmsg); - } else if (jmsg.getMID() > 0) { - sendJuickMessage(jmsg); - } - } - } - } - } else if (jid.Host.endsWith(XMPPComponent.HOSTNAME) && (jid.Host.equals(XMPPComponent.HOSTNAME) || jid.Host.endsWith("." + XMPPComponent.HOSTNAME))) { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER: " + xml); - } else { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER (OUT): " + xml); - XMPPComponent.sendOut(jid.Host, xml); - } - } else { - LOGGER.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); - } - } else { - LOGGER.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); - } - } - - LOGGER.warning("STREAM ROUTER FINISHED"); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "xmpp router exception", e); - } - } - - @Override - public void failed(Throwable exc, AsynchronousSocketChannel attachment) { - LOGGER.log(Level.WARNING, "s2s component failed to connect", exc); - } - }); - Thread.currentThread().join(); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "NIO2 error", e); - } - - } - - @Override - synchronized public void sendStanza(String xml) { - try { - writer.write(xml); - writer.flush(); - } catch (IOException e) { - LOGGER.warning("STREAM ROUTER ERROR: " + xml); - LOGGER.warning("STREAM ROUTER ERROR: " + e.toString()); - System.exit(0); - } - } - - public void sendJuickMessage(JuickMessage jmsg) { - List users; - - - if (jmsg.FriendsOnly) { - users = SubscriptionsQueries.getUsersSubscribedToUser(XMPPComponent.sql, jmsg.getUser().getUID(), jmsg.FriendsOnly); - } else { - users = SubscriptionsQueries.getSubscribedUsers(XMPPComponent.sql, jmsg.getUser().getUID(), jmsg.getMID()); - } - - - String txt = "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; - String attachment = jmsg.getAttachmentURL(); - if (attachment != null) { - txt += attachment + "\n"; - } - txt += jmsg.getText() + "\n\n"; - txt += "#" + jmsg.getMID() + " http://juick.com/" + jmsg.getMID(); - - Nickname nick = new Nickname(); - nick.Nickname = "@" + jmsg.getUser().getUName(); - - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = JuickBot.JuickJID; - msg.body = txt; - msg.type = Message.Type.chat; - msg.thread = "juick-" + jmsg.getMID(); - msg.addChild(jmsg); - msg.addChild(nick); - if (attachment != null) { - XOOB oob = new XOOB(); - oob.URL = attachment; - msg.addChild(oob); - } - for (User user : users) { - for (String jid : UserQueries.getActiveJIDs(XMPPComponent.sql, user)) { - msg.to = new JID(jid); - XMPPComponent.sendOut(msg); - } - } - } - - public void sendJuickComment(JuickMessage jmsg) { - String replyQuote; - - List users = SubscriptionsQueries.getSubscribedUsers(XMPPComponent.sql, jmsg.getUser().getUID(), jmsg.getMID()); - replyQuote = getReplyQuote(XMPPComponent.sql, jmsg.getMID(), jmsg.ReplyTo); - - String txt = "Reply by @" + jmsg.getUser().getUName() + ":\n" + replyQuote + "\n"; - String attachment = jmsg.getAttachmentURL(); - if (attachment != null) { - txt += attachment + "\n"; - } - txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMID() + "/" + jmsg.getRID() + " http://juick.com/" + jmsg.getMID() + "#" + jmsg.getRID(); - - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = JuickBot.JuickJID; - msg.body = txt; - msg.type = Message.Type.chat; - msg.addChild(jmsg); - for (User user : users) { - // TODO: make single query - for (String jid : UserQueries.getActiveJIDs(XMPPComponent.sql, user)) { - msg.to = new JID(jid); - XMPPComponent.sendOut(msg); - } - } - } - - private String getReplyQuote(JdbcTemplate sql, int MID, int ReplyTo) { - String quote = ""; - if (ReplyTo > 0) { - com.juick.Message q = MessagesQueries.getReply(sql, MID, ReplyTo); - if (q != null) { - quote = q.getText(); - } - } else { - com.juick.Message q = MessagesQueries.getMessage(sql, MID); - if (q != null) { - quote = q.getText(); - } - } - if (quote.length() > 50) { - quote = ">" + quote.substring(0, 47).replace('\n', ' ') + "...\n"; - } else if (quote.length() > 0) { - quote = ">" + quote.replace('\n', ' ') + "\n"; - } - return quote; - } - - public void sendJuickRecommendation(JuickMessage recomm) { - JuickMessage jmsg; - jmsg = new JuickMessage(MessagesQueries.getMessage(XMPPComponent.sql, recomm.getMID())); - List users = SubscriptionsQueries.getUsersSubscribedToComments(XMPPComponent.sql, - recomm.getMID(), jmsg.getUser().getUID()); - - String txt = "Recommended by @" + recomm.getUser().getUName() + ":\n"; - txt += "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; - String attachment = jmsg.getAttachmentURL(); - if (attachment != null) { - txt += attachment + "\n"; - } - txt += jmsg.getText() + "\n\n"; - txt += "#" + jmsg.getMID(); - if (jmsg.Replies > 0) { - if (jmsg.Replies % 10 == 1 && jmsg.Replies % 100 != 11) { - txt += " (" + jmsg.Replies + " reply)"; - } else { - txt += " (" + jmsg.Replies + " replies)"; - } - } - txt += " http://juick.com/" + jmsg.getMID(); - - Nickname nick = new Nickname(); - nick.Nickname = "@" + jmsg.getUser().getUName(); - - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = JuickBot.JuickJID; - msg.body = txt; - msg.type = Message.Type.chat; - msg.thread = "juick-" + jmsg.getMID(); - msg.addChild(jmsg); - msg.addChild(nick); - if (attachment != null) { - XOOB oob = new XOOB(); - oob.URL = attachment; - msg.addChild(oob); - } - for (User user : users) { - for (String jid : UserQueries.getActiveJIDs(XMPPComponent.sql, user)) { - msg.to = new JID(jid); - XMPPComponent.sendOut(msg); - } - } - } -} diff --git a/src/main/java/com/juick/xmpp/s2s/JuickBot.java b/src/main/java/com/juick/xmpp/s2s/JuickBot.java deleted file mode 100644 index d82ed939..00000000 --- a/src/main/java/com/juick/xmpp/s2s/JuickBot.java +++ /dev/null @@ -1,386 +0,0 @@ -package com.juick.xmpp.s2s; - -import com.juick.User; -import com.juick.server.PMQueries; -import com.juick.server.TagQueries; -import com.juick.server.UserQueries; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; -import com.juick.xmpp.Presence; -import com.juick.xmpp.extensions.Error; -import com.juick.xmpp.extensions.JuickMessage; - -import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * - * @author ugnich - */ -public class JuickBot { - - public static final JID JuickJID = new JID("juick", "juick.com", "Juick"); - private static final String HELPTEXT = - "@username text - Send private message\n" - + "*tagname Blah-blah-blah - Post a message with tag 'tagname'\n" - + "#1234 Blah-blah-blah - Answer to message #1234\n" - + "#1234/5 Blah - Answer to reply #1234/5\n" - + "! #1234 - Recommend post\n" - + "\n" - + "# - Show last messages from your feed (## - second page, ...)\n" - + "@ - Show recomendations and popular personal blogs\n" - + "* - Show your tags\n" - + "#1234 - Show message\n" - + "#1234+ - Show message with replies\n" - + "@username - Show user's info\n" - + "@username+ - Show user's info and last 10 messages\n" - + "@username *tag - User's messages with this tag\n" - + "*tag - Show last 10 messages with this tag\n" - + "? blah - Search posts for 'blah'\n" - + "? @username blah - Searching among user\'s posts for 'blah'\n" - + "D #123 - Delete message\n" - + "D #123/45 - Delete reply\n" - + "DL - Delete last message/reply\n" - + "S - Show your subscriptions\n" - + "S #123 - Subscribe to message replies\n" - + "S @username - Subscribe to user's blog\n" - + "U #123 - Unsubscribe from comments\n" - + "U @username - Unsubscribe from user's blog\n" - + "BL - Show your blacklist\n" - + "BL @username - Add/delete user to/from your blacklist\n" - + "BL *tag - Add/delete tag to/from your blacklist\n" - + "ON / OFF - Enable/disable subscriptions delivery\n" - + "PING - Pong\n" - + "\n" - + "Read more: http://juick.com/help/"; - - public static boolean incomingPresence(Presence p) { - final String username = p.to.Username.toLowerCase(); - final boolean toJuick = username.equals("juick"); - - if (p.type == null) { - Presence reply = new Presence(); - reply.from = new JID(p.to.Username, p.to.Host, null); - reply.to = new JID(p.from.Username, p.from.Host, null); - reply.type = Presence.Type.unsubscribe; - XMPPComponent.sendOut(reply); - return true; - } else if (p.type.equals(Presence.Type.probe)) { - int uid_to = 0; - if (!toJuick) { - uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, username); - } - - if (toJuick || uid_to > 0) { - Presence reply = new Presence(); - reply.from = p.to; - reply.from.Resource = "Juick"; - reply.to = p.from; - reply.priority = 10; - XMPPComponent.sendOut(reply); - } else { - Presence reply = new Presence(p.to, p.from, Presence.Type.error); - reply.id = p.id; - reply.addChild(new Error(Error.Type.cancel, "item-not-found")); - XMPPComponent.sendOut(reply); - return true; - } - return true; - } else if (p.type.equals(Presence.Type.subscribe)) { - boolean canSubscribe = false; - if (toJuick) { - canSubscribe = true; - } else { - int uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, username); - if (uid_to > 0) { - PMQueries.addPMinRoster(XMPPComponent.sql, uid_to, p.from.Bare()); - canSubscribe = true; - } - } - - if (canSubscribe) { - Presence reply = new Presence(p.to, p.from, Presence.Type.subscribed); - XMPPComponent.sendOut(reply); - - reply.from.Resource = "Juick"; - reply.priority = 10; - reply.type = null; - XMPPComponent.sendOut(reply); - - return true; - } else { - Presence reply = new Presence(p.to, p.from, Presence.Type.error); - reply.id = p.id; - reply.addChild(new Error(Error.Type.cancel, "item-not-found")); - XMPPComponent.sendOut(reply); - return true; - } - } else if (p.type.equals(Presence.Type.unsubscribe)) { - if (!toJuick) { - int uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, username); - if (uid_to > 0) { - PMQueries.removePMinRoster(XMPPComponent.sql, uid_to, p.from.Bare()); - } - } - - Presence reply = new Presence(p.to, p.from, Presence.Type.unsubscribed); - XMPPComponent.sendOut(reply); - } - - return false; - } - - public static boolean incomingMessage(Message msg) { - if (msg.body == null || msg.body.isEmpty()) { - return true; - } - - String username = msg.to.Username.toLowerCase(); - - User user_from = null; - String signuphash = ""; - user_from = UserQueries.getUserByJID(XMPPComponent.sql, msg.from.Bare()); - if (user_from == null) { - signuphash = UserQueries.getSignUpHashByJID(XMPPComponent.sql, msg.from.Bare()); - } - - if (user_from == null) { - Message reply = new Message(msg.to, msg.from, Message.Type.chat); - if (username.equals("juick")) { - reply.body = "Для того, чтобы начать пользоваться сервисом, пожалуйста пройдите быструю регистрацию: http://juick.com/signup?type=xmpp&hash=" + signuphash + "\nЕсли у вас уже есть учетная запись на Juick, вы сможете присоединить этот JabberID к ней.\n\nTo start using Juick, please sign up: http://juick.com/signup?type=xmpp&hash=" + signuphash + "\nIf you already have an account on Juick, you will be proposed to attach this JabberID to your existing account."; - } else { - reply.body = "Внимание, системное сообщение!\nВаш JabberID не обнаружен в списке доверенных. Для того, чтобы отправить сообщение пользователю " + username + "@juick.com, пожалуйста зарегистрируйте свой JabberID в системе: http://juick.com/signup?type=xmpp&hash=" + signuphash + "\nЕсли у вас уже есть учетная запись на Juick, вы сможете присоединить этот JabberID к ней.\n\nWarning, system message!\nYour JabberID is not found in our server's white list. To send a message to " + username + "@juick.com, please sign up: http://juick.com/signup?type=xmpp&hash=" + signuphash + "\nIf you already have an account on Juick, you will be proposed to attach this JabberID to your existing account."; - } - XMPPComponent.sendOut(reply); - return true; - } - - if (username.equals("juick")) { - return incomingMessageJuick(user_from, msg); - } - - int uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, username); - - if (uid_to == 0) { - Message reply = new Message(msg.to, msg.from, Message.Type.error); - reply.id = msg.id; - reply.addChild(new Error(Error.Type.cancel, "item-not-found")); - XMPPComponent.sendOut(reply); - return true; - } - - boolean success = false; - if (!UserQueries.isInBLAny(XMPPComponent.sql, uid_to, user_from.getUID())) { - success = PMQueries.createPM(XMPPComponent.sql, user_from.getUID(), uid_to, msg.body); - } - - - if (success) { - Message m = new Message(); - m.from = new JID("juick", "juick.com", null); - m.to = new JID(Integer.toString(uid_to), "push.juick.com", null); - JuickMessage jmsg = new JuickMessage(); - jmsg.setUser(UserQueries.getUserByUID(XMPPComponent.sql, user_from.getUID())); - - jmsg.setText(msg.body); - m.childs.add(jmsg); - XMPPComponent.connRouter.sendStanza(m.toString()); - - m.to.Host = "ws.juick.com"; - XMPPComponent.connRouter.sendStanza(m.toString()); - - String jid; - boolean inroster = false; - jid = UserQueries.getJIDbyUID(XMPPComponent.sql, uid_to); - if (jid != null) { - inroster = PMQueries.havePMinRoster(XMPPComponent.sql, user_from.getUID(), jid); - } - - if (jid != null) { - Message mm = new Message(); - mm.to = new JID(jid); - mm.type = Message.Type.chat; - if (inroster) { - mm.from = new JID(jmsg.getUser().getUName(), "juick.com", "Juick"); - mm.body = msg.body; - } else { - mm.from = new JID("juick", "juick.com", "Juick"); - mm.body = "Private message from @" + jmsg.getUser().getUName() + ":\n" + msg.body; - } - XMPPComponent.sendOut(mm); - } - - } else { - Message reply = new Message(msg.to, msg.from, Message.Type.error); - reply.id = msg.id; - reply.addChild(new Error(Error.Type.cancel, "not-allowed")); - XMPPComponent.sendOut(reply); - } - - return true; - } - private static Pattern regexPM = Pattern.compile("^\\@(\\S+)\\s+([\\s\\S]+)$"); - - public static boolean incomingMessageJuick(User user_from, Message msg) { - String command = msg.body.trim(); - int commandlen = command.length(); - - // COMPATIBILITY - if (commandlen > 7 && command.substring(0, 3).equalsIgnoreCase("PM ")) { - command = command.substring(3).trim(); - commandlen = command.length(); - } - - if (commandlen == 4) { - if (command.equalsIgnoreCase("PING")) { - commandPing(msg); - return true; - } else if (command.equalsIgnoreCase("HELP")) { - commandHelp(msg); - return true; - } - } else if (commandlen == 5 && command.equalsIgnoreCase("LOGIN")) { - commandLogin(msg, user_from); - return true; - } else if (command.charAt(0) == '@') { - Matcher matchPM = regexPM.matcher(command); - if (matchPM.find()) { - String user_to = matchPM.group(1); - String msgtxt = matchPM.group(2); - commandPM(msg, user_from, user_to, msgtxt); - return true; - } - } else if (commandlen == 2 && command.equalsIgnoreCase("BL")) { - commandBLShow(msg, user_from); - return true; - } - - return false; - } - - private static void commandPing(Message m) { - Presence p = new Presence(JuickJID, m.from); - p.priority = 10; - XMPPComponent.sendOut(p); - - Message reply = new Message(JuickJID, m.from, Message.Type.chat); - reply.body = "PONG"; - XMPPComponent.sendOut(reply); - } - - private static void commandHelp(Message m) { - Message reply = new Message(JuickJID, m.from, Message.Type.chat); - reply.body = HELPTEXT; - XMPPComponent.sendOut(reply); - } - - private static void commandLogin(Message m, User user_from) { - Message reply = new Message(JuickJID, m.from, Message.Type.chat); - reply.body = "http://juick.com/login?" + UserQueries.getHashByUID(XMPPComponent.sql, user_from.getUID()); - XMPPComponent.sendOut(reply); - } - - private static void commandPM(Message m, User user_from, String user_to, String body) { - int ret = 0; - - int uid_to = 0; - String jid_to = null; - boolean haveInRoster = false; - - if (user_to.indexOf('@') > 0) { - uid_to = UserQueries.getUIDbyJID(XMPPComponent.sql, user_to); - } else { - uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, user_to); - } - - if (uid_to > 0) { - if (!UserQueries.isInBLAny(XMPPComponent.sql, uid_to, user_from.getUID())) { - if (PMQueries.createPM(XMPPComponent.sql, user_from.getUID(), uid_to, body)) { - jid_to = UserQueries.getJIDbyUID(XMPPComponent.sql, uid_to); - if (jid_to != null) { - haveInRoster = PMQueries.havePMinRoster(XMPPComponent.sql, user_from.getUID(), jid_to); - } - ret = 200; - } else { - ret = 500; - } - } else { - ret = 403; - } - } else { - ret = 404; - } - - - if (ret == 200) { - Message msg = new Message(); - msg.from = new JID("juick", "juick.com", null); - msg.to = new JID(Integer.toString(uid_to), "push.juick.com", null); - JuickMessage jmsg = new JuickMessage(); - jmsg.setUser(user_from); - jmsg.setText(body); - msg.childs.add(jmsg); - XMPPComponent.connRouter.sendStanza(msg.toString()); - - msg.to.Host = "ws.juick.com"; - XMPPComponent.connRouter.sendStanza(msg.toString()); - - if (jid_to != null) { - Message mm = new Message(); - mm.to = new JID(jid_to); - mm.type = Message.Type.chat; - if (haveInRoster) { - mm.from = new JID(user_from.getUName(), "juick.com", "Juick"); - mm.body = body; - } else { - mm.from = new JID("juick", "juick.com", "Juick"); - mm.body = "Private message from @" + user_from.getUName() + ":\n" + body; - } - XMPPComponent.sendOut(mm); - } - } - - Message reply = new Message(m.to, m.from); - if (ret == 200) { - reply.type = m.type; - reply.body = "Private message sent"; - } else { - reply.type = Message.Type.error; - reply.body = "Error " + ret; - } - XMPPComponent.sendOut(reply); - } - - private static void commandBLShow(Message m, User user_from) { - List blusers; - List bltags; - - blusers = UserQueries.getUserBLUsers(XMPPComponent.sql, user_from.getUID()); - bltags = TagQueries.getUserBLTags(XMPPComponent.sql, user_from.getUID()); - - - String txt = ""; - if (bltags.size() > 0) { - for (int i = 0; i < bltags.size(); i++) { - txt += "*" + bltags.get(i) + "\n"; - } - - if (blusers.size() > 0) { - txt += "\n"; - } - } - if (blusers.size() > 0) { - for (int i = 0; i < blusers.size(); i++) { - txt += "@" + blusers.get(i).getUName() + "\n"; - } - } - if (txt.isEmpty()) { - txt = "You don't have any users or tags in your blacklist."; - } - - Message reply = new Message(JuickJID, m.from, Message.Type.chat); - reply.body = txt; - XMPPComponent.sendOut(reply); - } -} diff --git a/src/main/java/com/juick/xmpp/s2s/S2SComponent.java b/src/main/java/com/juick/xmpp/s2s/S2SComponent.java new file mode 100644 index 00000000..dcb547fb --- /dev/null +++ b/src/main/java/com/juick/xmpp/s2s/S2SComponent.java @@ -0,0 +1,316 @@ +package com.juick.xmpp.s2s; + +import com.juick.JuickComponent; +import com.juick.User; +import com.juick.server.MessagesQueries; +import com.juick.server.SubscriptionsQueries; +import com.juick.server.UserQueries; +import com.juick.xmpp.*; +import com.juick.xmpp.extensions.JuickMessage; +import com.juick.xmpp.extensions.Nickname; +import com.juick.xmpp.extensions.XOOB; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.logging.Logger; + +/** + * + * @author ugnich + */ +public class S2SComponent implements JuickComponent { + + private static final Logger LOGGER = Logger.getLogger(S2SComponent.class.getName()); + + ExecutorService executorService; + + public static String HOSTNAME = null; + public static String componentName = null; + public static String STATSFILE = null; + static final List inConnections = Collections.synchronizedList(new ArrayList<>()); + static final List outConnections = Collections.synchronizedList(new ArrayList<>()); + static final List outCache = Collections.synchronizedList(new ArrayList<>()); + JdbcTemplate sql; + final public static HashMap childParsers = new HashMap<>(); + + public static void addConnectionIn(ConnectionIn c) { + synchronized (inConnections) { + inConnections.add(c); + } + } + + public static void addConnectionOut(ConnectionOut c) { + synchronized (outConnections) { + outConnections.add(c); + } + } + + public static void removeConnectionIn(ConnectionIn c) { + synchronized (inConnections) { + inConnections.remove(c); + } + } + + public static void removeConnectionOut(ConnectionOut c) { + synchronized (outConnections) { + outConnections.remove(c); + } + } + + public static String getFromCache(String hostname) { + CacheEntry ret = null; + synchronized (outCache) { + for (Iterator i = outCache.iterator(); i.hasNext();) { + CacheEntry c = i.next(); + if (c.hostname != null && c.hostname.equals(hostname)) { + ret = c; + i.remove(); + break; + } + } + } + return (ret != null) ? ret.xml : null; + } + + public static ConnectionOut getConnectionOut(String hostname, boolean needReady) { + synchronized (outConnections) { + for (ConnectionOut c : outConnections) { + if (c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)) { + return c; + } + } + } + return null; + } + + public static ConnectionIn getConnectionIn(String streamID) { + synchronized (inConnections) { + for (ConnectionIn c : inConnections) { + if (c.streamID != null && c.streamID.equals(streamID)) { + return c; + } + } + } + return null; + } + + public void sendOut(Stanza s) { + sendOut(s.to.Host, s.toString()); + } + + public void sendOut(String hostname, String xml) { + boolean haveAnyConn = false; + + ConnectionOut connOut = null; + synchronized (outConnections) { + for (ConnectionOut c : outConnections) { + if (c.to != null && c.to.equals(hostname)) { + if (c.streamReady) { + connOut = c; + break; + } else { + haveAnyConn = true; + break; + } + } + } + } + if (connOut != null) { + try { + connOut.sendStanza(xml); + } catch (IOException e) { + LOGGER.warning("STREAM TO " + connOut.to + " " + connOut.streamID + " ERROR: " + e.toString()); + } + return; + } + + boolean haveCache = false; + synchronized (outCache) { + for (CacheEntry c : outCache) { + if (c.hostname != null && c.hostname.equals(hostname)) { + c.xml += xml; + c.tsUpdated = System.currentTimeMillis(); + haveCache = true; + break; + } + } + if (!haveCache) { + outCache.add(new CacheEntry(hostname, xml)); + } + } + + if (!haveAnyConn) { + ConnectionOut connectionOut = new ConnectionOut(hostname); + connectionOut.parseStream(); + } + } + + public S2SComponent(JdbcTemplate sql, ExecutorService executorService, Properties conf) { + LOGGER.info("component initialized"); + HOSTNAME = conf.getProperty("hostname"); + componentName = conf.getProperty("componentname"); + STATSFILE = conf.getProperty("statsfile"); + this.sql = sql; + this.executorService = executorService; + executorService.submit(new ConnectionListener(executorService)); + executorService.submit(new CleaningUp()); + } + @Override + public void messageReceived(Message xmsg) { + JID jid = xmsg.to; + if (jid.Host.equals(componentName)) { + LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); + + JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + if (jid.Username != null && jid.Username.equals("recomm")) { + sendJuickRecommendation(jmsg); + } else { + if (jmsg.getRID() > 0) { + sendJuickComment(jmsg); + } else if (jmsg.getMID() > 0) { + sendJuickMessage(jmsg); + } + } + } + } + } + + public void sendJuickMessage(JuickMessage jmsg) { + List users; + + + if (jmsg.FriendsOnly) { + users = SubscriptionsQueries.getUsersSubscribedToUser(sql, jmsg.getUser().getUID(), jmsg.FriendsOnly); + } else { + users = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()); + } + + + String txt = "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n"; + txt += "#" + jmsg.getMID() + " http://juick.com/" + jmsg.getMID(); + + Nickname nick = new Nickname(); + nick.Nickname = "@" + jmsg.getUser().getUName(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = JuickBot.JuickJID; + msg.body = txt; + msg.type = Message.Type.chat; + msg.thread = "juick-" + jmsg.getMID(); + msg.addChild(jmsg); + msg.addChild(nick); + if (attachment != null) { + XOOB oob = new XOOB(); + oob.URL = attachment; + msg.addChild(oob); + } + for (User user : users) { + for (String jid : UserQueries.getActiveJIDs(sql, user)) { + msg.to = new JID(jid); + sendOut(msg); + } + } + } + + public void sendJuickComment(JuickMessage jmsg) { + String replyQuote; + + List users = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()); + replyQuote = getReplyQuote(sql, jmsg.getMID(), jmsg.ReplyTo); + + String txt = "Reply by @" + jmsg.getUser().getUName() + ":\n" + replyQuote + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMID() + "/" + jmsg.getRID() + " http://juick.com/" + jmsg.getMID() + "#" + jmsg.getRID(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = JuickBot.JuickJID; + msg.body = txt; + msg.type = Message.Type.chat; + msg.addChild(jmsg); + for (User user : users) { + // TODO: make single query + for (String jid : UserQueries.getActiveJIDs(sql, user)) { + msg.to = new JID(jid); + sendOut(msg); + } + } + } + + private String getReplyQuote(JdbcTemplate sql, int MID, int ReplyTo) { + String quote = ""; + if (ReplyTo > 0) { + com.juick.Message q = MessagesQueries.getReply(sql, MID, ReplyTo); + if (q != null) { + quote = q.getText(); + } + } else { + com.juick.Message q = MessagesQueries.getMessage(sql, MID); + if (q != null) { + quote = q.getText(); + } + } + if (quote.length() > 50) { + quote = ">" + quote.substring(0, 47).replace('\n', ' ') + "...\n"; + } else if (quote.length() > 0) { + quote = ">" + quote.replace('\n', ' ') + "\n"; + } + return quote; + } + + public void sendJuickRecommendation(JuickMessage recomm) { + JuickMessage jmsg; + jmsg = new JuickMessage(MessagesQueries.getMessage(sql, recomm.getMID())); + List users = SubscriptionsQueries.getUsersSubscribedToComments(sql, + recomm.getMID(), jmsg.getUser().getUID()); + + String txt = "Recommended by @" + recomm.getUser().getUName() + ":\n"; + txt += "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n"; + txt += "#" + jmsg.getMID(); + if (jmsg.Replies > 0) { + if (jmsg.Replies % 10 == 1 && jmsg.Replies % 100 != 11) { + txt += " (" + jmsg.Replies + " reply)"; + } else { + txt += " (" + jmsg.Replies + " replies)"; + } + } + txt += " http://juick.com/" + jmsg.getMID(); + + Nickname nick = new Nickname(); + nick.Nickname = "@" + jmsg.getUser().getUName(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = JuickBot.JuickJID; + msg.body = txt; + msg.type = Message.Type.chat; + msg.thread = "juick-" + jmsg.getMID(); + msg.addChild(jmsg); + msg.addChild(nick); + if (attachment != null) { + XOOB oob = new XOOB(); + oob.URL = attachment; + msg.addChild(oob); + } + for (User user : users) { + for (String jid : UserQueries.getActiveJIDs(sql, user)) { + msg.to = new JID(jid); + sendOut(msg); + } + } + } +} diff --git a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java b/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java deleted file mode 100644 index 1dfdd38d..00000000 --- a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java +++ /dev/null @@ -1,199 +0,0 @@ -package com.juick.xmpp.s2s; - -import com.juick.xmpp.Stanza; -import com.juick.xmpp.StanzaChild; -import com.juick.xmpp.extensions.JuickMessage; -import org.springframework.jdbc.core.JdbcTemplate; - -import javax.servlet.ServletContextEvent; -import javax.servlet.ServletContextListener; -import java.io.IOException; -import java.sql.Driver; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * - * @author ugnich - */ -public class XMPPComponent implements ServletContextListener { - - private static final Logger LOGGER = Logger.getLogger(XMPPComponent.class.getName()); - - public static final ExecutorService executorService = Executors.newWorkStealingPool(); - - public static String HOSTNAME = null; - public static String STATSFILE = null; - public static ConnectionRouter connRouter; - static final List inConnections = Collections.synchronizedList(new ArrayList<>()); - static final List outConnections = Collections.synchronizedList(new ArrayList<>()); - static final List outCache = Collections.synchronizedList(new ArrayList<>()); - static JdbcTemplate sql; - final public static HashMap childParsers = new HashMap<>(); - - public static void addConnectionIn(ConnectionIn c) { - synchronized (inConnections) { - inConnections.add(c); - } - } - - public static void addConnectionOut(ConnectionOut c) { - synchronized (outConnections) { - outConnections.add(c); - } - } - - public static void removeConnectionIn(ConnectionIn c) { - synchronized (inConnections) { - inConnections.remove(c); - } - } - - public static void removeConnectionOut(ConnectionOut c) { - synchronized (outConnections) { - outConnections.remove(c); - } - } - - public static String getFromCache(String hostname) { - CacheEntry ret = null; - synchronized (outCache) { - for (Iterator i = outCache.iterator(); i.hasNext();) { - CacheEntry c = i.next(); - if (c.hostname != null && c.hostname.equals(hostname)) { - ret = c; - i.remove(); - break; - } - } - } - return (ret != null) ? ret.xml : null; - } - - public static ConnectionOut getConnectionOut(String hostname, boolean needReady) { - synchronized (outConnections) { - for (ConnectionOut c : outConnections) { - if (c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)) { - return c; - } - } - } - return null; - } - - public static ConnectionIn getConnectionIn(String streamID) { - synchronized (inConnections) { - for (ConnectionIn c : inConnections) { - if (c.streamID != null && c.streamID.equals(streamID)) { - return c; - } - } - } - return null; - } - - public static void sendOut(Stanza s) { - sendOut(s.to.Host, s.toString()); - } - - public static void sendOut(String hostname, String xml) { - boolean haveAnyConn = false; - - ConnectionOut connOut = null; - synchronized (outConnections) { - for (ConnectionOut c : outConnections) { - if (c.to != null && c.to.equals(hostname)) { - if (c.streamReady) { - connOut = c; - break; - } else { - haveAnyConn = true; - break; - } - } - } - } - if (connOut != null) { - try { - connOut.sendStanza(xml); - } catch (IOException e) { - LOGGER.warning("STREAM TO " + connOut.to + " " + connOut.streamID + " ERROR: " + e.toString()); - } - return; - } - - boolean haveCache = false; - synchronized (outCache) { - for (CacheEntry c : outCache) { - if (c.hostname != null && c.hostname.equals(hostname)) { - c.xml += xml; - c.tsUpdated = System.currentTimeMillis(); - haveCache = true; - break; - } - } - if (!haveCache) { - outCache.add(new CacheEntry(hostname, xml)); - } - } - - if (!haveAnyConn) { - ConnectionOut connectionOut = new ConnectionOut(hostname); - connectionOut.parseStream(); - } - } - - @Override - public void contextInitialized(ServletContextEvent sce) { - - LOGGER.info("component initialized"); - executorService.submit(() -> { - Properties conf = new Properties(); - try { - conf.load(sce.getServletContext().getResourceAsStream("WEB-INF/s2s.conf")); - HOSTNAME = conf.getProperty("hostname"); - String componentName = conf.getProperty("componentname"); - STATSFILE = conf.getProperty("statsfile"); - sql = (JdbcTemplate) sce.getServletContext().getAttribute("sql"); - - childParsers.put(JuickMessage.XMLNS, new JuickMessage()); - - connRouter = new ConnectionRouter(componentName); - executorService.submit(connRouter); - executorService.submit(new ConnectionListener()); - executorService.submit(new CleaningUp()); - } catch (IOException e) { - LOGGER.log(Level.SEVERE, "XMPPComponent error", e); - } - }); - } - - - - @Override - public void contextDestroyed(ServletContextEvent sce) { - synchronized (XMPPComponent.outConnections) { - for (Iterator i = XMPPComponent.outConnections.iterator(); i.hasNext();) { - ConnectionOut c = i.next(); - c.closeConnection(); - i.remove(); - } - } - - synchronized (XMPPComponent.inConnections) { - for (Iterator i = XMPPComponent.inConnections.iterator(); i.hasNext();) { - ConnectionIn c = i.next(); - c.closeConnection(); - i.remove(); - } - } - XMPPComponent.connRouter.closeConnection(); - executorService.shutdown(); - LOGGER.info("component destroyed"); - } -} -- cgit v1.2.3
hostlivesize