diff options
author | Vitaly Takmazov | 2016-02-03 12:35:59 +0300 |
---|---|---|
committer | Vitaly Takmazov | 2016-02-03 12:35:59 +0300 |
commit | 380018da475ff41d3375e7f2bea0a192a4d9b178 (patch) | |
tree | 6e6e38e109c73ff5bc233681143ee4ac2bff9a96 /src/main/java/com/juick/xmpp/s2s/S2SComponent.java | |
parent | 36f542dad713d173102a60a1aa7e336e6db31200 (diff) |
single xmpp component, WIP
Diffstat (limited to 'src/main/java/com/juick/xmpp/s2s/S2SComponent.java')
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/S2SComponent.java | 316 |
1 files changed, 316 insertions, 0 deletions
diff --git a/src/main/java/com/juick/xmpp/s2s/S2SComponent.java b/src/main/java/com/juick/xmpp/s2s/S2SComponent.java new file mode 100644 index 00000000..dcb547fb --- /dev/null +++ b/src/main/java/com/juick/xmpp/s2s/S2SComponent.java @@ -0,0 +1,316 @@ +package com.juick.xmpp.s2s; + +import com.juick.JuickComponent; +import com.juick.User; +import com.juick.server.MessagesQueries; +import com.juick.server.SubscriptionsQueries; +import com.juick.server.UserQueries; +import com.juick.xmpp.*; +import com.juick.xmpp.extensions.JuickMessage; +import com.juick.xmpp.extensions.Nickname; +import com.juick.xmpp.extensions.XOOB; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.logging.Logger; + +/** + * + * @author ugnich + */ +public class S2SComponent implements JuickComponent { + + private static final Logger LOGGER = Logger.getLogger(S2SComponent.class.getName()); + + ExecutorService executorService; + + public static String HOSTNAME = null; + public static String componentName = null; + public static String STATSFILE = null; + static final List<ConnectionIn> inConnections = Collections.synchronizedList(new ArrayList<>()); + static final List<ConnectionOut> outConnections = Collections.synchronizedList(new ArrayList<>()); + static final List<CacheEntry> outCache = Collections.synchronizedList(new ArrayList<>()); + JdbcTemplate sql; + final public static HashMap<String, StanzaChild> childParsers = new HashMap<>(); + + public static void addConnectionIn(ConnectionIn c) { + synchronized (inConnections) { + inConnections.add(c); + } + } + + public static void addConnectionOut(ConnectionOut c) { + synchronized (outConnections) { + outConnections.add(c); + } + } + + public static void removeConnectionIn(ConnectionIn c) { + synchronized (inConnections) { + inConnections.remove(c); + } + } + + public static void removeConnectionOut(ConnectionOut c) { + synchronized (outConnections) { + outConnections.remove(c); + } + } + + public static String getFromCache(String hostname) { + CacheEntry ret = null; + synchronized (outCache) { + for (Iterator<CacheEntry> i = outCache.iterator(); i.hasNext();) { + CacheEntry c = i.next(); + if (c.hostname != null && c.hostname.equals(hostname)) { + ret = c; + i.remove(); + break; + } + } + } + return (ret != null) ? ret.xml : null; + } + + public static ConnectionOut getConnectionOut(String hostname, boolean needReady) { + synchronized (outConnections) { + for (ConnectionOut c : outConnections) { + if (c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)) { + return c; + } + } + } + return null; + } + + public static ConnectionIn getConnectionIn(String streamID) { + synchronized (inConnections) { + for (ConnectionIn c : inConnections) { + if (c.streamID != null && c.streamID.equals(streamID)) { + return c; + } + } + } + return null; + } + + public void sendOut(Stanza s) { + sendOut(s.to.Host, s.toString()); + } + + public void sendOut(String hostname, String xml) { + boolean haveAnyConn = false; + + ConnectionOut connOut = null; + synchronized (outConnections) { + for (ConnectionOut c : outConnections) { + if (c.to != null && c.to.equals(hostname)) { + if (c.streamReady) { + connOut = c; + break; + } else { + haveAnyConn = true; + break; + } + } + } + } + if (connOut != null) { + try { + connOut.sendStanza(xml); + } catch (IOException e) { + LOGGER.warning("STREAM TO " + connOut.to + " " + connOut.streamID + " ERROR: " + e.toString()); + } + return; + } + + boolean haveCache = false; + synchronized (outCache) { + for (CacheEntry c : outCache) { + if (c.hostname != null && c.hostname.equals(hostname)) { + c.xml += xml; + c.tsUpdated = System.currentTimeMillis(); + haveCache = true; + break; + } + } + if (!haveCache) { + outCache.add(new CacheEntry(hostname, xml)); + } + } + + if (!haveAnyConn) { + ConnectionOut connectionOut = new ConnectionOut(hostname); + connectionOut.parseStream(); + } + } + + public S2SComponent(JdbcTemplate sql, ExecutorService executorService, Properties conf) { + LOGGER.info("component initialized"); + HOSTNAME = conf.getProperty("hostname"); + componentName = conf.getProperty("componentname"); + STATSFILE = conf.getProperty("statsfile"); + this.sql = sql; + this.executorService = executorService; + executorService.submit(new ConnectionListener(executorService)); + executorService.submit(new CleaningUp()); + } + @Override + public void messageReceived(Message xmsg) { + JID jid = xmsg.to; + if (jid.Host.equals(componentName)) { + LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); + + JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + if (jid.Username != null && jid.Username.equals("recomm")) { + sendJuickRecommendation(jmsg); + } else { + if (jmsg.getRID() > 0) { + sendJuickComment(jmsg); + } else if (jmsg.getMID() > 0) { + sendJuickMessage(jmsg); + } + } + } + } + } + + public void sendJuickMessage(JuickMessage jmsg) { + List<User> users; + + + if (jmsg.FriendsOnly) { + users = SubscriptionsQueries.getUsersSubscribedToUser(sql, jmsg.getUser().getUID(), jmsg.FriendsOnly); + } else { + users = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()); + } + + + String txt = "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n"; + txt += "#" + jmsg.getMID() + " http://juick.com/" + jmsg.getMID(); + + Nickname nick = new Nickname(); + nick.Nickname = "@" + jmsg.getUser().getUName(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = JuickBot.JuickJID; + msg.body = txt; + msg.type = Message.Type.chat; + msg.thread = "juick-" + jmsg.getMID(); + msg.addChild(jmsg); + msg.addChild(nick); + if (attachment != null) { + XOOB oob = new XOOB(); + oob.URL = attachment; + msg.addChild(oob); + } + for (User user : users) { + for (String jid : UserQueries.getActiveJIDs(sql, user)) { + msg.to = new JID(jid); + sendOut(msg); + } + } + } + + public void sendJuickComment(JuickMessage jmsg) { + String replyQuote; + + List<User> users = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()); + replyQuote = getReplyQuote(sql, jmsg.getMID(), jmsg.ReplyTo); + + String txt = "Reply by @" + jmsg.getUser().getUName() + ":\n" + replyQuote + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMID() + "/" + jmsg.getRID() + " http://juick.com/" + jmsg.getMID() + "#" + jmsg.getRID(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = JuickBot.JuickJID; + msg.body = txt; + msg.type = Message.Type.chat; + msg.addChild(jmsg); + for (User user : users) { + // TODO: make single query + for (String jid : UserQueries.getActiveJIDs(sql, user)) { + msg.to = new JID(jid); + sendOut(msg); + } + } + } + + private String getReplyQuote(JdbcTemplate sql, int MID, int ReplyTo) { + String quote = ""; + if (ReplyTo > 0) { + com.juick.Message q = MessagesQueries.getReply(sql, MID, ReplyTo); + if (q != null) { + quote = q.getText(); + } + } else { + com.juick.Message q = MessagesQueries.getMessage(sql, MID); + if (q != null) { + quote = q.getText(); + } + } + if (quote.length() > 50) { + quote = ">" + quote.substring(0, 47).replace('\n', ' ') + "...\n"; + } else if (quote.length() > 0) { + quote = ">" + quote.replace('\n', ' ') + "\n"; + } + return quote; + } + + public void sendJuickRecommendation(JuickMessage recomm) { + JuickMessage jmsg; + jmsg = new JuickMessage(MessagesQueries.getMessage(sql, recomm.getMID())); + List<User> users = SubscriptionsQueries.getUsersSubscribedToComments(sql, + recomm.getMID(), jmsg.getUser().getUID()); + + String txt = "Recommended by @" + recomm.getUser().getUName() + ":\n"; + txt += "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n"; + txt += "#" + jmsg.getMID(); + if (jmsg.Replies > 0) { + if (jmsg.Replies % 10 == 1 && jmsg.Replies % 100 != 11) { + txt += " (" + jmsg.Replies + " reply)"; + } else { + txt += " (" + jmsg.Replies + " replies)"; + } + } + txt += " http://juick.com/" + jmsg.getMID(); + + Nickname nick = new Nickname(); + nick.Nickname = "@" + jmsg.getUser().getUName(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = JuickBot.JuickJID; + msg.body = txt; + msg.type = Message.Type.chat; + msg.thread = "juick-" + jmsg.getMID(); + msg.addChild(jmsg); + msg.addChild(nick); + if (attachment != null) { + XOOB oob = new XOOB(); + oob.URL = attachment; + msg.addChild(oob); + } + for (User user : users) { + for (String jid : UserQueries.getActiveJIDs(sql, user)) { + msg.to = new JID(jid); + sendOut(msg); + } + } + } +} |