diff options
Diffstat (limited to 'src/main/java/com/juick/xmpp/s2s')
6 files changed, 118 insertions, 126 deletions
diff --git a/src/main/java/com/juick/xmpp/s2s/Connection.java b/src/main/java/com/juick/xmpp/s2s/Connection.java index c3e983b5..c1bbf22e 100644 --- a/src/main/java/com/juick/xmpp/s2s/Connection.java +++ b/src/main/java/com/juick/xmpp/s2s/Connection.java @@ -12,8 +12,8 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import java.io.*; import java.net.Socket; -import java.security.KeyStore; -import java.security.SecureRandom; +import java.security.*; +import java.security.cert.CertificateException; import java.util.UUID; import java.util.logging.Logger; @@ -52,7 +52,7 @@ public class Connection { }; - public Connection() throws Exception { + public Connection() throws XmlPullParserException, KeyStoreException, CertificateException, NoSuchAlgorithmException, IOException, UnrecoverableKeyException, KeyManagementException { tsCreated = System.currentTimeMillis(); parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); KeyStore ks = KeyStore.getInstance("JKS"); diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java index 554d3b05..93e761a0 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java @@ -115,7 +115,7 @@ public class ConnectionIn extends Connection implements Runnable { if (msg != null && (msg.type == null || !msg.type.equals(Message.Type.error))) { LOGGER.info("STREAM " + streamID + ": " + msg.toString()); if (!JuickBot.incomingMessage(msg)) { - XMPPComponent.connRouter.sendStanza(msg.toString()); + XMPPComponent.connRouter.router.send(msg.toString()); } } } else if (tag.equals("iq") && checkFromTo(parser)) { @@ -124,7 +124,7 @@ public class ConnectionIn extends Connection implements Runnable { String xml = XmlUtils.parseToString(parser, true); if (type == null || !type.equals(Iq.Type.error)) { LOGGER.info("STREAM " + streamID + ": " + xml); - XMPPComponent.connRouter.sendStanza(xml); + XMPPComponent.connRouter.router.send(xml); } } else if (!isSecured() && tag.equals("starttls")) { LOGGER.info("STREAM " + streamID + " SECURING"); diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java index 68851da1..d3a10406 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java @@ -3,6 +3,7 @@ package com.juick.xmpp.s2s; import com.juick.xmpp.extensions.StreamFeatures; import com.juick.xmpp.utils.XmlUtils; import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; import javax.net.ssl.SSLException; import javax.net.ssl.SSLSocket; @@ -11,6 +12,11 @@ import java.io.IOException; import java.net.InetAddress; import java.net.Socket; import java.net.UnknownHostException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; import java.util.logging.Level; /** @@ -23,7 +29,7 @@ public class ConnectionOut extends Connection implements Runnable { String checkSID = null; String dbKey = null; - public ConnectionOut(String hostname) throws Exception { + public ConnectionOut(String hostname) throws CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, XmlPullParserException, KeyManagementException, KeyStoreException, IOException { super(); to = hostname; } diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java index 61ae89e0..443bfa82 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java @@ -2,127 +2,49 @@ package com.juick.xmpp.s2s; import com.juick.server.MessagesQueries; import com.juick.server.SubscriptionsQueries; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; +import com.juick.xmpp.*; import com.juick.xmpp.extensions.JuickMessage; import com.juick.xmpp.extensions.Nickname; import com.juick.xmpp.extensions.XOOB; -import com.juick.xmpp.utils.XmlUtils; -import org.apache.commons.codec.digest.DigestUtils; -import org.xmlpull.v1.XmlPullParser; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.net.Socket; import java.util.List; import java.util.logging.Level; +import java.util.logging.Logger; /** * @author ugnich */ -public class ConnectionRouter extends Connection implements Runnable { +public class ConnectionRouter implements Stream.StreamListener, + Message.MessageListener, Iq.IqListener, Presence.PresenceListener { + private static final Logger logger = Logger.getLogger(ConnectionRouter.class.getName()); private String componentName; + Stream router; + Socket socket; - ConnectionRouter(String componentName) throws Exception { + ConnectionRouter(String componentName, String password) { this.componentName = componentName; - } - - @Override - public void run() { - LOGGER.info("STREAM ROUTER START"); - + logger.info("STREAM ROUTER START"); try { socket = new Socket("localhost", 5347); - parser.setInput(new InputStreamReader(socket.getInputStream())); - - parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); - writer = new OutputStreamWriter(socket.getOutputStream()); - - String msg = "<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' to='s2s'>"; - writer.write(msg); - writer.flush(); - - parser.next(); // stream:stream - streamID = parser.getAttributeValue(null, "id"); - if (streamID == null || streamID.isEmpty()) { - throw new Exception("FAIL ON FIRST PACKET"); - } - - msg = "<handshake>" + DigestUtils.sha1Hex(streamID + "secret") + "</handshake>"; - writer.write(msg); - writer.flush(); - - parser.next(); - if (!parser.getName().equals("handshake")) { - throw new Exception("NO HANDSHAKE"); - } - XmlUtils.skip(parser); - LOGGER.info("STREAM ROUTER OPEN"); - - while (parser.next() != XmlPullParser.END_DOCUMENT) { - if (parser.getEventType() != XmlPullParser.START_TAG) { - continue; - } - - String tag = parser.getName(); - String to = parser.getAttributeValue(null, "to"); - if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { - JID jid = new JID(to); - if (jid.Host != null) { - if (jid.Host.equals(componentName)) { - if (tag.equals("message")) { - Message xmsg = Message.parse(parser, XMPPComponent.childParsers); - LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); - JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); - if (jmsg != null) { - if (jid.Username != null && jid.Username.equals("recomm")) { - sendJuickRecommendation(jmsg); - } else { - if (jmsg.getRID() > 0) { - sendJuickComment(jmsg); - } else if (jmsg.getMID() > 0) { - sendJuickMessage(jmsg); - } - } - } - } - } else if (jid.Host.endsWith(XMPPComponent.HOSTNAME) && (jid.Host.equals(XMPPComponent.HOSTNAME) || jid.Host.endsWith("." + XMPPComponent.HOSTNAME))) { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER: " + xml); - } else { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER (OUT): " + xml); - XMPPComponent.sendOut(jid.Host, xml); - } - } else { - LOGGER.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); - } - } else { - LOGGER.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); - } - } - - LOGGER.warning("STREAM ROUTER FINISHED"); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "xmpp router exception", e); - } - } - - @Override - synchronized public void sendStanza(String xml) { - try { - writer.write(xml); - writer.flush(); + router = new StreamComponent(new JID(componentName), socket.getInputStream(), socket.getOutputStream(), password); + router.addChildParser(new JuickMessage()); + router.addListener((Stream.StreamListener) this); + router.addListener((Message.MessageListener) this); + router.addListener((Iq.IqListener) this); + router.startParsing(); } catch (IOException e) { - LOGGER.warning("STREAM ROUTER ERROR: " + xml); - LOGGER.warning("STREAM ROUTER ERROR: " + e.toString()); - System.exit(0); + logger.log(Level.SEVERE, "router failed", e); } } + public void closeConnection() throws IOException { + router.logoff(); + socket.close(); + } - public void sendJuickMessage(JuickMessage jmsg) throws Exception { + public void sendJuickMessage(JuickMessage jmsg) { List<String> jids; synchronized (XMPPComponent.sqlSync) { @@ -157,16 +79,16 @@ public class ConnectionRouter extends Connection implements Runnable { msg.addChild(oob); } - for (int i = 0; i < jids.size(); i++) { - msg.to = new JID(jids.get(i)); + for (String jid : jids) { + msg.to = new JID(jid); XMPPComponent.sendOut(msg); } } - public void sendJuickComment(JuickMessage jmsg) throws Exception { + public void sendJuickComment(JuickMessage jmsg) { List<String> jids; String replyQuote; - String replyTo; + String replyTo; synchronized (XMPPComponent.sqlSync) { jids = SubscriptionsQueries.getJIDSubscribedToComments(XMPPComponent.sql, jmsg.getMID(), jmsg.getUser().getUID()); @@ -188,8 +110,8 @@ public class ConnectionRouter extends Connection implements Runnable { msg.body = txt; msg.type = Message.Type.chat; msg.addChild(jmsg); - for (int i = 0; i < jids.size(); i++) { - msg.to = new JID(jids.get(i)); + for (String jid : jids) { + msg.to = new JID(jid); XMPPComponent.sendOut(msg); } } @@ -204,7 +126,7 @@ public class ConnectionRouter extends Connection implements Runnable { return quote; } - public void sendJuickRecommendation(JuickMessage recomm) throws Exception { + public void sendJuickRecommendation(JuickMessage recomm) { List<String> jids; JuickMessage jmsg; synchronized (XMPPComponent.sqlSync) { @@ -246,9 +168,60 @@ public class ConnectionRouter extends Connection implements Runnable { msg.addChild(oob); } - for (int i = 0; i < jids.size(); i++) { - msg.to = new JID(jids.get(i)); + for (String jid : jids) { + msg.to = new JID(jid); XMPPComponent.sendOut(msg); } } + + @Override + public boolean onIq(Iq iq) { + JID jid = iq.to; + if (!jid.Host.equals(componentName)) { + logger.info("STREAM ROUTER (IQ): " + iq.toString()); + XMPPComponent.sendOut(iq); + } + return false; + } + + @Override + public void onMessage(Message xmsg) { + logger.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); + JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); + JID jid = xmsg.to; + if (jid.Host.equals(componentName)) { + if (jmsg != null) { + if (jid.Username != null && jid.Username.equals("recomm")) { + sendJuickRecommendation(jmsg); + } else { + if (jmsg.getRID() > 0) { + sendJuickComment(jmsg); + } else if (jmsg.getMID() > 0) { + sendJuickMessage(jmsg); + } + } + } + } else { + XMPPComponent.sendOut(xmsg); + } + } + + @Override + public void onPresence(Presence presence) { + JID jid = presence.to; + if (!jid.Host.equals(componentName)) { + logger.info("STREAM ROUTER (PRESENCE): " + presence.toString()); + XMPPComponent.sendOut(presence); + } + } + + @Override + public void onStreamReady() { + logger.info("STREAM ROUTER (READY)"); + } + + @Override + public void onStreamFail(Exception ex) { + logger.log(Level.SEVERE, "STREAM ROUTER (FAIL)", ex); + } } diff --git a/src/main/java/com/juick/xmpp/s2s/JuickBot.java b/src/main/java/com/juick/xmpp/s2s/JuickBot.java index 0b9cceaf..25c75dfe 100644 --- a/src/main/java/com/juick/xmpp/s2s/JuickBot.java +++ b/src/main/java/com/juick/xmpp/s2s/JuickBot.java @@ -198,10 +198,10 @@ public class JuickBot { } jmsg.setText(msg.body); m.childs.add(jmsg); - XMPPComponent.connRouter.sendStanza(m.toString()); + XMPPComponent.connRouter.router.send(m.toString()); m.to.Host = "ws.juick.com"; - XMPPComponent.connRouter.sendStanza(m.toString()); + XMPPComponent.connRouter.router.send(m.toString()); String jid; boolean inroster = false; @@ -337,10 +337,10 @@ public class JuickBot { jmsg.setUser(user_from); jmsg.setText(body); msg.childs.add(jmsg); - XMPPComponent.connRouter.sendStanza(msg.toString()); + XMPPComponent.connRouter.router.send(msg.toString()); msg.to.Host = "ws.juick.com"; - XMPPComponent.connRouter.sendStanza(msg.toString()); + XMPPComponent.connRouter.router.send(msg.toString()); if (jid_to != null) { Message mm = new Message(); diff --git a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java b/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java index 2b293fd6..13df5fd0 100644 --- a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java +++ b/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java @@ -3,10 +3,16 @@ package com.juick.xmpp.s2s; import com.juick.xmpp.Stanza; import com.juick.xmpp.StanzaChild; import com.juick.xmpp.extensions.JuickMessage; +import org.xmlpull.v1.XmlPullParserException; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import java.io.IOException; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.security.UnrecoverableKeyException; +import java.security.cert.CertificateException; import java.sql.Driver; import java.sql.DriverManager; import java.sql.SQLException; @@ -100,11 +106,11 @@ public class XMPPComponent implements ServletContextListener { return null; } - public static void sendOut(Stanza s) throws Exception { + public static void sendOut(Stanza s) { sendOut(s.to.Host, s.toString()); } - public static void sendOut(String hostname, String xml) throws Exception { + public static void sendOut(String hostname, String xml) { boolean haveAnyConn = false; ConnectionOut connOut = null; @@ -146,8 +152,13 @@ public class XMPPComponent implements ServletContextListener { } if (!haveAnyConn) { - ConnectionOut connectionOut = new ConnectionOut(hostname); - XMPPComponent.executorService.submit(connectionOut); + ConnectionOut connectionOut = null; + try { + connectionOut = new ConnectionOut(hostname); + XMPPComponent.executorService.submit(connectionOut); + } catch (CertificateException | UnrecoverableKeyException | NoSuchAlgorithmException | XmlPullParserException | KeyStoreException | KeyManagementException | IOException e) { + LOGGER.log(Level.SEVERE, "s2s out error", e); + } } } @@ -170,9 +181,7 @@ public class XMPPComponent implements ServletContextListener { conf.getProperty("mysql_username", "") + "&password=" + conf.getProperty("mysql_password", "")); childParsers.put(JuickMessage.XMLNS, new JuickMessage()); - - connRouter = new ConnectionRouter(componentName); - executorService.submit(connRouter); + executorService.submit(() -> connRouter = new ConnectionRouter(componentName, conf.getProperty("xmpp_password"))); executorService.submit(new ConnectionListener()); executorService.submit(new CleaningUp()); } catch (Exception e) { @@ -201,7 +210,11 @@ public class XMPPComponent implements ServletContextListener { } } - XMPPComponent.connRouter.closeConnection(); + try { + XMPPComponent.connRouter.closeConnection(); + } catch (IOException e) { + LOGGER.log(Level.WARNING, "router warning", e); + } synchronized (XMPPComponent.sqlSync) { if (XMPPComponent.sql != null) { |