diff options
author | Vitaly Takmazov | 2018-01-30 12:43:23 +0300 |
---|---|---|
committer | Vitaly Takmazov | 2018-01-30 12:43:23 +0300 |
commit | c20245e7606a7785563fcac97ccee0a2d59ab581 (patch) | |
tree | ef699ffb8c2be7d42b3a0353a195e8169af7b9d4 /juick-xmpp/src | |
parent | 5f5cfa9ee18d5d590404096ae9c55086e4b08304 (diff) |
xmpp: minimize out connections
Diffstat (limited to 'juick-xmpp/src')
-rw-r--r-- | juick-xmpp/src/main/java/com/juick/components/XMPPServer.java | 56 | ||||
-rw-r--r-- | juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java | 8 | ||||
-rw-r--r-- | juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java | 14 | ||||
-rw-r--r-- | juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java (renamed from juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java) | 6 |
4 files changed, 44 insertions, 40 deletions
diff --git a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java index de9476a8..4b3470a7 100644 --- a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java +++ b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java @@ -46,6 +46,7 @@ import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -76,7 +77,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { public String[] bannedHosts; private final List<ConnectionIn> inConnections = new CopyOnWriteArrayList<>(); - private final Map<StreamServerDialback, Socket> outConnections = new ConcurrentHashMap<>(); + private final Map<ConnectionOut, Optional<Socket>> outConnections = new ConcurrentHashMap<>(); private final List<CacheEntry> outCache = new CopyOnWriteArrayList<>(); private final List<StanzaListener> stanzaListeners = new CopyOnWriteArrayList<>(); @@ -159,7 +160,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { inConnections.add(c); } - public void addConnectionOut(Socket socket, StreamServerDialback c) { + public void addConnectionOut(ConnectionOut c, Optional<Socket> socket) { c.setListener(this); outConnections.put(c, socket); } @@ -168,7 +169,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { inConnections.remove(c); } - public void removeConnectionOut(StreamServerDialback c) { + public void removeConnectionOut(ConnectionOut c) { outConnections.remove(c); } @@ -181,7 +182,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { return cache[0]; } - public Optional<StreamServerDialback> getConnectionOut(Jid hostname, boolean needReady) { + public Optional<ConnectionOut> getConnectionOut(Jid hostname, boolean needReady) { return outConnections.keySet().stream().filter(c -> c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)).findFirst(); } @@ -209,8 +210,8 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { public void sendOut(Jid hostname, String xml) { boolean haveAnyConn = false; - StreamServerDialback connOut = null; - for (StreamServerDialback c : outConnections.keySet()) { + ConnectionOut connOut = null; + for (ConnectionOut c : outConnections.keySet()) { if (c.to != null && c.to.equals(hostname)) { if (c.streamReady) { connOut = c; @@ -240,33 +241,36 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { } if (!haveAnyConn) { - createDialbackConnection(hostname.toEscapedString(), null, null); + try { + createDialbackConnection(hostname.toEscapedString(), null, null); + } catch (Exception e) { + logger.warn("dialback error", e); + } } } - void createDialbackConnection(String to, String checkSID, String dbKey) { + void createDialbackConnection(String to, String checkSID, String dbKey) throws Exception { + ConnectionOut connectionOut = new ConnectionOut(getJid(), Jid.of(to), null, null, checkSID, dbKey); + addConnectionOut(connectionOut, Optional.empty()); service.submit(() -> { try { Socket socket = new Socket(); socket.connect(DNSQueries.getServerAddress(to)); - StreamServerDialback streamServerDialback = new StreamServerDialback(getJid(), Jid.of(to), socket.getInputStream(), socket.getOutputStream(), checkSID, dbKey); - addConnectionOut(socket, streamServerDialback); - streamServerDialback.addChildParser(new JuickMessage()); - streamServerDialback.connect(); - } catch (UnknownHostException | ConnectException e) { + addConnectionOut(connectionOut, Optional.of(socket)); + connectionOut.addChildParser(new JuickMessage()); + connectionOut.connect(); + } catch (IOException e) { userService.getActiveJIDs().stream().filter(j -> Jid.of(j).getDomain().equals(to)) .forEach(j -> { userService.setActiveStatusForJID(j, UserService.ActiveStatus.Inactive); logger.info("{} is inactive now", j); }); - } catch (Exception e) { - logger.error("s2s out error", e); } }); } public void startDialback(Jid from, String streamId, String dbKey) throws Exception { - Optional<StreamServerDialback> c = getConnectionOut(from, false); + Optional<ConnectionOut> c = getConnectionOut(from, false); if (c.isPresent()) { c.get().sendDialbackVerify(streamId, dbKey); } else { @@ -291,7 +295,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { return inConnections; } - public Map<StreamServerDialback, Socket> getOutConnections() { + public Map<ConnectionOut, Optional<Socket>> getOutConnections() { return outConnections; } @@ -316,9 +320,9 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { } @Override - public void proceed(StreamServerDialback connection) { + public void proceed(ConnectionOut connection) { try { - Socket socket = outConnections.get(connection); + Socket socket = outConnections.get(connection).get(); socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(), socket.getPort(), true); ((SSLSocket) socket).startHandshake(); @@ -328,7 +332,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { connection.setOutputStream(socket.getOutputStream()); connection.restartStream(); connection.sendOpenStream(); - } catch (XmlPullParserException | IOException sex) { + } catch (NoSuchElementException | XmlPullParserException | IOException sex) { logger.error("s2s ssl error: {} {}, error {}", connection.to, connection.getStreamID(), sex); connection.send("<failed xmlns\"" + NS_TLS + "\" />"); removeConnectionOut(connection); @@ -337,35 +341,35 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { } @Override - public void verify(StreamServerDialback connection, String from, String type, String sid) { + public void verify(ConnectionOut connection, String from, String type, String sid) { if (from != null && from.equals(connection.to.toEscapedString()) && sid != null && !sid.isEmpty() && type != null) { getConnectionIn(sid).ifPresent(c -> c.sendDialbackResult(Jid.of(from), type)); } } @Override - public void dialbackError(StreamServerDialback connection, StreamError error) { + public void dialbackError(ConnectionOut connection, StreamError error) { logger.warn("Stream error from {}: {}", connection.getStreamID(), error.getCondition()); removeConnectionOut(connection); connection.logoff(); } @Override - public void finished(StreamServerDialback connection, boolean dirty) { + public void finished(ConnectionOut connection, boolean dirty) { logger.warn("stream to {} {} finished, dirty={}", connection.to, connection.getStreamID(), dirty); removeConnectionOut(connection); connection.logoff(); } @Override - public void exception(StreamServerDialback connection, Exception ex) { + public void exception(ConnectionOut connection, Exception ex) { logger.error("s2s out exception: {} {}, exception {}", connection.to, connection.getStreamID(), ex); removeConnectionOut(connection); connection.logoff(); } @Override - public void ready(StreamServerDialback connection) { + public void ready(ConnectionOut connection) { logger.info("stream to {} {} ready", connection.to, connection.getStreamID()); String cache = getFromCache(connection.to); if (cache != null) { @@ -375,7 +379,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable { } @Override - public boolean securing(StreamServerDialback connection) { + public boolean securing(ConnectionOut connection) { return !Arrays.asList(brokenSSLhosts).contains(connection.to.toEscapedString()); } diff --git a/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java b/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java index 9b5bb0d5..c5a7f6e3 100644 --- a/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java +++ b/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java @@ -18,7 +18,7 @@ package com.juick.components.controllers.helpers; import com.juick.components.s2s.ConnectionIn; -import com.juick.components.s2s.StreamServerDialback; +import com.juick.components.s2s.ConnectionOut; import java.util.List; import java.util.Set; @@ -28,7 +28,7 @@ import java.util.Set; */ public class XMPPStatus { private List<ConnectionIn> inbound; - private Set<StreamServerDialback> outbound; + private Set<ConnectionOut> outbound; public List<ConnectionIn> getInbound() { return inbound; @@ -38,11 +38,11 @@ public class XMPPStatus { this.inbound = inbound; } - public Set<StreamServerDialback> getOutbound() { + public Set<ConnectionOut> getOutbound() { return outbound; } - public void setOutbound(Set<StreamServerDialback> outbound) { + public void setOutbound(Set<ConnectionOut> outbound) { this.outbound = outbound; } } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java index ef5a948b..3b191974 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java @@ -4,11 +4,11 @@ import com.juick.xmpp.extensions.StreamError; public interface ConnectionListener { void starttls(ConnectionIn connection); - void proceed(StreamServerDialback connection); - void verify(StreamServerDialback connection, String from, String type, String sid); - void dialbackError(StreamServerDialback connection, StreamError error); - void finished(StreamServerDialback connection, boolean dirty); - void exception(StreamServerDialback connection, Exception ex); - void ready(StreamServerDialback connection); - boolean securing(StreamServerDialback connection); + void proceed(ConnectionOut connection); + void verify(ConnectionOut connection, String from, String type, String sid); + void dialbackError(ConnectionOut connection, StreamError error); + void finished(ConnectionOut connection, boolean dirty); + void exception(ConnectionOut connection, Exception ex); + void ready(ConnectionOut connection); + boolean securing(ConnectionOut connection); } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java index 48c4d72d..9578a831 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java @@ -38,8 +38,8 @@ import java.util.UUID; /** * @author ugnich */ -public class StreamServerDialback extends Stream { - protected static final Logger logger = LoggerFactory.getLogger(StreamServerDialback.class); +public class ConnectionOut extends Stream { + protected static final Logger logger = LoggerFactory.getLogger(ConnectionOut.class); public static final String NS_TLS = "urn:ietf:params:xml:ns:xmpp-tls"; public static final String NS_DB = "jabber:server:dialback"; private boolean secured = false; @@ -51,7 +51,7 @@ public class StreamServerDialback extends Stream { ConnectionListener listener; RandomStringGenerator generator = new RandomStringGenerator.Builder().withinRange('a', 'z').build(); - public StreamServerDialback(Jid from, Jid to, InputStream is, OutputStream os, String checkSID, String dbKey) throws Exception { + public ConnectionOut(Jid from, Jid to, InputStream is, OutputStream os, String checkSID, String dbKey) throws Exception { super(from, to, is, os); this.to = to; this.checkSID = checkSID; |