From c20245e7606a7785563fcac97ccee0a2d59ab581 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 30 Jan 2018 12:43:23 +0300 Subject: xmpp: minimize out connections --- .../main/java/com/juick/components/XMPPServer.java | 56 ++++++++++++---------- 1 file changed, 30 insertions(+), 26 deletions(-) (limited to 'juick-xmpp/src/main/java/com/juick/components/XMPPServer.java') diff --git a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java index de9476a8..4b3470a7 100644 --- a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java +++ b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java @@ -46,6 +46,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -76,7 +77,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { public String[] bannedHosts; private final List inConnections = new CopyOnWriteArrayList<>(); - private final Map outConnections = new ConcurrentHashMap<>(); + private final Map> outConnections = new ConcurrentHashMap<>(); private final List outCache = new CopyOnWriteArrayList<>(); private final List stanzaListeners = new CopyOnWriteArrayList<>(); @@ -159,7 +160,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { inConnections.add(c); } - public void addConnectionOut(Socket socket, StreamServerDialback c) { + public void addConnectionOut(ConnectionOut c, Optional socket) { c.setListener(this); outConnections.put(c, socket); } @@ -168,7 +169,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { inConnections.remove(c); } - public void removeConnectionOut(StreamServerDialback c) { + public void removeConnectionOut(ConnectionOut c) { outConnections.remove(c); } @@ -181,7 +182,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { return cache[0]; } - public Optional getConnectionOut(Jid hostname, boolean needReady) { + public Optional getConnectionOut(Jid hostname, boolean needReady) { return outConnections.keySet().stream().filter(c -> c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)).findFirst(); } @@ -209,8 +210,8 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { public void sendOut(Jid hostname, String xml) { boolean haveAnyConn = false; - StreamServerDialback connOut = null; - for (StreamServerDialback c : outConnections.keySet()) { + ConnectionOut connOut = null; + for (ConnectionOut c : outConnections.keySet()) { if (c.to != null && c.to.equals(hostname)) { if (c.streamReady) { connOut = c; @@ -240,33 +241,36 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { } if (!haveAnyConn) { - createDialbackConnection(hostname.toEscapedString(), null, null); + try { + createDialbackConnection(hostname.toEscapedString(), null, null); + } catch (Exception e) { + logger.warn("dialback error", e); + } } } - void createDialbackConnection(String to, String checkSID, String dbKey) { + void createDialbackConnection(String to, String checkSID, String dbKey) throws Exception { + ConnectionOut connectionOut = new ConnectionOut(getJid(), Jid.of(to), null, null, checkSID, dbKey); + addConnectionOut(connectionOut, Optional.empty()); service.submit(() -> { try { Socket socket = new Socket(); socket.connect(DNSQueries.getServerAddress(to)); - StreamServerDialback streamServerDialback = new StreamServerDialback(getJid(), Jid.of(to), socket.getInputStream(), socket.getOutputStream(), checkSID, dbKey); - addConnectionOut(socket, streamServerDialback); - streamServerDialback.addChildParser(new JuickMessage()); - streamServerDialback.connect(); - } catch (UnknownHostException | ConnectException e) { + addConnectionOut(connectionOut, Optional.of(socket)); + connectionOut.addChildParser(new JuickMessage()); + connectionOut.connect(); + } catch (IOException e) { userService.getActiveJIDs().stream().filter(j -> Jid.of(j).getDomain().equals(to)) .forEach(j -> { userService.setActiveStatusForJID(j, UserService.ActiveStatus.Inactive); logger.info("{} is inactive now", j); }); - } catch (Exception e) { - logger.error("s2s out error", e); } }); } public void startDialback(Jid from, String streamId, String dbKey) throws Exception { - Optional c = getConnectionOut(from, false); + Optional c = getConnectionOut(from, false); if (c.isPresent()) { c.get().sendDialbackVerify(streamId, dbKey); } else { @@ -291,7 +295,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { return inConnections; } - public Map getOutConnections() { + public Map> getOutConnections() { return outConnections; } @@ -316,9 +320,9 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { } @Override - public void proceed(StreamServerDialback connection) { + public void proceed(ConnectionOut connection) { try { - Socket socket = outConnections.get(connection); + Socket socket = outConnections.get(connection).get(); socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(), socket.getPort(), true); ((SSLSocket) socket).startHandshake(); @@ -328,7 +332,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { connection.setOutputStream(socket.getOutputStream()); connection.restartStream(); connection.sendOpenStream(); - } catch (XmlPullParserException | IOException sex) { + } catch (NoSuchElementException | XmlPullParserException | IOException sex) { logger.error("s2s ssl error: {} {}, error {}", connection.to, connection.getStreamID(), sex); connection.send(""); removeConnectionOut(connection); @@ -337,35 +341,35 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { } @Override - public void verify(StreamServerDialback connection, String from, String type, String sid) { + public void verify(ConnectionOut connection, String from, String type, String sid) { if (from != null && from.equals(connection.to.toEscapedString()) && sid != null && !sid.isEmpty() && type != null) { getConnectionIn(sid).ifPresent(c -> c.sendDialbackResult(Jid.of(from), type)); } } @Override - public void dialbackError(StreamServerDialback connection, StreamError error) { + public void dialbackError(ConnectionOut connection, StreamError error) { logger.warn("Stream error from {}: {}", connection.getStreamID(), error.getCondition()); removeConnectionOut(connection); connection.logoff(); } @Override - public void finished(StreamServerDialback connection, boolean dirty) { + public void finished(ConnectionOut connection, boolean dirty) { logger.warn("stream to {} {} finished, dirty={}", connection.to, connection.getStreamID(), dirty); removeConnectionOut(connection); connection.logoff(); } @Override - public void exception(StreamServerDialback connection, Exception ex) { + public void exception(ConnectionOut connection, Exception ex) { logger.error("s2s out exception: {} {}, exception {}", connection.to, connection.getStreamID(), ex); removeConnectionOut(connection); connection.logoff(); } @Override - public void ready(StreamServerDialback connection) { + public void ready(ConnectionOut connection) { logger.info("stream to {} {} ready", connection.to, connection.getStreamID()); String cache = getFromCache(connection.to); if (cache != null) { @@ -375,7 +379,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { } @Override - public boolean securing(StreamServerDialback connection) { + public boolean securing(ConnectionOut connection) { return !Arrays.asList(brokenSSLhosts).contains(connection.to.toEscapedString()); } -- cgit v1.2.3