aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2016-01-30 16:13:30 +0300
committerGravatar Vitaly Takmazov2016-01-30 16:13:30 +0300
commit474323453f1b881273f618555845e07ace830bd2 (patch)
tree11673a80973b3fb493912edc366ade720ecf40c4
parent58fb4a6394cb19ee04e65d0e921cdf0ae6fdb6e3 (diff)
s2s: NIO.2 sockets
-rw-r--r--src/main/java/com/juick/xmpp/s2s/Connection.java13
-rw-r--r--src/main/java/com/juick/xmpp/s2s/ConnectionIn.java15
-rw-r--r--src/main/java/com/juick/xmpp/s2s/ConnectionListener.java45
-rw-r--r--src/main/java/com/juick/xmpp/s2s/ConnectionOut.java11
-rw-r--r--src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java16
5 files changed, 62 insertions, 38 deletions
diff --git a/src/main/java/com/juick/xmpp/s2s/Connection.java b/src/main/java/com/juick/xmpp/s2s/Connection.java
index 6d32abbe..1a14b2cc 100644
--- a/src/main/java/com/juick/xmpp/s2s/Connection.java
+++ b/src/main/java/com/juick/xmpp/s2s/Connection.java
@@ -1,15 +1,16 @@
package com.juick.xmpp.s2s;
+import org.xmlpull.mxp1.MXParser;
+import org.xmlpull.v1.XmlPullParser;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
-import java.net.Socket;
+import java.nio.channels.AsynchronousSocketChannel;
import java.util.Date;
import java.util.logging.Logger;
-import javax.crypto.Mac;
-import javax.crypto.spec.SecretKeySpec;
-import org.xmlpull.mxp1.MXParser;
-import org.xmlpull.v1.XmlPullParser;
/**
*
@@ -24,7 +25,7 @@ public class Connection {
public long tsLocalData = 0;
public long bytesLocal = 0;
public long packetsLocal = 0;
- Socket socket;
+ AsynchronousSocketChannel socket;
final XmlPullParser parser = new MXParser();
OutputStreamWriter writer;
diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java
index fad97c16..7b9483f7 100644
--- a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java
+++ b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java
@@ -5,18 +5,19 @@ import com.juick.xmpp.JID;
import com.juick.xmpp.Message;
import com.juick.xmpp.Presence;
import com.juick.xmpp.utils.XmlUtils;
+import org.xmlpull.v1.XmlPullParser;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
-import java.net.Socket;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.Channels;
import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
import java.util.logging.Logger;
-import org.xmlpull.v1.XmlPullParser;
-
/**
*
* @author ugnich
@@ -25,11 +26,11 @@ public class ConnectionIn extends Connection implements Runnable {
private static final Logger LOGGER = Logger.getLogger(ConnectionIn.class.getName());
- final public ArrayList<String> from = new ArrayList<String>();
+ final public List<String> from = new ArrayList<>();
public long tsRemoteData = 0;
public long packetsRemote = 0;
- public ConnectionIn(Socket socket) {
+ public ConnectionIn(AsynchronousSocketChannel socket) {
super();
this.socket = socket;
streamID = UUID.randomUUID().toString();
@@ -39,8 +40,8 @@ public class ConnectionIn extends Connection implements Runnable {
public void run() {
LOGGER.info("STREAM FROM ? " + streamID + " START");
try {
- parser.setInput(new InputStreamReader(socket.getInputStream()));
- writer = new OutputStreamWriter(socket.getOutputStream());
+ parser.setInput(new InputStreamReader(Channels.newInputStream(socket)));
+ writer = new OutputStreamWriter(Channels.newOutputStream(socket));
parser.next(); // stream:stream
updateTsRemoteData();
diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java
index f9b788c4..982f2efc 100644
--- a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java
+++ b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java
@@ -1,34 +1,49 @@
package com.juick.xmpp.s2s;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
+import java.net.InetSocketAddress;
+import java.nio.channels.AsynchronousServerSocketChannel;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
- * @author ugnich
+ * @author vt
*/
public class ConnectionListener implements Runnable {
- private static final Logger LOGGER = Logger.getLogger(ConnectionListener.class.getName());
+ private static final Logger logger = Logger.getLogger(ConnectionListener.class.getName());
- ExecutorService executorService = Executors.newCachedThreadPool();
+ ExecutorService connectionPool;
@Override
public void run() {
try {
- ServerSocket listener = new ServerSocket(5269);
- while (true) {
- Socket sock = listener.accept();
- ConnectionIn conn = new ConnectionIn(sock);
- XMPPComponent.addConnectionIn(conn);
- executorService.submit(conn);
- }
- } catch (IOException e) {
- LOGGER.info("IOException on socket listen: " + e.toString());
+ connectionPool = Executors.newCachedThreadPool();
+ final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open();
+ listener.bind(new InetSocketAddress(5269));
+ logger.info("s2s listener ready");
+ listener.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
+ @Override
+ public void completed(AsynchronousSocketChannel result, Object attachment) {
+ listener.accept(connectionPool, this);
+ ConnectionIn client = new ConnectionIn(result);
+ XMPPComponent.addConnectionIn(client);
+ connectionPool.submit(client);
+ }
+
+ @Override
+ public void failed(Throwable exc, Object attachment) {
+
+ }
+ });
+ Thread.currentThread().join();
+ listener.close();
+ } catch (Exception e) {
+ logger.log(Level.SEVERE, "s2s listener exception", e);
}
}
}
diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java
index 0205349a..8e543843 100644
--- a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java
+++ b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java
@@ -4,7 +4,11 @@ import com.juick.xmpp.utils.XmlUtils;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
+import java.net.InetSocketAddress;
import java.net.Socket;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.Channels;
+
import org.xmlpull.v1.XmlPullParser;
/**
@@ -36,10 +40,11 @@ public class ConnectionOut extends Connection implements Runnable {
try {
HostnamePort addr = DNSQueries.getServerAddress(to);
- socket = new Socket(addr.hostname, addr.port);
+ socket = AsynchronousSocketChannel.open();
+ socket.connect(new InetSocketAddress(addr.hostname, addr.port));
- parser.setInput(new InputStreamReader(socket.getInputStream()));
- writer = new OutputStreamWriter(socket.getOutputStream());
+ parser.setInput(new InputStreamReader(Channels.newInputStream(socket)));
+ writer = new OutputStreamWriter(Channels.newOutputStream(socket));
sendStanza("<?xml version='1.0'?><stream:stream xmlns='jabber:server' " +
"xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" +
diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java
index 4a1f6b16..ccd6a335 100644
--- a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java
+++ b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java
@@ -9,15 +9,16 @@ import com.juick.xmpp.extensions.Nickname;
import com.juick.xmpp.extensions.XOOB;
import com.juick.xmpp.utils.SHA1;
import com.juick.xmpp.utils.XmlUtils;
+import org.xmlpull.v1.XmlPullParser;
+
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
-import java.net.Socket;
-import java.util.ArrayList;
+import java.net.InetSocketAddress;
+import java.nio.channels.AsynchronousSocketChannel;
+import java.nio.channels.Channels;
import java.util.List;
-import org.xmlpull.v1.XmlPullParser;
-
/**
*
* @author ugnich
@@ -29,10 +30,11 @@ public class ConnectionRouter extends Connection implements Runnable {
LOGGER.info("STREAM ROUTER START");
try {
- socket = new Socket("localhost", 5347);
- parser.setInput(new InputStreamReader(socket.getInputStream()));
+ socket = AsynchronousSocketChannel.open();
+ socket.connect(new InetSocketAddress(5347));
+ parser.setInput(new InputStreamReader(Channels.newInputStream(socket)));
parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
- writer = new OutputStreamWriter(socket.getOutputStream());
+ writer = new OutputStreamWriter(Channels.newOutputStream(socket));
String msg = "<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' to='s2s'>";
writer.write(msg);