diff options
Diffstat (limited to 'src/main/java')
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/Connection.java | 70 | ||||
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/ConnectionIn.java | 67 | ||||
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/ConnectionListener.java | 31 | ||||
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/ConnectionOut.java | 186 | ||||
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java | 150 | ||||
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/JuickBot.java | 16 | ||||
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/XMPPComponent.java | 14 |
7 files changed, 302 insertions, 232 deletions
diff --git a/src/main/java/com/juick/xmpp/s2s/Connection.java b/src/main/java/com/juick/xmpp/s2s/Connection.java index 1a14b2cc8..c3e983b5c 100644 --- a/src/main/java/com/juick/xmpp/s2s/Connection.java +++ b/src/main/java/com/juick/xmpp/s2s/Connection.java @@ -2,14 +2,19 @@ package com.juick.xmpp.s2s; import org.xmlpull.mxp1.MXParser; import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; -import java.io.FileWriter; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.channels.AsynchronousSocketChannel; -import java.util.Date; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; +import java.io.*; +import java.net.Socket; +import java.security.KeyStore; +import java.security.SecureRandom; +import java.util.UUID; import java.util.logging.Logger; /** @@ -25,12 +30,43 @@ public class Connection { public long tsLocalData = 0; public long bytesLocal = 0; public long packetsLocal = 0; - AsynchronousSocketChannel socket; - final XmlPullParser parser = new MXParser(); + Socket socket; + public static final String NS_DB = "jabber:server:dialback"; + public static final String NS_TLS = "urn:ietf:params:xml:ns:xmpp-tls"; + public static final String NS_STREAM = "http://etherx.jabber.org/streams"; + XmlPullParser parser = new MXParser(); OutputStreamWriter writer; - - public Connection() { + private boolean secured = false; + SSLContext sc; + private TrustManager[] trustAllCerts = new TrustManager[]{ + new X509TrustManager() { + public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) { + } + + public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) { + } + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return null; + } + } + }; + + + public Connection() throws Exception { tsCreated = System.currentTimeMillis(); + parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); + KeyStore ks = KeyStore.getInstance("JKS"); + try (InputStream ksIs = new FileInputStream(XMPPComponent.keystore)) { + ks.load(ksIs, XMPPComponent.keystorePassword.toCharArray()); + } + + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory + .getDefaultAlgorithm()); + kmf.init(ks, XMPPComponent.keystorePassword.toCharArray()); + sc = SSLContext.getInstance("TLSv1.2"); + + sc.init(kmf.getKeyManagers(), trustAllCerts, new SecureRandom()); + } public void logParser() { @@ -91,4 +127,20 @@ public class Connection { return hexkey.toString(); } + + public boolean isSecured() { + return secured; + } + + public void setSecured(boolean secured) { + this.secured = secured; + } + + public void restartParser() throws XmlPullParserException, IOException { + parser = new MXParser(); + parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); + parser.setInput(new InputStreamReader(socket.getInputStream())); + writer = new OutputStreamWriter(socket.getOutputStream()); + streamID = UUID.randomUUID().toString(); + } } diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java index c215e375d..950a2eaac 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java @@ -7,12 +7,13 @@ import com.juick.xmpp.Presence; import com.juick.xmpp.utils.XmlUtils; import org.xmlpull.v1.XmlPullParser; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSocket; import java.io.EOFException; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; -import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.Channels; +import java.net.Socket; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -31,7 +32,7 @@ public class ConnectionIn extends Connection { public long tsRemoteData = 0; public long packetsRemote = 0; - public ConnectionIn(AsynchronousSocketChannel socket) { + public ConnectionIn(Socket socket) throws Exception { super(); this.socket = socket; streamID = UUID.randomUUID().toString(); @@ -40,28 +41,21 @@ public class ConnectionIn extends Connection { public void parseStream() { LOGGER.info("STREAM FROM ? " + streamID + " START"); try { - parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); - writer = new OutputStreamWriter(Channels.newOutputStream(socket)); + parser.setInput(new InputStreamReader(socket.getInputStream())); + writer = new OutputStreamWriter(socket.getOutputStream()); parser.next(); // stream:stream updateTsRemoteData(); - if (!parser.getName().equals("stream:stream") - || !parser.getAttributeValue(null, "xmlns").equals("jabber:server") - || !parser.getAttributeValue(null, "xmlns:stream").equals("http://etherx.jabber.org/streams") - || !parser.getAttributeValue(null, "xmlns:db").equals("jabber:server:dialback")) { + if (!parser.getName().equals("stream") + || !parser.getAttributeValue(null, "stream").equals(NS_STREAM) + || !parser.getAttributeValue(null, "db").equals(NS_DB)) { // || !parser.getAttributeValue(null, "version").equals("1.0") // || !parser.getAttributeValue(null, "to").equals(Main.HOSTNAME)) { throw new Exception("STREAM FROM ? " + streamID + " INVALID FIRST PACKET"); } boolean xmppversionnew = parser.getAttributeValue(null, "version") != null; - String openStream = "<?xml version='1.0'?><stream:stream xmlns='jabber:server' " + - "xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" + - XMPPComponent.HOSTNAME + "' id='" + streamID + "' version='1.0'>"; - if (xmppversionnew) { - openStream += "<stream:features></stream:features>"; - } - sendStanza(openStream); + sendOpenStream(xmppversionnew); while (parser.next() != XmlPullParser.END_DOCUMENT) { updateTsRemoteData(); @@ -73,14 +67,14 @@ public class ConnectionIn extends Connection { packetsRemote++; String tag = parser.getName(); - if (tag.equals("db:result")) { + if (tag.equals("result") && parser.getNamespace().equals(NS_DB)) { String dfrom = parser.getAttributeValue(null, "from"); String to = parser.getAttributeValue(null, "to"); LOGGER.info("STREAM FROM " + dfrom + " TO " + to + " " + streamID + " ASKING FOR DIALBACK"); if (dfrom.endsWith(XMPPComponent.HOSTNAME) && (dfrom.equals(XMPPComponent.HOSTNAME) || dfrom.endsWith("." + XMPPComponent.HOSTNAME))) { break; } - if (dfrom != null && to != null && to.equals(XMPPComponent.HOSTNAME)) { + if (to != null && to.equals(XMPPComponent.HOSTNAME)) { String dbKey = XmlUtils.getTagText(parser); updateTsRemoteData(); @@ -89,12 +83,12 @@ public class ConnectionIn extends Connection { c.sendDialbackVerify(streamID, dbKey); } else { c = new ConnectionOut(dfrom, streamID, dbKey); - c.parseStream(); + XMPPComponent.executorService.submit(c); } } else { throw new Exception("STREAM FROM " + dfrom + " " + streamID + " DIALBACK RESULT FAIL"); } - } else if (tag.equals("db:verify")) { + } else if (tag.equals("verify") && parser.getNamespace().equals(NS_DB)) { String vfrom = parser.getAttributeValue(null, "from"); String vto = parser.getAttributeValue(null, "to"); String vid = parser.getAttributeValue(null, "id"); @@ -112,6 +106,25 @@ public class ConnectionIn extends Connection { sendStanza("<db:verify from='" + vto + "' to='" + vfrom + "' id='" + vid + "' type='invalid'/>"); LOGGER.warning("STREAM FROM " + vfrom + " " + streamID + " DIALBACK VERIFY INVALID"); } + } else if (!isSecured() && tag.equals("starttls")) { + LOGGER.info("STREAM " + streamID + " SECURING"); + sendStanza("<proceed xmlns=\"" + NS_TLS + "\" />"); + try { + socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(), + socket.getPort(), true); + ((SSLSocket) socket).setUseClientMode(false); + ((SSLSocket) socket).startHandshake(); + setSecured(true); + LOGGER.info("STREAM " + streamID + " SECURED"); + restartParser(); + } catch (SSLException sex) { + LOGGER.warning("STREAM " + streamID + " SSL ERROR"); + sendStanza("<failed xmlns\"" + NS_TLS + "\" />"); + XMPPComponent.removeConnectionIn(this); + closeConnection(); + } + } else if (isSecured() && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) { + sendOpenStream(true); } else if (tag.equals("presence") && checkFromTo(parser)) { Presence p = Presence.parse(parser, null); if (p != null && (p.type == null || !p.type.equals(Presence.Type.error))) { @@ -156,6 +169,20 @@ public class ConnectionIn extends Connection { tsRemoteData = System.currentTimeMillis(); } + void sendOpenStream(boolean xmppversionnew) throws IOException { + String openStream = "<?xml version='1.0'?><stream:stream xmlns='jabber:server' " + + "xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" + + XMPPComponent.HOSTNAME + "' id='" + streamID + "' version='1.0'>"; + if (xmppversionnew) { + openStream += "<stream:features>"; + if (!isSecured()) { + openStream += "<starttls xmlns=\"" + NS_TLS + "\"><optional/></starttls>"; + } + openStream += "</stream:features>"; + } + sendStanza(openStream); + } + public void sendDialbackResult(String sfrom, String type) { try { sendStanza("<db:result from='" + XMPPComponent.HOSTNAME + "' to='" + sfrom + "' type='" + type + "'/>"); diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java index 02a2be39d..094fbd4f9 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java @@ -1,11 +1,7 @@ package com.juick.xmpp.s2s; -import java.net.InetSocketAddress; -import java.nio.channels.AsynchronousServerSocketChannel; -import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.CompletionHandler; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.net.ServerSocket; +import java.net.Socket; import java.util.logging.Level; import java.util.logging.Logger; @@ -20,25 +16,18 @@ public class ConnectionListener implements Runnable { @Override public void run() { try { - final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(); - listener.bind(new InetSocketAddress(5269)); + final ServerSocket listener = new ServerSocket(5269); logger.info("s2s listener ready"); - listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { - @Override - public void completed(AsynchronousSocketChannel result, Object attachment) { - listener.accept(XMPPComponent.executorService, this); - ConnectionIn client = new ConnectionIn(result); + while (true) { + try { + Socket socket = listener.accept(); + ConnectionIn client = new ConnectionIn(socket); XMPPComponent.addConnectionIn(client); client.parseStream(); + } catch (Exception e) { + logger.log(Level.SEVERE, "s2s error", e); } - - @Override - public void failed(Throwable exc, Object attachment) { - - } - }); - Thread.currentThread().join(); - listener.close(); + } } catch (Exception e) { logger.log(Level.SEVERE, "s2s listener exception", e); } diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java index 59fdfb606..4ebeffb61 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java @@ -1,140 +1,158 @@ package com.juick.xmpp.s2s; +import com.juick.xmpp.extensions.StreamFeatures; import com.juick.xmpp.utils.XmlUtils; import java.io.EOFException; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channels; import java.nio.channels.CompletionHandler; +import java.util.concurrent.ExecutionException; import java.util.logging.Level; import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParserException; +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLSocket; + /** - * * @author ugnich */ -public class ConnectionOut extends Connection { +public class ConnectionOut extends Connection implements Runnable { public boolean streamReady = false; public String to; String checkSID = null; String dbKey = null; - public ConnectionOut(String hostname) { + public ConnectionOut(String hostname) throws Exception { super(); to = hostname; } - public ConnectionOut(String hostname, String checkSID, String dbKey) { + public ConnectionOut(String hostname, String checkSID, String dbKey) throws Exception { super(); to = hostname; this.checkSID = checkSID; this.dbKey = dbKey; } - public void parseStream() { - LOGGER.info("STREAM TO " + to + " START"); + void sendOpenStream() throws IOException { + sendStanza("<?xml version='1.0'?><stream:stream xmlns='jabber:server' " + + "xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" + + XMPPComponent.HOSTNAME + "' to='" + to + "' version='1.0'>"); + } + + void processDialback() throws Exception { + if (checkSID != null) { + sendDialbackVerify(checkSID, dbKey); + } + sendStanza("<db:result from='" + XMPPComponent.HOSTNAME + "' to='" + to + "'>" + + generateDialbackKey(to, XMPPComponent.HOSTNAME, streamID) + "</db:result>"); + } + @Override + public void run() { + LOGGER.info("STREAM TO " + to + " START"); try { HostnamePort addr = DNSQueries.getServerAddress(to); - socket = AsynchronousSocketChannel.open(); - socket.connect(new InetSocketAddress(addr.hostname, addr.port), socket, new CompletionHandler<Void, AsynchronousSocketChannel>() { - @Override - public void completed(Void result, AsynchronousSocketChannel attachment) { - try { - parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); + socket = new Socket(InetAddress.getByName(addr.hostname), addr.port); + parser.setInput(new InputStreamReader(socket.getInputStream())); - writer = new OutputStreamWriter(Channels.newOutputStream(socket)); + writer = new OutputStreamWriter(socket.getOutputStream()); - sendStanza("<?xml version='1.0'?><stream:stream xmlns='jabber:server' " + - "xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" + - XMPPComponent.HOSTNAME + "' to='" + to + "' version='1.0'>"); + sendOpenStream(); - parser.next(); // stream:stream - streamID = parser.getAttributeValue(null, "id"); - if (streamID == null || streamID.isEmpty()) { - throw new Exception("STREAM TO " + to + " INVALID FIRST PACKET"); - } + parser.next(); // stream:stream + streamID = parser.getAttributeValue(null, "id"); + if (streamID == null || streamID.isEmpty()) { + throw new Exception("STREAM TO " + to + " INVALID FIRST PACKET"); + } - LOGGER.info("STREAM TO " + to + " " + streamID + " OPEN"); - XMPPComponent.addConnectionOut(ConnectionOut.this); + LOGGER.info("STREAM TO " + to + " " + streamID + " OPEN"); + XMPPComponent.addConnectionOut(ConnectionOut.this); - if (checkSID != null) { - sendDialbackVerify(checkSID, dbKey); + while (parser.next() != XmlPullParser.END_DOCUMENT) { + if (parser.getEventType() != XmlPullParser.START_TAG) { + continue; + } + logParser(); + + String tag = parser.getName(); + if (tag.equals("result") && parser.getNamespace().equals(NS_DB)) { + String type = parser.getAttributeValue(null, "type"); + if (type != null && type.equals("valid")) { + streamReady = true; + LOGGER.info("STREAM TO " + to + " " + streamID + " READY"); + + String cache = XMPPComponent.getFromCache(to); + if (cache != null) { + LOGGER.info("STREAM TO " + to + " " + streamID + " SENDING CACHE"); + sendStanza(cache); } - sendStanza("<db:result from='" + XMPPComponent.HOSTNAME + "' to='" + to + "'>" + generateDialbackKey(to, XMPPComponent.HOSTNAME, streamID) + "</db:result>"); - - while (parser.next() != XmlPullParser.END_DOCUMENT) { - if (parser.getEventType() != XmlPullParser.START_TAG) { - continue; - } - logParser(); - - String tag = parser.getName(); - if (tag.equals("db:result")) { - String type = parser.getAttributeValue(null, "type"); - if (type != null && type.equals("valid")) { - streamReady = true; - LOGGER.info("STREAM TO " + to + " " + streamID + " READY"); - - String cache = XMPPComponent.getFromCache(to); - if (cache != null) { - LOGGER.info("STREAM TO " + to + " " + streamID + " SENDING CACHE"); - sendStanza(cache); - } - - } else { - LOGGER.info("STREAM TO " + to + " " + streamID + " DIALBACK FAIL"); - } - XmlUtils.skip(parser); - } else if (tag.equals("db:verify")) { - String from = parser.getAttributeValue(null, "from"); - String type = parser.getAttributeValue(null, "type"); - String sid = parser.getAttributeValue(null, "id"); - if (from != null && from.equals(to) && sid != null && !sid.isEmpty() && type != null) { - ConnectionIn c = XMPPComponent.getConnectionIn(sid); - if (c != null) { - c.sendDialbackResult(from, type); - } - } - XmlUtils.skip(parser); - } else { - LOGGER.info("STREAM TO " + to + " " + streamID + ": " + XmlUtils.parseToString(parser, true)); - } + } else { + LOGGER.info("STREAM TO " + to + " " + streamID + " DIALBACK FAIL"); + } + XmlUtils.skip(parser); + } else if (tag.equals("verify") && parser.getNamespace().equals(NS_DB)) { + String from = parser.getAttributeValue(null, "from"); + String type = parser.getAttributeValue(null, "type"); + String sid = parser.getAttributeValue(null, "id"); + if (from != null && from.equals(to) && sid != null && !sid.isEmpty() && type != null) { + ConnectionIn c = XMPPComponent.getConnectionIn(sid); + if (c != null) { + c.sendDialbackResult(from, type); } - - LOGGER.warning("STREAM TO " + to + " " + streamID + " FINISHED"); - XMPPComponent.removeConnectionOut(ConnectionOut.this); - closeConnection(); - } catch (EOFException eofex) { - LOGGER.info(String.format("STREAM %s %s CLOSED (dirty)", to, streamID)); - XMPPComponent.removeConnectionOut(ConnectionOut.this); - closeConnection(); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "s2s out exception", e); - XMPPComponent.removeConnectionOut(ConnectionOut.this); + } + XmlUtils.skip(parser); + } else if (tag.equals("features") && parser.getNamespace().equals(NS_STREAM)) { + StreamFeatures features = StreamFeatures.parse(parser); + if (!isSecured() && features.STARTTLS >= 0) { + System.out.println("STREAM TO " + to + " " + streamID + " SECURING"); + sendStanza("<starttls xmlns=\"" + NS_TLS + "\" />"); + } else { + processDialback(); + } + } else if (tag.equals("proceed") && parser.getNamespace().equals(NS_TLS)) { + try { + socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(), + socket.getPort(), true); + ((SSLSocket) socket).startHandshake(); + setSecured(true); + System.out.println("STREAM " + streamID + " SECURED"); + restartParser(); + sendOpenStream(); + } catch (SSLException sex) { + System.err.println("STREAM " + streamID + " SSL ERROR"); + sendStanza("<failed xmlns\"" + NS_TLS + "\" />"); + XMPPComponent.removeConnectionOut(this); closeConnection(); } + } else if (isSecured() && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) { + streamID = parser.getAttributeValue(null, "id"); + } else { + LOGGER.info("STREAM TO " + to + " " + streamID + ": " + XmlUtils.parseToString(parser, true)); } + } - @Override - public void failed(Throwable exc, AsynchronousSocketChannel attachment) { - LOGGER.log(Level.WARNING, "s2s out failed", exc); - XMPPComponent.removeConnectionOut(ConnectionOut.this); - closeConnection(); - } - }); + LOGGER.warning("STREAM TO " + to + " " + streamID + " FINISHED"); + XMPPComponent.removeConnectionOut(ConnectionOut.this); + closeConnection(); + } catch (EOFException eofex) { + LOGGER.info(String.format("STREAM %s %s CLOSED (dirty)", to, streamID)); + XMPPComponent.removeConnectionOut(ConnectionOut.this); + closeConnection(); } catch (Exception e) { - LOGGER.warning(e.toString()); - XMPPComponent.removeConnectionOut(this); + LOGGER.log(Level.SEVERE, "s2s out exception", e); + XMPPComponent.removeConnectionOut(ConnectionOut.this); closeConnection(); } } diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java index 4634ca99b..48d2efd53 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java @@ -14,22 +14,18 @@ import org.xmlpull.v1.XmlPullParser; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; -import java.net.InetSocketAddress; -import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.Channels; -import java.nio.channels.CompletionHandler; +import java.net.Socket; import java.util.List; import java.util.logging.Level; /** - * * @author ugnich */ public class ConnectionRouter extends Connection implements Runnable { private String componentName; - ConnectionRouter(String componentName) { + ConnectionRouter(String componentName) throws Exception { this.componentName = componentName; } @@ -38,96 +34,80 @@ public class ConnectionRouter extends Connection implements Runnable { LOGGER.info("STREAM ROUTER START"); try { - socket = AsynchronousSocketChannel.open(); - socket.connect(new InetSocketAddress("localhost", 5347), socket, new CompletionHandler<Void, AsynchronousSocketChannel>() { - @Override - public void completed(Void result, AsynchronousSocketChannel client) { - try { - parser.setInput(new InputStreamReader(Channels.newInputStream(client))); - - parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); - writer = new OutputStreamWriter(Channels.newOutputStream(client)); - - String msg = "<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' to='s2s'>"; - writer.write(msg); - writer.flush(); - - parser.next(); // stream:stream - streamID = parser.getAttributeValue(null, "id"); - if (streamID == null || streamID.isEmpty()) { - throw new Exception("FAIL ON FIRST PACKET"); - } + socket = new Socket("localhost", 5347); + parser.setInput(new InputStreamReader(socket.getInputStream())); - msg = "<handshake>" + SHA1.encode(streamID + "secret") + "</handshake>"; - writer.write(msg); - writer.flush(); + parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); + writer = new OutputStreamWriter(socket.getOutputStream()); - parser.next(); - if (!parser.getName().equals("handshake")) { - throw new Exception("NO HANDSHAKE"); - } - XmlUtils.skip(parser); - LOGGER.info("STREAM ROUTER OPEN"); + String msg = "<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' to='s2s'>"; + writer.write(msg); + writer.flush(); - while (parser.next() != XmlPullParser.END_DOCUMENT) { - if (parser.getEventType() != XmlPullParser.START_TAG) { - continue; - } + parser.next(); // stream:stream + streamID = parser.getAttributeValue(null, "id"); + if (streamID == null || streamID.isEmpty()) { + throw new Exception("FAIL ON FIRST PACKET"); + } - String tag = parser.getName(); - String to = parser.getAttributeValue(null, "to"); - if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { - JID jid = new JID(to); - if (jid.Host != null) { - if (jid.Host.equals(componentName)) { - if (tag.equals("message")) { - Message xmsg = Message.parse(parser, XMPPComponent.childParsers); - LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); - JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); - if (jmsg != null) { - if (jid.Username != null && jid.Username.equals("recomm")) { - sendJuickRecommendation(jmsg); - } else { - if (jmsg.getRID() > 0) { - sendJuickComment(jmsg); - } else if (jmsg.getMID() > 0) { - sendJuickMessage(jmsg); - } - } - } - } - } else if (jid.Host.endsWith(XMPPComponent.HOSTNAME) && (jid.Host.equals(XMPPComponent.HOSTNAME) || jid.Host.endsWith("." + XMPPComponent.HOSTNAME))) { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER: " + xml); + msg = "<handshake>" + SHA1.encode(streamID + "secret") + "</handshake>"; + writer.write(msg); + writer.flush(); + + parser.next(); + if (!parser.getName().equals("handshake")) { + throw new Exception("NO HANDSHAKE"); + } + XmlUtils.skip(parser); + LOGGER.info("STREAM ROUTER OPEN"); + + while (parser.next() != XmlPullParser.END_DOCUMENT) { + if (parser.getEventType() != XmlPullParser.START_TAG) { + continue; + } + + String tag = parser.getName(); + String to = parser.getAttributeValue(null, "to"); + if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { + JID jid = new JID(to); + if (jid.Host != null) { + if (jid.Host.equals(componentName)) { + if (tag.equals("message")) { + Message xmsg = Message.parse(parser, XMPPComponent.childParsers); + LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); + JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + if (jid.Username != null && jid.Username.equals("recomm")) { + sendJuickRecommendation(jmsg); } else { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER (OUT): " + xml); - XMPPComponent.sendOut(jid.Host, xml); + if (jmsg.getRID() > 0) { + sendJuickComment(jmsg); + } else if (jmsg.getMID() > 0) { + sendJuickMessage(jmsg); + } } - } else { - LOGGER.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); } - } else { - LOGGER.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); } + } else if (jid.Host.endsWith(XMPPComponent.HOSTNAME) && (jid.Host.equals(XMPPComponent.HOSTNAME) || jid.Host.endsWith("." + XMPPComponent.HOSTNAME))) { + String xml = XmlUtils.parseToString(parser, true); + LOGGER.info("STREAM ROUTER: " + xml); + } else { + String xml = XmlUtils.parseToString(parser, true); + LOGGER.info("STREAM ROUTER (OUT): " + xml); + XMPPComponent.sendOut(jid.Host, xml); } - - LOGGER.warning("STREAM ROUTER FINISHED"); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "xmpp router exception", e); + } else { + LOGGER.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); } + } else { + LOGGER.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); } + } - @Override - public void failed(Throwable exc, AsynchronousSocketChannel attachment) { - LOGGER.log(Level.WARNING, "s2s component failed to connect", exc); - } - }); - Thread.currentThread().join(); + LOGGER.warning("STREAM ROUTER FINISHED"); } catch (Exception e) { - LOGGER.log(Level.SEVERE, "NIO2 error", e); + LOGGER.log(Level.SEVERE, "xmpp router exception", e); } - } @Override @@ -142,7 +122,7 @@ public class ConnectionRouter extends Connection implements Runnable { } } - public void sendJuickMessage(JuickMessage jmsg) { + public void sendJuickMessage(JuickMessage jmsg) throws Exception { List<String> jids; synchronized (XMPPComponent.sqlSync) { @@ -183,7 +163,7 @@ public class ConnectionRouter extends Connection implements Runnable { } } - public void sendJuickComment(JuickMessage jmsg) { + public void sendJuickComment(JuickMessage jmsg) throws Exception { List<String> jids; String replyQuote; @@ -231,7 +211,7 @@ public class ConnectionRouter extends Connection implements Runnable { return quote; } - public void sendJuickRecommendation(JuickMessage recomm) { + public void sendJuickRecommendation(JuickMessage recomm) throws Exception { List<String> jids; JuickMessage jmsg; synchronized (XMPPComponent.sqlSync) { diff --git a/src/main/java/com/juick/xmpp/s2s/JuickBot.java b/src/main/java/com/juick/xmpp/s2s/JuickBot.java index 182de10d0..0b9cceafe 100644 --- a/src/main/java/com/juick/xmpp/s2s/JuickBot.java +++ b/src/main/java/com/juick/xmpp/s2s/JuickBot.java @@ -55,7 +55,7 @@ public class JuickBot { + "\n" + "Read more: http://juick.com/help/"; - public static boolean incomingPresence(Presence p) { + public static boolean incomingPresence(Presence p) throws Exception { final String username = p.to.Username.toLowerCase(); final boolean toJuick = username.equals("juick"); @@ -137,7 +137,7 @@ public class JuickBot { return false; } - public static boolean incomingMessage(Message msg) { + public static boolean incomingMessage(Message msg) throws Exception { if (msg.body == null || msg.body.isEmpty()) { return true; } @@ -237,7 +237,7 @@ public class JuickBot { } private static Pattern regexPM = Pattern.compile("^\\@(\\S+)\\s+([\\s\\S]+)$"); - public static boolean incomingMessageJuick(User user_from, Message msg) { + public static boolean incomingMessageJuick(User user_from, Message msg) throws Exception { String command = msg.body.trim(); int commandlen = command.length(); @@ -274,7 +274,7 @@ public class JuickBot { return false; } - private static void commandPing(Message m) { + private static void commandPing(Message m) throws Exception { Presence p = new Presence(JuickJID, m.from); p.priority = 10; XMPPComponent.sendOut(p); @@ -284,19 +284,19 @@ public class JuickBot { XMPPComponent.sendOut(reply); } - private static void commandHelp(Message m) { + private static void commandHelp(Message m) throws Exception { Message reply = new Message(JuickJID, m.from, Message.Type.chat); reply.body = HELPTEXT; XMPPComponent.sendOut(reply); } - private static void commandLogin(Message m, User user_from) { + private static void commandLogin(Message m, User user_from) throws Exception { Message reply = new Message(JuickJID, m.from, Message.Type.chat); reply.body = "http://juick.com/login?" + UserQueries.getHashByUID(XMPPComponent.sql, user_from.getUID()); XMPPComponent.sendOut(reply); } - private static void commandPM(Message m, User user_from, String user_to, String body) { + private static void commandPM(Message m, User user_from, String user_to, String body) throws Exception { int ret = 0; int uid_to = 0; @@ -368,7 +368,7 @@ public class JuickBot { XMPPComponent.sendOut(reply); } - private static void commandBLShow(Message m, User user_from) { + private static void commandBLShow(Message m, User user_from) throws Exception { List<User> blusers; List<String> bltags; diff --git a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java b/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java index 50d2c1e4f..4b523ab01 100644 --- a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java +++ b/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java @@ -24,10 +24,12 @@ public class XMPPComponent implements ServletContextListener { private static final Logger LOGGER = Logger.getLogger(XMPPComponent.class.getName()); - public static final ExecutorService executorService = Executors.newWorkStealingPool(); + public static final ExecutorService executorService = Executors.newCachedThreadPool(); public static String HOSTNAME = null; public static String STATSFILE = null; + public static String keystore; + public static String keystorePassword; public static ConnectionRouter connRouter; static final List<ConnectionIn> inConnections = Collections.synchronizedList(new ArrayList<>()); static final List<ConnectionOut> outConnections = Collections.synchronizedList(new ArrayList<>()); @@ -97,11 +99,11 @@ public class XMPPComponent implements ServletContextListener { return null; } - public static void sendOut(Stanza s) { + public static void sendOut(Stanza s) throws Exception { sendOut(s.to.Host, s.toString()); } - public static void sendOut(String hostname, String xml) { + public static void sendOut(String hostname, String xml) throws Exception { boolean haveAnyConn = false; ConnectionOut connOut = null; @@ -144,7 +146,7 @@ public class XMPPComponent implements ServletContextListener { if (!haveAnyConn) { ConnectionOut connectionOut = new ConnectionOut(hostname); - connectionOut.parseStream(); + XMPPComponent.executorService.submit(connectionOut); } } @@ -159,6 +161,8 @@ public class XMPPComponent implements ServletContextListener { HOSTNAME = conf.getProperty("hostname"); String componentName = conf.getProperty("componentname"); STATSFILE = conf.getProperty("statsfile"); + keystore = conf.getProperty("keystore"); + keystorePassword = conf.getProperty("keystore_password"); Class.forName("com.mysql.jdbc.Driver"); sql = DriverManager.getConnection("jdbc:mysql://localhost/juick?autoReconnect=true&user=" + @@ -170,7 +174,7 @@ public class XMPPComponent implements ServletContextListener { executorService.submit(connRouter); executorService.submit(new ConnectionListener()); executorService.submit(new CleaningUp()); - } catch (IOException | ClassNotFoundException | SQLException e) { + } catch (Exception e) { LOGGER.log(Level.SEVERE, "XMPPComponent error", e); } }); |