diff options
author | Vitaly Takmazov | 2016-02-03 12:35:59 +0300 |
---|---|---|
committer | Vitaly Takmazov | 2016-02-03 12:35:59 +0300 |
commit | 380018da475ff41d3375e7f2bea0a192a4d9b178 (patch) | |
tree | 6e6e38e109c73ff5bc233681143ee4ac2bff9a96 /src | |
parent | 36f542dad713d173102a60a1aa7e336e6db31200 (diff) |
single xmpp component, WIP
Diffstat (limited to 'src')
16 files changed, 607 insertions, 779 deletions
diff --git a/src/main/java/com/juick/CrosspostComponent.java b/src/main/java/com/juick/CrosspostComponent.java index b188fa16..6166f899 100644 --- a/src/main/java/com/juick/CrosspostComponent.java +++ b/src/main/java/com/juick/CrosspostComponent.java @@ -18,10 +18,6 @@ package com.juick; import com.juick.server.CrosspostQueries; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; -import com.juick.xmpp.Stream; -import com.juick.xmpp.StreamComponent; import com.juick.xmpp.extensions.JuickMessage; import com.juick.xmpp.utils.Base64; import org.apache.commons.lang3.tuple.Pair; @@ -30,94 +26,39 @@ import org.springframework.jdbc.core.JdbcTemplate; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import javax.net.ssl.HttpsURLConnection; -import javax.servlet.ServletContextEvent; -import javax.servlet.ServletContextListener; import java.io.*; -import java.net.Socket; import java.net.URL; import java.net.URLEncoder; import java.security.Key; -import java.sql.Connection; -import java.sql.Driver; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.Enumeration; import java.util.Properties; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.logging.Level; -import java.util.logging.LogManager; import java.util.logging.Logger; /** * * @author Ugnich Anton */ -public class CrosspostComponent implements ServletContextListener, Stream.StreamListener, Message.MessageListener { +public class CrosspostComponent implements JuickComponent { private static Logger logger = Logger.getLogger(CrosspostComponent.class.getName()); - private ExecutorService executorService; - public final static String TWITTERURL = "https://api.twitter.com/1.1/statuses/update.json"; public final static String FBURL = "https://graph.facebook.com/me/feed"; public final static String VKURL = "https://api.vk.com/method/wall.post"; + JdbcTemplate sql; - Stream xmpp; String twitter_consumer_key; String twitter_consumer_secret; - @Override - public void contextInitialized(final ServletContextEvent sce) { + public CrosspostComponent(JdbcTemplate sql, Properties conf) { logger.info("component initialized"); - executorService = Executors.newSingleThreadExecutor(); - executorService.submit((Runnable) () -> { - try { - Properties conf = new Properties(); - conf.load(sce.getServletContext().getResourceAsStream("WEB-INF/juick.conf")); - - LogManager.getLogManager().readConfiguration( - sce.getServletContext().getResourceAsStream("WEB-INF/logging.properties")); - twitter_consumer_key = conf.getProperty("twitter_consumer_key", ""); - twitter_consumer_secret = conf.getProperty("twitter_consumer_secret", ""); - - sql = (JdbcTemplate) sce.getServletContext().getAttribute("sql"); - setupXmppComponent(conf.getProperty("xmpp_password", "")); - } catch (Exception e) { - logger.log(Level.SEVERE, e.getMessage(), e); - } - }); - } - - @Override - public void contextDestroyed(ServletContextEvent sce) { - executorService.shutdown(); - logger.info("component destroyed"); - } - - public void setupXmppComponent(String password) { - try { - Socket socket = new Socket("localhost", 5347); - xmpp = new StreamComponent(new JID("", "crosspost.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), password); - xmpp.addChildParser(new JuickMessage()); - xmpp.addListener((Stream.StreamListener) this); - xmpp.addListener((Message.MessageListener) this); - xmpp.startParsing(); - } catch (IOException e) { - logger.log(Level.SEVERE, e.getMessage(), e); - } - } - - @Override - public void onStreamReady() { - logger.info("XMPP STREAM READY"); + twitter_consumer_key = conf.getProperty("twitter_consumer_key", ""); + twitter_consumer_secret = conf.getProperty("twitter_consumer_secret", ""); + this.sql = sql; } - - @Override - public void onStreamFail(Exception e) {logger.log(Level.SEVERE, "XMPP STREAM FAIL", e);} @Override - public void onMessage(com.juick.xmpp.Message msg) { + public void messageReceived(com.juick.xmpp.Message msg) { JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS); if (msg.to != null && msg.to.Username != null && jmsg != null && jmsg.getRID() == 0) { if (msg.to.Username.equals("twitter")) { diff --git a/src/main/java/com/juick/JuickComponent.java b/src/main/java/com/juick/JuickComponent.java new file mode 100644 index 00000000..24ee9717 --- /dev/null +++ b/src/main/java/com/juick/JuickComponent.java @@ -0,0 +1,8 @@ +package com.juick; + +/** + * Created by vt on 03/02/16. + */ +public interface JuickComponent { + void messageReceived(com.juick.xmpp.Message msg); +} diff --git a/src/main/java/com/juick/JuickNotificator.java b/src/main/java/com/juick/JuickNotificator.java new file mode 100644 index 00000000..c8fb949f --- /dev/null +++ b/src/main/java/com/juick/JuickNotificator.java @@ -0,0 +1,8 @@ +package com.juick; + +/** + * Created by vt on 03/02/16. + */ +public interface JuickNotificator { + void push (com.juick.xmpp.Message msg); +} diff --git a/src/main/java/com/juick/PushComponent.java b/src/main/java/com/juick/PushComponent.java index eb2230cb..3a96ff16 100644 --- a/src/main/java/com/juick/PushComponent.java +++ b/src/main/java/com/juick/PushComponent.java @@ -24,10 +24,6 @@ import com.google.android.gcm.server.Sender; import com.juick.json.MessageSerializer; import com.juick.server.PushQueries; import com.juick.server.SubscriptionsQueries; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message.MessageListener; -import com.juick.xmpp.Stream; -import com.juick.xmpp.StreamComponent; import com.juick.xmpp.extensions.JuickMessage; import com.juick.xmpp.utils.XmlUtils; import com.notnoop.apns.APNS; @@ -47,18 +43,12 @@ import org.apache.http.util.TextUtils; import org.json.JSONObject; import org.springframework.jdbc.core.JdbcTemplate; -import javax.servlet.ServletContextEvent; -import javax.servlet.ServletContextListener; import java.io.IOException; -import java.net.Socket; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.logging.Level; -import java.util.logging.LogManager; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -66,69 +56,24 @@ import java.util.stream.Collectors; * * @author Ugnich Anton */ -public class PushComponent implements ServletContextListener, Stream.StreamListener, MessageListener { +public class PushComponent implements JuickComponent { private static Logger logger = Logger.getLogger(PushComponent.class.getName()); - private ExecutorService executorService; String wns_application_sip; String wns_client_secret; JdbcTemplate sql; - Socket socket; - Stream xmpp; Sender GCMSender; - @Override - public void contextInitialized(final ServletContextEvent sce) { + public PushComponent(JdbcTemplate sql, Properties conf) { logger.info("component initialized"); - executorService = Executors.newSingleThreadExecutor(); - executorService.submit((Runnable) () -> { - Properties conf = new Properties(); - try { - conf.load(sce.getServletContext().getResourceAsStream("WEB-INF/juick.conf")); - LogManager.getLogManager().readConfiguration( - sce.getServletContext().getResourceAsStream("WEB-INF/logging.properties")); - wns_application_sip = conf.getProperty("wns_application_sip", ""); - wns_client_secret = conf.getProperty("wns_client_secret", ""); - GCMSender = new Sender(conf.getProperty("gcm_key")); - sql = (JdbcTemplate) sce.getServletContext().getAttribute("sql"); - setupXmppComponent(new JID("", conf.getProperty("push_jid"), ""), conf.getProperty("xmpp_host", "localhost"), - Integer.parseInt(conf.getProperty("xmpp_port", "5347")), conf.getProperty("push_xmpp_password", "")); - } catch (IOException e) { - logger.log(Level.SEVERE, e.getMessage(), e); - } - }); + wns_application_sip = conf.getProperty("wns_application_sip", ""); + wns_client_secret = conf.getProperty("wns_client_secret", ""); + GCMSender = new Sender(conf.getProperty("gcm_key")); + this.sql = sql; } - - @Override - public void contextDestroyed(ServletContextEvent sce) { - executorService.shutdown(); - logger.info("component destroyed"); - } - - public void setupXmppComponent(JID jid, String host, int port, String password) { - try { - socket = new Socket(host, port); - xmpp = new StreamComponent(jid, socket.getInputStream(), socket.getOutputStream(), password); - xmpp.addChildParser(new JuickMessage()); - xmpp.addListener((Stream.StreamListener) this); - xmpp.addListener((MessageListener) this); - xmpp.startParsing(); - } catch (IOException e) { - logger.log(Level.SEVERE, e.getMessage(), e); - } - } - - @Override - public void onStreamReady() { - logger.info("XMPP STREAM READY"); - } - - @Override - public void onStreamFail(Exception e) {logger.log(Level.SEVERE, "XMPP STREAM FAIL", e);} - @Override - public void onMessage(com.juick.xmpp.Message msg) { + public void messageReceived(com.juick.xmpp.Message msg) { JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS); if (jmsg == null) { return; diff --git a/src/main/java/com/juick/http/www/Main.java b/src/main/java/com/juick/http/www/Main.java index 85abed00..36376d3c 100644 --- a/src/main/java/com/juick/http/www/Main.java +++ b/src/main/java/com/juick/http/www/Main.java @@ -17,10 +17,15 @@ */ package com.juick.http.www; +import com.juick.CrosspostComponent; +import com.juick.JuickComponent; +import com.juick.JuickNotificator; +import com.juick.PushComponent; import com.juick.server.UserQueries; import com.juick.xmpp.JID; import com.juick.xmpp.Stream; import com.juick.xmpp.StreamComponent; +import com.juick.xmpp.s2s.S2SComponent; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.datasource.DriverManagerDataSource; import ru.sape.Sape; @@ -32,12 +37,20 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; -import java.net.Socket; +import java.net.InetSocketAddress; import java.net.URLEncoder; -import java.sql.Connection; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.Channels; +import java.nio.channels.CompletionHandler; +import java.sql.Driver; import java.sql.DriverManager; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * @@ -45,8 +58,8 @@ import java.util.Properties; */ @WebServlet(name = "Main", urlPatterns = {"/"}) @MultipartConfig(fileSizeThreshold = 1024 * 1024, maxRequestSize = 1024 * 1024 * 10) -public class Main extends HttpServlet implements Stream.StreamListener { - +public class Main extends HttpServlet implements JuickNotificator, Stream.StreamListener { + static ExecutorService executorService; JdbcTemplate sql; JdbcTemplate sqlSearch; Stream xmpp; @@ -64,6 +77,7 @@ public class Main extends HttpServlet implements Stream.StreamListener { SignUp signup = new SignUp(); Settings settings = new Settings(); RSS rss = new RSS(); + static List<JuickComponent> components = new ArrayList<>(); @Override public void init() throws ServletException { @@ -73,6 +87,9 @@ public class Main extends HttpServlet implements Stream.StreamListener { Properties conf = new Properties(); conf.load(getServletContext().getResourceAsStream("WEB-INF/juick.conf")); + executorService = Executors.newWorkStealingPool(); + getServletContext().setAttribute("es", executorService); + DriverManagerDataSource dataSource = new DriverManagerDataSource(); dataSource.setDriverClassName(conf.getProperty("datasource_driver", "com.mysql.jdbc.Driver")); dataSource.setUrl(conf.getProperty("datasource_url")); @@ -87,24 +104,43 @@ public class Main extends HttpServlet implements Stream.StreamListener { twitterAuth = new TwitterAuth(conf.getProperty("twitter_consumer_key"), conf.getProperty("twitter_consumer_secret")); PageTemplates.sape = new Sape(conf.getProperty("sape_user"), "juick.com", 2000, 3600); + components.add(new S2SComponent(sql, executorService, conf)); + components.add(new CrosspostComponent(sql, conf)); + components.add(new PushComponent(sql, conf)); } catch (Exception e) { - log(null, e); + log("www failed", e); + } + } + + @Override + public void push(com.juick.xmpp.Message msg) { + for (JuickComponent c : components) { + c.messageReceived(msg); } } public void setupXmppComponent(final String password) { - Thread thr = new Thread(() -> { + executorService.submit(() -> { try { - Socket socket = new Socket("localhost", 5347); - xmpp = new StreamComponent(new JID("", "www.juick.com", ""), socket.getInputStream(), - socket.getOutputStream(), password); - xmpp.addListener(Main.this); - xmpp.startParsing(); + AsynchronousSocketChannel socket = AsynchronousSocketChannel.open(); + socket.connect(new InetSocketAddress("localhost", 5347), socket, new CompletionHandler<Void, AsynchronousSocketChannel>() { + @Override + public void completed(Void result, AsynchronousSocketChannel attachment) { + xmpp = new StreamComponent(new JID("", "www.juick.com", ""), Channels.newInputStream(socket), + Channels.newOutputStream(socket), password); + xmpp.addListener(Main.this); + xmpp.startParsing(); + } + + @Override + public void failed(Throwable exc, AsynchronousSocketChannel attachment) { + log("www xmpp failed"); + } + }); } catch (IOException e) { log("xmpp exception", e); } }); - thr.start(); } @Override @@ -311,4 +347,30 @@ public class Main extends HttpServlet implements Stream.StreamListener { response.sendError(405); } } + + @Override + public void destroy() { + super.destroy(); + executorService.shutdown(); + // Now deregister JDBC drivers in this context's ClassLoader: + // Get the webapp's ClassLoader + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + // Loop through all drivers + Enumeration<Driver> drivers = DriverManager.getDrivers(); + while (drivers.hasMoreElements()) { + Driver driver = drivers.nextElement(); + if (driver.getClass().getClassLoader() == cl) { + // This driver was registered by the webapp's ClassLoader, so deregister it: + try { + log(String.format("Deregistering JDBC driver %s", driver.toString())); + DriverManager.deregisterDriver(driver); + } catch (SQLException ex) { + log(String.format("Error deregistering JDBC driver %s", driver), ex); + } + } else { + // driver was not registered by the webapp's ClassLoader and may be in use elsewhere + log(String.format("Not deregistering JDBC driver %s as it does not belong to this webapp's ClassLoader", driver)); + } + } + } } diff --git a/src/main/java/com/juick/http/www/NewMessage.java b/src/main/java/com/juick/http/www/NewMessage.java index 4d4ffde6..b3b63b17 100644 --- a/src/main/java/com/juick/http/www/NewMessage.java +++ b/src/main/java/com/juick/http/www/NewMessage.java @@ -31,6 +31,7 @@ import com.juick.xmpp.extensions.JuickUser; import com.juick.xmpp.extensions.Nickname; import com.juick.xmpp.extensions.XOOB; import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.namedparam.MapSqlParameterSource; import java.io.IOException; import java.io.PrintWriter; @@ -214,33 +215,32 @@ public class NewMessage { xoob.URL = attachmentURL; xmsg.addChild(xoob); } - if (xmpp != null) { - String tagsStr2 = ""; - for (String tag : tagsArr) { - tagsStr2 += " *" + tag; - } - xmsg.body = "@" + jmsg.getUser().getUName() + ":" + tagsStr2 + "\n" + body + "\n\n#" + mid + " http://juick.com/" + mid; - - xmsg.to = new JID("juick", "s2s.juick.com", null); - xmpp.send(xmsg); + String tagsStr2 = ""; + for (String tag : tagsArr) { + tagsStr2 += " *" + tag; + } + xmsg.body = "@" + jmsg.getUser().getUName() + ":" + tagsStr2 + "\n" + body + "\n\n#" + mid + " http://juick.com/" + mid; - xmsg.to.Host = "ws.juick.com"; - xmpp.send(xmsg); + xmsg.to = new JID("juick", "s2s.juick.com", null); + Main.push(xmsg); - xmsg.to.Host = "push.juick.com"; - xmpp.send(xmsg); + xmsg.to.Host = "push.juick.com"; + Main.push(xmsg); - xmsg.to.Host = "crosspost.juick.com"; - xmsg.to.Username = "twitter"; - xmpp.send(xmsg); - xmsg.to.Username = "fb"; - xmpp.send(xmsg); + xmsg.to.Host = "crosspost.juick.com"; + xmsg.to.Username = "twitter"; + Main.push(xmsg); + xmsg.to.Username = "fb"; + Main.push(xmsg); - xmsg.to.Host = "nologin.ru"; - xmsg.to.Username = "jubo"; + xmsg.to.Host = "nologin.ru"; + xmsg.to.Username = "jubo"; + Main.push(xmsg); + if (xmpp != null) { + xmsg.to.Host = "ws.juick.com"; xmpp.send(xmsg); } else { - logger.warning("XMPP is not available, users will not be notified"); + logger.warning("XMPP is not available, websockets will not be notified"); } response.setContentType("text/html; charset=UTF-8"); @@ -375,19 +375,19 @@ public class NewMessage { xmsg.addChild(xoob); } - if (xmpp != null) { - xmsg.body = "Reply by @" + jmsg.getUser().getUName() + ":\n>" + quote + "\n" + body + "\n\n#" + mid + "/" + ridnew + " http://juick.com/" + mid + "#" + ridnew; - xmsg.to = new JID("juick", "s2s.juick.com", null); - xmpp.send(xmsg); + xmsg.body = "Reply by @" + jmsg.getUser().getUName() + ":\n>" + quote + "\n" + body + "\n\n#" + mid + "/" + ridnew + " http://juick.com/" + mid + "#" + ridnew; - xmsg.to.Host = "ws.juick.com"; - xmpp.send(xmsg); + xmsg.to = new JID("juick", "s2s.juick.com", null); + Main.push(xmsg); + xmsg.to.Host = "push.juick.com"; + Main.push(xmsg); - xmsg.to.Host = "push.juick.com"; + if (xmpp != null) { + xmsg.to.Host = "ws.juick.com"; xmpp.send(xmsg); } else { - logger.warning("XMPP is not available, users will not be notified"); + logger.warning("XMPP is not available, websockets will not be notified"); } Utils.sendTemporaryRedirect(response, "/" + msg.getUser().getUName() + "/" + mid + "#" + ridnew); @@ -413,13 +413,12 @@ public class NewMessage { if (res) { Message xmsg = new Message(); - xmsg.from = new JID("juick", "juick.com", null); xmsg.to = new JID("recomm", "s2s.juick.com", null); JuickMessage jmsg = new JuickMessage(); jmsg.setMID(mid); jmsg.setUser(new JuickUser(visitor)); xmsg.addChild(jmsg); - xmpp.send(xmsg); + Main.push(xmsg); Utils.replyJSON(request, response, "{\"status\":\"ok\"}"); } else { diff --git a/src/main/java/com/juick/http/www/PM.java b/src/main/java/com/juick/http/www/PM.java index 1ee0b112..a6dfcec2 100644 --- a/src/main/java/com/juick/http/www/PM.java +++ b/src/main/java/com/juick/http/www/PM.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.text.SimpleDateFormat; import java.util.List; +import java.util.logging.Logger; /** * @@ -39,6 +40,8 @@ import java.util.List; */ public class PM { + private static final Logger logger = Logger.getLogger(PM.class.getName()); + private static final SimpleDateFormat sdfSQL = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); protected void doGetInbox(JdbcTemplate sql, HttpServletRequest request, HttpServletResponse response, com.juick.User visitor) throws ServletException, IOException { @@ -194,31 +197,35 @@ public class PM { } if (PMQueries.createPM(sql, visitor.getUID(), uid, body)) { - Message msg = new Message(); - msg.from = new JID("juick", "juick.com", null); - msg.to = new JID(Integer.toString(uid), "push.juick.com", null); - JuickMessage jmsg = new JuickMessage(); - jmsg.setUser(UserQueries.getUserByUID(sql, visitor.getUID())); - jmsg.setText(body); - msg.childs.add(jmsg); - xmpp.send(msg); - - msg.to.Host = "ws.juick.com"; - xmpp.send(msg); - - String jid = UserQueries.getJIDbyUID(sql, uid); - if (jid != null) { - Message mm = new Message(); - mm.to = new JID(jid); - mm.type = Message.Type.chat; - if (PMQueries.havePMinRoster(sql, visitor.getUID(), jid)) { - mm.from = new JID(jmsg.getUser().getUName(), "juick.com", "Juick"); - mm.body = body; - } else { - mm.from = new JID("juick", "juick.com", "Juick"); - mm.body = "Private message from @" + jmsg.getUser().getUName() + ":\n" + body; + if (xmpp != null) { + Message msg = new Message(); + msg.from = new JID("juick", "juick.com", null); + msg.to = new JID(Integer.toString(uid), "push.juick.com", null); + JuickMessage jmsg = new JuickMessage(); + jmsg.setUser(UserQueries.getUserByUID(sql, visitor.getUID())); + jmsg.setText(body); + msg.childs.add(jmsg); + xmpp.send(msg); + + msg.to.Host = "ws.juick.com"; + xmpp.send(msg); + + String jid = UserQueries.getJIDbyUID(sql, uid); + if (jid != null) { + Message mm = new Message(); + mm.to = new JID(jid); + mm.type = Message.Type.chat; + if (PMQueries.havePMinRoster(sql, visitor.getUID(), jid)) { + mm.from = new JID(jmsg.getUser().getUName(), "juick.com", "Juick"); + mm.body = body; + } else { + mm.from = new JID("juick", "juick.com", "Juick"); + mm.body = "Private message from @" + jmsg.getUser().getUName() + ":\n" + body; + } + xmpp.send(mm); } - xmpp.send(mm); + } else { + logger.warning("XMPP is not available"); } Utils.sendTemporaryRedirect(response, "/pm/sent"); diff --git a/src/main/java/com/juick/xmpp/s2s/JuickBot.java b/src/main/java/com/juick/xmpp/JuickBot.java index d82ed939..6104d19d 100644 --- a/src/main/java/com/juick/xmpp/s2s/JuickBot.java +++ b/src/main/java/com/juick/xmpp/JuickBot.java @@ -1,14 +1,14 @@ -package com.juick.xmpp.s2s; +package com.juick.xmpp; +import com.juick.JuickNotificator; import com.juick.User; import com.juick.server.PMQueries; import com.juick.server.TagQueries; import com.juick.server.UserQueries; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; -import com.juick.xmpp.Presence; import com.juick.xmpp.extensions.Error; import com.juick.xmpp.extensions.JuickMessage; +import com.juick.xmpp.s2s.S2SComponent; +import org.springframework.jdbc.core.JdbcTemplate; import java.util.List; import java.util.regex.Matcher; @@ -20,6 +20,16 @@ import java.util.regex.Pattern; */ public class JuickBot { + JdbcTemplate sql; + Stream xmpp; + JuickNotificator notificator; + + public JuickBot(JuickNotificator notificator, JdbcTemplate sql, Stream xmpp) { + this.notificator = notificator; + this.sql = sql; + this.xmpp = xmpp; + } + public static final JID JuickJID = new JID("juick", "juick.com", "Juick"); private static final String HELPTEXT = "@username text - Send private message\n" @@ -55,7 +65,7 @@ public class JuickBot { + "\n" + "Read more: http://juick.com/help/"; - public static boolean incomingPresence(Presence p) { + public boolean incomingPresence(Presence p) { final String username = p.to.Username.toLowerCase(); final boolean toJuick = username.equals("juick"); @@ -64,12 +74,12 @@ public class JuickBot { reply.from = new JID(p.to.Username, p.to.Host, null); reply.to = new JID(p.from.Username, p.from.Host, null); reply.type = Presence.Type.unsubscribe; - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); return true; } else if (p.type.equals(Presence.Type.probe)) { int uid_to = 0; if (!toJuick) { - uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, username); + uid_to = UserQueries.getUIDbyName(sql, username); } if (toJuick || uid_to > 0) { @@ -78,12 +88,12 @@ public class JuickBot { reply.from.Resource = "Juick"; reply.to = p.from; reply.priority = 10; - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } else { Presence reply = new Presence(p.to, p.from, Presence.Type.error); reply.id = p.id; reply.addChild(new Error(Error.Type.cancel, "item-not-found")); - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); return true; } return true; @@ -92,46 +102,46 @@ public class JuickBot { if (toJuick) { canSubscribe = true; } else { - int uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, username); + int uid_to = UserQueries.getUIDbyName(sql, username); if (uid_to > 0) { - PMQueries.addPMinRoster(XMPPComponent.sql, uid_to, p.from.Bare()); + PMQueries.addPMinRoster(sql, uid_to, p.from.Bare()); canSubscribe = true; } } if (canSubscribe) { Presence reply = new Presence(p.to, p.from, Presence.Type.subscribed); - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); reply.from.Resource = "Juick"; reply.priority = 10; reply.type = null; - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); return true; } else { Presence reply = new Presence(p.to, p.from, Presence.Type.error); reply.id = p.id; reply.addChild(new Error(Error.Type.cancel, "item-not-found")); - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); return true; } } else if (p.type.equals(Presence.Type.unsubscribe)) { if (!toJuick) { - int uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, username); + int uid_to = UserQueries.getUIDbyName(sql, username); if (uid_to > 0) { - PMQueries.removePMinRoster(XMPPComponent.sql, uid_to, p.from.Bare()); + PMQueries.removePMinRoster(sql, uid_to, p.from.Bare()); } } Presence reply = new Presence(p.to, p.from, Presence.Type.unsubscribed); - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } return false; } - public static boolean incomingMessage(Message msg) { + public boolean incomingMessage(Message msg) { if (msg.body == null || msg.body.isEmpty()) { return true; } @@ -140,9 +150,9 @@ public class JuickBot { User user_from = null; String signuphash = ""; - user_from = UserQueries.getUserByJID(XMPPComponent.sql, msg.from.Bare()); + user_from = UserQueries.getUserByJID(sql, msg.from.Bare()); if (user_from == null) { - signuphash = UserQueries.getSignUpHashByJID(XMPPComponent.sql, msg.from.Bare()); + signuphash = UserQueries.getSignUpHashByJID(sql, msg.from.Bare()); } if (user_from == null) { @@ -152,7 +162,7 @@ public class JuickBot { } else { reply.body = "Внимание, системное сообщение!\nВаш JabberID не обнаружен в списке доверенных. Для того, чтобы отправить сообщение пользователю " + username + "@juick.com, пожалуйста зарегистрируйте свой JabberID в системе: http://juick.com/signup?type=xmpp&hash=" + signuphash + "\nЕсли у вас уже есть учетная запись на Juick, вы сможете присоединить этот JabberID к ней.\n\nWarning, system message!\nYour JabberID is not found in our server's white list. To send a message to " + username + "@juick.com, please sign up: http://juick.com/signup?type=xmpp&hash=" + signuphash + "\nIf you already have an account on Juick, you will be proposed to attach this JabberID to your existing account."; } - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); return true; } @@ -160,41 +170,40 @@ public class JuickBot { return incomingMessageJuick(user_from, msg); } - int uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, username); + int uid_to = UserQueries.getUIDbyName(sql, username); if (uid_to == 0) { Message reply = new Message(msg.to, msg.from, Message.Type.error); reply.id = msg.id; reply.addChild(new Error(Error.Type.cancel, "item-not-found")); - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); return true; } boolean success = false; - if (!UserQueries.isInBLAny(XMPPComponent.sql, uid_to, user_from.getUID())) { - success = PMQueries.createPM(XMPPComponent.sql, user_from.getUID(), uid_to, msg.body); + if (!UserQueries.isInBLAny(sql, uid_to, user_from.getUID())) { + success = PMQueries.createPM(sql, user_from.getUID(), uid_to, msg.body); } if (success) { Message m = new Message(); - m.from = new JID("juick", "juick.com", null); m.to = new JID(Integer.toString(uid_to), "push.juick.com", null); JuickMessage jmsg = new JuickMessage(); - jmsg.setUser(UserQueries.getUserByUID(XMPPComponent.sql, user_from.getUID())); + jmsg.setUser(UserQueries.getUserByUID(sql, user_from.getUID())); jmsg.setText(msg.body); m.childs.add(jmsg); - XMPPComponent.connRouter.sendStanza(m.toString()); + notificator.push(m); m.to.Host = "ws.juick.com"; - XMPPComponent.connRouter.sendStanza(m.toString()); + xmpp.send(m); String jid; boolean inroster = false; - jid = UserQueries.getJIDbyUID(XMPPComponent.sql, uid_to); + jid = UserQueries.getJIDbyUID(sql, uid_to); if (jid != null) { - inroster = PMQueries.havePMinRoster(XMPPComponent.sql, user_from.getUID(), jid); + inroster = PMQueries.havePMinRoster(sql, user_from.getUID(), jid); } if (jid != null) { @@ -208,21 +217,21 @@ public class JuickBot { mm.from = new JID("juick", "juick.com", "Juick"); mm.body = "Private message from @" + jmsg.getUser().getUName() + ":\n" + msg.body; } - XMPPComponent.sendOut(mm); + S2SComponent.sendOut(mm); } } else { Message reply = new Message(msg.to, msg.from, Message.Type.error); reply.id = msg.id; reply.addChild(new Error(Error.Type.cancel, "not-allowed")); - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } return true; } private static Pattern regexPM = Pattern.compile("^\\@(\\S+)\\s+([\\s\\S]+)$"); - public static boolean incomingMessageJuick(User user_from, Message msg) { + public boolean incomingMessageJuick(User user_from, Message msg) { String command = msg.body.trim(); int commandlen = command.length(); @@ -262,26 +271,26 @@ public class JuickBot { private static void commandPing(Message m) { Presence p = new Presence(JuickJID, m.from); p.priority = 10; - XMPPComponent.sendOut(p); + S2SComponent.sendOut(p); Message reply = new Message(JuickJID, m.from, Message.Type.chat); reply.body = "PONG"; - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } private static void commandHelp(Message m) { Message reply = new Message(JuickJID, m.from, Message.Type.chat); reply.body = HELPTEXT; - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } - private static void commandLogin(Message m, User user_from) { + private void commandLogin(Message m, User user_from) { Message reply = new Message(JuickJID, m.from, Message.Type.chat); - reply.body = "http://juick.com/login?" + UserQueries.getHashByUID(XMPPComponent.sql, user_from.getUID()); - XMPPComponent.sendOut(reply); + reply.body = "http://juick.com/login?" + UserQueries.getHashByUID(sql, user_from.getUID()); + S2SComponent.sendOut(reply); } - private static void commandPM(Message m, User user_from, String user_to, String body) { + private void commandPM(Message m, User user_from, String user_to, String body) { int ret = 0; int uid_to = 0; @@ -289,17 +298,17 @@ public class JuickBot { boolean haveInRoster = false; if (user_to.indexOf('@') > 0) { - uid_to = UserQueries.getUIDbyJID(XMPPComponent.sql, user_to); + uid_to = UserQueries.getUIDbyJID(sql, user_to); } else { - uid_to = UserQueries.getUIDbyName(XMPPComponent.sql, user_to); + uid_to = UserQueries.getUIDbyName(sql, user_to); } if (uid_to > 0) { - if (!UserQueries.isInBLAny(XMPPComponent.sql, uid_to, user_from.getUID())) { - if (PMQueries.createPM(XMPPComponent.sql, user_from.getUID(), uid_to, body)) { - jid_to = UserQueries.getJIDbyUID(XMPPComponent.sql, uid_to); + if (!UserQueries.isInBLAny(sql, uid_to, user_from.getUID())) { + if (PMQueries.createPM(sql, user_from.getUID(), uid_to, body)) { + jid_to = UserQueries.getJIDbyUID(sql, uid_to); if (jid_to != null) { - haveInRoster = PMQueries.havePMinRoster(XMPPComponent.sql, user_from.getUID(), jid_to); + haveInRoster = PMQueries.havePMinRoster(sql, user_from.getUID(), jid_to); } ret = 200; } else { @@ -321,10 +330,10 @@ public class JuickBot { jmsg.setUser(user_from); jmsg.setText(body); msg.childs.add(jmsg); - XMPPComponent.connRouter.sendStanza(msg.toString()); + notificator.push(msg); msg.to.Host = "ws.juick.com"; - XMPPComponent.connRouter.sendStanza(msg.toString()); + xmpp.send(msg); if (jid_to != null) { Message mm = new Message(); @@ -337,7 +346,7 @@ public class JuickBot { mm.from = new JID("juick", "juick.com", "Juick"); mm.body = "Private message from @" + user_from.getUName() + ":\n" + body; } - XMPPComponent.sendOut(mm); + S2SComponent.sendOut(mm); } } @@ -349,15 +358,15 @@ public class JuickBot { reply.type = Message.Type.error; reply.body = "Error " + ret; } - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } - private static void commandBLShow(Message m, User user_from) { + private void commandBLShow(Message m, User user_from) { List<User> blusers; List<String> bltags; - blusers = UserQueries.getUserBLUsers(XMPPComponent.sql, user_from.getUID()); - bltags = TagQueries.getUserBLTags(XMPPComponent.sql, user_from.getUID()); + blusers = UserQueries.getUserBLUsers(sql, user_from.getUID()); + bltags = TagQueries.getUserBLTags(sql, user_from.getUID()); String txt = ""; @@ -381,6 +390,6 @@ public class JuickBot { Message reply = new Message(JuickJID, m.from, Message.Type.chat); reply.body = txt; - XMPPComponent.sendOut(reply); + S2SComponent.sendOut(reply); } } diff --git a/src/main/java/com/juick/xmpp/s2s/CleaningUp.java b/src/main/java/com/juick/xmpp/s2s/CleaningUp.java index 48771580..12ac7449 100644 --- a/src/main/java/com/juick/xmpp/s2s/CleaningUp.java +++ b/src/main/java/com/juick/xmpp/s2s/CleaningUp.java @@ -15,14 +15,14 @@ public class CleaningUp implements Runnable { public void run() { while (true) { try { - PrintWriter statsFile = new PrintWriter(XMPPComponent.STATSFILE, "UTF-8"); + PrintWriter statsFile = new PrintWriter(S2SComponent.STATSFILE, "UTF-8"); statsFile.write("<html><body><h2>Threads: " + Thread.activeCount() + "</h2>"); - statsFile.write("<h2>Out (" + XMPPComponent.outConnections.size() + ")</h2><table border=1><tr><th>to</th><th>sid</th><th>inactive</th><th>out packets</th><th>out bytes</th></tr>"); + statsFile.write("<h2>Out (" + S2SComponent.outConnections.size() + ")</h2><table border=1><tr><th>to</th><th>sid</th><th>inactive</th><th>out packets</th><th>out bytes</th></tr>"); long now = System.currentTimeMillis(); - synchronized (XMPPComponent.outConnections) { - for (Iterator<ConnectionOut> i = XMPPComponent.outConnections.iterator(); i.hasNext();) { + synchronized (S2SComponent.outConnections) { + for (Iterator<ConnectionOut> i = S2SComponent.outConnections.iterator(); i.hasNext();) { ConnectionOut c = i.next(); int inactive = (int) ((double) (now - c.tsLocalData) / 1000.0); if (inactive > 900) { @@ -40,10 +40,10 @@ public class CleaningUp implements Runnable { } } - statsFile.write("</table><h2>In (" + XMPPComponent.inConnections.size() + ")</h2><table border=1><tr><th>from</th><th>sid</th><th>inactive</th><th>in packets</th></tr>"); + statsFile.write("</table><h2>In (" + S2SComponent.inConnections.size() + ")</h2><table border=1><tr><th>from</th><th>sid</th><th>inactive</th><th>in packets</th></tr>"); - synchronized (XMPPComponent.inConnections) { - for (Iterator<ConnectionIn> i = XMPPComponent.inConnections.iterator(); i.hasNext();) { + synchronized (S2SComponent.inConnections) { + for (Iterator<ConnectionIn> i = S2SComponent.inConnections.iterator(); i.hasNext();) { ConnectionIn c = i.next(); int inactive = (int) ((double) (now - c.tsRemoteData) / 1000.0); if (inactive > 900) { @@ -73,10 +73,10 @@ public class CleaningUp implements Runnable { } } - statsFile.write("</table><h2>Cache (" + XMPPComponent.outCache.size() + ")</h2><table border=1><tr><th>host</th><th>live</th><th>size</th></tr>"); + statsFile.write("</table><h2>Cache (" + S2SComponent.outCache.size() + ")</h2><table border=1><tr><th>host</th><th>live</th><th>size</th></tr>"); - synchronized (XMPPComponent.outCache) { - for (Iterator<CacheEntry> i = XMPPComponent.outCache.iterator(); i.hasNext();) { + synchronized (S2SComponent.outCache) { + for (Iterator<CacheEntry> i = S2SComponent.outCache.iterator(); i.hasNext();) { CacheEntry c = i.next(); int inactive = (int) ((double) (now - c.tsCreated) / 1000.0); if (inactive > 600) { diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java index c215e375..a8572ddc 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java @@ -1,9 +1,6 @@ package com.juick.xmpp.s2s; -import com.juick.xmpp.Iq; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; -import com.juick.xmpp.Presence; +import com.juick.xmpp.*; import com.juick.xmpp.utils.XmlUtils; import org.xmlpull.v1.XmlPullParser; @@ -31,9 +28,14 @@ public class ConnectionIn extends Connection { public long tsRemoteData = 0; public long packetsRemote = 0; - public ConnectionIn(AsynchronousSocketChannel socket) { + JuickBot bot; + Stream xmpp; + + public ConnectionIn(AsynchronousSocketChannel socket, JuickBot bot, Stream xmpp) { super(); this.socket = socket; + this.bot = bot; + this.xmpp = xmpp; streamID = UUID.randomUUID().toString(); } @@ -57,7 +59,7 @@ public class ConnectionIn extends Connection { String openStream = "<?xml version='1.0'?><stream:stream xmlns='jabber:server' " + "xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" + - XMPPComponent.HOSTNAME + "' id='" + streamID + "' version='1.0'>"; + S2SComponent.HOSTNAME + "' id='" + streamID + "' version='1.0'>"; if (xmppversionnew) { openStream += "<stream:features></stream:features>"; } @@ -77,14 +79,14 @@ public class ConnectionIn extends Connection { String dfrom = parser.getAttributeValue(null, "from"); String to = parser.getAttributeValue(null, "to"); LOGGER.info("STREAM FROM " + dfrom + " TO " + to + " " + streamID + " ASKING FOR DIALBACK"); - if (dfrom.endsWith(XMPPComponent.HOSTNAME) && (dfrom.equals(XMPPComponent.HOSTNAME) || dfrom.endsWith("." + XMPPComponent.HOSTNAME))) { + if (dfrom.endsWith(S2SComponent.HOSTNAME) && (dfrom.equals(S2SComponent.HOSTNAME) || dfrom.endsWith("." + S2SComponent.HOSTNAME))) { break; } - if (dfrom != null && to != null && to.equals(XMPPComponent.HOSTNAME)) { + if (dfrom != null && to != null && to.equals(S2SComponent.HOSTNAME)) { String dbKey = XmlUtils.getTagText(parser); updateTsRemoteData(); - ConnectionOut c = XMPPComponent.getConnectionOut(dfrom, false); + ConnectionOut c = S2SComponent.getConnectionOut(dfrom, false); if (c != null) { c.sendDialbackVerify(streamID, dbKey); } else { @@ -115,15 +117,15 @@ public class ConnectionIn extends Connection { } else if (tag.equals("presence") && checkFromTo(parser)) { Presence p = Presence.parse(parser, null); if (p != null && (p.type == null || !p.type.equals(Presence.Type.error))) { - JuickBot.incomingPresence(p); + bot.incomingPresence(p); } } else if (tag.equals("message") && checkFromTo(parser)) { updateTsRemoteData(); - Message msg = Message.parse(parser, XMPPComponent.childParsers); + Message msg = Message.parse(parser, S2SComponent.childParsers); if (msg != null && (msg.type == null || !msg.type.equals(Message.Type.error))) { LOGGER.info("STREAM " + streamID + ": " + msg.toString()); - if (!JuickBot.incomingMessage(msg)) { - XMPPComponent.connRouter.sendStanza(msg.toString()); + if (!bot.incomingMessage(msg)) { + xmpp.send(msg); } } } else if (tag.equals("iq") && checkFromTo(parser)) { @@ -132,22 +134,22 @@ public class ConnectionIn extends Connection { String xml = XmlUtils.parseToString(parser, true); if (type == null || !type.equals(Iq.Type.error)) { LOGGER.info("STREAM " + streamID + ": " + xml); - XMPPComponent.connRouter.sendStanza(xml); + xmpp.send(xml); } } else { LOGGER.info("STREAM " + streamID + ": " + XmlUtils.parseToString(parser, true)); } } LOGGER.warning("STREAM " + streamID + " FINISHED"); - XMPPComponent.removeConnectionIn(this); + S2SComponent.removeConnectionIn(this); closeConnection(); } catch (EOFException ex) { LOGGER.info(String.format("STREAM %s CLOSED (dirty)", streamID)); - XMPPComponent.removeConnectionIn(this); + S2SComponent.removeConnectionIn(this); closeConnection(); } catch (Exception e) { LOGGER.log(Level.WARNING, "STREAM " + streamID + " ERROR", e); - XMPPComponent.removeConnectionIn(this); + S2SComponent.removeConnectionIn(this); closeConnection(); } } @@ -158,7 +160,7 @@ public class ConnectionIn extends Connection { public void sendDialbackResult(String sfrom, String type) { try { - sendStanza("<db:result from='" + XMPPComponent.HOSTNAME + "' to='" + sfrom + "' type='" + type + "'/>"); + sendStanza("<db:result from='" + S2SComponent.HOSTNAME + "' to='" + sfrom + "' type='" + type + "'/>"); if (type.equals("valid")) { from.add(sfrom); LOGGER.info("STREAM FROM " + sfrom + " " + streamID + " READY"); @@ -173,7 +175,7 @@ public class ConnectionIn extends Connection { String cto = parser.getAttributeValue(null, "to"); if (cfrom != null && cto != null && !cfrom.isEmpty() && !cto.isEmpty()) { JID jidto = new JID(cto); - if (jidto.Host != null && jidto.Username != null && jidto.Host.equals(XMPPComponent.HOSTNAME) && jidto.Username.matches("^[a-zA-Z0-9\\-]{2,16}$")) { + if (jidto.Host != null && jidto.Username != null && jidto.Host.equals(S2SComponent.HOSTNAME) && jidto.Username.matches("^[a-zA-Z0-9\\-]{2,16}$")) { JID jidfrom = new JID(cfrom); int size = from.size(); for (int i = 0; i < size; i++) { diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java index 02a2be39..28ff48f9 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java @@ -1,11 +1,13 @@ package com.juick.xmpp.s2s; +import com.juick.xmpp.JuickBot; +import com.juick.xmpp.Stream; + import java.net.InetSocketAddress; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; @@ -17,6 +19,16 @@ public class ConnectionListener implements Runnable { private static final Logger logger = Logger.getLogger(ConnectionListener.class.getName()); + ExecutorService executorService; + JuickBot bot; + Stream xmpp; + + public ConnectionListener(ExecutorService executorService, JuickBot bot, Stream xmpp) { + this.executorService = executorService; + this.bot = bot; + this.xmpp = xmpp; + } + @Override public void run() { try { @@ -26,9 +38,9 @@ public class ConnectionListener implements Runnable { listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { @Override public void completed(AsynchronousSocketChannel result, Object attachment) { - listener.accept(XMPPComponent.executorService, this); - ConnectionIn client = new ConnectionIn(result); - XMPPComponent.addConnectionIn(client); + listener.accept(executorService, this); + ConnectionIn client = new ConnectionIn(result, bot, xmpp); + S2SComponent.addConnectionIn(client); client.parseStream(); } diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java index 59fdfb60..d4018ae1 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java @@ -7,14 +7,12 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.InetSocketAddress; -import java.net.Socket; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channels; import java.nio.channels.CompletionHandler; import java.util.logging.Level; import org.xmlpull.v1.XmlPullParser; -import org.xmlpull.v1.XmlPullParserException; /** * @@ -55,7 +53,7 @@ public class ConnectionOut extends Connection { sendStanza("<?xml version='1.0'?><stream:stream xmlns='jabber:server' " + "xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" + - XMPPComponent.HOSTNAME + "' to='" + to + "' version='1.0'>"); + S2SComponent.HOSTNAME + "' to='" + to + "' version='1.0'>"); parser.next(); // stream:stream streamID = parser.getAttributeValue(null, "id"); @@ -64,13 +62,13 @@ public class ConnectionOut extends Connection { } LOGGER.info("STREAM TO " + to + " " + streamID + " OPEN"); - XMPPComponent.addConnectionOut(ConnectionOut.this); + S2SComponent.addConnectionOut(ConnectionOut.this); if (checkSID != null) { sendDialbackVerify(checkSID, dbKey); } - sendStanza("<db:result from='" + XMPPComponent.HOSTNAME + "' to='" + to + "'>" + generateDialbackKey(to, XMPPComponent.HOSTNAME, streamID) + "</db:result>"); + sendStanza("<db:result from='" + S2SComponent.HOSTNAME + "' to='" + to + "'>" + generateDialbackKey(to, S2SComponent.HOSTNAME, streamID) + "</db:result>"); while (parser.next() != XmlPullParser.END_DOCUMENT) { if (parser.getEventType() != XmlPullParser.START_TAG) { @@ -85,7 +83,7 @@ public class ConnectionOut extends Connection { streamReady = true; LOGGER.info("STREAM TO " + to + " " + streamID + " READY"); - String cache = XMPPComponent.getFromCache(to); + String cache = S2SComponent.getFromCache(to); if (cache != null) { LOGGER.info("STREAM TO " + to + " " + streamID + " SENDING CACHE"); sendStanza(cache); @@ -100,7 +98,7 @@ public class ConnectionOut extends Connection { String type = parser.getAttributeValue(null, "type"); String sid = parser.getAttributeValue(null, "id"); if (from != null && from.equals(to) && sid != null && !sid.isEmpty() && type != null) { - ConnectionIn c = XMPPComponent.getConnectionIn(sid); + ConnectionIn c = S2SComponent.getConnectionIn(sid); if (c != null) { c.sendDialbackResult(from, type); } @@ -112,15 +110,15 @@ public class ConnectionOut extends Connection { } LOGGER.warning("STREAM TO " + to + " " + streamID + " FINISHED"); - XMPPComponent.removeConnectionOut(ConnectionOut.this); + S2SComponent.removeConnectionOut(ConnectionOut.this); closeConnection(); } catch (EOFException eofex) { LOGGER.info(String.format("STREAM %s %s CLOSED (dirty)", to, streamID)); - XMPPComponent.removeConnectionOut(ConnectionOut.this); + S2SComponent.removeConnectionOut(ConnectionOut.this); closeConnection(); } catch (Exception e) { LOGGER.log(Level.SEVERE, "s2s out exception", e); - XMPPComponent.removeConnectionOut(ConnectionOut.this); + S2SComponent.removeConnectionOut(ConnectionOut.this); closeConnection(); } } @@ -128,20 +126,20 @@ public class ConnectionOut extends Connection { @Override public void failed(Throwable exc, AsynchronousSocketChannel attachment) { LOGGER.log(Level.WARNING, "s2s out failed", exc); - XMPPComponent.removeConnectionOut(ConnectionOut.this); + S2SComponent.removeConnectionOut(ConnectionOut.this); closeConnection(); } }); } catch (Exception e) { LOGGER.warning(e.toString()); - XMPPComponent.removeConnectionOut(this); + S2SComponent.removeConnectionOut(this); closeConnection(); } } public void sendDialbackVerify(String sid, String key) { try { - sendStanza("<db:verify from='" + XMPPComponent.HOSTNAME + "' to='" + to + "' id='" + sid + "'>" + key + "</db:verify>"); + sendStanza("<db:verify from='" + S2SComponent.HOSTNAME + "' to='" + to + "' id='" + sid + "'>" + key + "</db:verify>"); } catch (IOException e) { LOGGER.log(Level.WARNING, "STREAM TO " + to + " " + streamID + " ERROR", e); } diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java deleted file mode 100644 index c96a5cce..00000000 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java +++ /dev/null @@ -1,283 +0,0 @@ -package com.juick.xmpp.s2s; - -import com.juick.User; -import com.juick.server.MessagesQueries; -import com.juick.server.SubscriptionsQueries; -import com.juick.server.UserQueries; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; -import com.juick.xmpp.extensions.JuickMessage; -import com.juick.xmpp.extensions.Nickname; -import com.juick.xmpp.extensions.XOOB; -import com.juick.xmpp.utils.SHA1; -import com.juick.xmpp.utils.XmlUtils; -import org.springframework.jdbc.core.JdbcTemplate; -import org.xmlpull.v1.XmlPullParser; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.net.InetSocketAddress; -import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.Channels; -import java.nio.channels.CompletionHandler; -import java.util.List; -import java.util.logging.Level; - -/** - * - * @author ugnich - */ -public class ConnectionRouter extends Connection implements Runnable { - - private String componentName; - - ConnectionRouter(String componentName) { - this.componentName = componentName; - } - - @Override - public void run() { - LOGGER.info("STREAM ROUTER START"); - - try { - socket = AsynchronousSocketChannel.open(); - socket.connect(new InetSocketAddress("localhost", 5347), socket, new CompletionHandler<Void, AsynchronousSocketChannel>() { - @Override - public void completed(Void result, AsynchronousSocketChannel client) { - try { - parser.setInput(new InputStreamReader(Channels.newInputStream(client))); - - parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); - writer = new OutputStreamWriter(Channels.newOutputStream(client)); - - String msg = "<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' to='s2s'>"; - writer.write(msg); - writer.flush(); - - parser.next(); // stream:stream - streamID = parser.getAttributeValue(null, "id"); - if (streamID == null || streamID.isEmpty()) { - throw new Exception("FAIL ON FIRST PACKET"); - } - - msg = "<handshake>" + SHA1.encode(streamID + "secret") + "</handshake>"; - writer.write(msg); - writer.flush(); - - parser.next(); - if (!parser.getName().equals("handshake")) { - throw new Exception("NO HANDSHAKE"); - } - XmlUtils.skip(parser); - LOGGER.info("STREAM ROUTER OPEN"); - - while (parser.next() != XmlPullParser.END_DOCUMENT) { - if (parser.getEventType() != XmlPullParser.START_TAG) { - continue; - } - - String tag = parser.getName(); - String to = parser.getAttributeValue(null, "to"); - if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { - JID jid = new JID(to); - if (jid.Host != null) { - if (jid.Host.equals(componentName)) { - if (tag.equals("message")) { - Message xmsg = Message.parse(parser, XMPPComponent.childParsers); - LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); - JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); - if (jmsg != null) { - if (jid.Username != null && jid.Username.equals("recomm")) { - sendJuickRecommendation(jmsg); - } else { - if (jmsg.getRID() > 0) { - sendJuickComment(jmsg); - } else if (jmsg.getMID() > 0) { - sendJuickMessage(jmsg); - } - } - } - } - } else if (jid.Host.endsWith(XMPPComponent.HOSTNAME) && (jid.Host.equals(XMPPComponent.HOSTNAME) || jid.Host.endsWith("." + XMPPComponent.HOSTNAME))) { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER: " + xml); - } else { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER (OUT): " + xml); - XMPPComponent.sendOut(jid.Host, xml); - } - } else { - LOGGER.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); - } - } else { - LOGGER.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); - } - } - - LOGGER.warning("STREAM ROUTER FINISHED"); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "xmpp router exception", e); - } - } - - @Override - public void failed(Throwable exc, AsynchronousSocketChannel attachment) { - LOGGER.log(Level.WARNING, "s2s component failed to connect", exc); - } - }); - Thread.currentThread().join(); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "NIO2 error", e); - } - - } - - @Override - synchronized public void sendStanza(String xml) { - try { - writer.write(xml); - writer.flush(); - } catch (IOException e) { - LOGGER.warning("STREAM ROUTER ERROR: " + xml); - LOGGER.warning("STREAM ROUTER ERROR: " + e.toString()); - System.exit(0); - } - } - - public void sendJuickMessage(JuickMessage jmsg) { - List<User> users; - - - if (jmsg.FriendsOnly) { - users = SubscriptionsQueries.getUsersSubscribedToUser(XMPPComponent.sql, jmsg.getUser().getUID(), jmsg.FriendsOnly); - } else { - users = SubscriptionsQueries.getSubscribedUsers(XMPPComponent.sql, jmsg.getUser().getUID(), jmsg.getMID()); - } - - - String txt = "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; - String attachment = jmsg.getAttachmentURL(); - if (attachment != null) { - txt += attachment + "\n"; - } - txt += jmsg.getText() + "\n\n"; - txt += "#" + jmsg.getMID() + " http://juick.com/" + jmsg.getMID(); - - Nickname nick = new Nickname(); - nick.Nickname = "@" + jmsg.getUser().getUName(); - - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = JuickBot.JuickJID; - msg.body = txt; - msg.type = Message.Type.chat; - msg.thread = "juick-" + jmsg.getMID(); - msg.addChild(jmsg); - msg.addChild(nick); - if (attachment != null) { - XOOB oob = new XOOB(); - oob.URL = attachment; - msg.addChild(oob); - } - for (User user : users) { - for (String jid : UserQueries.getActiveJIDs(XMPPComponent.sql, user)) { - msg.to = new JID(jid); - XMPPComponent.sendOut(msg); - } - } - } - - public void sendJuickComment(JuickMessage jmsg) { - String replyQuote; - - List<User> users = SubscriptionsQueries.getSubscribedUsers(XMPPComponent.sql, jmsg.getUser().getUID(), jmsg.getMID()); - replyQuote = getReplyQuote(XMPPComponent.sql, jmsg.getMID(), jmsg.ReplyTo); - - String txt = "Reply by @" + jmsg.getUser().getUName() + ":\n" + replyQuote + "\n"; - String attachment = jmsg.getAttachmentURL(); - if (attachment != null) { - txt += attachment + "\n"; - } - txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMID() + "/" + jmsg.getRID() + " http://juick.com/" + jmsg.getMID() + "#" + jmsg.getRID(); - - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = JuickBot.JuickJID; - msg.body = txt; - msg.type = Message.Type.chat; - msg.addChild(jmsg); - for (User user : users) { - // TODO: make single query - for (String jid : UserQueries.getActiveJIDs(XMPPComponent.sql, user)) { - msg.to = new JID(jid); - XMPPComponent.sendOut(msg); - } - } - } - - private String getReplyQuote(JdbcTemplate sql, int MID, int ReplyTo) { - String quote = ""; - if (ReplyTo > 0) { - com.juick.Message q = MessagesQueries.getReply(sql, MID, ReplyTo); - if (q != null) { - quote = q.getText(); - } - } else { - com.juick.Message q = MessagesQueries.getMessage(sql, MID); - if (q != null) { - quote = q.getText(); - } - } - if (quote.length() > 50) { - quote = ">" + quote.substring(0, 47).replace('\n', ' ') + "...\n"; - } else if (quote.length() > 0) { - quote = ">" + quote.replace('\n', ' ') + "\n"; - } - return quote; - } - - public void sendJuickRecommendation(JuickMessage recomm) { - JuickMessage jmsg; - jmsg = new JuickMessage(MessagesQueries.getMessage(XMPPComponent.sql, recomm.getMID())); - List<User> users = SubscriptionsQueries.getUsersSubscribedToComments(XMPPComponent.sql, - recomm.getMID(), jmsg.getUser().getUID()); - - String txt = "Recommended by @" + recomm.getUser().getUName() + ":\n"; - txt += "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; - String attachment = jmsg.getAttachmentURL(); - if (attachment != null) { - txt += attachment + "\n"; - } - txt += jmsg.getText() + "\n\n"; - txt += "#" + jmsg.getMID(); - if (jmsg.Replies > 0) { - if (jmsg.Replies % 10 == 1 && jmsg.Replies % 100 != 11) { - txt += " (" + jmsg.Replies + " reply)"; - } else { - txt += " (" + jmsg.Replies + " replies)"; - } - } - txt += " http://juick.com/" + jmsg.getMID(); - - Nickname nick = new Nickname(); - nick.Nickname = "@" + jmsg.getUser().getUName(); - - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = JuickBot.JuickJID; - msg.body = txt; - msg.type = Message.Type.chat; - msg.thread = "juick-" + jmsg.getMID(); - msg.addChild(jmsg); - msg.addChild(nick); - if (attachment != null) { - XOOB oob = new XOOB(); - oob.URL = attachment; - msg.addChild(oob); - } - for (User user : users) { - for (String jid : UserQueries.getActiveJIDs(XMPPComponent.sql, user)) { - msg.to = new JID(jid); - XMPPComponent.sendOut(msg); - } - } - } -} diff --git a/src/main/java/com/juick/xmpp/s2s/S2SComponent.java b/src/main/java/com/juick/xmpp/s2s/S2SComponent.java new file mode 100644 index 00000000..dcb547fb --- /dev/null +++ b/src/main/java/com/juick/xmpp/s2s/S2SComponent.java @@ -0,0 +1,316 @@ +package com.juick.xmpp.s2s; + +import com.juick.JuickComponent; +import com.juick.User; +import com.juick.server.MessagesQueries; +import com.juick.server.SubscriptionsQueries; +import com.juick.server.UserQueries; +import com.juick.xmpp.*; +import com.juick.xmpp.extensions.JuickMessage; +import com.juick.xmpp.extensions.Nickname; +import com.juick.xmpp.extensions.XOOB; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.logging.Logger; + +/** + * + * @author ugnich + */ +public class S2SComponent implements JuickComponent { + + private static final Logger LOGGER = Logger.getLogger(S2SComponent.class.getName()); + + ExecutorService executorService; + + public static String HOSTNAME = null; + public static String componentName = null; + public static String STATSFILE = null; + static final List<ConnectionIn> inConnections = Collections.synchronizedList(new ArrayList<>()); + static final List<ConnectionOut> outConnections = Collections.synchronizedList(new ArrayList<>()); + static final List<CacheEntry> outCache = Collections.synchronizedList(new ArrayList<>()); + JdbcTemplate sql; + final public static HashMap<String, StanzaChild> childParsers = new HashMap<>(); + + public static void addConnectionIn(ConnectionIn c) { + synchronized (inConnections) { + inConnections.add(c); + } + } + + public static void addConnectionOut(ConnectionOut c) { + synchronized (outConnections) { + outConnections.add(c); + } + } + + public static void removeConnectionIn(ConnectionIn c) { + synchronized (inConnections) { + inConnections.remove(c); + } + } + + public static void removeConnectionOut(ConnectionOut c) { + synchronized (outConnections) { + outConnections.remove(c); + } + } + + public static String getFromCache(String hostname) { + CacheEntry ret = null; + synchronized (outCache) { + for (Iterator<CacheEntry> i = outCache.iterator(); i.hasNext();) { + CacheEntry c = i.next(); + if (c.hostname != null && c.hostname.equals(hostname)) { + ret = c; + i.remove(); + break; + } + } + } + return (ret != null) ? ret.xml : null; + } + + public static ConnectionOut getConnectionOut(String hostname, boolean needReady) { + synchronized (outConnections) { + for (ConnectionOut c : outConnections) { + if (c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)) { + return c; + } + } + } + return null; + } + + public static ConnectionIn getConnectionIn(String streamID) { + synchronized (inConnections) { + for (ConnectionIn c : inConnections) { + if (c.streamID != null && c.streamID.equals(streamID)) { + return c; + } + } + } + return null; + } + + public void sendOut(Stanza s) { + sendOut(s.to.Host, s.toString()); + } + + public void sendOut(String hostname, String xml) { + boolean haveAnyConn = false; + + ConnectionOut connOut = null; + synchronized (outConnections) { + for (ConnectionOut c : outConnections) { + if (c.to != null && c.to.equals(hostname)) { + if (c.streamReady) { + connOut = c; + break; + } else { + haveAnyConn = true; + break; + } + } + } + } + if (connOut != null) { + try { + connOut.sendStanza(xml); + } catch (IOException e) { + LOGGER.warning("STREAM TO " + connOut.to + " " + connOut.streamID + " ERROR: " + e.toString()); + } + return; + } + + boolean haveCache = false; + synchronized (outCache) { + for (CacheEntry c : outCache) { + if (c.hostname != null && c.hostname.equals(hostname)) { + c.xml += xml; + c.tsUpdated = System.currentTimeMillis(); + haveCache = true; + break; + } + } + if (!haveCache) { + outCache.add(new CacheEntry(hostname, xml)); + } + } + + if (!haveAnyConn) { + ConnectionOut connectionOut = new ConnectionOut(hostname); + connectionOut.parseStream(); + } + } + + public S2SComponent(JdbcTemplate sql, ExecutorService executorService, Properties conf) { + LOGGER.info("component initialized"); + HOSTNAME = conf.getProperty("hostname"); + componentName = conf.getProperty("componentname"); + STATSFILE = conf.getProperty("statsfile"); + this.sql = sql; + this.executorService = executorService; + executorService.submit(new ConnectionListener(executorService)); + executorService.submit(new CleaningUp()); + } + @Override + public void messageReceived(Message xmsg) { + JID jid = xmsg.to; + if (jid.Host.equals(componentName)) { + LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); + + JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + if (jid.Username != null && jid.Username.equals("recomm")) { + sendJuickRecommendation(jmsg); + } else { + if (jmsg.getRID() > 0) { + sendJuickComment(jmsg); + } else if (jmsg.getMID() > 0) { + sendJuickMessage(jmsg); + } + } + } + } + } + + public void sendJuickMessage(JuickMessage jmsg) { + List<User> users; + + + if (jmsg.FriendsOnly) { + users = SubscriptionsQueries.getUsersSubscribedToUser(sql, jmsg.getUser().getUID(), jmsg.FriendsOnly); + } else { + users = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()); + } + + + String txt = "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n"; + txt += "#" + jmsg.getMID() + " http://juick.com/" + jmsg.getMID(); + + Nickname nick = new Nickname(); + nick.Nickname = "@" + jmsg.getUser().getUName(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = JuickBot.JuickJID; + msg.body = txt; + msg.type = Message.Type.chat; + msg.thread = "juick-" + jmsg.getMID(); + msg.addChild(jmsg); + msg.addChild(nick); + if (attachment != null) { + XOOB oob = new XOOB(); + oob.URL = attachment; + msg.addChild(oob); + } + for (User user : users) { + for (String jid : UserQueries.getActiveJIDs(sql, user)) { + msg.to = new JID(jid); + sendOut(msg); + } + } + } + + public void sendJuickComment(JuickMessage jmsg) { + String replyQuote; + + List<User> users = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()); + replyQuote = getReplyQuote(sql, jmsg.getMID(), jmsg.ReplyTo); + + String txt = "Reply by @" + jmsg.getUser().getUName() + ":\n" + replyQuote + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMID() + "/" + jmsg.getRID() + " http://juick.com/" + jmsg.getMID() + "#" + jmsg.getRID(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = JuickBot.JuickJID; + msg.body = txt; + msg.type = Message.Type.chat; + msg.addChild(jmsg); + for (User user : users) { + // TODO: make single query + for (String jid : UserQueries.getActiveJIDs(sql, user)) { + msg.to = new JID(jid); + sendOut(msg); + } + } + } + + private String getReplyQuote(JdbcTemplate sql, int MID, int ReplyTo) { + String quote = ""; + if (ReplyTo > 0) { + com.juick.Message q = MessagesQueries.getReply(sql, MID, ReplyTo); + if (q != null) { + quote = q.getText(); + } + } else { + com.juick.Message q = MessagesQueries.getMessage(sql, MID); + if (q != null) { + quote = q.getText(); + } + } + if (quote.length() > 50) { + quote = ">" + quote.substring(0, 47).replace('\n', ' ') + "...\n"; + } else if (quote.length() > 0) { + quote = ">" + quote.replace('\n', ' ') + "\n"; + } + return quote; + } + + public void sendJuickRecommendation(JuickMessage recomm) { + JuickMessage jmsg; + jmsg = new JuickMessage(MessagesQueries.getMessage(sql, recomm.getMID())); + List<User> users = SubscriptionsQueries.getUsersSubscribedToComments(sql, + recomm.getMID(), jmsg.getUser().getUID()); + + String txt = "Recommended by @" + recomm.getUser().getUName() + ":\n"; + txt += "@" + jmsg.getUser().getUName() + ":" + jmsg.getTagsString() + "\n"; + String attachment = jmsg.getAttachmentURL(); + if (attachment != null) { + txt += attachment + "\n"; + } + txt += jmsg.getText() + "\n\n"; + txt += "#" + jmsg.getMID(); + if (jmsg.Replies > 0) { + if (jmsg.Replies % 10 == 1 && jmsg.Replies % 100 != 11) { + txt += " (" + jmsg.Replies + " reply)"; + } else { + txt += " (" + jmsg.Replies + " replies)"; + } + } + txt += " http://juick.com/" + jmsg.getMID(); + + Nickname nick = new Nickname(); + nick.Nickname = "@" + jmsg.getUser().getUName(); + + com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); + msg.from = JuickBot.JuickJID; + msg.body = txt; + msg.type = Message.Type.chat; + msg.thread = "juick-" + jmsg.getMID(); + msg.addChild(jmsg); + msg.addChild(nick); + if (attachment != null) { + XOOB oob = new XOOB(); + oob.URL = attachment; + msg.addChild(oob); + } + for (User user : users) { + for (String jid : UserQueries.getActiveJIDs(sql, user)) { + msg.to = new JID(jid); + sendOut(msg); + } + } + } +} diff --git a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java b/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java deleted file mode 100644 index 1dfdd38d..00000000 --- a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java +++ /dev/null @@ -1,199 +0,0 @@ -package com.juick.xmpp.s2s; - -import com.juick.xmpp.Stanza; -import com.juick.xmpp.StanzaChild; -import com.juick.xmpp.extensions.JuickMessage; -import org.springframework.jdbc.core.JdbcTemplate; - -import javax.servlet.ServletContextEvent; -import javax.servlet.ServletContextListener; -import java.io.IOException; -import java.sql.Driver; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * - * @author ugnich - */ -public class XMPPComponent implements ServletContextListener { - - private static final Logger LOGGER = Logger.getLogger(XMPPComponent.class.getName()); - - public static final ExecutorService executorService = Executors.newWorkStealingPool(); - - public static String HOSTNAME = null; - public static String STATSFILE = null; - public static ConnectionRouter connRouter; - static final List<ConnectionIn> inConnections = Collections.synchronizedList(new ArrayList<>()); - static final List<ConnectionOut> outConnections = Collections.synchronizedList(new ArrayList<>()); - static final List<CacheEntry> outCache = Collections.synchronizedList(new ArrayList<>()); - static JdbcTemplate sql; - final public static HashMap<String, StanzaChild> childParsers = new HashMap<>(); - - public static void addConnectionIn(ConnectionIn c) { - synchronized (inConnections) { - inConnections.add(c); - } - } - - public static void addConnectionOut(ConnectionOut c) { - synchronized (outConnections) { - outConnections.add(c); - } - } - - public static void removeConnectionIn(ConnectionIn c) { - synchronized (inConnections) { - inConnections.remove(c); - } - } - - public static void removeConnectionOut(ConnectionOut c) { - synchronized (outConnections) { - outConnections.remove(c); - } - } - - public static String getFromCache(String hostname) { - CacheEntry ret = null; - synchronized (outCache) { - for (Iterator<CacheEntry> i = outCache.iterator(); i.hasNext();) { - CacheEntry c = i.next(); - if (c.hostname != null && c.hostname.equals(hostname)) { - ret = c; - i.remove(); - break; - } - } - } - return (ret != null) ? ret.xml : null; - } - - public static ConnectionOut getConnectionOut(String hostname, boolean needReady) { - synchronized (outConnections) { - for (ConnectionOut c : outConnections) { - if (c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)) { - return c; - } - } - } - return null; - } - - public static ConnectionIn getConnectionIn(String streamID) { - synchronized (inConnections) { - for (ConnectionIn c : inConnections) { - if (c.streamID != null && c.streamID.equals(streamID)) { - return c; - } - } - } - return null; - } - - public static void sendOut(Stanza s) { - sendOut(s.to.Host, s.toString()); - } - - public static void sendOut(String hostname, String xml) { - boolean haveAnyConn = false; - - ConnectionOut connOut = null; - synchronized (outConnections) { - for (ConnectionOut c : outConnections) { - if (c.to != null && c.to.equals(hostname)) { - if (c.streamReady) { - connOut = c; - break; - } else { - haveAnyConn = true; - break; - } - } - } - } - if (connOut != null) { - try { - connOut.sendStanza(xml); - } catch (IOException e) { - LOGGER.warning("STREAM TO " + connOut.to + " " + connOut.streamID + " ERROR: " + e.toString()); - } - return; - } - - boolean haveCache = false; - synchronized (outCache) { - for (CacheEntry c : outCache) { - if (c.hostname != null && c.hostname.equals(hostname)) { - c.xml += xml; - c.tsUpdated = System.currentTimeMillis(); - haveCache = true; - break; - } - } - if (!haveCache) { - outCache.add(new CacheEntry(hostname, xml)); - } - } - - if (!haveAnyConn) { - ConnectionOut connectionOut = new ConnectionOut(hostname); - connectionOut.parseStream(); - } - } - - @Override - public void contextInitialized(ServletContextEvent sce) { - - LOGGER.info("component initialized"); - executorService.submit(() -> { - Properties conf = new Properties(); - try { - conf.load(sce.getServletContext().getResourceAsStream("WEB-INF/s2s.conf")); - HOSTNAME = conf.getProperty("hostname"); - String componentName = conf.getProperty("componentname"); - STATSFILE = conf.getProperty("statsfile"); - sql = (JdbcTemplate) sce.getServletContext().getAttribute("sql"); - - childParsers.put(JuickMessage.XMLNS, new JuickMessage()); - - connRouter = new ConnectionRouter(componentName); - executorService.submit(connRouter); - executorService.submit(new ConnectionListener()); - executorService.submit(new CleaningUp()); - } catch (IOException e) { - LOGGER.log(Level.SEVERE, "XMPPComponent error", e); - } - }); - } - - - - @Override - public void contextDestroyed(ServletContextEvent sce) { - synchronized (XMPPComponent.outConnections) { - for (Iterator<ConnectionOut> i = XMPPComponent.outConnections.iterator(); i.hasNext();) { - ConnectionOut c = i.next(); - c.closeConnection(); - i.remove(); - } - } - - synchronized (XMPPComponent.inConnections) { - for (Iterator<ConnectionIn> i = XMPPComponent.inConnections.iterator(); i.hasNext();) { - ConnectionIn c = i.next(); - c.closeConnection(); - i.remove(); - } - } - XMPPComponent.connRouter.closeConnection(); - executorService.shutdown(); - LOGGER.info("component destroyed"); - } -} diff --git a/src/main/webapp/WEB-INF/web.xml b/src/main/webapp/WEB-INF/web.xml index b4fbc03d..1b70543b 100644 --- a/src/main/webapp/WEB-INF/web.xml +++ b/src/main/webapp/WEB-INF/web.xml @@ -36,15 +36,18 @@ <description>APNS/GCM/MPNS module</description> <display-name>PushComponent</display-name> <listener-class>com.juick.PushComponent</listener-class> + <load-on-startup>2</load-on-startup> </listener> <listener> <description>Crossposting module</description> <display-name>CrosspostComponent</display-name> <listener-class>com.juick.CrosspostComponent</listener-class> + <load-on-startup>2</load-on-startup> </listener> <listener> <description>XMPP module</description> <display-name>XMPPComponent</display-name> - <listener-class>com.juick.xmpp.s2s.XMPPComponent</listener-class> + <listener-class>com.juick.xmpp.s2s.S2SComponent</listener-class> + <load-on-startup>2</load-on-startup> </listener> </web-app> |