diff options
author | Vitaly Takmazov | 2017-10-09 15:27:48 +0300 |
---|---|---|
committer | Vitaly Takmazov | 2017-10-10 09:37:31 +0300 |
commit | 448fc7e84732422011186a9a4633c345e9c6208e (patch) | |
tree | 7ec23a9f29939fecf6456956ead6cebd1338871d /juick-xmpp/src/main/java | |
parent | 443daa747298de315840cdd6ee6992519707e61a (diff) |
xmpp:ConnectionOut -> StreamServerDialback
Diffstat (limited to 'juick-xmpp/src/main/java')
13 files changed, 335 insertions, 232 deletions
diff --git a/juick-xmpp/src/main/java/com/juick/components/CleaningUp.java b/juick-xmpp/src/main/java/com/juick/components/CleaningUp.java index 16747ea4..a96b7c35 100644 --- a/juick-xmpp/src/main/java/com/juick/components/CleaningUp.java +++ b/juick-xmpp/src/main/java/com/juick/components/CleaningUp.java @@ -43,10 +43,10 @@ public class CleaningUp { @Scheduled(fixedDelay = 10000) public void cleanUp() { Instant now = Instant.now(); - xmpp.getOutConnections().stream().filter(c -> Duration.between(now, c.updated).toMinutes() > TIMEOUT_MINUTES) + xmpp.getOutConnections().keySet().stream().filter(c -> Duration.between(now, c.getUpdated()).toMinutes() > TIMEOUT_MINUTES) .forEach(c -> { logger.info("closing idle outgoing connection to {}", c.to); - c.closeConnection(); + c.logoff(); xmpp.getOutConnections().remove(c); }); diff --git a/juick-xmpp/src/main/java/com/juick/components/JuickBot.java b/juick-xmpp/src/main/java/com/juick/components/JuickBot.java index 4f838344..cefacb56 100644 --- a/juick-xmpp/src/main/java/com/juick/components/JuickBot.java +++ b/juick-xmpp/src/main/java/com/juick/components/JuickBot.java @@ -56,8 +56,6 @@ public class JuickBot implements StanzaListener, AutoCloseable { @Inject private XMPPConnection router; @Value("${xmppbot_jid}") - private String xmppbotJidStr; - private Jid jid; private PrettyTime pt; @@ -76,7 +74,6 @@ public class JuickBot implements StanzaListener, AutoCloseable { @PostConstruct public void init() { xmpp.addStanzaListener(this); - jid = Jid.of(xmppbotJidStr); broadcastPresence(null); pt = new PrettyTime(new Locale("ru")); } diff --git a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java index 1df7d575..cb2bb681 100644 --- a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java +++ b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java @@ -18,46 +18,53 @@ package com.juick.components; import com.juick.components.s2s.*; +import com.juick.xmpp.extensions.JuickMessage; +import com.juick.xmpp.extensions.StreamError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.xmlpull.v1.XmlPullParserException; +import rocks.xmpp.addr.Jid; import rocks.xmpp.core.stanza.model.Stanza; import rocks.xmpp.util.XmppUtils; import javax.annotation.PostConstruct; import javax.inject.Inject; +import javax.net.ssl.*; import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamWriter; -import java.io.IOException; -import java.io.StringWriter; +import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; -import java.security.KeyManagementException; +import java.security.KeyStore; import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; -import java.util.Iterator; +import java.security.SecureRandom; +import java.time.Instant; +import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; +import static com.juick.components.s2s.Connection.NS_TLS; + /** * @author ugnich */ @Component -public class XMPPServer implements AutoCloseable { +public class XMPPServer implements ConnectionListener, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(XMPPServer.class); @Inject public ExecutorService service; @Value("${hostname}") - public String HOSTNAME; + private Jid jid; @Value("${s2s_port:5269}") private int s2sPort; @Value("${keystore}") @@ -70,10 +77,24 @@ public class XMPPServer implements AutoCloseable { public String[] bannedHosts; private final List<ConnectionIn> inConnections = new CopyOnWriteArrayList<>(); - private final List<ConnectionOut> outConnections = new CopyOnWriteArrayList<>(); + private final Map<StreamServerDialback, Socket> outConnections = new ConcurrentHashMap<>(); private final List<CacheEntry> outCache = new CopyOnWriteArrayList<>(); private final List<StanzaListener> stanzaListeners = new CopyOnWriteArrayList<>(); + SSLContext sc; + private TrustManager[] trustAllCerts = new TrustManager[]{ + new X509TrustManager() { + public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) { + } + + public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) { + } + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return null; + } + } + }; + private ServerSocket listener; @@ -81,9 +102,18 @@ public class XMPPServer implements AutoCloseable { private BasicXmppSession session; @PostConstruct - public void init() { - - logger.info("component initialized"); + public void init() throws KeyStoreException { + KeyStore ks = KeyStore.getInstance("JKS"); + try (InputStream ksIs = new FileInputStream(keystore)) { + ks.load(ksIs, keystorePassword.toCharArray()); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory + .getDefaultAlgorithm()); + kmf.init(ks, keystorePassword.toCharArray()); + sc = SSLContext.getInstance("TLSv1.2"); + sc.init(kmf.getKeyManagers(), trustAllCerts, new SecureRandom()); + } catch (Exception e) { + logger.warn("tls unavailable"); + } service.submit(() -> { try { listener = new ServerSocket(s2sPort); @@ -97,7 +127,7 @@ public class XMPPServer implements AutoCloseable { } } catch (SocketException e) { // shutdown - } catch (IOException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException | KeyStoreException | XmlPullParserException | KeyManagementException e) { + } catch (IOException | XmlPullParserException e) { logger.warn("xmpp exception", e); } }); @@ -108,9 +138,9 @@ public class XMPPServer implements AutoCloseable { if (!listener.isClosed()) { listener.close(); } - outConnections.forEach(c -> { - c.closeConnection(); - outConnections.remove(c); + outConnections.forEach((c, s) -> { + c.logoff(); + outConnections.remove(s); }); inConnections.forEach(c -> { c.closeConnection(); @@ -124,32 +154,34 @@ public class XMPPServer implements AutoCloseable { } public void addConnectionIn(ConnectionIn c) { + c.setListener(this); inConnections.add(c); } - public void addConnectionOut(ConnectionOut c) { - outConnections.add(c); + public void addConnectionOut(Socket socket, StreamServerDialback c) { + c.setListener(this); + outConnections.put(c, socket); } public void removeConnectionIn(ConnectionIn c) { inConnections.remove(c); } - public void removeConnectionOut(ConnectionOut c) { + public void removeConnectionOut(StreamServerDialback c) { outConnections.remove(c); } - public String getFromCache(String hostname) { + public String getFromCache(Jid to) { final String[] cache = new String[1]; - outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(hostname)).findFirst().ifPresent(c -> { + outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(to)).findFirst().ifPresent(c -> { cache[0] = c.xml; outCache.remove(c); }); return cache[0]; } - public Optional<ConnectionOut> getConnectionOut(String hostname, boolean needReady) { - return outConnections.stream().filter(c -> c.to != null && + public Optional<StreamServerDialback> getConnectionOut(Jid hostname, boolean needReady) { + return outConnections.keySet().stream().filter(c -> c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)).findFirst(); } @@ -167,17 +199,17 @@ public class XMPPServer implements AutoCloseable { xmppStreamWriter.close(); String xml = stanzaWriter.toString(); logger.info("s2s (out): {}", xml); - sendOut(s.getTo().getDomain(), xml); + sendOut(Jid.of(s.getTo().getDomain()), xml); } catch (XMLStreamException | JAXBException e1) { logger.info("jaxb exception", e1); } } - public void sendOut(String hostname, String xml) { + public void sendOut(Jid hostname, String xml) { boolean haveAnyConn = false; - ConnectionOut connOut = null; - for (ConnectionOut c : outConnections) { + StreamServerDialback connOut = null; + for (StreamServerDialback c : outConnections.keySet()) { if (c.to != null && c.to.equals(hostname)) { if (c.streamReady) { connOut = c; @@ -189,7 +221,7 @@ public class XMPPServer implements AutoCloseable { } } if (connOut != null) { - connOut.sendStanza(xml); + connOut.send(xml); return; } @@ -197,7 +229,7 @@ public class XMPPServer implements AutoCloseable { for (CacheEntry c : outCache) { if (c.hostname != null && c.hostname.equals(hostname)) { c.xml += xml; - c.tsUpdated = System.currentTimeMillis(); + c.updated = Instant.now(); haveCache = true; break; } @@ -207,24 +239,31 @@ public class XMPPServer implements AutoCloseable { } if (!haveAnyConn) { + createDialbackConnection(hostname.toEscapedString(), null, null); + } + } + + void createDialbackConnection(String to, String checkSID, String dbKey) { + service.submit(() -> { try { - ConnectionOut connectionOut = new ConnectionOut(this, hostname); - addConnectionOut(connectionOut); - service.submit(connectionOut); - } catch (CertificateException | UnrecoverableKeyException | NoSuchAlgorithmException | XmlPullParserException | KeyStoreException | KeyManagementException | IOException e) { + Socket socket = new Socket(); + socket.connect(DNSQueries.getServerAddress(to)); + StreamServerDialback streamServerDialback = new StreamServerDialback(getJid(), Jid.of(to), socket.getInputStream(), socket.getOutputStream(), checkSID, dbKey); + addConnectionOut(socket, streamServerDialback); + streamServerDialback.addChildParser(new JuickMessage()); + streamServerDialback.connect(); + } catch (Exception e) { logger.error("s2s out error", e); } - } + }); } - public void startDialback(String from, String streamId, String dbKey) throws Exception { - Optional<ConnectionOut> c = getConnectionOut(from, false); + public void startDialback(Jid from, String streamId, String dbKey) throws Exception { + Optional<StreamServerDialback> c = getConnectionOut(from, false); if (c.isPresent()) { c.get().sendDialbackVerify(streamId, dbKey); } else { - ConnectionOut newConnection = new ConnectionOut(this, from, streamId, dbKey); - addConnectionOut(newConnection); - service.submit(newConnection); + createDialbackConnection(from.toEscapedString(), streamId, dbKey); } } @@ -232,8 +271,9 @@ public class XMPPServer implements AutoCloseable { stanzaListeners.add(listener); } - public void onStanzaReceived(Stanza xmlValue) { - stanzaListeners.forEach(l -> l.stanzaReceived(xmlValue)); + public void onStanzaReceived(String xmlValue) { + Stanza stanza = parse(xmlValue); + stanzaListeners.forEach(l -> l.stanzaReceived(stanza)); } public BasicXmppSession getSession() { @@ -244,7 +284,105 @@ public class XMPPServer implements AutoCloseable { return inConnections; } - public List<ConnectionOut> getOutConnections() { + public Map<StreamServerDialback, Socket> getOutConnections() { return outConnections; } + + @Override + public void starttls(ConnectionIn connection) { + logger.info("stream {} securing", connection.streamID); + connection.sendStanza("<proceed xmlns=\"" + NS_TLS + "\" />"); + try { + connection.setSocket(sc.getSocketFactory().createSocket(connection.getSocket(), connection.getSocket().getInetAddress().getHostAddress(), + connection.getSocket().getPort(), true)); + ((SSLSocket) connection.getSocket()).setUseClientMode(false); + ((SSLSocket) connection.getSocket()).startHandshake(); + connection.setSecured(true); + logger.info("stream {} secured", connection.streamID); + connection.restartParser(); + } catch (XmlPullParserException | IOException sex) { + logger.warn("stream {} ssl error {}", connection.streamID, sex); + connection.sendStanza("<failed xmlns\"" + NS_TLS + "\" />"); + removeConnectionIn(connection); + connection.closeConnection(); + } + } + + @Override + public void proceed(StreamServerDialback connection) { + try { + Socket socket = outConnections.get(connection); + socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(), + socket.getPort(), true); + ((SSLSocket) socket).startHandshake(); + connection.setSecured(true); + logger.info("stream {} secured", connection.getStreamID()); + connection.setInputStream(socket.getInputStream()); + connection.setOutputStream(socket.getOutputStream()); + connection.restartStream(); + connection.sendOpenStream(); + } catch (XmlPullParserException | IOException sex) { + logger.error("s2s ssl error: {} {}, error {}", connection.to, connection.getStreamID(), sex); + connection.send("<failed xmlns\"" + NS_TLS + "\" />"); + removeConnectionOut(connection); + connection.logoff(); + } + } + + @Override + public void verify(StreamServerDialback connection, String from, String type, String sid) { + if (from != null && from.equals(connection.to.toEscapedString()) && sid != null && !sid.isEmpty() && type != null) { + getConnectionIn(sid).ifPresent(c -> c.sendDialbackResult(Jid.of(from), type)); + } + } + + @Override + public void dialbackError(StreamServerDialback connection, StreamError error) { + logger.warn("Stream error from {}: {}", connection.getStreamID(), error.getCondition()); + removeConnectionOut(connection); + connection.logoff(); + } + + @Override + public void finished(StreamServerDialback connection, boolean dirty) { + logger.warn("stream to {} {} finished, dirty={}", connection.to, connection.getStreamID(), dirty); + removeConnectionOut(connection); + connection.logoff(); + } + + @Override + public void exception(StreamServerDialback connection, Exception ex) { + logger.error("s2s out exception: {} {}, exception {}", connection.to, connection.getStreamID(), ex); + removeConnectionOut(connection); + connection.logoff(); + } + + @Override + public void ready(StreamServerDialback connection) { + logger.info("stream to {} {} ready", connection.to, connection.getStreamID()); + String cache = getFromCache(connection.to); + if (cache != null) { + logger.info("stream to {} {} sending cache", connection.to, connection.getStreamID()); + connection.send(cache); + } + } + + @Override + public boolean securing(StreamServerDialback connection) { + return !Arrays.asList(brokenSSLhosts).contains(connection.to.toEscapedString()); + } + + protected Stanza parse(String xml) { + try { + Unmarshaller unmarshaller = session.createUnmarshaller(); + return (Stanza)unmarshaller.unmarshal(new StringReader(xml)); + } catch (JAXBException e) { + logger.error("JAXB exception", e); + } + return null; + } + + public Jid getJid() { + return jid; + } } diff --git a/juick-xmpp/src/main/java/com/juick/components/configuration/XmppAppConfiguration.java b/juick-xmpp/src/main/java/com/juick/components/configuration/XmppAppConfiguration.java index 02b1556d..f14b2b23 100644 --- a/juick-xmpp/src/main/java/com/juick/components/configuration/XmppAppConfiguration.java +++ b/juick-xmpp/src/main/java/com/juick/components/configuration/XmppAppConfiguration.java @@ -23,11 +23,14 @@ package com.juick.components.configuration; import com.juick.components.s2s.BasicXmppSession; import com.juick.server.configuration.BaseWebConfiguration; +import com.juick.xmpp.helpers.JidConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; +import org.springframework.core.convert.ConversionService; +import org.springframework.format.support.DefaultFormattingConversionService; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.web.servlet.config.annotation.EnableWebMvc; import rocks.xmpp.core.session.Extension; @@ -58,4 +61,10 @@ public class XmppAppConfiguration extends BaseWebConfiguration { .build(); return BasicXmppSession.create(hostname, configuration); } + @Bean + public static ConversionService conversionService() { + DefaultFormattingConversionService cs = new DefaultFormattingConversionService(); + cs.addConverter(new JidConverter()); + return cs; + } } diff --git a/juick-xmpp/src/main/java/com/juick/components/controllers/StatusController.java b/juick-xmpp/src/main/java/com/juick/components/controllers/StatusController.java index 697ad04a..f20fc76a 100644 --- a/juick-xmpp/src/main/java/com/juick/components/controllers/StatusController.java +++ b/juick-xmpp/src/main/java/com/juick/components/controllers/StatusController.java @@ -39,7 +39,7 @@ public class StatusController { XMPPStatus status = new XMPPStatus(); if (xmpp != null) { status.setInbound(xmpp.getInConnections()); - status.setOutbound(xmpp.getOutConnections()); + status.setOutbound(xmpp.getOutConnections().keySet()); } return status; } diff --git a/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java b/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java index d3459049..9b5bb0d5 100644 --- a/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java +++ b/juick-xmpp/src/main/java/com/juick/components/controllers/helpers/XMPPStatus.java @@ -18,16 +18,17 @@ package com.juick.components.controllers.helpers; import com.juick.components.s2s.ConnectionIn; -import com.juick.components.s2s.ConnectionOut; +import com.juick.components.s2s.StreamServerDialback; import java.util.List; +import java.util.Set; /** * Created by vitalyster on 16.02.2017. */ public class XMPPStatus { private List<ConnectionIn> inbound; - private List<ConnectionOut> outbound; + private Set<StreamServerDialback> outbound; public List<ConnectionIn> getInbound() { return inbound; @@ -37,11 +38,11 @@ public class XMPPStatus { this.inbound = inbound; } - public List<ConnectionOut> getOutbound() { + public Set<StreamServerDialback> getOutbound() { return outbound; } - public void setOutbound(List<ConnectionOut> outbound) { + public void setOutbound(Set<StreamServerDialback> outbound) { this.outbound = outbound; } } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/CacheEntry.java b/juick-xmpp/src/main/java/com/juick/components/s2s/CacheEntry.java index 645b24f1..c8eeab53 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/CacheEntry.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/CacheEntry.java @@ -17,20 +17,24 @@ package com.juick.components.s2s; +import rocks.xmpp.addr.Jid; + +import java.time.Instant; + /** * * @author ugnich */ public class CacheEntry { - public String hostname; - public long tsCreated; - public long tsUpdated; + public Jid hostname; + public Instant created; + public Instant updated; public String xml; - public CacheEntry(String hostname, String xml) { + public CacheEntry(Jid hostname, String xml) { this.hostname = hostname; - this.tsCreated = this.tsUpdated = System.currentTimeMillis(); + this.created = this.updated =Instant.now(); this.xml = xml; } } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/Connection.java b/juick-xmpp/src/main/java/com/juick/components/s2s/Connection.java index 7d3767a2..6a796307 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/Connection.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/Connection.java @@ -24,15 +24,11 @@ import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParserException; import org.xmlpull.v1.XmlPullParserFactory; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManager; -import javax.net.ssl.X509TrustManager; -import java.io.*; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.net.Socket; import java.nio.charset.StandardCharsets; -import java.security.*; -import java.security.cert.CertificateException; import java.time.Instant; import java.util.UUID; @@ -50,7 +46,7 @@ public class Connection { public long bytesLocal = 0; public long packetsLocal = 0; XMPPServer xmpp; - Socket socket; + private Socket socket; public static final String NS_DB = "jabber:server:dialback"; public static final String NS_TLS = "urn:ietf:params:xml:ns:xmpp-tls"; public static final String NS_STREAM = "http://etherx.jabber.org/streams"; @@ -58,35 +54,12 @@ public class Connection { XmlPullParser parser = factory.newPullParser(); OutputStreamWriter writer; private boolean secured = false; - SSLContext sc; - private TrustManager[] trustAllCerts = new TrustManager[]{ - new X509TrustManager() { - public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) { - } - - public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) { - } - public java.security.cert.X509Certificate[] getAcceptedIssuers() { - return null; - } - } - }; - - - public Connection(XMPPServer xmpp) throws XmlPullParserException, KeyStoreException, CertificateException, NoSuchAlgorithmException, IOException, UnrecoverableKeyException, KeyManagementException { + + + + public Connection(XMPPServer xmpp) throws XmlPullParserException { this.xmpp = xmpp; created = updated = Instant.now(); - KeyStore ks = KeyStore.getInstance("JKS"); - try (InputStream ksIs = new FileInputStream(xmpp.keystore)) { - ks.load(ksIs, xmpp.keystorePassword.toCharArray()); - KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory - .getDefaultAlgorithm()); - kmf.init(ks, xmpp.keystorePassword.toCharArray()); - sc = SSLContext.getInstance("TLSv1.2"); - sc.init(kmf.getKeyManagers(), trustAllCerts, new SecureRandom()); - } catch (Exception e) { - logger.warn("tls unavailable"); - } } public void logParser() { @@ -153,4 +126,12 @@ public class Connection { parser.setInput(new InputStreamReader(socket.getInputStream())); writer = new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8); } + + public Socket getSocket() { + return socket; + } + + public void setSocket(Socket socket) { + this.socket = socket; + } } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java index e6f404ef..16f207a7 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java @@ -24,52 +24,33 @@ import org.apache.commons.lang3.StringUtils; import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParserException; import rocks.xmpp.addr.Jid; -import rocks.xmpp.core.stanza.model.Stanza; -import javax.net.ssl.SSLException; -import javax.net.ssl.SSLSocket; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Unmarshaller; import java.io.EOFException; import java.io.IOException; -import java.io.StringReader; import java.net.Socket; import java.net.SocketException; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; -import java.util.ArrayList; +import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; /** * @author ugnich */ public class ConnectionIn extends Connection implements Runnable { - final public List<String> from = new ArrayList<>(); - public long tsRemoteData = 0; + final public List<Jid> from = new CopyOnWriteArrayList<>(); + public Instant received; public long packetsRemote = 0; + ConnectionListener listener; - public ConnectionIn(XMPPServer xmpp, Socket socket) throws CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, XmlPullParserException, KeyManagementException, KeyStoreException, IOException { + public ConnectionIn(XMPPServer xmpp, Socket socket) throws XmlPullParserException, IOException { super(xmpp); - this.socket = socket; + this.setSocket(socket); restartParser(); } - protected Stanza parse(String xml) { - try { - Unmarshaller unmarshaller = xmpp.getSession().createUnmarshaller(); - return (Stanza)unmarshaller.unmarshal(new StringReader(xml)); - } catch (JAXBException e) { - logger.error("JAXB exception", e); - } - return null; - } - @Override public void run() { try { @@ -79,7 +60,7 @@ public class ConnectionIn extends Connection implements Runnable { || !parser.getNamespace("stream").equals(NS_STREAM)) { // || !parser.getAttributeValue(null, "version").equals("1.0") // || !parser.getAttributeValue(null, "to").equals(Main.HOSTNAME)) { - throw new Exception(String.format("stream from %s invalid", socket.getRemoteSocketAddress())); + throw new Exception(String.format("stream from %s invalid", getSocket().getRemoteSocketAddress())); } streamID = parser.getAttributeValue(null, "id"); if (streamID == null) { @@ -108,14 +89,15 @@ public class ConnectionIn extends Connection implements Runnable { String dfrom = parser.getAttributeValue(null, "from"); String to = parser.getAttributeValue(null, "to"); logger.info("stream from {} to {} {} asking for dialback", dfrom, to, streamID); - if (dfrom.endsWith(xmpp.HOSTNAME) && (dfrom.equals(xmpp.HOSTNAME) || dfrom.endsWith("." + xmpp.HOSTNAME))) { + if (dfrom.endsWith(xmpp.getJid().toEscapedString()) && (dfrom.equals(xmpp.getJid().toEscapedString()) + || dfrom.endsWith("." + xmpp.getJid()))) { logger.warn("stream from {} is invalid", dfrom); break; } - if (to != null && to.equals(xmpp.HOSTNAME)) { + if (to != null && to.equals(xmpp.getJid().toEscapedString())) { String dbKey = XmlUtils.getTagText(parser); updateTsRemoteData(); - xmpp.startDialback(dfrom, streamID, dbKey); + xmpp.startDialback(Jid.of(dfrom), streamID, dbKey); } else { logger.warn("stream from " + dfrom + " " + streamID + " invalid to " + to); break; @@ -128,7 +110,7 @@ public class ConnectionIn extends Connection implements Runnable { updateTsRemoteData(); final boolean[] valid = {false}; if (vfrom != null && vto != null && vid != null && vkey != null) { - xmpp.getConnectionOut(vfrom, false).ifPresent(c -> { + xmpp.getConnectionOut(Jid.of(vfrom), false).ifPresent(c -> { String dialbackKey = c.dbKey; valid[0] = vkey.equals(dialbackKey); }); @@ -143,12 +125,12 @@ public class ConnectionIn extends Connection implements Runnable { } else if (tag.equals("presence") && checkFromTo(parser)) { String xml = XmlUtils.parseToString(parser, false); logger.info("stream {} presence: {}", streamID, xml); - xmpp.onStanzaReceived(parse(xml)); + xmpp.onStanzaReceived(xml); } else if (tag.equals("message") && checkFromTo(parser)) { updateTsRemoteData(); String xml = XmlUtils.parseToString(parser, false); logger.info("stream {} message: {}", streamID, xml); - xmpp.onStanzaReceived(parse(xml)); + xmpp.onStanzaReceived(xml); } else if (tag.equals("iq") && checkFromTo(parser)) { updateTsRemoteData(); @@ -156,25 +138,10 @@ public class ConnectionIn extends Connection implements Runnable { String xml = XmlUtils.parseToString(parser, false); if (type == null || !type.equals("error")) { logger.info("stream {} iq: {}", streamID, xml); - xmpp.onStanzaReceived(parse(xml)); - } - } else if (sc != null && !isSecured() && tag.equals("starttls")) { - logger.info("stream {} securing", streamID); - sendStanza("<proceed xmlns=\"" + NS_TLS + "\" />"); - try { - socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(), - socket.getPort(), true); - ((SSLSocket) socket).setUseClientMode(false); - ((SSLSocket) socket).startHandshake(); - setSecured(true); - logger.info("stream {} secured", streamID); - restartParser(); - } catch (SSLException sex) { - logger.warn("stream {} ssl error {}", streamID, sex); - sendStanza("<failed xmlns\"" + NS_TLS + "\" />"); - xmpp.removeConnectionIn(this); - closeConnection(); + xmpp.onStanzaReceived(xml); } + } else if (!isSecured() && tag.equals("starttls")) { + listener.starttls(this); } else if (isSecured() && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) { sendOpenStream(null, true); } else if (tag.equals("error")) { @@ -202,16 +169,16 @@ public class ConnectionIn extends Connection implements Runnable { } void updateTsRemoteData() { - tsRemoteData = System.currentTimeMillis(); + received = Instant.now(); } void sendOpenStream(String from, boolean xmppversionnew) throws IOException { String openStream = "<?xml version='1.0'?><stream:stream xmlns='jabber:server' " + "xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" + - xmpp.HOSTNAME + "' id='" + streamID + "' version='1.0'>"; + xmpp.getJid().toEscapedString() + "' id='" + streamID + "' version='1.0'>"; if (xmppversionnew) { openStream += "<stream:features>"; - if (sc != null && !isSecured() && !Arrays.asList(xmpp.brokenSSLhosts).contains(from)) { + if (listener != null && !isSecured() && !Arrays.asList(xmpp.brokenSSLhosts).contains(from)) { openStream += "<starttls xmlns=\"" + NS_TLS + "\"><optional/></starttls>"; } openStream += "</stream:features>"; @@ -219,8 +186,8 @@ public class ConnectionIn extends Connection implements Runnable { sendStanza(openStream); } - public void sendDialbackResult(String sfrom, String type) { - sendStanza("<db:result from='" + xmpp.HOSTNAME + "' to='" + sfrom + "' type='" + type + "'/>"); + public void sendDialbackResult(Jid sfrom, String type) { + sendStanza("<db:result from='" + xmpp.getJid().toEscapedString() + "' to='" + sfrom + "' type='" + type + "'/>"); if (type.equals("valid")) { from.add(sfrom); logger.info("stream from {} {} ready", sfrom, streamID); @@ -232,11 +199,10 @@ public class ConnectionIn extends Connection implements Runnable { String cto = parser.getAttributeValue(null, "to"); if (StringUtils.isNotEmpty(cfrom) && StringUtils.isNotEmpty(cto)) { Jid jidto = Jid.of(cto); - if (jidto.getDomain().equals(xmpp.HOSTNAME)) { + if (jidto.getDomain().equals(xmpp.getJid().toEscapedString())) { Jid jidfrom = Jid.of(cfrom); - int size = from.size(); - for (int i = 0; i < size; i++) { - if (from.get(i).equals(jidfrom.getDomain())) { + for (Jid aFrom : from) { + if (aFrom.equals(Jid.of(jidfrom.getDomain()))) { return true; } } @@ -244,4 +210,7 @@ public class ConnectionIn extends Connection implements Runnable { } return false; } + public void setListener(ConnectionListener listener) { + this.listener = listener; + } } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java new file mode 100644 index 00000000..ef5a948b --- /dev/null +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionListener.java @@ -0,0 +1,14 @@ +package com.juick.components.s2s; + +import com.juick.xmpp.extensions.StreamError; + +public interface ConnectionListener { + void starttls(ConnectionIn connection); + void proceed(StreamServerDialback connection); + void verify(StreamServerDialback connection, String from, String type, String sid); + void dialbackError(StreamServerDialback connection, StreamError error); + void finished(StreamServerDialback connection, boolean dirty); + void exception(StreamServerDialback connection, Exception ex); + void ready(StreamServerDialback connection); + boolean securing(StreamServerDialback connection); +} diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java b/juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java index 589ed18a..48c4d72d 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/StreamServerDialback.java @@ -17,77 +17,69 @@ package com.juick.components.s2s; -import com.juick.components.XMPPServer; import com.juick.components.s2s.util.DialbackUtils; +import com.juick.xmpp.Stream; import com.juick.xmpp.extensions.StreamError; import com.juick.xmpp.extensions.StreamFeatures; import com.juick.xmpp.utils.XmlUtils; import org.apache.commons.text.RandomStringGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.xmlpull.v1.XmlPullParser; -import org.xmlpull.v1.XmlPullParserException; +import rocks.xmpp.addr.Jid; -import javax.net.ssl.SSLException; -import javax.net.ssl.SSLSocket; import java.io.EOFException; import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; +import java.io.InputStream; +import java.io.OutputStream; import java.net.SocketException; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; -import java.util.Arrays; import java.util.UUID; /** * @author ugnich */ -public class ConnectionOut extends Connection implements Runnable { +public class StreamServerDialback extends Stream { + protected static final Logger logger = LoggerFactory.getLogger(StreamServerDialback.class); + public static final String NS_TLS = "urn:ietf:params:xml:ns:xmpp-tls"; + public static final String NS_DB = "jabber:server:dialback"; + private boolean secured = false; public boolean streamReady = false; - public String to; String checkSID = null; String dbKey = null; + private String streamID; + ConnectionListener listener; RandomStringGenerator generator = new RandomStringGenerator.Builder().withinRange('a', 'z').build(); - public ConnectionOut(XMPPServer xmpp, String hostname) throws CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, XmlPullParserException, KeyManagementException, KeyStoreException, IOException { - super(xmpp); - to = hostname; - dbKey = DialbackUtils.generateDialbackKey(generator.generate(15), to, xmpp.HOSTNAME, streamID); - } - - public ConnectionOut(XMPPServer xmpp, String hostname, String checkSID, String dbKey) throws Exception { - super(xmpp); - to = hostname; + public StreamServerDialback(Jid from, Jid to, InputStream is, OutputStream os, String checkSID, String dbKey) throws Exception { + super(from, to, is, os); + this.to = to; this.checkSID = checkSID; this.dbKey = dbKey; + if (dbKey == null) { + this.dbKey = DialbackUtils.generateDialbackKey(generator.generate(15), to, from, streamID); + } streamID = UUID.randomUUID().toString(); } - void sendOpenStream() throws IOException { - sendStanza("<?xml version='1.0'?><stream:stream xmlns='jabber:server' id='" + streamID + + public void sendOpenStream() throws IOException { + send("<?xml version='1.0'?><stream:stream xmlns='jabber:server' id='" + streamID + "' xmlns:stream='http://etherx.jabber.org/streams' xmlns:db='jabber:server:dialback' from='" + - xmpp.HOSTNAME + "' to='" + to + "' version='1.0'>"); + from.toEscapedString() + "' to='" + to.toEscapedString() + "' version='1.0'>"); } void processDialback() throws Exception { if (checkSID != null) { sendDialbackVerify(checkSID, dbKey); } - sendStanza("<db:result from='" + xmpp.HOSTNAME + "' to='" + to + "'>" + + send("<db:result from='" + from.toEscapedString() + "' to='" + to.toEscapedString() + "'>" + dbKey + "</db:result>"); } @Override - public void run() { - logger.info("stream to {} start", to); + public void handshake() { try { - socket = new Socket(); - InetSocketAddress address = DNSQueries.getServerAddress(to); - socket.connect(address); - restartParser(); + restartStream(); sendOpenStream(); @@ -107,21 +99,13 @@ public class ConnectionOut extends Connection implements Runnable { if (parser.getEventType() != XmlPullParser.START_TAG) { continue; } - logParser(); String tag = parser.getName(); if (tag.equals("result") && parser.getNamespace().equals(NS_DB)) { String type = parser.getAttributeValue(null, "type"); if (type != null && type.equals("valid")) { streamReady = true; - logger.info("stream to {} {} ready", to, streamID); - - String cache = xmpp.getFromCache(to); - if (cache != null) { - logger.info("stream to {} {} sending cache", to, streamID); - sendStanza(cache); - } - + listener.ready(this); } else { logger.info("stream to {} {} dialback fail", to, streamID); } @@ -130,62 +114,54 @@ public class ConnectionOut extends Connection implements Runnable { String from = parser.getAttributeValue(null, "from"); String type = parser.getAttributeValue(null, "type"); String sid = parser.getAttributeValue(null, "id"); - if (from != null && from.equals(to) && sid != null && !sid.isEmpty() && type != null) { - xmpp.getConnectionIn(sid).ifPresent(c -> c.sendDialbackResult(from, type)); - } + listener.verify(this, from, type, sid); XmlUtils.skip(parser); } else if (tag.equals("features") && parser.getNamespace().equals(NS_STREAM)) { StreamFeatures features = StreamFeatures.parse(parser); - if (sc != null && !isSecured() && features.STARTTLS >= 0 && !Arrays.asList(xmpp.brokenSSLhosts).contains(to)) { - logger.info("stream to {} {} securing", to, streamID); - sendStanza("<starttls xmlns=\"" + NS_TLS + "\" />"); + if (listener != null && !secured && features.STARTTLS >= 0 + && listener.securing(this)) { + logger.info("stream to {} {} securing", to.toEscapedString(), streamID); + send("<starttls xmlns=\"" + NS_TLS + "\" />"); } else { processDialback(); } } else if (tag.equals("proceed") && parser.getNamespace().equals(NS_TLS)) { - try { - socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(), - socket.getPort(), true); - ((SSLSocket) socket).startHandshake(); - setSecured(true); - logger.info("stream {} secured", streamID); - restartParser(); - sendOpenStream(); - } catch (SSLException sex) { - logger.error("s2s ssl error: {} {}, error {}", to, streamID, sex); - sendStanza("<failed xmlns\"" + NS_TLS + "\" />"); - xmpp.removeConnectionOut(this); - closeConnection(); - } - } else if (isSecured() && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) { + listener.proceed(this); + } else if (secured && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) { streamID = parser.getAttributeValue(null, "id"); } else if (tag.equals("error")) { StreamError streamError = StreamError.parse(parser); - logger.warn("Stream error from {}: {}", streamID, streamError.getCondition()); - xmpp.removeConnectionOut(this); - closeConnection(); + listener.dialbackError(this, streamError); } else { String unhandledStanza = XmlUtils.parseToString(parser, true); logger.warn("Unhandled stanza from {} {} : {}", to, streamID, unhandledStanza); } } - - logger.warn("stream to {} {} finished", to, streamID); - xmpp.removeConnectionOut(ConnectionOut.this); - closeConnection(); + listener.finished(this, false); } catch (EOFException | SocketException eofex) { - logger.info("stream {} {} closed (dirty)", to, streamID); - xmpp.removeConnectionOut(ConnectionOut.this); - closeConnection(); + listener.finished(this, true); } catch (Exception e) { - logger.error("s2s out exception: {} {}, exception {}", to, streamID, e); - xmpp.removeConnectionOut(ConnectionOut.this); - closeConnection(); + listener.exception(this, e); } } public void sendDialbackVerify(String sid, String key) { - sendStanza("<db:verify from='" + xmpp.HOSTNAME + "' to='" + to + "' id='" + sid + "'>" + + send("<db:verify from='" + from.toEscapedString() + "' to='" + to + "' id='" + sid + "'>" + key + "</db:verify>"); } + public void setListener(ConnectionListener listener) { + this.listener = listener; + } + + public String getStreamID() { + return streamID; + } + + public boolean isSecured() { + return secured; + } + + public void setSecured(boolean secured) { + this.secured = secured; + } } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/util/DialbackUtils.java b/juick-xmpp/src/main/java/com/juick/components/s2s/util/DialbackUtils.java index 88626606..fc08c5d6 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/util/DialbackUtils.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/util/DialbackUtils.java @@ -19,6 +19,7 @@ package com.juick.components.s2s.util; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.codec.digest.HmacUtils; +import rocks.xmpp.addr.Jid; /** * Created by vitalyster on 05.12.2016. @@ -28,8 +29,8 @@ public class DialbackUtils { throw new IllegalStateException(); } - public static String generateDialbackKey(String secret, String to, String from, String id) { + public static String generateDialbackKey(String secret, Jid to, Jid from, String id) { return HmacUtils.hmacSha256Hex(DigestUtils.sha256(secret), - (to + " " + from + " " + id).getBytes()); + (to.toEscapedString() + " " + from.toEscapedString() + " " + id).getBytes()); } } diff --git a/juick-xmpp/src/main/java/com/juick/xmpp/helpers/JidConverter.java b/juick-xmpp/src/main/java/com/juick/xmpp/helpers/JidConverter.java new file mode 100644 index 00000000..253c50f8 --- /dev/null +++ b/juick-xmpp/src/main/java/com/juick/xmpp/helpers/JidConverter.java @@ -0,0 +1,13 @@ +package com.juick.xmpp.helpers; + +import org.springframework.core.convert.converter.Converter; +import org.springframework.lang.Nullable; +import rocks.xmpp.addr.Jid; + +public class JidConverter implements Converter<String, Jid> { + @Nullable + @Override + public Jid convert(String jidStr) { + return Jid.of(jidStr); + } +} |