aboutsummaryrefslogtreecommitdiff
path: root/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2017-10-09 15:27:48 +0300
committerGravatar Vitaly Takmazov2017-10-10 09:37:31 +0300
commit448fc7e84732422011186a9a4633c345e9c6208e (patch)
tree7ec23a9f29939fecf6456956ead6cebd1338871d /juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
parent443daa747298de315840cdd6ee6992519707e61a (diff)
xmpp:ConnectionOut -> StreamServerDialback
Diffstat (limited to 'juick-xmpp/src/main/java/com/juick/components/XMPPServer.java')
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/XMPPServer.java224
1 files changed, 181 insertions, 43 deletions
diff --git a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
index 1df7d575..cb2bb681 100644
--- a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
+++ b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
@@ -18,46 +18,53 @@
package com.juick.components;
import com.juick.components.s2s.*;
+import com.juick.xmpp.extensions.JuickMessage;
+import com.juick.xmpp.extensions.StreamError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.xmlpull.v1.XmlPullParserException;
+import rocks.xmpp.addr.Jid;
import rocks.xmpp.core.stanza.model.Stanza;
import rocks.xmpp.util.XmppUtils;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
+import javax.net.ssl.*;
import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
-import java.io.IOException;
-import java.io.StringWriter;
+import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
-import java.security.KeyManagementException;
+import java.security.KeyStore;
import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.Iterator;
+import java.security.SecureRandom;
+import java.time.Instant;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
+import static com.juick.components.s2s.Connection.NS_TLS;
+
/**
* @author ugnich
*/
@Component
-public class XMPPServer implements AutoCloseable {
+public class XMPPServer implements ConnectionListener, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(XMPPServer.class);
@Inject
public ExecutorService service;
@Value("${hostname}")
- public String HOSTNAME;
+ private Jid jid;
@Value("${s2s_port:5269}")
private int s2sPort;
@Value("${keystore}")
@@ -70,10 +77,24 @@ public class XMPPServer implements AutoCloseable {
public String[] bannedHosts;
private final List<ConnectionIn> inConnections = new CopyOnWriteArrayList<>();
- private final List<ConnectionOut> outConnections = new CopyOnWriteArrayList<>();
+ private final Map<StreamServerDialback, Socket> outConnections = new ConcurrentHashMap<>();
private final List<CacheEntry> outCache = new CopyOnWriteArrayList<>();
private final List<StanzaListener> stanzaListeners = new CopyOnWriteArrayList<>();
+ SSLContext sc;
+ private TrustManager[] trustAllCerts = new TrustManager[]{
+ new X509TrustManager() {
+ public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) {
+ }
+
+ public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) {
+ }
+ public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ }
+ };
+
private ServerSocket listener;
@@ -81,9 +102,18 @@ public class XMPPServer implements AutoCloseable {
private BasicXmppSession session;
@PostConstruct
- public void init() {
-
- logger.info("component initialized");
+ public void init() throws KeyStoreException {
+ KeyStore ks = KeyStore.getInstance("JKS");
+ try (InputStream ksIs = new FileInputStream(keystore)) {
+ ks.load(ksIs, keystorePassword.toCharArray());
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
+ .getDefaultAlgorithm());
+ kmf.init(ks, keystorePassword.toCharArray());
+ sc = SSLContext.getInstance("TLSv1.2");
+ sc.init(kmf.getKeyManagers(), trustAllCerts, new SecureRandom());
+ } catch (Exception e) {
+ logger.warn("tls unavailable");
+ }
service.submit(() -> {
try {
listener = new ServerSocket(s2sPort);
@@ -97,7 +127,7 @@ public class XMPPServer implements AutoCloseable {
}
} catch (SocketException e) {
// shutdown
- } catch (IOException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException | KeyStoreException | XmlPullParserException | KeyManagementException e) {
+ } catch (IOException | XmlPullParserException e) {
logger.warn("xmpp exception", e);
}
});
@@ -108,9 +138,9 @@ public class XMPPServer implements AutoCloseable {
if (!listener.isClosed()) {
listener.close();
}
- outConnections.forEach(c -> {
- c.closeConnection();
- outConnections.remove(c);
+ outConnections.forEach((c, s) -> {
+ c.logoff();
+ outConnections.remove(s);
});
inConnections.forEach(c -> {
c.closeConnection();
@@ -124,32 +154,34 @@ public class XMPPServer implements AutoCloseable {
}
public void addConnectionIn(ConnectionIn c) {
+ c.setListener(this);
inConnections.add(c);
}
- public void addConnectionOut(ConnectionOut c) {
- outConnections.add(c);
+ public void addConnectionOut(Socket socket, StreamServerDialback c) {
+ c.setListener(this);
+ outConnections.put(c, socket);
}
public void removeConnectionIn(ConnectionIn c) {
inConnections.remove(c);
}
- public void removeConnectionOut(ConnectionOut c) {
+ public void removeConnectionOut(StreamServerDialback c) {
outConnections.remove(c);
}
- public String getFromCache(String hostname) {
+ public String getFromCache(Jid to) {
final String[] cache = new String[1];
- outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(hostname)).findFirst().ifPresent(c -> {
+ outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(to)).findFirst().ifPresent(c -> {
cache[0] = c.xml;
outCache.remove(c);
});
return cache[0];
}
- public Optional<ConnectionOut> getConnectionOut(String hostname, boolean needReady) {
- return outConnections.stream().filter(c -> c.to != null &&
+ public Optional<StreamServerDialback> getConnectionOut(Jid hostname, boolean needReady) {
+ return outConnections.keySet().stream().filter(c -> c.to != null &&
c.to.equals(hostname) && (!needReady || c.streamReady)).findFirst();
}
@@ -167,17 +199,17 @@ public class XMPPServer implements AutoCloseable {
xmppStreamWriter.close();
String xml = stanzaWriter.toString();
logger.info("s2s (out): {}", xml);
- sendOut(s.getTo().getDomain(), xml);
+ sendOut(Jid.of(s.getTo().getDomain()), xml);
} catch (XMLStreamException | JAXBException e1) {
logger.info("jaxb exception", e1);
}
}
- public void sendOut(String hostname, String xml) {
+ public void sendOut(Jid hostname, String xml) {
boolean haveAnyConn = false;
- ConnectionOut connOut = null;
- for (ConnectionOut c : outConnections) {
+ StreamServerDialback connOut = null;
+ for (StreamServerDialback c : outConnections.keySet()) {
if (c.to != null && c.to.equals(hostname)) {
if (c.streamReady) {
connOut = c;
@@ -189,7 +221,7 @@ public class XMPPServer implements AutoCloseable {
}
}
if (connOut != null) {
- connOut.sendStanza(xml);
+ connOut.send(xml);
return;
}
@@ -197,7 +229,7 @@ public class XMPPServer implements AutoCloseable {
for (CacheEntry c : outCache) {
if (c.hostname != null && c.hostname.equals(hostname)) {
c.xml += xml;
- c.tsUpdated = System.currentTimeMillis();
+ c.updated = Instant.now();
haveCache = true;
break;
}
@@ -207,24 +239,31 @@ public class XMPPServer implements AutoCloseable {
}
if (!haveAnyConn) {
+ createDialbackConnection(hostname.toEscapedString(), null, null);
+ }
+ }
+
+ void createDialbackConnection(String to, String checkSID, String dbKey) {
+ service.submit(() -> {
try {
- ConnectionOut connectionOut = new ConnectionOut(this, hostname);
- addConnectionOut(connectionOut);
- service.submit(connectionOut);
- } catch (CertificateException | UnrecoverableKeyException | NoSuchAlgorithmException | XmlPullParserException | KeyStoreException | KeyManagementException | IOException e) {
+ Socket socket = new Socket();
+ socket.connect(DNSQueries.getServerAddress(to));
+ StreamServerDialback streamServerDialback = new StreamServerDialback(getJid(), Jid.of(to), socket.getInputStream(), socket.getOutputStream(), checkSID, dbKey);
+ addConnectionOut(socket, streamServerDialback);
+ streamServerDialback.addChildParser(new JuickMessage());
+ streamServerDialback.connect();
+ } catch (Exception e) {
logger.error("s2s out error", e);
}
- }
+ });
}
- public void startDialback(String from, String streamId, String dbKey) throws Exception {
- Optional<ConnectionOut> c = getConnectionOut(from, false);
+ public void startDialback(Jid from, String streamId, String dbKey) throws Exception {
+ Optional<StreamServerDialback> c = getConnectionOut(from, false);
if (c.isPresent()) {
c.get().sendDialbackVerify(streamId, dbKey);
} else {
- ConnectionOut newConnection = new ConnectionOut(this, from, streamId, dbKey);
- addConnectionOut(newConnection);
- service.submit(newConnection);
+ createDialbackConnection(from.toEscapedString(), streamId, dbKey);
}
}
@@ -232,8 +271,9 @@ public class XMPPServer implements AutoCloseable {
stanzaListeners.add(listener);
}
- public void onStanzaReceived(Stanza xmlValue) {
- stanzaListeners.forEach(l -> l.stanzaReceived(xmlValue));
+ public void onStanzaReceived(String xmlValue) {
+ Stanza stanza = parse(xmlValue);
+ stanzaListeners.forEach(l -> l.stanzaReceived(stanza));
}
public BasicXmppSession getSession() {
@@ -244,7 +284,105 @@ public class XMPPServer implements AutoCloseable {
return inConnections;
}
- public List<ConnectionOut> getOutConnections() {
+ public Map<StreamServerDialback, Socket> getOutConnections() {
return outConnections;
}
+
+ @Override
+ public void starttls(ConnectionIn connection) {
+ logger.info("stream {} securing", connection.streamID);
+ connection.sendStanza("<proceed xmlns=\"" + NS_TLS + "\" />");
+ try {
+ connection.setSocket(sc.getSocketFactory().createSocket(connection.getSocket(), connection.getSocket().getInetAddress().getHostAddress(),
+ connection.getSocket().getPort(), true));
+ ((SSLSocket) connection.getSocket()).setUseClientMode(false);
+ ((SSLSocket) connection.getSocket()).startHandshake();
+ connection.setSecured(true);
+ logger.info("stream {} secured", connection.streamID);
+ connection.restartParser();
+ } catch (XmlPullParserException | IOException sex) {
+ logger.warn("stream {} ssl error {}", connection.streamID, sex);
+ connection.sendStanza("<failed xmlns\"" + NS_TLS + "\" />");
+ removeConnectionIn(connection);
+ connection.closeConnection();
+ }
+ }
+
+ @Override
+ public void proceed(StreamServerDialback connection) {
+ try {
+ Socket socket = outConnections.get(connection);
+ socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(),
+ socket.getPort(), true);
+ ((SSLSocket) socket).startHandshake();
+ connection.setSecured(true);
+ logger.info("stream {} secured", connection.getStreamID());
+ connection.setInputStream(socket.getInputStream());
+ connection.setOutputStream(socket.getOutputStream());
+ connection.restartStream();
+ connection.sendOpenStream();
+ } catch (XmlPullParserException | IOException sex) {
+ logger.error("s2s ssl error: {} {}, error {}", connection.to, connection.getStreamID(), sex);
+ connection.send("<failed xmlns\"" + NS_TLS + "\" />");
+ removeConnectionOut(connection);
+ connection.logoff();
+ }
+ }
+
+ @Override
+ public void verify(StreamServerDialback connection, String from, String type, String sid) {
+ if (from != null && from.equals(connection.to.toEscapedString()) && sid != null && !sid.isEmpty() && type != null) {
+ getConnectionIn(sid).ifPresent(c -> c.sendDialbackResult(Jid.of(from), type));
+ }
+ }
+
+ @Override
+ public void dialbackError(StreamServerDialback connection, StreamError error) {
+ logger.warn("Stream error from {}: {}", connection.getStreamID(), error.getCondition());
+ removeConnectionOut(connection);
+ connection.logoff();
+ }
+
+ @Override
+ public void finished(StreamServerDialback connection, boolean dirty) {
+ logger.warn("stream to {} {} finished, dirty={}", connection.to, connection.getStreamID(), dirty);
+ removeConnectionOut(connection);
+ connection.logoff();
+ }
+
+ @Override
+ public void exception(StreamServerDialback connection, Exception ex) {
+ logger.error("s2s out exception: {} {}, exception {}", connection.to, connection.getStreamID(), ex);
+ removeConnectionOut(connection);
+ connection.logoff();
+ }
+
+ @Override
+ public void ready(StreamServerDialback connection) {
+ logger.info("stream to {} {} ready", connection.to, connection.getStreamID());
+ String cache = getFromCache(connection.to);
+ if (cache != null) {
+ logger.info("stream to {} {} sending cache", connection.to, connection.getStreamID());
+ connection.send(cache);
+ }
+ }
+
+ @Override
+ public boolean securing(StreamServerDialback connection) {
+ return !Arrays.asList(brokenSSLhosts).contains(connection.to.toEscapedString());
+ }
+
+ protected Stanza parse(String xml) {
+ try {
+ Unmarshaller unmarshaller = session.createUnmarshaller();
+ return (Stanza)unmarshaller.unmarshal(new StringReader(xml));
+ } catch (JAXBException e) {
+ logger.error("JAXB exception", e);
+ }
+ return null;
+ }
+
+ public Jid getJid() {
+ return jid;
+ }
}