From 448fc7e84732422011186a9a4633c345e9c6208e Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Mon, 9 Oct 2017 15:27:48 +0300 Subject: xmpp:ConnectionOut -> StreamServerDialback --- .../main/java/com/juick/components/XMPPServer.java | 224 +++++++++++++++++---- 1 file changed, 181 insertions(+), 43 deletions(-) (limited to 'juick-xmpp/src/main/java/com/juick/components/XMPPServer.java') diff --git a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java index 1df7d575..cb2bb681 100644 --- a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java +++ b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java @@ -18,46 +18,53 @@ package com.juick.components; import com.juick.components.s2s.*; +import com.juick.xmpp.extensions.JuickMessage; +import com.juick.xmpp.extensions.StreamError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.xmlpull.v1.XmlPullParserException; +import rocks.xmpp.addr.Jid; import rocks.xmpp.core.stanza.model.Stanza; import rocks.xmpp.util.XmppUtils; import javax.annotation.PostConstruct; import javax.inject.Inject; +import javax.net.ssl.*; import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamWriter; -import java.io.IOException; -import java.io.StringWriter; +import java.io.*; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; -import java.security.KeyManagementException; +import java.security.KeyStore; import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; -import java.util.Iterator; +import java.security.SecureRandom; +import java.time.Instant; +import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; +import static com.juick.components.s2s.Connection.NS_TLS; + /** * @author ugnich */ @Component -public class XMPPServer implements AutoCloseable { +public class XMPPServer implements ConnectionListener, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(XMPPServer.class); @Inject public ExecutorService service; @Value("${hostname}") - public String HOSTNAME; + private Jid jid; @Value("${s2s_port:5269}") private int s2sPort; @Value("${keystore}") @@ -70,10 +77,24 @@ public class XMPPServer implements AutoCloseable { public String[] bannedHosts; private final List inConnections = new CopyOnWriteArrayList<>(); - private final List outConnections = new CopyOnWriteArrayList<>(); + private final Map outConnections = new ConcurrentHashMap<>(); private final List outCache = new CopyOnWriteArrayList<>(); private final List stanzaListeners = new CopyOnWriteArrayList<>(); + SSLContext sc; + private TrustManager[] trustAllCerts = new TrustManager[]{ + new X509TrustManager() { + public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) { + } + + public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) { + } + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return null; + } + } + }; + private ServerSocket listener; @@ -81,9 +102,18 @@ public class XMPPServer implements AutoCloseable { private BasicXmppSession session; @PostConstruct - public void init() { - - logger.info("component initialized"); + public void init() throws KeyStoreException { + KeyStore ks = KeyStore.getInstance("JKS"); + try (InputStream ksIs = new FileInputStream(keystore)) { + ks.load(ksIs, keystorePassword.toCharArray()); + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory + .getDefaultAlgorithm()); + kmf.init(ks, keystorePassword.toCharArray()); + sc = SSLContext.getInstance("TLSv1.2"); + sc.init(kmf.getKeyManagers(), trustAllCerts, new SecureRandom()); + } catch (Exception e) { + logger.warn("tls unavailable"); + } service.submit(() -> { try { listener = new ServerSocket(s2sPort); @@ -97,7 +127,7 @@ public class XMPPServer implements AutoCloseable { } } catch (SocketException e) { // shutdown - } catch (IOException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException | KeyStoreException | XmlPullParserException | KeyManagementException e) { + } catch (IOException | XmlPullParserException e) { logger.warn("xmpp exception", e); } }); @@ -108,9 +138,9 @@ public class XMPPServer implements AutoCloseable { if (!listener.isClosed()) { listener.close(); } - outConnections.forEach(c -> { - c.closeConnection(); - outConnections.remove(c); + outConnections.forEach((c, s) -> { + c.logoff(); + outConnections.remove(s); }); inConnections.forEach(c -> { c.closeConnection(); @@ -124,32 +154,34 @@ public class XMPPServer implements AutoCloseable { } public void addConnectionIn(ConnectionIn c) { + c.setListener(this); inConnections.add(c); } - public void addConnectionOut(ConnectionOut c) { - outConnections.add(c); + public void addConnectionOut(Socket socket, StreamServerDialback c) { + c.setListener(this); + outConnections.put(c, socket); } public void removeConnectionIn(ConnectionIn c) { inConnections.remove(c); } - public void removeConnectionOut(ConnectionOut c) { + public void removeConnectionOut(StreamServerDialback c) { outConnections.remove(c); } - public String getFromCache(String hostname) { + public String getFromCache(Jid to) { final String[] cache = new String[1]; - outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(hostname)).findFirst().ifPresent(c -> { + outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(to)).findFirst().ifPresent(c -> { cache[0] = c.xml; outCache.remove(c); }); return cache[0]; } - public Optional getConnectionOut(String hostname, boolean needReady) { - return outConnections.stream().filter(c -> c.to != null && + public Optional getConnectionOut(Jid hostname, boolean needReady) { + return outConnections.keySet().stream().filter(c -> c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)).findFirst(); } @@ -167,17 +199,17 @@ public class XMPPServer implements AutoCloseable { xmppStreamWriter.close(); String xml = stanzaWriter.toString(); logger.info("s2s (out): {}", xml); - sendOut(s.getTo().getDomain(), xml); + sendOut(Jid.of(s.getTo().getDomain()), xml); } catch (XMLStreamException | JAXBException e1) { logger.info("jaxb exception", e1); } } - public void sendOut(String hostname, String xml) { + public void sendOut(Jid hostname, String xml) { boolean haveAnyConn = false; - ConnectionOut connOut = null; - for (ConnectionOut c : outConnections) { + StreamServerDialback connOut = null; + for (StreamServerDialback c : outConnections.keySet()) { if (c.to != null && c.to.equals(hostname)) { if (c.streamReady) { connOut = c; @@ -189,7 +221,7 @@ public class XMPPServer implements AutoCloseable { } } if (connOut != null) { - connOut.sendStanza(xml); + connOut.send(xml); return; } @@ -197,7 +229,7 @@ public class XMPPServer implements AutoCloseable { for (CacheEntry c : outCache) { if (c.hostname != null && c.hostname.equals(hostname)) { c.xml += xml; - c.tsUpdated = System.currentTimeMillis(); + c.updated = Instant.now(); haveCache = true; break; } @@ -207,24 +239,31 @@ public class XMPPServer implements AutoCloseable { } if (!haveAnyConn) { + createDialbackConnection(hostname.toEscapedString(), null, null); + } + } + + void createDialbackConnection(String to, String checkSID, String dbKey) { + service.submit(() -> { try { - ConnectionOut connectionOut = new ConnectionOut(this, hostname); - addConnectionOut(connectionOut); - service.submit(connectionOut); - } catch (CertificateException | UnrecoverableKeyException | NoSuchAlgorithmException | XmlPullParserException | KeyStoreException | KeyManagementException | IOException e) { + Socket socket = new Socket(); + socket.connect(DNSQueries.getServerAddress(to)); + StreamServerDialback streamServerDialback = new StreamServerDialback(getJid(), Jid.of(to), socket.getInputStream(), socket.getOutputStream(), checkSID, dbKey); + addConnectionOut(socket, streamServerDialback); + streamServerDialback.addChildParser(new JuickMessage()); + streamServerDialback.connect(); + } catch (Exception e) { logger.error("s2s out error", e); } - } + }); } - public void startDialback(String from, String streamId, String dbKey) throws Exception { - Optional c = getConnectionOut(from, false); + public void startDialback(Jid from, String streamId, String dbKey) throws Exception { + Optional c = getConnectionOut(from, false); if (c.isPresent()) { c.get().sendDialbackVerify(streamId, dbKey); } else { - ConnectionOut newConnection = new ConnectionOut(this, from, streamId, dbKey); - addConnectionOut(newConnection); - service.submit(newConnection); + createDialbackConnection(from.toEscapedString(), streamId, dbKey); } } @@ -232,8 +271,9 @@ public class XMPPServer implements AutoCloseable { stanzaListeners.add(listener); } - public void onStanzaReceived(Stanza xmlValue) { - stanzaListeners.forEach(l -> l.stanzaReceived(xmlValue)); + public void onStanzaReceived(String xmlValue) { + Stanza stanza = parse(xmlValue); + stanzaListeners.forEach(l -> l.stanzaReceived(stanza)); } public BasicXmppSession getSession() { @@ -244,7 +284,105 @@ public class XMPPServer implements AutoCloseable { return inConnections; } - public List getOutConnections() { + public Map getOutConnections() { return outConnections; } + + @Override + public void starttls(ConnectionIn connection) { + logger.info("stream {} securing", connection.streamID); + connection.sendStanza(""); + try { + connection.setSocket(sc.getSocketFactory().createSocket(connection.getSocket(), connection.getSocket().getInetAddress().getHostAddress(), + connection.getSocket().getPort(), true)); + ((SSLSocket) connection.getSocket()).setUseClientMode(false); + ((SSLSocket) connection.getSocket()).startHandshake(); + connection.setSecured(true); + logger.info("stream {} secured", connection.streamID); + connection.restartParser(); + } catch (XmlPullParserException | IOException sex) { + logger.warn("stream {} ssl error {}", connection.streamID, sex); + connection.sendStanza(""); + removeConnectionIn(connection); + connection.closeConnection(); + } + } + + @Override + public void proceed(StreamServerDialback connection) { + try { + Socket socket = outConnections.get(connection); + socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(), + socket.getPort(), true); + ((SSLSocket) socket).startHandshake(); + connection.setSecured(true); + logger.info("stream {} secured", connection.getStreamID()); + connection.setInputStream(socket.getInputStream()); + connection.setOutputStream(socket.getOutputStream()); + connection.restartStream(); + connection.sendOpenStream(); + } catch (XmlPullParserException | IOException sex) { + logger.error("s2s ssl error: {} {}, error {}", connection.to, connection.getStreamID(), sex); + connection.send(""); + removeConnectionOut(connection); + connection.logoff(); + } + } + + @Override + public void verify(StreamServerDialback connection, String from, String type, String sid) { + if (from != null && from.equals(connection.to.toEscapedString()) && sid != null && !sid.isEmpty() && type != null) { + getConnectionIn(sid).ifPresent(c -> c.sendDialbackResult(Jid.of(from), type)); + } + } + + @Override + public void dialbackError(StreamServerDialback connection, StreamError error) { + logger.warn("Stream error from {}: {}", connection.getStreamID(), error.getCondition()); + removeConnectionOut(connection); + connection.logoff(); + } + + @Override + public void finished(StreamServerDialback connection, boolean dirty) { + logger.warn("stream to {} {} finished, dirty={}", connection.to, connection.getStreamID(), dirty); + removeConnectionOut(connection); + connection.logoff(); + } + + @Override + public void exception(StreamServerDialback connection, Exception ex) { + logger.error("s2s out exception: {} {}, exception {}", connection.to, connection.getStreamID(), ex); + removeConnectionOut(connection); + connection.logoff(); + } + + @Override + public void ready(StreamServerDialback connection) { + logger.info("stream to {} {} ready", connection.to, connection.getStreamID()); + String cache = getFromCache(connection.to); + if (cache != null) { + logger.info("stream to {} {} sending cache", connection.to, connection.getStreamID()); + connection.send(cache); + } + } + + @Override + public boolean securing(StreamServerDialback connection) { + return !Arrays.asList(brokenSSLhosts).contains(connection.to.toEscapedString()); + } + + protected Stanza parse(String xml) { + try { + Unmarshaller unmarshaller = session.createUnmarshaller(); + return (Stanza)unmarshaller.unmarshal(new StringReader(xml)); + } catch (JAXBException e) { + logger.error("JAXB exception", e); + } + return null; + } + + public Jid getJid() { + return jid; + } } -- cgit v1.2.3