aboutsummaryrefslogtreecommitdiff
path: root/juick-xmpp
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2018-01-30 12:43:23 +0300
committerGravatar Vitaly Takmazov2018-01-30 12:43:23 +0300
commitc20245e7606a7785563fcac97ccee0a2d59ab581 (patch)
treeef699ffb8c2be7d42b3a0353a195e8169af7b9d4 /juick-xmpp
parent5f5cfa9ee18d5d590404096ae9c55086e4b08304 (diff)
xmpp: minimize out connections
Diffstat (limited to 'juick-xmpp')
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/XMPPServer.java56
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java8
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java14
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java (renamed from juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java)6
4 files changed, 44 insertions, 40 deletions
diff --git a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
index de9476a8..4b3470a7 100644
--- a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
+++ b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
@@ -46,6 +46,7 @@ import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -76,7 +77,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable {
public String[] bannedHosts;
private final List<ConnectionIn> inConnections = new CopyOnWriteArrayList<>();
- private final Map<StreamServerDialback, Socket> outConnections = new ConcurrentHashMap<>();
+ private final Map<ConnectionOut, Optional<Socket>> outConnections = new ConcurrentHashMap<>();
private final List<CacheEntry> outCache = new CopyOnWriteArrayList<>();
private final List<StanzaListener> stanzaListeners = new CopyOnWriteArrayList<>();
@@ -159,7 +160,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable {
inConnections.add(c);
}
- public void addConnectionOut(Socket socket, StreamServerDialback c) {
+ public void addConnectionOut(ConnectionOut c, Optional<Socket> socket) {
c.setListener(this);
outConnections.put(c, socket);
}
@@ -168,7 +169,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable {
inConnections.remove(c);
}
- public void removeConnectionOut(StreamServerDialback c) {
+ public void removeConnectionOut(ConnectionOut c) {
outConnections.remove(c);
}
@@ -181,7 +182,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable {
return cache[0];
}
- public Optional<StreamServerDialback> getConnectionOut(Jid hostname, boolean needReady) {
+ public Optional<ConnectionOut> getConnectionOut(Jid hostname, boolean needReady) {
return outConnections.keySet().stream().filter(c -> c.to != null &&
c.to.equals(hostname) && (!needReady || c.streamReady)).findFirst();
}
@@ -209,8 +210,8 @@ public class XMPPServer implements ConnectionListener, AutoCloseable {
public void sendOut(Jid hostname, String xml) {
boolean haveAnyConn = false;
- StreamServerDialback connOut = null;
- for (StreamServerDialback c : outConnections.keySet()) {
+ ConnectionOut connOut = null;
+ for (ConnectionOut c : outConnections.keySet()) {
if (c.to != null && c.to.equals(hostname)) {
if (c.streamReady) {
connOut = c;
@@ -240,33 +241,36 @@ public class XMPPServer implements ConnectionListener, AutoCloseable {
}
if (!haveAnyConn) {
- createDialbackConnection(hostname.toEscapedString(), null, null);
+ try {
+ createDialbackConnection(hostname.toEscapedString(), null, null);
+ } catch (Exception e) {
+ logger.warn("dialback error", e);
+ }
}
}
- void createDialbackConnection(String to, String checkSID, String dbKey) {
+ void createDialbackConnection(String to, String checkSID, String dbKey) throws Exception {
+ ConnectionOut connectionOut = new ConnectionOut(getJid(), Jid.of(to), null, null, checkSID, dbKey);
+ addConnectionOut(connectionOut, Optional.empty());
service.submit(() -> {
try {
Socket socket = new Socket();
socket.connect(DNSQueries.getServerAddress(to));
- StreamServerDialback streamServerDialback = new StreamServerDialback(getJid(), Jid.of(to), socket.getInputStream(), socket.getOutputStream(), checkSID, dbKey);
- addConnectionOut(socket, streamServerDialback);
- streamServerDialback.addChildParser(new JuickMessage());
- streamServerDialback.connect();
- } catch (UnknownHostException | ConnectException e) {
+ addConnectionOut(connectionOut, Optional.of(socket));
+ connectionOut.addChildParser(new JuickMessage());
+ connectionOut.connect();
+ } catch (IOException e) {
userService.getActiveJIDs().stream().filter(j -> Jid.of(j).getDomain().equals(to))
.forEach(j -> {
userService.setActiveStatusForJID(j, UserService.ActiveStatus.Inactive);
logger.info("{} is inactive now", j);
});
- } catch (Exception e) {
- logger.error("s2s out error", e);
}
});
}
public void startDialback(Jid from, String streamId, String dbKey) throws Exception {
- Optional<StreamServerDialback> c = getConnectionOut(from, false);
+ Optional<ConnectionOut> c = getConnectionOut(from, false);
if (c.isPresent()) {
c.get().sendDialbackVerify(streamId, dbKey);
} else {
@@ -291,7 +295,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable {
return inConnections;
}
- public Map<StreamServerDialback, Socket> getOutConnections() {
+ public Map<ConnectionOut, Optional<Socket>> getOutConnections() {
return outConnections;
}
@@ -316,9 +320,9 @@ public class XMPPServer implements ConnectionListener, AutoCloseable {
}
@Override
- public void proceed(StreamServerDialback connection) {
+ public void proceed(ConnectionOut connection) {
try {
- Socket socket = outConnections.get(connection);
+ Socket socket = outConnections.get(connection).get();
socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(),
socket.getPort(), true);
((SSLSocket) socket).startHandshake();
@@ -328,7 +332,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable {
connection.setOutputStream(socket.getOutputStream());
connection.restartStream();
connection.sendOpenStream();
- } catch (XmlPullParserException | IOException sex) {
+ } catch (NoSuchElementException | XmlPullParserException | IOException sex) {
logger.error("s2s ssl error: {} {}, error {}", connection.to, connection.getStreamID(), sex);
connection.send("<failed xmlns\"" + NS_TLS + "\" />");
removeConnectionOut(connection);
@@ -337,35 +341,35 @@ public class XMPPServer implements ConnectionListener, AutoCloseable {
}
@Override
- public void verify(StreamServerDialback connection, String from, String type, String sid) {
+ public void verify(ConnectionOut connection, String from, String type, String sid) {
if (from != null && from.equals(connection.to.toEscapedString()) && sid != null && !sid.isEmpty() && type != null) {
getConnectionIn(sid).ifPresent(c -> c.sendDialbackResult(Jid.of(from), type));
}
}
@Override
- public void dialbackError(StreamServerDialback connection, StreamError error) {
+ public void dialbackError(ConnectionOut connection, StreamError error) {
logger.warn("Stream error from {}: {}", connection.getStreamID(), error.getCondition());
removeConnectionOut(connection);
connection.logoff();
}
@Override
- public void finished(StreamServerDialback connection, boolean dirty) {
+ public void finished(ConnectionOut connection, boolean dirty) {
logger.warn("stream to {} {} finished, dirty={}", connection.to, connection.getStreamID(), dirty);
removeConnectionOut(connection);
connection.logoff();
}
@Override
- public void exception(StreamServerDialback connection, Exception ex) {
+ public void exception(ConnectionOut connection, Exception ex) {
logger.error("s2s out exception: {} {}, exception {}", connection.to, connection.getStreamID(), ex);
removeConnectionOut(connection);
connection.logoff();
}
@Override
- public void ready(StreamServerDialback connection) {
+ public void ready(ConnectionOut connection) {
logger.info("stream to {} {} ready", connection.to, connection.getStreamID());
String cache = getFromCache(connection.to);
if (cache != null) {
@@ -375,7 +379,7 @@ public class XMPPServer implements ConnectionListener, AutoCloseable {
}
@Override
- public boolean securing(StreamServerDialback connection) {
+ public boolean securing(ConnectionOut connection) {
return !Arrays.asList(brokenSSLhosts).contains(connection.to.toEscapedString());
}
diff --git a/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java b/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java
index 9b5bb0d5..c5a7f6e3 100644
--- a/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java
+++ b/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java
@@ -18,7 +18,7 @@
package com.juick.components.controllers.helpers;
import com.juick.components.s2s.ConnectionIn;
-import com.juick.components.s2s.StreamServerDialback;
+import com.juick.components.s2s.ConnectionOut;
import java.util.List;
import java.util.Set;
@@ -28,7 +28,7 @@ import java.util.Set;
*/
public class XMPPStatus {
private List<ConnectionIn> inbound;
- private Set<StreamServerDialback> outbound;
+ private Set<ConnectionOut> outbound;
public List<ConnectionIn> getInbound() {
return inbound;
@@ -38,11 +38,11 @@ public class XMPPStatus {
this.inbound = inbound;
}
- public Set<StreamServerDialback> getOutbound() {
+ public Set<ConnectionOut> getOutbound() {
return outbound;
}
- public void setOutbound(Set<StreamServerDialback> outbound) {
+ public void setOutbound(Set<ConnectionOut> outbound) {
this.outbound = outbound;
}
}
diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java
index ef5a948b..3b191974 100644
--- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java
+++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java
@@ -4,11 +4,11 @@ import com.juick.xmpp.extensions.StreamError;
public interface ConnectionListener {
void starttls(ConnectionIn connection);
- void proceed(StreamServerDialback connection);
- void verify(StreamServerDialback connection, String from, String type, String sid);
- void dialbackError(StreamServerDialback connection, StreamError error);
- void finished(StreamServerDialback connection, boolean dirty);
- void exception(StreamServerDialback connection, Exception ex);
- void ready(StreamServerDialback connection);
- boolean securing(StreamServerDialback connection);
+ void proceed(ConnectionOut connection);
+ void verify(ConnectionOut connection, String from, String type, String sid);
+ void dialbackError(ConnectionOut connection, StreamError error);
+ void finished(ConnectionOut connection, boolean dirty);
+ void exception(ConnectionOut connection, Exception ex);
+ void ready(ConnectionOut connection);
+ boolean securing(ConnectionOut connection);
}
diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java
index 48c4d72d..9578a831 100644
--- a/juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java
+++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java
@@ -38,8 +38,8 @@ import java.util.UUID;
/**
* @author ugnich
*/
-public class StreamServerDialback extends Stream {
- protected static final Logger logger = LoggerFactory.getLogger(StreamServerDialback.class);
+public class ConnectionOut extends Stream {
+ protected static final Logger logger = LoggerFactory.getLogger(ConnectionOut.class);
public static final String NS_TLS = "urn:ietf:params:xml:ns:xmpp-tls";
public static final String NS_DB = "jabber:server:dialback";
private boolean secured = false;
@@ -51,7 +51,7 @@ public class StreamServerDialback extends Stream {
ConnectionListener listener;
RandomStringGenerator generator = new RandomStringGenerator.Builder().withinRange('a', 'z').build();
- public StreamServerDialback(Jid from, Jid to, InputStream is, OutputStream os, String checkSID, String dbKey) throws Exception {
+ public ConnectionOut(Jid from, Jid to, InputStream is, OutputStream os, String checkSID, String dbKey) throws Exception {
super(from, to, is, os);
this.to = to;
this.checkSID = checkSID;