package com.juick.ws.components; import com.juick.User; import com.juick.server.MessagesQueries; import com.juick.server.SubscriptionsQueries; import com.juick.server.UserQueries; import com.juick.ws.s2s.*; import com.juick.xmpp.*; import com.juick.xmpp.extensions.JuickMessage; import com.juick.xmpp.extensions.Nickname; import com.juick.xmpp.extensions.XOOB; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.commons.lang3.math.NumberUtils; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.env.Environment; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.xmlpull.v1.XmlPullParserException; import javax.inject.Inject; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; /** * * @author ugnich */ @Component public class XMPPComponent implements JuickComponent, DisposableBean, Stream.StreamListener, Message.MessageListener, Iq.IqListener, Presence.PresenceListener { private static final Logger logger = Logger.getLogger(XMPPComponent.class.getName()); public final ExecutorService executorService = Executors.newCachedThreadPool(); private StreamComponent router; JuickBot bot; public String HOSTNAME, componentName; public String STATSFILE = null; public String keystore; public String keystorePassword; public List brokenSSLhosts; public List bannedHosts; private final List inConnections = Collections.synchronizedList(new ArrayList<>()); private final List outConnections = Collections.synchronizedList(new ArrayList<>()); private final List outCache = Collections.synchronizedList(new ArrayList<>()); private JdbcTemplate sql; final public HashMap childParsers = new HashMap<>(); @Inject public XMPPComponent(Environment env) { logger.info("component initialized"); try { HOSTNAME = env.getProperty("hostname"); componentName = env.getProperty("componentname"); int componentPort = NumberUtils.toInt(env.getProperty("component_port"), 5347); int s2sPort = NumberUtils.toInt(env.getProperty("s2s_port"), 5269); JID Jid = new JID(env.getProperty("xmppbot_jid")); STATSFILE = env.getProperty("statsfile"); keystore = env.getProperty("keystore"); keystorePassword = env.getProperty("keystore_password"); brokenSSLhosts = Arrays.asList(env.getProperty("broken_ssl_hosts", "").split(",")); bannedHosts = Arrays.asList(env.getProperty("banned_hosts", "").split(",")); BasicDataSource dataSource = new BasicDataSource(); dataSource.setDriverClassName(env.getProperty("datasource_driver", "com.mysql.jdbc.Driver")); dataSource.setUrl(env.getProperty("datasource_url")); setSql(new JdbcTemplate(dataSource)); bot = new JuickBot(this, Jid); childParsers.put(JuickMessage.XMLNS, new JuickMessage()); Socket routerSocket = null; try { routerSocket = new Socket("localhost", componentPort); setRouter(new StreamComponent(new JID("s2s"), routerSocket.getInputStream(), routerSocket.getOutputStream(), env.getProperty("xmpp_password"))); getRouter().addChildParser(new JuickMessage()); getRouter().addListener((Stream.StreamListener) this); getRouter().addListener((Message.MessageListener) this); getRouter().addListener((Iq.IqListener) this); } catch (IOException e) { logger.log(Level.SEVERE, "router error", e); } executorService.submit(() -> { final ServerSocket listener = new ServerSocket(s2sPort); logger.info("s2s listener ready"); while (true) { try { Socket socket = listener.accept(); ConnectionIn client = new ConnectionIn(this, bot, socket); addConnectionIn(client); executorService.submit(client); } catch (Exception e) { logger.log(Level.SEVERE, "s2s error", e); } } }); executorService.submit(new CleaningUp(this)); } catch (Exception e) { logger.log(Level.SEVERE, "XMPPComponent error", e); } } public void addConnectionIn(ConnectionIn c) { synchronized (getInConnections()) { getInConnections().add(c); } } public void addConnectionOut(ConnectionOut c) { synchronized (getOutConnections()) { getOutConnections().add(c); } } public void removeConnectionIn(ConnectionIn c) { synchronized (getInConnections()) { getInConnections().remove(c); } } public void removeConnectionOut(ConnectionOut c) { synchronized (getOutConnections()) { getOutConnections().remove(c); } } public String getFromCache(String hostname) { CacheEntry ret = null; synchronized (getOutCache()) { for (Iterator i = getOutCache().iterator(); i.hasNext();) { CacheEntry c = i.next(); if (c.hostname != null && c.hostname.equals(hostname)) { ret = c; i.remove(); break; } } } return (ret != null) ? ret.xml : null; } public ConnectionOut getConnectionOut(String hostname, boolean needReady) { synchronized (getOutConnections()) { for (ConnectionOut c : getOutConnections()) { if (c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)) { return c; } } } return null; } public ConnectionIn getConnectionIn(String streamID) { synchronized (getInConnections()) { for (ConnectionIn c : getInConnections()) { if (c.streamID != null && c.streamID.equals(streamID)) { return c; } } } return null; } public void sendOut(Stanza s) { sendOut(s.to.Host, s.toString()); } public void sendOut(String hostname, String xml) { boolean haveAnyConn = false; ConnectionOut connOut = null; synchronized (getOutConnections()) { for (ConnectionOut c : getOutConnections()) { if (c.to != null && c.to.equals(hostname)) { if (c.streamReady) { connOut = c; break; } else { haveAnyConn = true; break; } } } } if (connOut != null) { try { connOut.sendStanza(xml); } catch (IOException e) { logger.warning("STREAM TO " + connOut.to + " " + connOut.streamID + " ERROR: " + e.toString()); } return; } boolean haveCache = false; synchronized (getOutCache()) { for (CacheEntry c : getOutCache()) { if (c.hostname != null && c.hostname.equals(hostname)) { c.xml += xml; c.tsUpdated = System.currentTimeMillis(); haveCache = true; break; } } if (!haveCache) { getOutCache().add(new CacheEntry(hostname, xml)); } } if (!haveAnyConn) { try { ConnectionOut connectionOut = new ConnectionOut(this, hostname); executorService.submit(connectionOut); } catch (CertificateException | UnrecoverableKeyException | NoSuchAlgorithmException | XmlPullParserException | KeyStoreException | KeyManagementException | IOException e) { logger.log(Level.SEVERE, "s2s out error", e); } } } @Async @Override public void init() { getRouter().startParsing(); } @Override public void destroy() { synchronized (getOutConnections()) { for (Iterator i = getOutConnections().iterator(); i.hasNext();) { ConnectionOut c = i.next(); c.closeConnection(); i.remove(); } } synchronized (getInConnections()) { for (Iterator i = getInConnections().iterator(); i.hasNext();) { ConnectionIn c = i.next(); c.closeConnection(); i.remove(); } } try { closeRouterConnection(); } catch (IOException e) { logger.log(Level.WARNING, "router warning", e); } executorService.shutdown(); logger.info("component destroyed"); } public void closeRouterConnection() throws IOException { getRouter().logoff(); } public void sendJuickMessage(JuickMessage jmsg) { List jids = new ArrayList<>(); if (jmsg.FriendsOnly) { jids = SubscriptionsQueries.getJIDSubscribedToUser(getSql(), jmsg.getUser().getUID(), jmsg.FriendsOnly); } else { List users = SubscriptionsQueries.getSubscribedUsers(getSql(), jmsg.getUser().getUID(), jmsg.getMID()); for (User user : users) { for (String jid : UserQueries.getJIDsbyUID(getSql(), user.getUID())) { jids.add(jid); } } } String txt = "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; String attachment = jmsg.getAttachmentURL(); if (attachment != null) { txt += attachment + "\n"; } txt += jmsg.getText() + "\n\n"; txt += "#" + jmsg.getMID() + " http://juick.com/" + jmsg.getMID(); Nickname nick = new Nickname(); nick.Nickname = "@" + jmsg.getUser().getUName(); com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); msg.from = bot.JuickJID; msg.body = txt; msg.type = Message.Type.chat; msg.thread = "juick-" + jmsg.getMID(); msg.addChild(jmsg); msg.addChild(nick); if (attachment != null) { XOOB oob = new XOOB(); oob.URL = attachment; msg.addChild(oob); } for (String jid : jids) { msg.to = new JID(jid); sendOut(msg); } } public void sendJuickComment(JuickMessage jmsg) { List users; String replyQuote; String replyTo; users = SubscriptionsQueries.getUsersSubscribedToComments(getSql(), jmsg.getMID(), jmsg.getUser().getUID()); com.juick.Message replyMessage = jmsg.ReplyTo > 0 ? MessagesQueries.getReply(getSql(), jmsg.getMID(), jmsg.ReplyTo) : MessagesQueries.getMessage(getSql(), jmsg.getMID()); replyTo = replyMessage.getUser().getUName(); replyQuote = getReplyQuote(replyMessage); String txt = "Reply by @" + jmsg.getUser().getUName() + ":\n" + replyQuote + "\n@" + replyTo + " "; String attachment = jmsg.getAttachmentURL(); if (attachment != null) { txt += attachment + "\n"; } txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMID() + "/" + jmsg.getRID() + " http://juick.com/" + jmsg.getMID() + "#" + jmsg.getRID(); com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); msg.from = bot.JuickJID; msg.body = txt; msg.type = Message.Type.chat; msg.addChild(jmsg); for (User user : users) { for (String jid : UserQueries.getJIDsbyUID(getSql(), user.getUID())) { msg.to = new JID(jid); sendOut(msg); } } } private String getReplyQuote(com.juick.Message q) { String quote = q.getText(); if (quote.length() > 50) { quote = ">" + quote.substring(0, 47).replace('\n', ' ') + "...\n"; } else if (quote.length() > 0) { quote = ">" + quote.replace('\n', ' ') + "\n"; } return quote; } public void sendJuickRecommendation(JuickMessage recomm) { List users; JuickMessage jmsg; jmsg = new JuickMessage(MessagesQueries.getMessage(getSql(), recomm.getMID())); users = SubscriptionsQueries.getUsersSubscribedToUserRecommendations(getSql(), recomm.getUser().getUID(), recomm.getMID(), jmsg.getUser().getUID()); String txt = "Recommended by @" + recomm.getUser().getUName() + ":\n"; txt += "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; String attachment = jmsg.getAttachmentURL(); if (attachment != null) { txt += attachment + "\n"; } txt += jmsg.getText() + "\n\n"; txt += "#" + jmsg.getMID(); if (jmsg.Replies > 0) { if (jmsg.Replies % 10 == 1 && jmsg.Replies % 100 != 11) { txt += " (" + jmsg.Replies + " reply)"; } else { txt += " (" + jmsg.Replies + " replies)"; } } txt += " http://juick.com/" + jmsg.getMID(); Nickname nick = new Nickname(); nick.Nickname = "@" + jmsg.getUser().getUName(); com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); msg.from = bot.JuickJID; msg.body = txt; msg.type = Message.Type.chat; msg.thread = "juick-" + jmsg.getMID(); msg.addChild(jmsg); msg.addChild(nick); if (attachment != null) { XOOB oob = new XOOB(); oob.URL = attachment; msg.addChild(oob); } for (User user : users) { for (String jid : UserQueries.getJIDsbyUID(getSql(), user.getUID())) { msg.to = new JID(jid); sendOut(msg); } } } @Override public boolean onIq(Iq iq) { JID jid = iq.to; if (!jid.Host.equals(componentName)) { logger.info("STREAM ROUTER (IQ): " + iq.toString()); sendOut(iq); } return false; } @Override public void onMessage(Message xmsg) { logger.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); JID jid = xmsg.to; if (jid.Host.equals(componentName)) { if (jmsg != null) { if (jid.Username != null && jid.Username.equals("recomm")) { sendJuickRecommendation(jmsg); } else { if (jmsg.getRID() > 0) { sendJuickComment(jmsg); } else if (jmsg.getMID() > 0) { sendJuickMessage(jmsg); } } } } else { sendOut(xmsg); } } @Override public void onPresence(Presence presence) { JID jid = presence.to; if (!jid.Host.equals(componentName)) { logger.info("STREAM ROUTER (PRESENCE): " + presence.toString()); sendOut(presence); } } @Override public void onStreamReady() { logger.info("STREAM ROUTER (READY)"); } @Override public void onStreamFail(Exception ex) { logger.log(Level.SEVERE, "STREAM ROUTER (FAIL)", ex); } public StreamComponent getRouter() { return router; } public void setRouter(StreamComponent router) { this.router = router; } public List getInConnections() { return inConnections; } public List getOutConnections() { return outConnections; } public List getOutCache() { return outCache; } public JdbcTemplate getSql() { return sql; } public void setSql(JdbcTemplate sql) { this.sql = sql; } }