aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/com/juick/xmpp/s2s
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/juick/xmpp/s2s')
-rw-r--r--src/main/java/com/juick/xmpp/s2s/Connection.java70
-rw-r--r--src/main/java/com/juick/xmpp/s2s/ConnectionIn.java67
-rw-r--r--src/main/java/com/juick/xmpp/s2s/ConnectionListener.java31
-rw-r--r--src/main/java/com/juick/xmpp/s2s/ConnectionOut.java186
-rw-r--r--src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java150
-rw-r--r--src/main/java/com/juick/xmpp/s2s/JuickBot.java16
-rw-r--r--src/main/java/com/juick/xmpp/s2s/XMPPComponent.java14
7 files changed, 302 insertions, 232 deletions
diff --git a/src/main/java/com/juick/xmpp/s2s/Connection.java b/src/main/java/com/juick/xmpp/s2s/Connection.java
index 1a14b2cc..c3e983b5 100644
--- a/src/main/java/com/juick/xmpp/s2s/Connection.java
+++ b/src/main/java/com/juick/xmpp/s2s/Connection.java
@@ -2,14 +2,19 @@ package com.juick.xmpp.s2s;
import org.xmlpull.mxp1.MXParser;
import org.xmlpull.v1.XmlPullParser;
+import org.xmlpull.v1.XmlPullParserException;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.channels.AsynchronousSocketChannel;
-import java.util.Date;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.*;
+import java.net.Socket;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.util.UUID;
import java.util.logging.Logger;
/**
@@ -25,12 +30,43 @@ public class Connection {
public long tsLocalData = 0;
public long bytesLocal = 0;
public long packetsLocal = 0;
- AsynchronousSocketChannel socket;
- final XmlPullParser parser = new MXParser();
+ Socket socket;
+ public static final String NS_DB = "jabber:server:dialback";
+ public static final String NS_TLS = "urn:ietf:params:xml:ns:xmpp-tls";
+ public static final String NS_STREAM = "http://etherx.jabber.org/streams";
+ XmlPullParser parser = new MXParser();
OutputStreamWriter writer;
-
- public Connection() {
+ private boolean secured = false;
+ SSLContext sc;
+ private TrustManager[] trustAllCerts = new TrustManager[]{
+ new X509TrustManager() {
+ public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) {
+ }
+
+ public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) {
+ }
+ public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ }
+ };
+
+
+ public Connection() throws Exception {
tsCreated = System.currentTimeMillis();
+ parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
+ KeyStore ks = KeyStore.getInstance("JKS");
+ try (InputStream ksIs = new FileInputStream(XMPPComponent.keystore)) {
+ ks.load(ksIs, XMPPComponent.keystorePassword.toCharArray());
+ }
+
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
+ .getDefaultAlgorithm());
+ kmf.init(ks, XMPPComponent.keystorePassword.toCharArray());
+ sc = SSLContext.getInstance("TLSv1.2");
+
+ sc.init(kmf.getKeyManagers(), trustAllCerts, new SecureRandom());
+
}
public void logParser() {
@@ -91,4 +127,20 @@ public class Connection {
return hexkey.toString();
}
+
+ public boolean isSecured() {
+ return secured;
+ }
+
+ public void setSecured(boolean secured) {
+ this.secured = secured;
+ }
+
+ public void restartParser() throws XmlPullParserException, IOException {
+ parser = new MXParser();
+ parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
+ parser.setInput(new InputStreamReader(socket.getInputStream()));
+ writer = new OutputStreamWriter(socket.getOutputStream());
+ streamID = UUID.randomUUID().toString();
+ }
}
diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java
index c215e375..950a2eaa 100644
--- a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java
+++ b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java
@@ -7,12 +7,13 @@ import com.juick.xmpp.Presence;
import com.juick.xmpp.utils.XmlUtils;
import org.xmlpull.v1.XmlPullParser;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSocket;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
-import java.nio.channels.AsynchronousSocketChannel;
-import java.nio.channels.Channels;
+import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -31,7 +32,7 @@ public class ConnectionIn extends Connection {
public long tsRemoteData = 0;
public long packetsRemote = 0;
- public ConnectionIn(AsynchronousSocketChannel socket) {
+ public ConnectionIn(Socket socket) throws Exception {
super();
this.socket = socket;
streamID = UUID.randomUUID().toString();
@@ -40,28 +41,21 @@ public class ConnectionIn extends Connection {
public void parseStream() {
LOGGER.info("STREAM FROM ? " + streamID + " START");
try {
- parser.setInput(new InputStreamReader(Channels.newInputStream(socket)));
- writer = new OutputStreamWriter(Channels.newOutputStream(socket));
+ parser.setInput(new InputStreamReader(socket.getInputStream()));
+ writer = new OutputStreamWriter(socket.getOutputStream());
parser.next(); // stream:stream
updateTsRemoteData();
- if (!parser.getName().equals("stream:stream")
- || !parser.getAttributeValue(null, "xmlns").equals("jabber:server")
- || !parser.getAttributeValue(null, "xmlns:stream").equals("http://etherx.jabber.org/streams")
- || !parser.getAttributeValue(null, "xmlns:db").equals("jabber:server:dialback")) {
+ if (!parser.getName().equals("stream")
+ || !parser.getAttributeValue(null, "stream").equals(NS_STREAM)
+ || !parser.getAttributeValue(null, "db").equals(NS_DB)) {
// || !parser.getAttributeValue(null, "version").equals("1.0")
// || !parser.getAttributeValue(null, "to").equals(Main.HOSTNAME)) {
throw new Exception("STREAM FROM ? " + streamID + " INVALID FIRST PACKET");
}
boolean xmppversionnew = parser.getAttributeValue(null, "version") != null;
- String openStream = "<?xml version='1.0'?><stream:stream xmlns='jabber:server' " +
- "xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" +
- XMPPComponent.HOSTNAME + "' id='" + streamID + "' version='1.0'>";
- if (xmppversionnew) {
- openStream += "<stream:features></stream:features>";
- }
- sendStanza(openStream);
+ sendOpenStream(xmppversionnew);
while (parser.next() != XmlPullParser.END_DOCUMENT) {
updateTsRemoteData();
@@ -73,14 +67,14 @@ public class ConnectionIn extends Connection {
packetsRemote++;
String tag = parser.getName();
- if (tag.equals("db:result")) {
+ if (tag.equals("result") && parser.getNamespace().equals(NS_DB)) {
String dfrom = parser.getAttributeValue(null, "from");
String to = parser.getAttributeValue(null, "to");
LOGGER.info("STREAM FROM " + dfrom + " TO " + to + " " + streamID + " ASKING FOR DIALBACK");
if (dfrom.endsWith(XMPPComponent.HOSTNAME) && (dfrom.equals(XMPPComponent.HOSTNAME) || dfrom.endsWith("." + XMPPComponent.HOSTNAME))) {
break;
}
- if (dfrom != null && to != null && to.equals(XMPPComponent.HOSTNAME)) {
+ if (to != null && to.equals(XMPPComponent.HOSTNAME)) {
String dbKey = XmlUtils.getTagText(parser);
updateTsRemoteData();
@@ -89,12 +83,12 @@ public class ConnectionIn extends Connection {
c.sendDialbackVerify(streamID, dbKey);
} else {
c = new ConnectionOut(dfrom, streamID, dbKey);
- c.parseStream();
+ XMPPComponent.executorService.submit(c);
}
} else {
throw new Exception("STREAM FROM " + dfrom + " " + streamID + " DIALBACK RESULT FAIL");
}
- } else if (tag.equals("db:verify")) {
+ } else if (tag.equals("verify") && parser.getNamespace().equals(NS_DB)) {
String vfrom = parser.getAttributeValue(null, "from");
String vto = parser.getAttributeValue(null, "to");
String vid = parser.getAttributeValue(null, "id");
@@ -112,6 +106,25 @@ public class ConnectionIn extends Connection {
sendStanza("<db:verify from='" + vto + "' to='" + vfrom + "' id='" + vid + "' type='invalid'/>");
LOGGER.warning("STREAM FROM " + vfrom + " " + streamID + " DIALBACK VERIFY INVALID");
}
+ } else if (!isSecured() && tag.equals("starttls")) {
+ LOGGER.info("STREAM " + streamID + " SECURING");
+ sendStanza("<proceed xmlns=\"" + NS_TLS + "\" />");
+ try {
+ socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(),
+ socket.getPort(), true);
+ ((SSLSocket) socket).setUseClientMode(false);
+ ((SSLSocket) socket).startHandshake();
+ setSecured(true);
+ LOGGER.info("STREAM " + streamID + " SECURED");
+ restartParser();
+ } catch (SSLException sex) {
+ LOGGER.warning("STREAM " + streamID + " SSL ERROR");
+ sendStanza("<failed xmlns\"" + NS_TLS + "\" />");
+ XMPPComponent.removeConnectionIn(this);
+ closeConnection();
+ }
+ } else if (isSecured() && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) {
+ sendOpenStream(true);
} else if (tag.equals("presence") && checkFromTo(parser)) {
Presence p = Presence.parse(parser, null);
if (p != null && (p.type == null || !p.type.equals(Presence.Type.error))) {
@@ -156,6 +169,20 @@ public class ConnectionIn extends Connection {
tsRemoteData = System.currentTimeMillis();
}
+ void sendOpenStream(boolean xmppversionnew) throws IOException {
+ String openStream = "<?xml version='1.0'?><stream:stream xmlns='jabber:server' " +
+ "xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" +
+ XMPPComponent.HOSTNAME + "' id='" + streamID + "' version='1.0'>";
+ if (xmppversionnew) {
+ openStream += "<stream:features>";
+ if (!isSecured()) {
+ openStream += "<starttls xmlns=\"" + NS_TLS + "\"><optional/></starttls>";
+ }
+ openStream += "</stream:features>";
+ }
+ sendStanza(openStream);
+ }
+
public void sendDialbackResult(String sfrom, String type) {
try {
sendStanza("<db:result from='" + XMPPComponent.HOSTNAME + "' to='" + sfrom + "' type='" + type + "'/>");
diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java
index 02a2be39..094fbd4f 100644
--- a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java
+++ b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java
@@ -1,11 +1,7 @@
package com.juick.xmpp.s2s;
-import java.net.InetSocketAddress;
-import java.nio.channels.AsynchronousServerSocketChannel;
-import java.nio.channels.AsynchronousSocketChannel;
-import java.nio.channels.CompletionHandler;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.net.ServerSocket;
+import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -20,25 +16,18 @@ public class ConnectionListener implements Runnable {
@Override
public void run() {
try {
- final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open();
- listener.bind(new InetSocketAddress(5269));
+ final ServerSocket listener = new ServerSocket(5269);
logger.info("s2s listener ready");
- listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
- @Override
- public void completed(AsynchronousSocketChannel result, Object attachment) {
- listener.accept(XMPPComponent.executorService, this);
- ConnectionIn client = new ConnectionIn(result);
+ while (true) {
+ try {
+ Socket socket = listener.accept();
+ ConnectionIn client = new ConnectionIn(socket);
XMPPComponent.addConnectionIn(client);
client.parseStream();
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, "s2s error", e);
}
-
- @Override
- public void failed(Throwable exc, Object attachment) {
-
- }
- });
- Thread.currentThread().join();
- listener.close();
+ }
} catch (Exception e) {
logger.log(Level.SEVERE, "s2s listener exception", e);
}
diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java
index 59fdfb60..4ebeffb6 100644
--- a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java
+++ b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java
@@ -1,140 +1,158 @@
package com.juick.xmpp.s2s;
+import com.juick.xmpp.extensions.StreamFeatures;
import com.juick.xmpp.utils.XmlUtils;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.Channels;
import java.nio.channels.CompletionHandler;
+import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLSocket;
+
/**
- *
* @author ugnich
*/
-public class ConnectionOut extends Connection {
+public class ConnectionOut extends Connection implements Runnable {
public boolean streamReady = false;
public String to;
String checkSID = null;
String dbKey = null;
- public ConnectionOut(String hostname) {
+ public ConnectionOut(String hostname) throws Exception {
super();
to = hostname;
}
- public ConnectionOut(String hostname, String checkSID, String dbKey) {
+ public ConnectionOut(String hostname, String checkSID, String dbKey) throws Exception {
super();
to = hostname;
this.checkSID = checkSID;
this.dbKey = dbKey;
}
- public void parseStream() {
- LOGGER.info("STREAM TO " + to + " START");
+ void sendOpenStream() throws IOException {
+ sendStanza("<?xml version='1.0'?><stream:stream xmlns='jabber:server' " +
+ "xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" +
+ XMPPComponent.HOSTNAME + "' to='" + to + "' version='1.0'>");
+ }
+
+ void processDialback() throws Exception {
+ if (checkSID != null) {
+ sendDialbackVerify(checkSID, dbKey);
+ }
+ sendStanza("<db:result from='" + XMPPComponent.HOSTNAME + "' to='" + to + "'>" +
+ generateDialbackKey(to, XMPPComponent.HOSTNAME, streamID) + "</db:result>");
+ }
+ @Override
+ public void run() {
+ LOGGER.info("STREAM TO " + to + " START");
try {
HostnamePort addr = DNSQueries.getServerAddress(to);
- socket = AsynchronousSocketChannel.open();
- socket.connect(new InetSocketAddress(addr.hostname, addr.port), socket, new CompletionHandler<Void, AsynchronousSocketChannel>() {
- @Override
- public void completed(Void result, AsynchronousSocketChannel attachment) {
- try {
- parser.setInput(new InputStreamReader(Channels.newInputStream(socket)));
+ socket = new Socket(InetAddress.getByName(addr.hostname), addr.port);
+ parser.setInput(new InputStreamReader(socket.getInputStream()));
- writer = new OutputStreamWriter(Channels.newOutputStream(socket));
+ writer = new OutputStreamWriter(socket.getOutputStream());
- sendStanza("<?xml version='1.0'?><stream:stream xmlns='jabber:server' " +
- "xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" +
- XMPPComponent.HOSTNAME + "' to='" + to + "' version='1.0'>");
+ sendOpenStream();
- parser.next(); // stream:stream
- streamID = parser.getAttributeValue(null, "id");
- if (streamID == null || streamID.isEmpty()) {
- throw new Exception("STREAM TO " + to + " INVALID FIRST PACKET");
- }
+ parser.next(); // stream:stream
+ streamID = parser.getAttributeValue(null, "id");
+ if (streamID == null || streamID.isEmpty()) {
+ throw new Exception("STREAM TO " + to + " INVALID FIRST PACKET");
+ }
- LOGGER.info("STREAM TO " + to + " " + streamID + " OPEN");
- XMPPComponent.addConnectionOut(ConnectionOut.this);
+ LOGGER.info("STREAM TO " + to + " " + streamID + " OPEN");
+ XMPPComponent.addConnectionOut(ConnectionOut.this);
- if (checkSID != null) {
- sendDialbackVerify(checkSID, dbKey);
+ while (parser.next() != XmlPullParser.END_DOCUMENT) {
+ if (parser.getEventType() != XmlPullParser.START_TAG) {
+ continue;
+ }
+ logParser();
+
+ String tag = parser.getName();
+ if (tag.equals("result") && parser.getNamespace().equals(NS_DB)) {
+ String type = parser.getAttributeValue(null, "type");
+ if (type != null && type.equals("valid")) {
+ streamReady = true;
+ LOGGER.info("STREAM TO " + to + " " + streamID + " READY");
+
+ String cache = XMPPComponent.getFromCache(to);
+ if (cache != null) {
+ LOGGER.info("STREAM TO " + to + " " + streamID + " SENDING CACHE");
+ sendStanza(cache);
}
- sendStanza("<db:result from='" + XMPPComponent.HOSTNAME + "' to='" + to + "'>" + generateDialbackKey(to, XMPPComponent.HOSTNAME, streamID) + "</db:result>");
-
- while (parser.next() != XmlPullParser.END_DOCUMENT) {
- if (parser.getEventType() != XmlPullParser.START_TAG) {
- continue;
- }
- logParser();
-
- String tag = parser.getName();
- if (tag.equals("db:result")) {
- String type = parser.getAttributeValue(null, "type");
- if (type != null && type.equals("valid")) {
- streamReady = true;
- LOGGER.info("STREAM TO " + to + " " + streamID + " READY");
-
- String cache = XMPPComponent.getFromCache(to);
- if (cache != null) {
- LOGGER.info("STREAM TO " + to + " " + streamID + " SENDING CACHE");
- sendStanza(cache);
- }
-
- } else {
- LOGGER.info("STREAM TO " + to + " " + streamID + " DIALBACK FAIL");
- }
- XmlUtils.skip(parser);
- } else if (tag.equals("db:verify")) {
- String from = parser.getAttributeValue(null, "from");
- String type = parser.getAttributeValue(null, "type");
- String sid = parser.getAttributeValue(null, "id");
- if (from != null && from.equals(to) && sid != null && !sid.isEmpty() && type != null) {
- ConnectionIn c = XMPPComponent.getConnectionIn(sid);
- if (c != null) {
- c.sendDialbackResult(from, type);
- }
- }
- XmlUtils.skip(parser);
- } else {
- LOGGER.info("STREAM TO " + to + " " + streamID + ": " + XmlUtils.parseToString(parser, true));
- }
+ } else {
+ LOGGER.info("STREAM TO " + to + " " + streamID + " DIALBACK FAIL");
+ }
+ XmlUtils.skip(parser);
+ } else if (tag.equals("verify") && parser.getNamespace().equals(NS_DB)) {
+ String from = parser.getAttributeValue(null, "from");
+ String type = parser.getAttributeValue(null, "type");
+ String sid = parser.getAttributeValue(null, "id");
+ if (from != null && from.equals(to) && sid != null && !sid.isEmpty() && type != null) {
+ ConnectionIn c = XMPPComponent.getConnectionIn(sid);
+ if (c != null) {
+ c.sendDialbackResult(from, type);
}
-
- LOGGER.warning("STREAM TO " + to + " " + streamID + " FINISHED");
- XMPPComponent.removeConnectionOut(ConnectionOut.this);
- closeConnection();
- } catch (EOFException eofex) {
- LOGGER.info(String.format("STREAM %s %s CLOSED (dirty)", to, streamID));
- XMPPComponent.removeConnectionOut(ConnectionOut.this);
- closeConnection();
- } catch (Exception e) {
- LOGGER.log(Level.SEVERE, "s2s out exception", e);
- XMPPComponent.removeConnectionOut(ConnectionOut.this);
+ }
+ XmlUtils.skip(parser);
+ } else if (tag.equals("features") && parser.getNamespace().equals(NS_STREAM)) {
+ StreamFeatures features = StreamFeatures.parse(parser);
+ if (!isSecured() && features.STARTTLS >= 0) {
+ System.out.println("STREAM TO " + to + " " + streamID + " SECURING");
+ sendStanza("<starttls xmlns=\"" + NS_TLS + "\" />");
+ } else {
+ processDialback();
+ }
+ } else if (tag.equals("proceed") && parser.getNamespace().equals(NS_TLS)) {
+ try {
+ socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(),
+ socket.getPort(), true);
+ ((SSLSocket) socket).startHandshake();
+ setSecured(true);
+ System.out.println("STREAM " + streamID + " SECURED");
+ restartParser();
+ sendOpenStream();
+ } catch (SSLException sex) {
+ System.err.println("STREAM " + streamID + " SSL ERROR");
+ sendStanza("<failed xmlns\"" + NS_TLS + "\" />");
+ XMPPComponent.removeConnectionOut(this);
closeConnection();
}
+ } else if (isSecured() && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) {
+ streamID = parser.getAttributeValue(null, "id");
+ } else {
+ LOGGER.info("STREAM TO " + to + " " + streamID + ": " + XmlUtils.parseToString(parser, true));
}
+ }
- @Override
- public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
- LOGGER.log(Level.WARNING, "s2s out failed", exc);
- XMPPComponent.removeConnectionOut(ConnectionOut.this);
- closeConnection();
- }
- });
+ LOGGER.warning("STREAM TO " + to + " " + streamID + " FINISHED");
+ XMPPComponent.removeConnectionOut(ConnectionOut.this);
+ closeConnection();
+ } catch (EOFException eofex) {
+ LOGGER.info(String.format("STREAM %s %s CLOSED (dirty)", to, streamID));
+ XMPPComponent.removeConnectionOut(ConnectionOut.this);
+ closeConnection();
} catch (Exception e) {
- LOGGER.warning(e.toString());
- XMPPComponent.removeConnectionOut(this);
+ LOGGER.log(Level.SEVERE, "s2s out exception", e);
+ XMPPComponent.removeConnectionOut(ConnectionOut.this);
closeConnection();
}
}
diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java
index 4634ca99..48d2efd5 100644
--- a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java
+++ b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java
@@ -14,22 +14,18 @@ import org.xmlpull.v1.XmlPullParser;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
-import java.net.InetSocketAddress;
-import java.nio.channels.AsynchronousSocketChannel;
-import java.nio.channels.Channels;
-import java.nio.channels.CompletionHandler;
+import java.net.Socket;
import java.util.List;
import java.util.logging.Level;
/**
- *
* @author ugnich
*/
public class ConnectionRouter extends Connection implements Runnable {
private String componentName;
- ConnectionRouter(String componentName) {
+ ConnectionRouter(String componentName) throws Exception {
this.componentName = componentName;
}
@@ -38,96 +34,80 @@ public class ConnectionRouter extends Connection implements Runnable {
LOGGER.info("STREAM ROUTER START");
try {
- socket = AsynchronousSocketChannel.open();
- socket.connect(new InetSocketAddress("localhost", 5347), socket, new CompletionHandler<Void, AsynchronousSocketChannel>() {
- @Override
- public void completed(Void result, AsynchronousSocketChannel client) {
- try {
- parser.setInput(new InputStreamReader(Channels.newInputStream(client)));
-
- parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
- writer = new OutputStreamWriter(Channels.newOutputStream(client));
-
- String msg = "<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' to='s2s'>";
- writer.write(msg);
- writer.flush();
-
- parser.next(); // stream:stream
- streamID = parser.getAttributeValue(null, "id");
- if (streamID == null || streamID.isEmpty()) {
- throw new Exception("FAIL ON FIRST PACKET");
- }
+ socket = new Socket("localhost", 5347);
+ parser.setInput(new InputStreamReader(socket.getInputStream()));
- msg = "<handshake>" + SHA1.encode(streamID + "secret") + "</handshake>";
- writer.write(msg);
- writer.flush();
+ parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
+ writer = new OutputStreamWriter(socket.getOutputStream());
- parser.next();
- if (!parser.getName().equals("handshake")) {
- throw new Exception("NO HANDSHAKE");
- }
- XmlUtils.skip(parser);
- LOGGER.info("STREAM ROUTER OPEN");
+ String msg = "<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' to='s2s'>";
+ writer.write(msg);
+ writer.flush();
- while (parser.next() != XmlPullParser.END_DOCUMENT) {
- if (parser.getEventType() != XmlPullParser.START_TAG) {
- continue;
- }
+ parser.next(); // stream:stream
+ streamID = parser.getAttributeValue(null, "id");
+ if (streamID == null || streamID.isEmpty()) {
+ throw new Exception("FAIL ON FIRST PACKET");
+ }
- String tag = parser.getName();
- String to = parser.getAttributeValue(null, "to");
- if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) {
- JID jid = new JID(to);
- if (jid.Host != null) {
- if (jid.Host.equals(componentName)) {
- if (tag.equals("message")) {
- Message xmsg = Message.parse(parser, XMPPComponent.childParsers);
- LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString());
- JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS);
- if (jmsg != null) {
- if (jid.Username != null && jid.Username.equals("recomm")) {
- sendJuickRecommendation(jmsg);
- } else {
- if (jmsg.getRID() > 0) {
- sendJuickComment(jmsg);
- } else if (jmsg.getMID() > 0) {
- sendJuickMessage(jmsg);
- }
- }
- }
- }
- } else if (jid.Host.endsWith(XMPPComponent.HOSTNAME) && (jid.Host.equals(XMPPComponent.HOSTNAME) || jid.Host.endsWith("." + XMPPComponent.HOSTNAME))) {
- String xml = XmlUtils.parseToString(parser, true);
- LOGGER.info("STREAM ROUTER: " + xml);
+ msg = "<handshake>" + SHA1.encode(streamID + "secret") + "</handshake>";
+ writer.write(msg);
+ writer.flush();
+
+ parser.next();
+ if (!parser.getName().equals("handshake")) {
+ throw new Exception("NO HANDSHAKE");
+ }
+ XmlUtils.skip(parser);
+ LOGGER.info("STREAM ROUTER OPEN");
+
+ while (parser.next() != XmlPullParser.END_DOCUMENT) {
+ if (parser.getEventType() != XmlPullParser.START_TAG) {
+ continue;
+ }
+
+ String tag = parser.getName();
+ String to = parser.getAttributeValue(null, "to");
+ if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) {
+ JID jid = new JID(to);
+ if (jid.Host != null) {
+ if (jid.Host.equals(componentName)) {
+ if (tag.equals("message")) {
+ Message xmsg = Message.parse(parser, XMPPComponent.childParsers);
+ LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString());
+ JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS);
+ if (jmsg != null) {
+ if (jid.Username != null && jid.Username.equals("recomm")) {
+ sendJuickRecommendation(jmsg);
} else {
- String xml = XmlUtils.parseToString(parser, true);
- LOGGER.info("STREAM ROUTER (OUT): " + xml);
- XMPPComponent.sendOut(jid.Host, xml);
+ if (jmsg.getRID() > 0) {
+ sendJuickComment(jmsg);
+ } else if (jmsg.getMID() > 0) {
+ sendJuickMessage(jmsg);
+ }
}
- } else {
- LOGGER.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true));
}
- } else {
- LOGGER.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true));
}
+ } else if (jid.Host.endsWith(XMPPComponent.HOSTNAME) && (jid.Host.equals(XMPPComponent.HOSTNAME) || jid.Host.endsWith("." + XMPPComponent.HOSTNAME))) {
+ String xml = XmlUtils.parseToString(parser, true);
+ LOGGER.info("STREAM ROUTER: " + xml);
+ } else {
+ String xml = XmlUtils.parseToString(parser, true);
+ LOGGER.info("STREAM ROUTER (OUT): " + xml);
+ XMPPComponent.sendOut(jid.Host, xml);
}
-
- LOGGER.warning("STREAM ROUTER FINISHED");
- } catch (Exception e) {
- LOGGER.log(Level.SEVERE, "xmpp router exception", e);
+ } else {
+ LOGGER.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true));
}
+ } else {
+ LOGGER.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true));
}
+ }
- @Override
- public void failed(Throwable exc, AsynchronousSocketChannel attachment) {
- LOGGER.log(Level.WARNING, "s2s component failed to connect", exc);
- }
- });
- Thread.currentThread().join();
+ LOGGER.warning("STREAM ROUTER FINISHED");
} catch (Exception e) {
- LOGGER.log(Level.SEVERE, "NIO2 error", e);
+ LOGGER.log(Level.SEVERE, "xmpp router exception", e);
}
-
}
@Override
@@ -142,7 +122,7 @@ public class ConnectionRouter extends Connection implements Runnable {
}
}
- public void sendJuickMessage(JuickMessage jmsg) {
+ public void sendJuickMessage(JuickMessage jmsg) throws Exception {
List<String> jids;
synchronized (XMPPComponent.sqlSync) {
@@ -183,7 +163,7 @@ public class ConnectionRouter extends Connection implements Runnable {
}
}
- public void sendJuickComment(JuickMessage jmsg) {
+ public void sendJuickComment(JuickMessage jmsg) throws Exception {
List<String> jids;
String replyQuote;
@@ -231,7 +211,7 @@ public class ConnectionRouter extends Connection implements Runnable {
return quote;
}
- public void sendJuickRecommendation(JuickMessage recomm) {
+ public void sendJuickRecommendation(JuickMessage recomm) throws Exception {
List<String> jids;
JuickMessage jmsg;
synchronized (XMPPComponent.sqlSync) {
diff --git a/src/main/java/com/juick/xmpp/s2s/JuickBot.java b/src/main/java/com/juick/xmpp/s2s/JuickBot.java
index 182de10d..0b9cceaf 100644
--- a/src/main/java/com/juick/xmpp/s2s/JuickBot.java
+++ b/src/main/java/com/juick/xmpp/s2s/JuickBot.java
@@ -55,7 +55,7 @@ public class JuickBot {
+ "\n"
+ "Read more: http://juick.com/help/";
- public static boolean incomingPresence(Presence p) {
+ public static boolean incomingPresence(Presence p) throws Exception {
final String username = p.to.Username.toLowerCase();
final boolean toJuick = username.equals("juick");
@@ -137,7 +137,7 @@ public class JuickBot {
return false;
}
- public static boolean incomingMessage(Message msg) {
+ public static boolean incomingMessage(Message msg) throws Exception {
if (msg.body == null || msg.body.isEmpty()) {
return true;
}
@@ -237,7 +237,7 @@ public class JuickBot {
}
private static Pattern regexPM = Pattern.compile("^\\@(\\S+)\\s+([\\s\\S]+)$");
- public static boolean incomingMessageJuick(User user_from, Message msg) {
+ public static boolean incomingMessageJuick(User user_from, Message msg) throws Exception {
String command = msg.body.trim();
int commandlen = command.length();
@@ -274,7 +274,7 @@ public class JuickBot {
return false;
}
- private static void commandPing(Message m) {
+ private static void commandPing(Message m) throws Exception {
Presence p = new Presence(JuickJID, m.from);
p.priority = 10;
XMPPComponent.sendOut(p);
@@ -284,19 +284,19 @@ public class JuickBot {
XMPPComponent.sendOut(reply);
}
- private static void commandHelp(Message m) {
+ private static void commandHelp(Message m) throws Exception {
Message reply = new Message(JuickJID, m.from, Message.Type.chat);
reply.body = HELPTEXT;
XMPPComponent.sendOut(reply);
}
- private static void commandLogin(Message m, User user_from) {
+ private static void commandLogin(Message m, User user_from) throws Exception {
Message reply = new Message(JuickJID, m.from, Message.Type.chat);
reply.body = "http://juick.com/login?" + UserQueries.getHashByUID(XMPPComponent.sql, user_from.getUID());
XMPPComponent.sendOut(reply);
}
- private static void commandPM(Message m, User user_from, String user_to, String body) {
+ private static void commandPM(Message m, User user_from, String user_to, String body) throws Exception {
int ret = 0;
int uid_to = 0;
@@ -368,7 +368,7 @@ public class JuickBot {
XMPPComponent.sendOut(reply);
}
- private static void commandBLShow(Message m, User user_from) {
+ private static void commandBLShow(Message m, User user_from) throws Exception {
List<User> blusers;
List<String> bltags;
diff --git a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java b/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java
index 50d2c1e4..4b523ab0 100644
--- a/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java
+++ b/src/main/java/com/juick/xmpp/s2s/XMPPComponent.java
@@ -24,10 +24,12 @@ public class XMPPComponent implements ServletContextListener {
private static final Logger LOGGER = Logger.getLogger(XMPPComponent.class.getName());
- public static final ExecutorService executorService = Executors.newWorkStealingPool();
+ public static final ExecutorService executorService = Executors.newCachedThreadPool();
public static String HOSTNAME = null;
public static String STATSFILE = null;
+ public static String keystore;
+ public static String keystorePassword;
public static ConnectionRouter connRouter;
static final List<ConnectionIn> inConnections = Collections.synchronizedList(new ArrayList<>());
static final List<ConnectionOut> outConnections = Collections.synchronizedList(new ArrayList<>());
@@ -97,11 +99,11 @@ public class XMPPComponent implements ServletContextListener {
return null;
}
- public static void sendOut(Stanza s) {
+ public static void sendOut(Stanza s) throws Exception {
sendOut(s.to.Host, s.toString());
}
- public static void sendOut(String hostname, String xml) {
+ public static void sendOut(String hostname, String xml) throws Exception {
boolean haveAnyConn = false;
ConnectionOut connOut = null;
@@ -144,7 +146,7 @@ public class XMPPComponent implements ServletContextListener {
if (!haveAnyConn) {
ConnectionOut connectionOut = new ConnectionOut(hostname);
- connectionOut.parseStream();
+ XMPPComponent.executorService.submit(connectionOut);
}
}
@@ -159,6 +161,8 @@ public class XMPPComponent implements ServletContextListener {
HOSTNAME = conf.getProperty("hostname");
String componentName = conf.getProperty("componentname");
STATSFILE = conf.getProperty("statsfile");
+ keystore = conf.getProperty("keystore");
+ keystorePassword = conf.getProperty("keystore_password");
Class.forName("com.mysql.jdbc.Driver");
sql = DriverManager.getConnection("jdbc:mysql://localhost/juick?autoReconnect=true&user=" +
@@ -170,7 +174,7 @@ public class XMPPComponent implements ServletContextListener {
executorService.submit(connRouter);
executorService.submit(new ConnectionListener());
executorService.submit(new CleaningUp());
- } catch (IOException | ClassNotFoundException | SQLException e) {
+ } catch (Exception e) {
LOGGER.log(Level.SEVERE, "XMPPComponent error", e);
}
});