From c907c80a1bfe86a3c0b40b48d65a244e8b987368 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Mon, 1 Feb 2016 19:09:43 +0300 Subject: fix connectionout --- .../java/com/juick/xmpp/s2s/ConnectionOut.java | 135 ++++++++++++--------- 1 file changed, 78 insertions(+), 57 deletions(-) (limited to 'src/main') diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java index 8e543843..0101e340 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionOut.java @@ -8,8 +8,10 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.Channels; +import java.nio.channels.CompletionHandler; import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; /** * @@ -41,72 +43,91 @@ public class ConnectionOut extends Connection implements Runnable { try { HostnamePort addr = DNSQueries.getServerAddress(to); socket = AsynchronousSocketChannel.open(); - socket.connect(new InetSocketAddress(addr.hostname, addr.port)); - - parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); - writer = new OutputStreamWriter(Channels.newOutputStream(socket)); + socket.connect(new InetSocketAddress(addr.hostname, addr.port), socket, new CompletionHandler() { + @Override + public void completed(Void result, AsynchronousSocketChannel attachment) { + try { + parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); + + writer = new OutputStreamWriter(Channels.newOutputStream(socket)); + + sendStanza(""); + + parser.next(); // stream:stream + streamID = parser.getAttributeValue(null, "id"); + if (streamID == null || streamID.isEmpty()) { + throw new Exception("STREAM TO " + to + " INVALID FIRST PACKET"); + } - sendStanza(""); + LOGGER.info("STREAM TO " + to + " " + streamID + " OPEN"); + XMPPComponent.addConnectionOut(ConnectionOut.this); - parser.next(); // stream:stream - streamID = parser.getAttributeValue(null, "id"); - if (streamID == null || streamID.isEmpty()) { - throw new Exception("STREAM TO " + to + " INVALID FIRST PACKET"); - } + if (checkSID != null) { + sendDialbackVerify(checkSID, dbKey); + } - LOGGER.info("STREAM TO " + to + " " + streamID + " OPEN"); - XMPPComponent.addConnectionOut(this); + sendStanza("" + generateDialbackKey(to, XMPPComponent.HOSTNAME, streamID) + ""); + + while (parser.next() != XmlPullParser.END_DOCUMENT) { + if (parser.getEventType() != XmlPullParser.START_TAG) { + continue; + } + logParser(); + + String tag = parser.getName(); + if (tag.equals("db:result")) { + String type = parser.getAttributeValue(null, "type"); + if (type != null && type.equals("valid")) { + streamReady = true; + LOGGER.info("STREAM TO " + to + " " + streamID + " READY"); + + String cache = XMPPComponent.getFromCache(to); + if (cache != null) { + LOGGER.info("STREAM TO " + to + " " + streamID + " SENDING CACHE"); + sendStanza(cache); + } + + } else { + LOGGER.info("STREAM TO " + to + " " + streamID + " DIALBACK FAIL"); + } + XmlUtils.skip(parser); + } else if (tag.equals("db:verify")) { + String from = parser.getAttributeValue(null, "from"); + String type = parser.getAttributeValue(null, "type"); + String sid = parser.getAttributeValue(null, "id"); + if (from != null && from.equals(to) && sid != null && !sid.isEmpty() && type != null) { + ConnectionIn c = XMPPComponent.getConnectionIn(sid); + if (c != null) { + c.sendDialbackResult(from, type); + } + } + XmlUtils.skip(parser); + } else { + LOGGER.info("STREAM TO " + to + " " + streamID + ": " + XmlUtils.parseToString(parser, true)); + } + } - if (checkSID != null) { - sendDialbackVerify(checkSID, dbKey); - } + LOGGER.warning("STREAM TO " + to + " " + streamID + " FINISHED"); + XMPPComponent.removeConnectionOut(this); + closeConnection(); + } catch (XmlPullParserException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + } - sendStanza("" + generateDialbackKey(to, XMPPComponent.HOSTNAME, streamID) + ""); + @Override + public void failed(Throwable exc, AsynchronousSocketChannel attachment) { - while (parser.next() != XmlPullParser.END_DOCUMENT) { - if (parser.getEventType() != XmlPullParser.START_TAG) { - continue; } - logParser(); - - String tag = parser.getName(); - if (tag.equals("db:result")) { - String type = parser.getAttributeValue(null, "type"); - if (type != null && type.equals("valid")) { - streamReady = true; - LOGGER.info("STREAM TO " + to + " " + streamID + " READY"); - - String cache = XMPPComponent.getFromCache(to); - if (cache != null) { - LOGGER.info("STREAM TO " + to + " " + streamID + " SENDING CACHE"); - sendStanza(cache); - } + }); - } else { - LOGGER.info("STREAM TO " + to + " " + streamID + " DIALBACK FAIL"); - } - XmlUtils.skip(parser); - } else if (tag.equals("db:verify")) { - String from = parser.getAttributeValue(null, "from"); - String type = parser.getAttributeValue(null, "type"); - String sid = parser.getAttributeValue(null, "id"); - if (from != null && from.equals(to) && sid != null && !sid.isEmpty() && type != null) { - ConnectionIn c = XMPPComponent.getConnectionIn(sid); - if (c != null) { - c.sendDialbackResult(from, type); - } - } - XmlUtils.skip(parser); - } else { - LOGGER.info("STREAM TO " + to + " " + streamID + ": " + XmlUtils.parseToString(parser, true)); - } - } - LOGGER.warning("STREAM TO " + to + " " + streamID + " FINISHED"); - XMPPComponent.removeConnectionOut(this); - closeConnection(); } catch (Exception e) { LOGGER.warning(e.toString()); XMPPComponent.removeConnectionOut(this); -- cgit v1.2.3