From c20245e7606a7785563fcac97ccee0a2d59ab581 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 30 Jan 2018 12:43:23 +0300 Subject: xmpp: minimize out connections --- .../main/java/com/juick/components/XMPPServer.java | 56 +++---- .../components/controllers/helpers/XMPPStatus.java | 8 +- .../juick/components/s2s/ConnectionListener.java | 14 +- .../com/juick/components/s2s/ConnectionOut.java | 167 +++++++++++++++++++++ .../juick/components/s2s/StreamServerDialback.java | 167 --------------------- 5 files changed, 208 insertions(+), 204 deletions(-) create mode 100644 juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java delete mode 100644 juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java (limited to 'juick-xmpp/src/main/java/com/juick/components') diff --git a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java index de9476a8..4b3470a7 100644 --- a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java +++ b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java @@ -46,6 +46,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -76,7 +77,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { public String[] bannedHosts; private final List inConnections = new CopyOnWriteArrayList<>(); - private final Map outConnections = new ConcurrentHashMap<>(); + private final Map> outConnections = new ConcurrentHashMap<>(); private final List outCache = new CopyOnWriteArrayList<>(); private final List stanzaListeners = new CopyOnWriteArrayList<>(); @@ -159,7 +160,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { inConnections.add(c); } - public void addConnectionOut(Socket socket, StreamServerDialback c) { + public void addConnectionOut(ConnectionOut c, Optional socket) { c.setListener(this); outConnections.put(c, socket); } @@ -168,7 +169,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { inConnections.remove(c); } - public void removeConnectionOut(StreamServerDialback c) { + public void removeConnectionOut(ConnectionOut c) { outConnections.remove(c); } @@ -181,7 +182,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { return cache[0]; } - public Optional getConnectionOut(Jid hostname, boolean needReady) { + public Optional getConnectionOut(Jid hostname, boolean needReady) { return outConnections.keySet().stream().filter(c -> c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)).findFirst(); } @@ -209,8 +210,8 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { public void sendOut(Jid hostname, String xml) { boolean haveAnyConn = false; - StreamServerDialback connOut = null; - for (StreamServerDialback c : outConnections.keySet()) { + ConnectionOut connOut = null; + for (ConnectionOut c : outConnections.keySet()) { if (c.to != null && c.to.equals(hostname)) { if (c.streamReady) { connOut = c; @@ -240,33 +241,36 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { } if (!haveAnyConn) { - createDialbackConnection(hostname.toEscapedString(), null, null); + try { + createDialbackConnection(hostname.toEscapedString(), null, null); + } catch (Exception e) { + logger.warn("dialback error", e); + } } } - void createDialbackConnection(String to, String checkSID, String dbKey) { + void createDialbackConnection(String to, String checkSID, String dbKey) throws Exception { + ConnectionOut connectionOut = new ConnectionOut(getJid(), Jid.of(to), null, null, checkSID, dbKey); + addConnectionOut(connectionOut, Optional.empty()); service.submit(() -> { try { Socket socket = new Socket(); socket.connect(DNSQueries.getServerAddress(to)); - StreamServerDialback streamServerDialback = new StreamServerDialback(getJid(), Jid.of(to), socket.getInputStream(), socket.getOutputStream(), checkSID, dbKey); - addConnectionOut(socket, streamServerDialback); - streamServerDialback.addChildParser(new JuickMessage()); - streamServerDialback.connect(); - } catch (UnknownHostException | ConnectException e) { + addConnectionOut(connectionOut, Optional.of(socket)); + connectionOut.addChildParser(new JuickMessage()); + connectionOut.connect(); + } catch (IOException e) { userService.getActiveJIDs().stream().filter(j -> Jid.of(j).getDomain().equals(to)) .forEach(j -> { userService.setActiveStatusForJID(j, UserService.ActiveStatus.Inactive); logger.info("{} is inactive now", j); }); - } catch (Exception e) { - logger.error("s2s out error", e); } }); } public void startDialback(Jid from, String streamId, String dbKey) throws Exception { - Optional c = getConnectionOut(from, false); + Optional c = getConnectionOut(from, false); if (c.isPresent()) { c.get().sendDialbackVerify(streamId, dbKey); } else { @@ -291,7 +295,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { return inConnections; } - public Map getOutConnections() { + public Map> getOutConnections() { return outConnections; } @@ -316,9 +320,9 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { } @Override - public void proceed(StreamServerDialback connection) { + public void proceed(ConnectionOut connection) { try { - Socket socket = outConnections.get(connection); + Socket socket = outConnections.get(connection).get(); socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(), socket.getPort(), true); ((SSLSocket) socket).startHandshake(); @@ -328,7 +332,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { connection.setOutputStream(socket.getOutputStream()); connection.restartStream(); connection.sendOpenStream(); - } catch (XmlPullParserException | IOException sex) { + } catch (NoSuchElementException | XmlPullParserException | IOException sex) { logger.error("s2s ssl error: {} {}, error {}", connection.to, connection.getStreamID(), sex); connection.send(""); removeConnectionOut(connection); @@ -337,35 +341,35 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { } @Override - public void verify(StreamServerDialback connection, String from, String type, String sid) { + public void verify(ConnectionOut connection, String from, String type, String sid) { if (from != null && from.equals(connection.to.toEscapedString()) && sid != null && !sid.isEmpty() && type != null) { getConnectionIn(sid).ifPresent(c -> c.sendDialbackResult(Jid.of(from), type)); } } @Override - public void dialbackError(StreamServerDialback connection, StreamError error) { + public void dialbackError(ConnectionOut connection, StreamError error) { logger.warn("Stream error from {}: {}", connection.getStreamID(), error.getCondition()); removeConnectionOut(connection); connection.logoff(); } @Override - public void finished(StreamServerDialback connection, boolean dirty) { + public void finished(ConnectionOut connection, boolean dirty) { logger.warn("stream to {} {} finished, dirty={}", connection.to, connection.getStreamID(), dirty); removeConnectionOut(connection); connection.logoff(); } @Override - public void exception(StreamServerDialback connection, Exception ex) { + public void exception(ConnectionOut connection, Exception ex) { logger.error("s2s out exception: {} {}, exception {}", connection.to, connection.getStreamID(), ex); removeConnectionOut(connection); connection.logoff(); } @Override - public void ready(StreamServerDialback connection) { + public void ready(ConnectionOut connection) { logger.info("stream to {} {} ready", connection.to, connection.getStreamID()); String cache = getFromCache(connection.to); if (cache != null) { @@ -375,7 +379,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { } @Override - public boolean securing(StreamServerDialback connection) { + public boolean securing(ConnectionOut connection) { return !Arrays.asList(brokenSSLhosts).contains(connection.to.toEscapedString()); } diff --git a/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java b/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java index 9b5bb0d5..c5a7f6e3 100644 --- a/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java +++ b/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java @@ -18,7 +18,7 @@ package com.juick.components.controllers.helpers; import com.juick.components.s2s.ConnectionIn; -import com.juick.components.s2s.StreamServerDialback; +import com.juick.components.s2s.ConnectionOut; import java.util.List; import java.util.Set; @@ -28,7 +28,7 @@ import java.util.Set; */ public class XMPPStatus { private List inbound; - private Set outbound; + private Set outbound; public List getInbound() { return inbound; @@ -38,11 +38,11 @@ public class XMPPStatus { this.inbound = inbound; } - public Set getOutbound() { + public Set getOutbound() { return outbound; } - public void setOutbound(Set outbound) { + public void setOutbound(Set outbound) { this.outbound = outbound; } } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java index ef5a948b..3b191974 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java @@ -4,11 +4,11 @@ import com.juick.xmpp.extensions.StreamError; public interface ConnectionListener { void starttls(ConnectionIn connection); - void proceed(StreamServerDialback connection); - void verify(StreamServerDialback connection, String from, String type, String sid); - void dialbackError(StreamServerDialback connection, StreamError error); - void finished(StreamServerDialback connection, boolean dirty); - void exception(StreamServerDialback connection, Exception ex); - void ready(StreamServerDialback connection); - boolean securing(StreamServerDialback connection); + void proceed(ConnectionOut connection); + void verify(ConnectionOut connection, String from, String type, String sid); + void dialbackError(ConnectionOut connection, StreamError error); + void finished(ConnectionOut connection, boolean dirty); + void exception(ConnectionOut connection, Exception ex); + void ready(ConnectionOut connection); + boolean securing(ConnectionOut connection); } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java new file mode 100644 index 00000000..9578a831 --- /dev/null +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java @@ -0,0 +1,167 @@ +/* + * Copyright (C) 2008-2017, Juick + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.juick.components.s2s; + +import com.juick.components.s2s.util.DialbackUtils; +import com.juick.xmpp.Stream; +import com.juick.xmpp.extensions.StreamError; +import com.juick.xmpp.extensions.StreamFeatures; +import com.juick.xmpp.utils.XmlUtils; +import org.apache.commons.text.RandomStringGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xmlpull.v1.XmlPullParser; +import rocks.xmpp.addr.Jid; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.SocketException; +import java.util.UUID; + +/** + * @author ugnich + */ +public class ConnectionOut extends Stream { + protected static final Logger logger = LoggerFactory.getLogger(ConnectionOut.class); + public static final String NS_TLS = "urn:ietf:params:xml:ns:xmpp-tls"; + public static final String NS_DB = "jabber:server:dialback"; + private boolean secured = false; + + public boolean streamReady = false; + String checkSID = null; + String dbKey = null; + private String streamID; + ConnectionListener listener; + RandomStringGenerator generator = new RandomStringGenerator.Builder().withinRange('a', 'z').build(); + + public ConnectionOut(Jid from, Jid to, InputStream is, OutputStream os, String checkSID, String dbKey) throws Exception { + super(from, to, is, os); + this.to = to; + this.checkSID = checkSID; + this.dbKey = dbKey; + if (dbKey == null) { + this.dbKey = DialbackUtils.generateDialbackKey(generator.generate(15), to, from, streamID); + } + streamID = UUID.randomUUID().toString(); + } + + public void sendOpenStream() throws IOException { + send(""); + } + + void processDialback() throws Exception { + if (checkSID != null) { + sendDialbackVerify(checkSID, dbKey); + } + send("" + + dbKey + ""); + } + + @Override + public void handshake() { + try { + restartStream(); + + sendOpenStream(); + + parser.next(); // stream:stream + streamID = parser.getAttributeValue(null, "id"); + if (streamID == null || streamID.isEmpty()) { + throw new Exception("stream to " + to + " invalid first packet"); + } + + logger.info("stream to {} {} open", to, streamID); + boolean xmppversionnew = parser.getAttributeValue(null, "version") != null; + if (!xmppversionnew) { + processDialback(); + } + + while (parser.next() != XmlPullParser.END_DOCUMENT) { + if (parser.getEventType() != XmlPullParser.START_TAG) { + continue; + } + + String tag = parser.getName(); + if (tag.equals("result") && parser.getNamespace().equals(NS_DB)) { + String type = parser.getAttributeValue(null, "type"); + if (type != null && type.equals("valid")) { + streamReady = true; + listener.ready(this); + } else { + logger.info("stream to {} {} dialback fail", to, streamID); + } + XmlUtils.skip(parser); + } else if (tag.equals("verify") && parser.getNamespace().equals(NS_DB)) { + String from = parser.getAttributeValue(null, "from"); + String type = parser.getAttributeValue(null, "type"); + String sid = parser.getAttributeValue(null, "id"); + listener.verify(this, from, type, sid); + XmlUtils.skip(parser); + } else if (tag.equals("features") && parser.getNamespace().equals(NS_STREAM)) { + StreamFeatures features = StreamFeatures.parse(parser); + if (listener != null && !secured && features.STARTTLS >= 0 + && listener.securing(this)) { + logger.info("stream to {} {} securing", to.toEscapedString(), streamID); + send(""); + } else { + processDialback(); + } + } else if (tag.equals("proceed") && parser.getNamespace().equals(NS_TLS)) { + listener.proceed(this); + } else if (secured && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) { + streamID = parser.getAttributeValue(null, "id"); + } else if (tag.equals("error")) { + StreamError streamError = StreamError.parse(parser); + listener.dialbackError(this, streamError); + } else { + String unhandledStanza = XmlUtils.parseToString(parser, true); + logger.warn("Unhandled stanza from {} {} : {}", to, streamID, unhandledStanza); + } + } + listener.finished(this, false); + } catch (EOFException | SocketException eofex) { + listener.finished(this, true); + } catch (Exception e) { + listener.exception(this, e); + } + } + + public void sendDialbackVerify(String sid, String key) { + send("" + + key + ""); + } + public void setListener(ConnectionListener listener) { + this.listener = listener; + } + + public String getStreamID() { + return streamID; + } + + public boolean isSecured() { + return secured; + } + + public void setSecured(boolean secured) { + this.secured = secured; + } +} diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java b/juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java deleted file mode 100644 index 48c4d72d..00000000 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright (C) 2008-2017, Juick - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package com.juick.components.s2s; - -import com.juick.components.s2s.util.DialbackUtils; -import com.juick.xmpp.Stream; -import com.juick.xmpp.extensions.StreamError; -import com.juick.xmpp.extensions.StreamFeatures; -import com.juick.xmpp.utils.XmlUtils; -import org.apache.commons.text.RandomStringGenerator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.xmlpull.v1.XmlPullParser; -import rocks.xmpp.addr.Jid; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.SocketException; -import java.util.UUID; - -/** - * @author ugnich - */ -public class StreamServerDialback extends Stream { - protected static final Logger logger = LoggerFactory.getLogger(StreamServerDialback.class); - public static final String NS_TLS = "urn:ietf:params:xml:ns:xmpp-tls"; - public static final String NS_DB = "jabber:server:dialback"; - private boolean secured = false; - - public boolean streamReady = false; - String checkSID = null; - String dbKey = null; - private String streamID; - ConnectionListener listener; - RandomStringGenerator generator = new RandomStringGenerator.Builder().withinRange('a', 'z').build(); - - public StreamServerDialback(Jid from, Jid to, InputStream is, OutputStream os, String checkSID, String dbKey) throws Exception { - super(from, to, is, os); - this.to = to; - this.checkSID = checkSID; - this.dbKey = dbKey; - if (dbKey == null) { - this.dbKey = DialbackUtils.generateDialbackKey(generator.generate(15), to, from, streamID); - } - streamID = UUID.randomUUID().toString(); - } - - public void sendOpenStream() throws IOException { - send(""); - } - - void processDialback() throws Exception { - if (checkSID != null) { - sendDialbackVerify(checkSID, dbKey); - } - send("" + - dbKey + ""); - } - - @Override - public void handshake() { - try { - restartStream(); - - sendOpenStream(); - - parser.next(); // stream:stream - streamID = parser.getAttributeValue(null, "id"); - if (streamID == null || streamID.isEmpty()) { - throw new Exception("stream to " + to + " invalid first packet"); - } - - logger.info("stream to {} {} open", to, streamID); - boolean xmppversionnew = parser.getAttributeValue(null, "version") != null; - if (!xmppversionnew) { - processDialback(); - } - - while (parser.next() != XmlPullParser.END_DOCUMENT) { - if (parser.getEventType() != XmlPullParser.START_TAG) { - continue; - } - - String tag = parser.getName(); - if (tag.equals("result") && parser.getNamespace().equals(NS_DB)) { - String type = parser.getAttributeValue(null, "type"); - if (type != null && type.equals("valid")) { - streamReady = true; - listener.ready(this); - } else { - logger.info("stream to {} {} dialback fail", to, streamID); - } - XmlUtils.skip(parser); - } else if (tag.equals("verify") && parser.getNamespace().equals(NS_DB)) { - String from = parser.getAttributeValue(null, "from"); - String type = parser.getAttributeValue(null, "type"); - String sid = parser.getAttributeValue(null, "id"); - listener.verify(this, from, type, sid); - XmlUtils.skip(parser); - } else if (tag.equals("features") && parser.getNamespace().equals(NS_STREAM)) { - StreamFeatures features = StreamFeatures.parse(parser); - if (listener != null && !secured && features.STARTTLS >= 0 - && listener.securing(this)) { - logger.info("stream to {} {} securing", to.toEscapedString(), streamID); - send(""); - } else { - processDialback(); - } - } else if (tag.equals("proceed") && parser.getNamespace().equals(NS_TLS)) { - listener.proceed(this); - } else if (secured && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) { - streamID = parser.getAttributeValue(null, "id"); - } else if (tag.equals("error")) { - StreamError streamError = StreamError.parse(parser); - listener.dialbackError(this, streamError); - } else { - String unhandledStanza = XmlUtils.parseToString(parser, true); - logger.warn("Unhandled stanza from {} {} : {}", to, streamID, unhandledStanza); - } - } - listener.finished(this, false); - } catch (EOFException | SocketException eofex) { - listener.finished(this, true); - } catch (Exception e) { - listener.exception(this, e); - } - } - - public void sendDialbackVerify(String sid, String key) { - send("" + - key + ""); - } - public void setListener(ConnectionListener listener) { - this.listener = listener; - } - - public String getStreamID() { - return streamID; - } - - public boolean isSecured() { - return secured; - } - - public void setSecured(boolean secured) { - this.secured = secured; - } -} -- cgit v1.2.3