From 474323453f1b881273f618555845e07ace830bd2 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Sat, 30 Jan 2016 16:13:30 +0300 Subject: s2s: NIO.2 sockets --- src/main/java/com/juick/xmpp/s2s/Connection.java | 13 ++++--- src/main/java/com/juick/xmpp/s2s/ConnectionIn.java | 15 ++++---- .../com/juick/xmpp/s2s/ConnectionListener.java | 45 ++++++++++++++-------- .../java/com/juick/xmpp/s2s/ConnectionOut.java | 11 ++++-- .../java/com/juick/xmpp/s2s/ConnectionRouter.java | 16 ++++---- 5 files changed, 62 insertions(+), 38 deletions(-) (limited to 'src/main/java') diff --git a/src/main/java/com/juick/xmpp/s2s/Connection.java b/src/main/java/com/juick/xmpp/s2s/Connection.java index 6d32abbe..1a14b2cc 100644 --- a/src/main/java/com/juick/xmpp/s2s/Connection.java +++ b/src/main/java/com/juick/xmpp/s2s/Connection.java @@ -1,15 +1,16 @@ package com.juick.xmpp.s2s; +import org.xmlpull.mxp1.MXParser; +import org.xmlpull.v1.XmlPullParser; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; import java.io.FileWriter; import java.io.IOException; import java.io.OutputStreamWriter; -import java.net.Socket; +import java.nio.channels.AsynchronousSocketChannel; import java.util.Date; import java.util.logging.Logger; -import javax.crypto.Mac; -import javax.crypto.spec.SecretKeySpec; -import org.xmlpull.mxp1.MXParser; -import org.xmlpull.v1.XmlPullParser; /** * @@ -24,7 +25,7 @@ public class Connection { public long tsLocalData = 0; public long bytesLocal = 0; public long packetsLocal = 0; - Socket socket; + AsynchronousSocketChannel socket; final XmlPullParser parser = new MXParser(); OutputStreamWriter writer; diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java index fad97c16..7b9483f7 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionIn.java @@ -5,18 +5,19 @@ import com.juick.xmpp.JID; import com.juick.xmpp.Message; import com.juick.xmpp.Presence; import com.juick.xmpp.utils.XmlUtils; +import org.xmlpull.v1.XmlPullParser; import java.io.EOFException; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; -import java.net.Socket; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.Channels; import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.logging.Logger; -import org.xmlpull.v1.XmlPullParser; - /** * * @author ugnich @@ -25,11 +26,11 @@ public class ConnectionIn extends Connection implements Runnable { private static final Logger LOGGER = Logger.getLogger(ConnectionIn.class.getName()); - final public ArrayList from = new ArrayList(); + final public List from = new ArrayList<>(); public long tsRemoteData = 0; public long packetsRemote = 0; - public ConnectionIn(Socket socket) { + public ConnectionIn(AsynchronousSocketChannel socket) { super(); this.socket = socket; streamID = UUID.randomUUID().toString(); @@ -39,8 +40,8 @@ public class ConnectionIn extends Connection implements Runnable { public void run() { LOGGER.info("STREAM FROM ? " + streamID + " START"); try { - parser.setInput(new InputStreamReader(socket.getInputStream())); - writer = new OutputStreamWriter(socket.getOutputStream()); + parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); + writer = new OutputStreamWriter(Channels.newOutputStream(socket)); parser.next(); // stream:stream updateTsRemoteData(); diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java index f9b788c4..982f2efc 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionListener.java @@ -1,34 +1,49 @@ package com.juick.xmpp.s2s; -import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; +import java.net.InetSocketAddress; +import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.logging.Level; import java.util.logging.Logger; /** * - * @author ugnich + * @author vt */ public class ConnectionListener implements Runnable { - private static final Logger LOGGER = Logger.getLogger(ConnectionListener.class.getName()); + private static final Logger logger = Logger.getLogger(ConnectionListener.class.getName()); - ExecutorService executorService = Executors.newCachedThreadPool(); + ExecutorService connectionPool; @Override public void run() { try { - ServerSocket listener = new ServerSocket(5269); - while (true) { - Socket sock = listener.accept(); - ConnectionIn conn = new ConnectionIn(sock); - XMPPComponent.addConnectionIn(conn); - executorService.submit(conn); - } - } catch (IOException e) { - LOGGER.info("IOException on socket listen: " + e.toString()); + connectionPool = Executors.newCachedThreadPool(); + final AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open(); + listener.bind(new InetSocketAddress(5269)); + logger.info("s2s listener ready"); + listener.accept(null, new CompletionHandler() { + @Override + public void completed(AsynchronousSocketChannel result, Object attachment) { + listener.accept(connectionPool, this); + ConnectionIn client = new ConnectionIn(result); + XMPPComponent.addConnectionIn(client); + connectionPool.submit(client); + } + + @Override + public void failed(Throwable exc, Object attachment) { + + } + }); + Thread.currentThread().join(); + listener.close(); + } catch (Exception e) { + logger.log(Level.SEVERE, "s2s listener exception", e); } } } diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java index 0205349a..8e543843 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java @@ -4,7 +4,11 @@ import com.juick.xmpp.utils.XmlUtils; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; +import java.net.InetSocketAddress; import java.net.Socket; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.Channels; + import org.xmlpull.v1.XmlPullParser; /** @@ -36,10 +40,11 @@ public class ConnectionOut extends Connection implements Runnable { try { HostnamePort addr = DNSQueries.getServerAddress(to); - socket = new Socket(addr.hostname, addr.port); + socket = AsynchronousSocketChannel.open(); + socket.connect(new InetSocketAddress(addr.hostname, addr.port)); - parser.setInput(new InputStreamReader(socket.getInputStream())); - writer = new OutputStreamWriter(socket.getOutputStream()); + parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); + writer = new OutputStreamWriter(Channels.newOutputStream(socket)); sendStanza(""; writer.write(msg); -- cgit v1.2.3