aboutsummaryrefslogtreecommitdiff
path: root/juick-xmpp/src/main/java/com/juick/components
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2017-10-09 15:27:48 +0300
committerGravatar Vitaly Takmazov2017-10-10 09:37:31 +0300
commit448fc7e84732422011186a9a4633c345e9c6208e (patch)
tree7ec23a9f29939fecf6456956ead6cebd1338871d /juick-xmpp/src/main/java/com/juick/components
parent443daa747298de315840cdd6ee6992519707e61a (diff)
xmpp:ConnectionOut -> StreamServerDialback
Diffstat (limited to 'juick-xmpp/src/main/java/com/juick/components')
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/CleaningUp.java4
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/JuickBot.java3
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/XMPPServer.java224
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/configuration/XmppAppConfiguration.java9
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/controllers/StatusController.java2
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java9
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/s2s/CacheEntry.java14
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/s2s/Connection.java51
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java89
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java14
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java (renamed from juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java)130
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/s2s/util/DialbackUtils.java5
12 files changed, 322 insertions, 232 deletions
diff --git a/juick-xmpp/src/main/java/com/juick/components/CleaningUp.java b/juick-xmpp/src/main/java/com/juick/components/CleaningUp.java
index 16747ea4..a96b7c35 100644
--- a/juick-xmpp/src/main/java/com/juick/components/CleaningUp.java
+++ b/juick-xmpp/src/main/java/com/juick/components/CleaningUp.java
@@ -43,10 +43,10 @@ public class CleaningUp {
@Scheduled(fixedDelay = 10000)
public void cleanUp() {
Instant now = Instant.now();
- xmpp.getOutConnections().stream().filter(c -> Duration.between(now, c.updated).toMinutes() > TIMEOUT_MINUTES)
+ xmpp.getOutConnections().keySet().stream().filter(c -> Duration.between(now, c.getUpdated()).toMinutes() > TIMEOUT_MINUTES)
.forEach(c -> {
logger.info("closing idle outgoing connection to {}", c.to);
- c.closeConnection();
+ c.logoff();
xmpp.getOutConnections().remove(c);
});
diff --git a/juick-xmpp/src/main/java/com/juick/components/JuickBot.java b/juick-xmpp/src/main/java/com/juick/components/JuickBot.java
index 4f838344..cefacb56 100644
--- a/juick-xmpp/src/main/java/com/juick/components/JuickBot.java
+++ b/juick-xmpp/src/main/java/com/juick/components/JuickBot.java
@@ -56,8 +56,6 @@ public class JuickBot implements StanzaListener, AutoCloseable {
@Inject
private XMPPConnection router;
@Value("${xmppbot_jid}")
- private String xmppbotJidStr;
-
private Jid jid;
private PrettyTime pt;
@@ -76,7 +74,6 @@ public class JuickBot implements StanzaListener, AutoCloseable {
@PostConstruct
public void init() {
xmpp.addStanzaListener(this);
- jid = Jid.of(xmppbotJidStr);
broadcastPresence(null);
pt = new PrettyTime(new Locale("ru"));
}
diff --git a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
index 1df7d575..cb2bb681 100644
--- a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
+++ b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
@@ -18,46 +18,53 @@
package com.juick.components;
import com.juick.components.s2s.*;
+import com.juick.xmpp.extensions.JuickMessage;
+import com.juick.xmpp.extensions.StreamError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.xmlpull.v1.XmlPullParserException;
+import rocks.xmpp.addr.Jid;
import rocks.xmpp.core.stanza.model.Stanza;
import rocks.xmpp.util.XmppUtils;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
+import javax.net.ssl.*;
import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
-import java.io.IOException;
-import java.io.StringWriter;
+import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
-import java.security.KeyManagementException;
+import java.security.KeyStore;
import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.Iterator;
+import java.security.SecureRandom;
+import java.time.Instant;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
+import static com.juick.components.s2s.Connection.NS_TLS;
+
/**
* @author ugnich
*/
@Component
-public class XMPPServer implements AutoCloseable {
+public class XMPPServer implements ConnectionListener, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(XMPPServer.class);
@Inject
public ExecutorService service;
@Value("${hostname}")
- public String HOSTNAME;
+ private Jid jid;
@Value("${s2s_port:5269}")
private int s2sPort;
@Value("${keystore}")
@@ -70,10 +77,24 @@ public class XMPPServer implements AutoCloseable {
public String[] bannedHosts;
private final List<ConnectionIn> inConnections = new CopyOnWriteArrayList<>();
- private final List<ConnectionOut> outConnections = new CopyOnWriteArrayList<>();
+ private final Map<StreamServerDialback, Socket> outConnections = new ConcurrentHashMap<>();
private final List<CacheEntry> outCache = new CopyOnWriteArrayList<>();
private final List<StanzaListener> stanzaListeners = new CopyOnWriteArrayList<>();
+ SSLContext sc;
+ private TrustManager[] trustAllCerts = new TrustManager[]{
+ new X509TrustManager() {
+ public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) {
+ }
+
+ public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) {
+ }
+ public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ }
+ };
+
private ServerSocket listener;
@@ -81,9 +102,18 @@ public class XMPPServer implements AutoCloseable {
private BasicXmppSession session;
@PostConstruct
- public void init() {
-
- logger.info("component initialized");
+ public void init() throws KeyStoreException {
+ KeyStore ks = KeyStore.getInstance("JKS");
+ try (InputStream ksIs = new FileInputStream(keystore)) {
+ ks.load(ksIs, keystorePassword.toCharArray());
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
+ .getDefaultAlgorithm());
+ kmf.init(ks, keystorePassword.toCharArray());
+ sc = SSLContext.getInstance("TLSv1.2");
+ sc.init(kmf.getKeyManagers(), trustAllCerts, new SecureRandom());
+ } catch (Exception e) {
+ logger.warn("tls unavailable");
+ }
service.submit(() -> {
try {
listener = new ServerSocket(s2sPort);
@@ -97,7 +127,7 @@ public class XMPPServer implements AutoCloseable {
}
} catch (SocketException e) {
// shutdown
- } catch (IOException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException | KeyStoreException | XmlPullParserException | KeyManagementException e) {
+ } catch (IOException | XmlPullParserException e) {
logger.warn("xmpp exception", e);
}
});
@@ -108,9 +138,9 @@ public class XMPPServer implements AutoCloseable {
if (!listener.isClosed()) {
listener.close();
}
- outConnections.forEach(c -> {
- c.closeConnection();
- outConnections.remove(c);
+ outConnections.forEach((c, s) -> {
+ c.logoff();
+ outConnections.remove(s);
});
inConnections.forEach(c -> {
c.closeConnection();
@@ -124,32 +154,34 @@ public class XMPPServer implements AutoCloseable {
}
public void addConnectionIn(ConnectionIn c) {
+ c.setListener(this);
inConnections.add(c);
}
- public void addConnectionOut(ConnectionOut c) {
- outConnections.add(c);
+ public void addConnectionOut(Socket socket, StreamServerDialback c) {
+ c.setListener(this);
+ outConnections.put(c, socket);
}
public void removeConnectionIn(ConnectionIn c) {
inConnections.remove(c);
}
- public void removeConnectionOut(ConnectionOut c) {
+ public void removeConnectionOut(StreamServerDialback c) {
outConnections.remove(c);
}
- public String getFromCache(String hostname) {
+ public String getFromCache(Jid to) {
final String[] cache = new String[1];
- outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(hostname)).findFirst().ifPresent(c -> {
+ outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(to)).findFirst().ifPresent(c -> {
cache[0] = c.xml;
outCache.remove(c);
});
return cache[0];
}
- public Optional<ConnectionOut> getConnectionOut(String hostname, boolean needReady) {
- return outConnections.stream().filter(c -> c.to != null &&
+ public Optional<StreamServerDialback> getConnectionOut(Jid hostname, boolean needReady) {
+ return outConnections.keySet().stream().filter(c -> c.to != null &&
c.to.equals(hostname) && (!needReady || c.streamReady)).findFirst();
}
@@ -167,17 +199,17 @@ public class XMPPServer implements AutoCloseable {
xmppStreamWriter.close();
String xml = stanzaWriter.toString();
logger.info("s2s (out): {}", xml);
- sendOut(s.getTo().getDomain(), xml);
+ sendOut(Jid.of(s.getTo().getDomain()), xml);
} catch (XMLStreamException | JAXBException e1) {
logger.info("jaxb exception", e1);
}
}
- public void sendOut(String hostname, String xml) {
+ public void sendOut(Jid hostname, String xml) {
boolean haveAnyConn = false;
- ConnectionOut connOut = null;
- for (ConnectionOut c : outConnections) {
+ StreamServerDialback connOut = null;
+ for (StreamServerDialback c : outConnections.keySet()) {
if (c.to != null && c.to.equals(hostname)) {
if (c.streamReady) {
connOut = c;
@@ -189,7 +221,7 @@ public class XMPPServer implements AutoCloseable {
}
}
if (connOut != null) {
- connOut.sendStanza(xml);
+ connOut.send(xml);
return;
}
@@ -197,7 +229,7 @@ public class XMPPServer implements AutoCloseable {
for (CacheEntry c : outCache) {
if (c.hostname != null && c.hostname.equals(hostname)) {
c.xml += xml;
- c.tsUpdated = System.currentTimeMillis();
+ c.updated = Instant.now();
haveCache = true;
break;
}
@@ -207,24 +239,31 @@ public class XMPPServer implements AutoCloseable {
}
if (!haveAnyConn) {
+ createDialbackConnection(hostname.toEscapedString(), null, null);
+ }
+ }
+
+ void createDialbackConnection(String to, String checkSID, String dbKey) {
+ service.submit(() -> {
try {
- ConnectionOut connectionOut = new ConnectionOut(this, hostname);
- addConnectionOut(connectionOut);
- service.submit(connectionOut);
- } catch (CertificateException | UnrecoverableKeyException | NoSuchAlgorithmException | XmlPullParserException | KeyStoreException | KeyManagementException | IOException e) {
+ Socket socket = new Socket();
+ socket.connect(DNSQueries.getServerAddress(to));
+ StreamServerDialback streamServerDialback = new StreamServerDialback(getJid(), Jid.of(to), socket.getInputStream(), socket.getOutputStream(), checkSID, dbKey);
+ addConnectionOut(socket, streamServerDialback);
+ streamServerDialback.addChildParser(new JuickMessage());
+ streamServerDialback.connect();
+ } catch (Exception e) {
logger.error("s2s out error", e);
}
- }
+ });
}
- public void startDialback(String from, String streamId, String dbKey) throws Exception {
- Optional<ConnectionOut> c = getConnectionOut(from, false);
+ public void startDialback(Jid from, String streamId, String dbKey) throws Exception {
+ Optional<StreamServerDialback> c = getConnectionOut(from, false);
if (c.isPresent()) {
c.get().sendDialbackVerify(streamId, dbKey);
} else {
- ConnectionOut newConnection = new ConnectionOut(this, from, streamId, dbKey);
- addConnectionOut(newConnection);
- service.submit(newConnection);
+ createDialbackConnection(from.toEscapedString(), streamId, dbKey);
}
}
@@ -232,8 +271,9 @@ public class XMPPServer implements AutoCloseable {
stanzaListeners.add(listener);
}
- public void onStanzaReceived(Stanza xmlValue) {
- stanzaListeners.forEach(l -> l.stanzaReceived(xmlValue));
+ public void onStanzaReceived(String xmlValue) {
+ Stanza stanza = parse(xmlValue);
+ stanzaListeners.forEach(l -> l.stanzaReceived(stanza));
}
public BasicXmppSession getSession() {
@@ -244,7 +284,105 @@ public class XMPPServer implements AutoCloseable {
return inConnections;
}
- public List<ConnectionOut> getOutConnections() {
+ public Map<StreamServerDialback, Socket> getOutConnections() {
return outConnections;
}
+
+ @Override
+ public void starttls(ConnectionIn connection) {
+ logger.info("stream {} securing", connection.streamID);
+ connection.sendStanza("<proceed xmlns=\"" + NS_TLS + "\" />");
+ try {
+ connection.setSocket(sc.getSocketFactory().createSocket(connection.getSocket(), connection.getSocket().getInetAddress().getHostAddress(),
+ connection.getSocket().getPort(), true));
+ ((SSLSocket) connection.getSocket()).setUseClientMode(false);
+ ((SSLSocket) connection.getSocket()).startHandshake();
+ connection.setSecured(true);
+ logger.info("stream {} secured", connection.streamID);
+ connection.restartParser();
+ } catch (XmlPullParserException | IOException sex) {
+ logger.warn("stream {} ssl error {}", connection.streamID, sex);
+ connection.sendStanza("<failed xmlns\"" + NS_TLS + "\" />");
+ removeConnectionIn(connection);
+ connection.closeConnection();
+ }
+ }
+
+ @Override
+ public void proceed(StreamServerDialback connection) {
+ try {
+ Socket socket = outConnections.get(connection);
+ socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(),
+ socket.getPort(), true);
+ ((SSLSocket) socket).startHandshake();
+ connection.setSecured(true);
+ logger.info("stream {} secured", connection.getStreamID());
+ connection.setInputStream(socket.getInputStream());
+ connection.setOutputStream(socket.getOutputStream());
+ connection.restartStream();
+ connection.sendOpenStream();
+ } catch (XmlPullParserException | IOException sex) {
+ logger.error("s2s ssl error: {} {}, error {}", connection.to, connection.getStreamID(), sex);
+ connection.send("<failed xmlns\"" + NS_TLS + "\" />");
+ removeConnectionOut(connection);
+ connection.logoff();
+ }
+ }
+
+ @Override
+ public void verify(StreamServerDialback connection, String from, String type, String sid) {
+ if (from != null && from.equals(connection.to.toEscapedString()) && sid != null && !sid.isEmpty() && type != null) {
+ getConnectionIn(sid).ifPresent(c -> c.sendDialbackResult(Jid.of(from), type));
+ }
+ }
+
+ @Override
+ public void dialbackError(StreamServerDialback connection, StreamError error) {
+ logger.warn("Stream error from {}: {}", connection.getStreamID(), error.getCondition());
+ removeConnectionOut(connection);
+ connection.logoff();
+ }
+
+ @Override
+ public void finished(StreamServerDialback connection, boolean dirty) {
+ logger.warn("stream to {} {} finished, dirty={}", connection.to, connection.getStreamID(), dirty);
+ removeConnectionOut(connection);
+ connection.logoff();
+ }
+
+ @Override
+ public void exception(StreamServerDialback connection, Exception ex) {
+ logger.error("s2s out exception: {} {}, exception {}", connection.to, connection.getStreamID(), ex);
+ removeConnectionOut(connection);
+ connection.logoff();
+ }
+
+ @Override
+ public void ready(StreamServerDialback connection) {
+ logger.info("stream to {} {} ready", connection.to, connection.getStreamID());
+ String cache = getFromCache(connection.to);
+ if (cache != null) {
+ logger.info("stream to {} {} sending cache", connection.to, connection.getStreamID());
+ connection.send(cache);
+ }
+ }
+
+ @Override
+ public boolean securing(StreamServerDialback connection) {
+ return !Arrays.asList(brokenSSLhosts).contains(connection.to.toEscapedString());
+ }
+
+ protected Stanza parse(String xml) {
+ try {
+ Unmarshaller unmarshaller = session.createUnmarshaller();
+ return (Stanza)unmarshaller.unmarshal(new StringReader(xml));
+ } catch (JAXBException e) {
+ logger.error("JAXB exception", e);
+ }
+ return null;
+ }
+
+ public Jid getJid() {
+ return jid;
+ }
}
diff --git a/juick-xmpp/src/main/java/com/juick/components/configuration/XmppAppConfiguration.java b/juick-xmpp/src/main/java/com/juick/components/configuration/XmppAppConfiguration.java
index 02b1556d..f14b2b23 100644
--- a/juick-xmpp/src/main/java/com/juick/components/configuration/XmppAppConfiguration.java
+++ b/juick-xmpp/src/main/java/com/juick/components/configuration/XmppAppConfiguration.java
@@ -23,11 +23,14 @@ package com.juick.components.configuration;
import com.juick.components.s2s.BasicXmppSession;
import com.juick.server.configuration.BaseWebConfiguration;
+import com.juick.xmpp.helpers.JidConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
+import org.springframework.core.convert.ConversionService;
+import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import rocks.xmpp.core.session.Extension;
@@ -58,4 +61,10 @@ public class XmppAppConfiguration extends BaseWebConfiguration {
.build();
return BasicXmppSession.create(hostname, configuration);
}
+ @Bean
+ public static ConversionService conversionService() {
+ DefaultFormattingConversionService cs = new DefaultFormattingConversionService();
+ cs.addConverter(new JidConverter());
+ return cs;
+ }
}
diff --git a/juick-xmpp/src/main/java/com/juick/components/controllers/StatusController.java b/juick-xmpp/src/main/java/com/juick/components/controllers/StatusController.java
index 697ad04a..f20fc76a 100644
--- a/juick-xmpp/src/main/java/com/juick/components/controllers/StatusController.java
+++ b/juick-xmpp/src/main/java/com/juick/components/controllers/StatusController.java
@@ -39,7 +39,7 @@ public class StatusController {
XMPPStatus status = new XMPPStatus();
if (xmpp != null) {
status.setInbound(xmpp.getInConnections());
- status.setOutbound(xmpp.getOutConnections());
+ status.setOutbound(xmpp.getOutConnections().keySet());
}
return status;
}
diff --git a/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java b/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java
index d3459049..9b5bb0d5 100644
--- a/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java
+++ b/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java
@@ -18,16 +18,17 @@
package com.juick.components.controllers.helpers;
import com.juick.components.s2s.ConnectionIn;
-import com.juick.components.s2s.ConnectionOut;
+import com.juick.components.s2s.StreamServerDialback;
import java.util.List;
+import java.util.Set;
/**
* Created by vitalyster on 16.02.2017.
*/
public class XMPPStatus {
private List<ConnectionIn> inbound;
- private List<ConnectionOut> outbound;
+ private Set<StreamServerDialback> outbound;
public List<ConnectionIn> getInbound() {
return inbound;
@@ -37,11 +38,11 @@ public class XMPPStatus {
this.inbound = inbound;
}
- public List<ConnectionOut> getOutbound() {
+ public Set<StreamServerDialback> getOutbound() {
return outbound;
}
- public void setOutbound(List<ConnectionOut> outbound) {
+ public void setOutbound(Set<StreamServerDialback> outbound) {
this.outbound = outbound;
}
}
diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/CacheEntry.java b/juick-xmpp/src/main/java/com/juick/components/s2s/CacheEntry.java
index 645b24f1..c8eeab53 100644
--- a/juick-xmpp/src/main/java/com/juick/components/s2s/CacheEntry.java
+++ b/juick-xmpp/src/main/java/com/juick/components/s2s/CacheEntry.java
@@ -17,20 +17,24 @@
package com.juick.components.s2s;
+import rocks.xmpp.addr.Jid;
+
+import java.time.Instant;
+
/**
*
* @author ugnich
*/
public class CacheEntry {
- public String hostname;
- public long tsCreated;
- public long tsUpdated;
+ public Jid hostname;
+ public Instant created;
+ public Instant updated;
public String xml;
- public CacheEntry(String hostname, String xml) {
+ public CacheEntry(Jid hostname, String xml) {
this.hostname = hostname;
- this.tsCreated = this.tsUpdated = System.currentTimeMillis();
+ this.created = this.updated =Instant.now();
this.xml = xml;
}
}
diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/Connection.java b/juick-xmpp/src/main/java/com/juick/components/s2s/Connection.java
index 7d3767a2..6a796307 100644
--- a/juick-xmpp/src/main/java/com/juick/components/s2s/Connection.java
+++ b/juick-xmpp/src/main/java/com/juick/components/s2s/Connection.java
@@ -24,15 +24,11 @@ import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import org.xmlpull.v1.XmlPullParserFactory;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.X509TrustManager;
-import java.io.*;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
-import java.security.*;
-import java.security.cert.CertificateException;
import java.time.Instant;
import java.util.UUID;
@@ -50,7 +46,7 @@ public class Connection {
public long bytesLocal = 0;
public long packetsLocal = 0;
XMPPServer xmpp;
- Socket socket;
+ private Socket socket;
public static final String NS_DB = "jabber:server:dialback";
public static final String NS_TLS = "urn:ietf:params:xml:ns:xmpp-tls";
public static final String NS_STREAM = "http://etherx.jabber.org/streams";
@@ -58,35 +54,12 @@ public class Connection {
XmlPullParser parser = factory.newPullParser();
OutputStreamWriter writer;
private boolean secured = false;
- SSLContext sc;
- private TrustManager[] trustAllCerts = new TrustManager[]{
- new X509TrustManager() {
- public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) {
- }
-
- public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) {
- }
- public java.security.cert.X509Certificate[] getAcceptedIssuers() {
- return null;
- }
- }
- };
-
-
- public Connection(XMPPServer xmpp) throws XmlPullParserException, KeyStoreException, CertificateException, NoSuchAlgorithmException, IOException, UnrecoverableKeyException, KeyManagementException {
+
+
+
+ public Connection(XMPPServer xmpp) throws XmlPullParserException {
this.xmpp = xmpp;
created = updated = Instant.now();
- KeyStore ks = KeyStore.getInstance("JKS");
- try (InputStream ksIs = new FileInputStream(xmpp.keystore)) {
- ks.load(ksIs, xmpp.keystorePassword.toCharArray());
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory
- .getDefaultAlgorithm());
- kmf.init(ks, xmpp.keystorePassword.toCharArray());
- sc = SSLContext.getInstance("TLSv1.2");
- sc.init(kmf.getKeyManagers(), trustAllCerts, new SecureRandom());
- } catch (Exception e) {
- logger.warn("tls unavailable");
- }
}
public void logParser() {
@@ -153,4 +126,12 @@ public class Connection {
parser.setInput(new InputStreamReader(socket.getInputStream()));
writer = new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8);
}
+
+ public Socket getSocket() {
+ return socket;
+ }
+
+ public void setSocket(Socket socket) {
+ this.socket = socket;
+ }
}
diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java
index e6f404ef..16f207a7 100644
--- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java
+++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java
@@ -24,52 +24,33 @@ import org.apache.commons.lang3.StringUtils;
import org.xmlpull.v1.XmlPullParser;
import org.xmlpull.v1.XmlPullParserException;
import rocks.xmpp.addr.Jid;
-import rocks.xmpp.core.stanza.model.Stanza;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLSocket;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
import java.io.EOFException;
import java.io.IOException;
-import java.io.StringReader;
import java.net.Socket;
import java.net.SocketException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.ArrayList;
+import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
/**
* @author ugnich
*/
public class ConnectionIn extends Connection implements Runnable {
- final public List<String> from = new ArrayList<>();
- public long tsRemoteData = 0;
+ final public List<Jid> from = new CopyOnWriteArrayList<>();
+ public Instant received;
public long packetsRemote = 0;
+ ConnectionListener listener;
- public ConnectionIn(XMPPServer xmpp, Socket socket) throws CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, XmlPullParserException, KeyManagementException, KeyStoreException, IOException {
+ public ConnectionIn(XMPPServer xmpp, Socket socket) throws XmlPullParserException, IOException {
super(xmpp);
- this.socket = socket;
+ this.setSocket(socket);
restartParser();
}
- protected Stanza parse(String xml) {
- try {
- Unmarshaller unmarshaller = xmpp.getSession().createUnmarshaller();
- return (Stanza)unmarshaller.unmarshal(new StringReader(xml));
- } catch (JAXBException e) {
- logger.error("JAXB exception", e);
- }
- return null;
- }
-
@Override
public void run() {
try {
@@ -79,7 +60,7 @@ public class ConnectionIn extends Connection implements Runnable {
|| !parser.getNamespace("stream").equals(NS_STREAM)) {
// || !parser.getAttributeValue(null, "version").equals("1.0")
// || !parser.getAttributeValue(null, "to").equals(Main.HOSTNAME)) {
- throw new Exception(String.format("stream from %s invalid", socket.getRemoteSocketAddress()));
+ throw new Exception(String.format("stream from %s invalid", getSocket().getRemoteSocketAddress()));
}
streamID = parser.getAttributeValue(null, "id");
if (streamID == null) {
@@ -108,14 +89,15 @@ public class ConnectionIn extends Connection implements Runnable {
String dfrom = parser.getAttributeValue(null, "from");
String to = parser.getAttributeValue(null, "to");
logger.info("stream from {} to {} {} asking for dialback", dfrom, to, streamID);
- if (dfrom.endsWith(xmpp.HOSTNAME) && (dfrom.equals(xmpp.HOSTNAME) || dfrom.endsWith("." + xmpp.HOSTNAME))) {
+ if (dfrom.endsWith(xmpp.getJid().toEscapedString()) && (dfrom.equals(xmpp.getJid().toEscapedString())
+ || dfrom.endsWith("." + xmpp.getJid()))) {
logger.warn("stream from {} is invalid", dfrom);
break;
}
- if (to != null && to.equals(xmpp.HOSTNAME)) {
+ if (to != null && to.equals(xmpp.getJid().toEscapedString())) {
String dbKey = XmlUtils.getTagText(parser);
updateTsRemoteData();
- xmpp.startDialback(dfrom, streamID, dbKey);
+ xmpp.startDialback(Jid.of(dfrom), streamID, dbKey);
} else {
logger.warn("stream from " + dfrom + " " + streamID + " invalid to " + to);
break;
@@ -128,7 +110,7 @@ public class ConnectionIn extends Connection implements Runnable {
updateTsRemoteData();
final boolean[] valid = {false};
if (vfrom != null && vto != null && vid != null && vkey != null) {
- xmpp.getConnectionOut(vfrom, false).ifPresent(c -> {
+ xmpp.getConnectionOut(Jid.of(vfrom), false).ifPresent(c -> {
String dialbackKey = c.dbKey;
valid[0] = vkey.equals(dialbackKey);
});
@@ -143,12 +125,12 @@ public class ConnectionIn extends Connection implements Runnable {
} else if (tag.equals("presence") && checkFromTo(parser)) {
String xml = XmlUtils.parseToString(parser, false);
logger.info("stream {} presence: {}", streamID, xml);
- xmpp.onStanzaReceived(parse(xml));
+ xmpp.onStanzaReceived(xml);
} else if (tag.equals("message") && checkFromTo(parser)) {
updateTsRemoteData();
String xml = XmlUtils.parseToString(parser, false);
logger.info("stream {} message: {}", streamID, xml);
- xmpp.onStanzaReceived(parse(xml));
+ xmpp.onStanzaReceived(xml);
} else if (tag.equals("iq") && checkFromTo(parser)) {
updateTsRemoteData();
@@ -156,25 +138,10 @@ public class ConnectionIn extends Connection implements Runnable {
String xml = XmlUtils.parseToString(parser, false);
if (type == null || !type.equals("error")) {
logger.info("stream {} iq: {}", streamID, xml);
- xmpp.onStanzaReceived(parse(xml));
- }
- } else if (sc != null && !isSecured() && tag.equals("starttls")) {
- logger.info("stream {} securing", streamID);
- sendStanza("<proceed xmlns=\"" + NS_TLS + "\" />");
- try {
- socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(),
- socket.getPort(), true);
- ((SSLSocket) socket).setUseClientMode(false);
- ((SSLSocket) socket).startHandshake();
- setSecured(true);
- logger.info("stream {} secured", streamID);
- restartParser();
- } catch (SSLException sex) {
- logger.warn("stream {} ssl error {}", streamID, sex);
- sendStanza("<failed xmlns\"" + NS_TLS + "\" />");
- xmpp.removeConnectionIn(this);
- closeConnection();
+ xmpp.onStanzaReceived(xml);
}
+ } else if (!isSecured() && tag.equals("starttls")) {
+ listener.starttls(this);
} else if (isSecured() && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) {
sendOpenStream(null, true);
} else if (tag.equals("error")) {
@@ -202,16 +169,16 @@ public class ConnectionIn extends Connection implements Runnable {
}
void updateTsRemoteData() {
- tsRemoteData = System.currentTimeMillis();
+ received = Instant.now();
}
void sendOpenStream(String from, boolean xmppversionnew) throws IOException {
String openStream = "<?xml version='1.0'?><stream:stream xmlns='jabber:server' " +
"xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" +
- xmpp.HOSTNAME + "' id='" + streamID + "' version='1.0'>";
+ xmpp.getJid().toEscapedString() + "' id='" + streamID + "' version='1.0'>";
if (xmppversionnew) {
openStream += "<stream:features>";
- if (sc != null && !isSecured() && !Arrays.asList(xmpp.brokenSSLhosts).contains(from)) {
+ if (listener != null && !isSecured() && !Arrays.asList(xmpp.brokenSSLhosts).contains(from)) {
openStream += "<starttls xmlns=\"" + NS_TLS + "\"><optional/></starttls>";
}
openStream += "</stream:features>";
@@ -219,8 +186,8 @@ public class ConnectionIn extends Connection implements Runnable {
sendStanza(openStream);
}
- public void sendDialbackResult(String sfrom, String type) {
- sendStanza("<db:result from='" + xmpp.HOSTNAME + "' to='" + sfrom + "' type='" + type + "'/>");
+ public void sendDialbackResult(Jid sfrom, String type) {
+ sendStanza("<db:result from='" + xmpp.getJid().toEscapedString() + "' to='" + sfrom + "' type='" + type + "'/>");
if (type.equals("valid")) {
from.add(sfrom);
logger.info("stream from {} {} ready", sfrom, streamID);
@@ -232,11 +199,10 @@ public class ConnectionIn extends Connection implements Runnable {
String cto = parser.getAttributeValue(null, "to");
if (StringUtils.isNotEmpty(cfrom) && StringUtils.isNotEmpty(cto)) {
Jid jidto = Jid.of(cto);
- if (jidto.getDomain().equals(xmpp.HOSTNAME)) {
+ if (jidto.getDomain().equals(xmpp.getJid().toEscapedString())) {
Jid jidfrom = Jid.of(cfrom);
- int size = from.size();
- for (int i = 0; i < size; i++) {
- if (from.get(i).equals(jidfrom.getDomain())) {
+ for (Jid aFrom : from) {
+ if (aFrom.equals(Jid.of(jidfrom.getDomain()))) {
return true;
}
}
@@ -244,4 +210,7 @@ public class ConnectionIn extends Connection implements Runnable {
}
return false;
}
+ public void setListener(ConnectionListener listener) {
+ this.listener = listener;
+ }
}
diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java
new file mode 100644
index 00000000..ef5a948b
--- /dev/null
+++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java
@@ -0,0 +1,14 @@
+package com.juick.components.s2s;
+
+import com.juick.xmpp.extensions.StreamError;
+
+public interface ConnectionListener {
+ void starttls(ConnectionIn connection);
+ void proceed(StreamServerDialback connection);
+ void verify(StreamServerDialback connection, String from, String type, String sid);
+ void dialbackError(StreamServerDialback connection, StreamError error);
+ void finished(StreamServerDialback connection, boolean dirty);
+ void exception(StreamServerDialback connection, Exception ex);
+ void ready(StreamServerDialback connection);
+ boolean securing(StreamServerDialback connection);
+}
diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java b/juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java
index 589ed18a..48c4d72d 100644
--- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java
+++ b/juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java
@@ -17,77 +17,69 @@
package com.juick.components.s2s;
-import com.juick.components.XMPPServer;
import com.juick.components.s2s.util.DialbackUtils;
+import com.juick.xmpp.Stream;
import com.juick.xmpp.extensions.StreamError;
import com.juick.xmpp.extensions.StreamFeatures;
import com.juick.xmpp.utils.XmlUtils;
import org.apache.commons.text.RandomStringGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.xmlpull.v1.XmlPullParser;
-import org.xmlpull.v1.XmlPullParserException;
+import rocks.xmpp.addr.Jid;
-import javax.net.ssl.SSLException;
-import javax.net.ssl.SSLSocket;
import java.io.EOFException;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.net.SocketException;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.UnrecoverableKeyException;
-import java.security.cert.CertificateException;
-import java.util.Arrays;
import java.util.UUID;
/**
* @author ugnich
*/
-public class ConnectionOut extends Connection implements Runnable {
+public class StreamServerDialback extends Stream {
+ protected static final Logger logger = LoggerFactory.getLogger(StreamServerDialback.class);
+ public static final String NS_TLS = "urn:ietf:params:xml:ns:xmpp-tls";
+ public static final String NS_DB = "jabber:server:dialback";
+ private boolean secured = false;
public boolean streamReady = false;
- public String to;
String checkSID = null;
String dbKey = null;
+ private String streamID;
+ ConnectionListener listener;
RandomStringGenerator generator = new RandomStringGenerator.Builder().withinRange('a', 'z').build();
- public ConnectionOut(XMPPServer xmpp, String hostname) throws CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, XmlPullParserException, KeyManagementException, KeyStoreException, IOException {
- super(xmpp);
- to = hostname;
- dbKey = DialbackUtils.generateDialbackKey(generator.generate(15), to, xmpp.HOSTNAME, streamID);
- }
-
- public ConnectionOut(XMPPServer xmpp, String hostname, String checkSID, String dbKey) throws Exception {
- super(xmpp);
- to = hostname;
+ public StreamServerDialback(Jid from, Jid to, InputStream is, OutputStream os, String checkSID, String dbKey) throws Exception {
+ super(from, to, is, os);
+ this.to = to;
this.checkSID = checkSID;
this.dbKey = dbKey;
+ if (dbKey == null) {
+ this.dbKey = DialbackUtils.generateDialbackKey(generator.generate(15), to, from, streamID);
+ }
streamID = UUID.randomUUID().toString();
}
- void sendOpenStream() throws IOException {
- sendStanza("<?xml version='1.0'?><stream:stream xmlns='jabber:server' id='" + streamID +
+ public void sendOpenStream() throws IOException {
+ send("<?xml version='1.0'?><stream:stream xmlns='jabber:server' id='" + streamID +
"' xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" +
- xmpp.HOSTNAME + "' to='" + to + "' version='1.0'>");
+ from.toEscapedString() + "' to='" + to.toEscapedString() + "' version='1.0'>");
}
void processDialback() throws Exception {
if (checkSID != null) {
sendDialbackVerify(checkSID, dbKey);
}
- sendStanza("<db:result from='" + xmpp.HOSTNAME + "' to='" + to + "'>" +
+ send("<db:result from='" + from.toEscapedString() + "' to='" + to.toEscapedString() + "'>" +
dbKey + "</db:result>");
}
@Override
- public void run() {
- logger.info("stream to {} start", to);
+ public void handshake() {
try {
- socket = new Socket();
- InetSocketAddress address = DNSQueries.getServerAddress(to);
- socket.connect(address);
- restartParser();
+ restartStream();
sendOpenStream();
@@ -107,21 +99,13 @@ public class ConnectionOut extends Connection implements Runnable {
if (parser.getEventType() != XmlPullParser.START_TAG) {
continue;
}
- logParser();
String tag = parser.getName();
if (tag.equals("result") && parser.getNamespace().equals(NS_DB)) {
String type = parser.getAttributeValue(null, "type");
if (type != null && type.equals("valid")) {
streamReady = true;
- logger.info("stream to {} {} ready", to, streamID);
-
- String cache = xmpp.getFromCache(to);
- if (cache != null) {
- logger.info("stream to {} {} sending cache", to, streamID);
- sendStanza(cache);
- }
-
+ listener.ready(this);
} else {
logger.info("stream to {} {} dialback fail", to, streamID);
}
@@ -130,62 +114,54 @@ public class ConnectionOut extends Connection implements Runnable {
String from = parser.getAttributeValue(null, "from");
String type = parser.getAttributeValue(null, "type");
String sid = parser.getAttributeValue(null, "id");
- if (from != null && from.equals(to) && sid != null && !sid.isEmpty() && type != null) {
- xmpp.getConnectionIn(sid).ifPresent(c -> c.sendDialbackResult(from, type));
- }
+ listener.verify(this, from, type, sid);
XmlUtils.skip(parser);
} else if (tag.equals("features") && parser.getNamespace().equals(NS_STREAM)) {
StreamFeatures features = StreamFeatures.parse(parser);
- if (sc != null && !isSecured() && features.STARTTLS >= 0 && !Arrays.asList(xmpp.brokenSSLhosts).contains(to)) {
- logger.info("stream to {} {} securing", to, streamID);
- sendStanza("<starttls xmlns=\"" + NS_TLS + "\" />");
+ if (listener != null && !secured && features.STARTTLS >= 0
+ && listener.securing(this)) {
+ logger.info("stream to {} {} securing", to.toEscapedString(), streamID);
+ send("<starttls xmlns=\"" + NS_TLS + "\" />");
} else {
processDialback();
}
} else if (tag.equals("proceed") && parser.getNamespace().equals(NS_TLS)) {
- try {
- socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(),
- socket.getPort(), true);
- ((SSLSocket) socket).startHandshake();
- setSecured(true);
- logger.info("stream {} secured", streamID);
- restartParser();
- sendOpenStream();
- } catch (SSLException sex) {
- logger.error("s2s ssl error: {} {}, error {}", to, streamID, sex);
- sendStanza("<failed xmlns\"" + NS_TLS + "\" />");
- xmpp.removeConnectionOut(this);
- closeConnection();
- }
- } else if (isSecured() && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) {
+ listener.proceed(this);
+ } else if (secured && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) {
streamID = parser.getAttributeValue(null, "id");
} else if (tag.equals("error")) {
StreamError streamError = StreamError.parse(parser);
- logger.warn("Stream error from {}: {}", streamID, streamError.getCondition());
- xmpp.removeConnectionOut(this);
- closeConnection();
+ listener.dialbackError(this, streamError);
} else {
String unhandledStanza = XmlUtils.parseToString(parser, true);
logger.warn("Unhandled stanza from {} {} : {}", to, streamID, unhandledStanza);
}
}
-
- logger.warn("stream to {} {} finished", to, streamID);
- xmpp.removeConnectionOut(ConnectionOut.this);
- closeConnection();
+ listener.finished(this, false);
} catch (EOFException | SocketException eofex) {
- logger.info("stream {} {} closed (dirty)", to, streamID);
- xmpp.removeConnectionOut(ConnectionOut.this);
- closeConnection();
+ listener.finished(this, true);
} catch (Exception e) {
- logger.error("s2s out exception: {} {}, exception {}", to, streamID, e);
- xmpp.removeConnectionOut(ConnectionOut.this);
- closeConnection();
+ listener.exception(this, e);
}
}
public void sendDialbackVerify(String sid, String key) {
- sendStanza("<db:verify from='" + xmpp.HOSTNAME + "' to='" + to + "' id='" + sid + "'>" +
+ send("<db:verify from='" + from.toEscapedString() + "' to='" + to + "' id='" + sid + "'>" +
key + "</db:verify>");
}
+ public void setListener(ConnectionListener listener) {
+ this.listener = listener;
+ }
+
+ public String getStreamID() {
+ return streamID;
+ }
+
+ public boolean isSecured() {
+ return secured;
+ }
+
+ public void setSecured(boolean secured) {
+ this.secured = secured;
+ }
}
diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/util/DialbackUtils.java b/juick-xmpp/src/main/java/com/juick/components/s2s/util/DialbackUtils.java
index 88626606..fc08c5d6 100644
--- a/juick-xmpp/src/main/java/com/juick/components/s2s/util/DialbackUtils.java
+++ b/juick-xmpp/src/main/java/com/juick/components/s2s/util/DialbackUtils.java
@@ -19,6 +19,7 @@ package com.juick.components.s2s.util;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.codec.digest.HmacUtils;
+import rocks.xmpp.addr.Jid;
/**
* Created by vitalyster on 05.12.2016.
@@ -28,8 +29,8 @@ public class DialbackUtils {
throw new IllegalStateException();
}
- public static String generateDialbackKey(String secret, String to, String from, String id) {
+ public static String generateDialbackKey(String secret, Jid to, Jid from, String id) {
return HmacUtils.hmacSha256Hex(DigestUtils.sha256(secret),
- (to + " " + from + " " + id).getBytes());
+ (to.toEscapedString() + " " + from.toEscapedString() + " " + id).getBytes());
}
}