From 7aaa3f9a29c280f01c677c918932620be45cdbd7 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Thu, 8 Nov 2018 21:38:27 +0300 Subject: Merge everything into single Spring Boot application --- src/main/java/com/juick/server/XMPPServer.java | 429 +++++++++++++++++++++++++ 1 file changed, 429 insertions(+) create mode 100644 src/main/java/com/juick/server/XMPPServer.java (limited to 'src/main/java/com/juick/server/XMPPServer.java') diff --git a/src/main/java/com/juick/server/XMPPServer.java b/src/main/java/com/juick/server/XMPPServer.java new file mode 100644 index 00000000..86ab6a78 --- /dev/null +++ b/src/main/java/com/juick/server/XMPPServer.java @@ -0,0 +1,429 @@ +/* + * Copyright (C) 2008-2017, Juick + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.juick.server; + +import com.juick.server.xmpp.router.StreamError; +import com.juick.server.xmpp.s2s.*; +import com.juick.service.UserService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.xmlpull.v1.XmlPullParserException; +import rocks.xmpp.addr.Jid; +import rocks.xmpp.core.stanza.model.Stanza; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.net.ssl.*; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import java.io.IOException; +import java.io.StringReader; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.security.InvalidAlgorithmParameterException; +import java.security.KeyStore; +import java.security.KeyStoreException; +import java.security.SecureRandom; +import java.security.cert.*; +import java.time.Duration; +import java.time.Instant; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author ugnich + */ +public class XMPPServer implements ConnectionListener { + private static final Logger logger = LoggerFactory.getLogger("com.juick.server.xmpp"); + + private static final int TIMEOUT_MINUTES = 15; + + @Inject + public ExecutorService service; + @Value("${hostname:localhost}") + private Jid jid; + @Value("${s2s_port:5269}") + private int s2sPort; + @Value("${broken_ssl_hosts:}") + public String[] brokenSSLhosts; + @Value("${banned_hosts:}") + public String[] bannedHosts; + + private final List inConnections = new CopyOnWriteArrayList<>(); + private final Map> outConnections = new ConcurrentHashMap<>(); + private final List outCache = new CopyOnWriteArrayList<>(); + private final List stanzaListeners = new CopyOnWriteArrayList<>(); + private final AtomicBoolean closeFlag = new AtomicBoolean(false); + + SSLContext sc; + CertificateFactory cf; + CertPathValidator cpv; + PKIXParameters params; + private TrustManager[] trustAllCerts = new TrustManager[]{ + new X509TrustManager() { + public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) { + } + + public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) { + } + + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + } + }; + private boolean tlsConfigured = false; + + + private ServerSocket listener; + + @Inject + private BasicXmppSession session; + @Inject + private UserService userService; + @Inject + private KeystoreManager keystoreManager; + + @PostConstruct + public void init() throws KeyStoreException { + closeFlag.set(false); + try { + sc = SSLContext.getInstance("TLSv1.2"); + sc.init(keystoreManager.getKeymanagerFactory().getKeyManagers(), trustAllCerts, new SecureRandom()); + TrustManagerFactory trustManagerFactory = + TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + Set ca = new HashSet<>(); + trustManagerFactory.init((KeyStore)null); + Arrays.stream(trustManagerFactory.getTrustManagers()).forEach(t -> Arrays.stream(((X509TrustManager)t).getAcceptedIssuers()).forEach(cert -> ca.add(new TrustAnchor(cert, null)))); + params = new PKIXParameters(ca); + params.setRevocationEnabled(false); + cpv = CertPathValidator.getInstance("PKIX"); + cf = CertificateFactory.getInstance( "X.509" ); + tlsConfigured = true; + } catch (Exception e) { + logger.warn("tls unavailable"); + } + service.submit(() -> { + try { + listener = new ServerSocket(s2sPort); + logger.info("s2s listener ready"); + while (!listener.isClosed()) { + if (Thread.currentThread().isInterrupted()) break; + Socket socket = listener.accept(); + ConnectionIn client = new ConnectionIn(this, socket); + addConnectionIn(client); + service.submit(client); + } + } catch (SocketException e) { + // shutdown + } catch (IOException | XmlPullParserException e) { + logger.warn("xmpp exception", e); + } + }); + } + + public void addConnectionIn(ConnectionIn c) { + c.setListener(this); + inConnections.add(c); + } + + public void addConnectionOut(ConnectionOut c, Optional socket) { + c.setListener(this); + outConnections.put(c, socket); + } + + public void removeConnectionIn(ConnectionIn c) { + inConnections.remove(c); + } + + public void removeConnectionOut(ConnectionOut c) { + outConnections.remove(c); + } + + public String getFromCache(Jid to) { + final String[] cache = new String[1]; + outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(to)).findFirst().ifPresent(c -> { + cache[0] = c.xml; + outCache.remove(c); + }); + return cache[0]; + } + + public Optional getConnectionOut(Jid hostname, boolean needReady) { + return outConnections.keySet().stream().filter(c -> c.to != null && + c.to.equals(hostname) && (!needReady || c.streamReady)).findFirst(); + } + + public Optional getConnectionIn(String streamID) { + return inConnections.stream().filter(c -> c.streamID != null && c.streamID.equals(streamID)).findFirst(); + } + + public void sendOut(Jid hostname, String xml) { + boolean haveAnyConn = false; + + ConnectionOut connOut = null; + for (ConnectionOut c : outConnections.keySet()) { + if (c.to != null && c.to.equals(hostname)) { + if (c.streamReady) { + connOut = c; + break; + } else { + haveAnyConn = true; + break; + } + } + } + if (connOut != null) { + connOut.send(xml); + return; + } + + boolean haveCache = false; + for (CacheEntry c : outCache) { + if (c.hostname != null && c.hostname.equals(hostname)) { + c.xml += xml; + c.updated = Instant.now(); + haveCache = true; + break; + } + } + if (!haveCache) { + outCache.add(new CacheEntry(hostname, xml)); + } + + if (!haveAnyConn && !closeFlag.get()) { + try { + createDialbackConnection(hostname.toEscapedString(), null, null); + } catch (Exception e) { + logger.warn("dialback error", e); + } + } + } + + void createDialbackConnection(String to, String checkSID, String dbKey) throws Exception { + ConnectionOut connectionOut = new ConnectionOut(getJid(), Jid.of(to), null, null, checkSID, dbKey); + addConnectionOut(connectionOut, Optional.empty()); + service.submit(() -> { + try { + Socket socket = new Socket(); + socket.connect(DNSQueries.getServerAddress(to)); + connectionOut.setInputStream(socket.getInputStream()); + connectionOut.setOutputStream(socket.getOutputStream()); + addConnectionOut(connectionOut, Optional.of(socket)); + connectionOut.connect(); + } catch (IOException e) { + logger.info("dialback to " + to + " exception", e); + } + }); + } + + public void startDialback(Jid from, String streamId, String dbKey) throws Exception { + Optional c = getConnectionOut(from, false); + if (c.isPresent()) { + c.get().sendDialbackVerify(streamId, dbKey); + } else { + createDialbackConnection(from.toEscapedString(), streamId, dbKey); + } + } + + public void addStanzaListener(StanzaListener listener) { + stanzaListeners.add(listener); + } + + public void onStanzaReceived(String xmlValue) { + logger.info("S2S: {}", xmlValue); + Stanza stanza = parse(xmlValue); + stanzaListeners.forEach(l -> l.stanzaReceived(stanza)); + } + + public BasicXmppSession getSession() { + return session; + } + + public List getInConnections() { + return inConnections; + } + + public Map> getOutConnections() { + return outConnections; + } + + @Override + public boolean isTlsAvailable() { + return tlsConfigured; + } + + @Override + public void starttls(ConnectionIn connection) { + logger.debug("stream {} securing", connection.streamID); + connection.sendStanza(""); + try { + connection.setSocket(sc.getSocketFactory().createSocket(connection.getSocket(), connection.getSocket().getInetAddress().getHostAddress(), + connection.getSocket().getPort(), false)); + SSLSocket sslSocket = (SSLSocket) connection.getSocket(); + sslSocket.addHandshakeCompletedListener(handshakeCompletedEvent -> { + try { + CertPath certPath = cf.generateCertPath(Arrays.asList(handshakeCompletedEvent.getPeerCertificates())); + cpv.validate(certPath, params); + connection.setTrusted(true); + logger.info("connection from {} is trusted", connection.from); + } catch (SSLPeerUnverifiedException | CertificateException | CertPathValidatorException | InvalidAlgorithmParameterException e) { + logger.info("connection from {} is NOT trusted, falling back to dialback", connection.from); + } + }); + sslSocket.setUseClientMode(false); + sslSocket.setNeedClientAuth(true); + sslSocket.startHandshake(); + connection.setSecured(true); + logger.debug("stream from {} secured", connection.streamID); + connection.restartParser(); + } catch (XmlPullParserException | IOException sex) { + logger.warn("stream {} ssl error {}", connection.streamID, sex); + connection.sendStanza(""); + removeConnectionIn(connection); + connection.closeConnection(); + } + } + + @Override + public void proceed(ConnectionOut connection) { + try { + Socket socket = outConnections.get(connection).get(); + socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(), + socket.getPort(), false); + SSLSocket sslSocket = (SSLSocket) socket; + sslSocket.addHandshakeCompletedListener(handshakeCompletedEvent -> { + try { + CertPath certPath = cf.generateCertPath(Arrays.asList(handshakeCompletedEvent.getPeerCertificates())); + cpv.validate(certPath, params); + connection.setTrusted(true); + logger.info("connection to {} is trusted", connection.to); + } catch (SSLPeerUnverifiedException | CertificateException | CertPathValidatorException | InvalidAlgorithmParameterException e) { + logger.info("connection to {} is NOT trusted, falling back to dialback", connection.to); + } + }); + sslSocket.setNeedClientAuth(true); + sslSocket.startHandshake(); + connection.setSecured(true); + logger.debug("stream to {} secured", connection.getStreamID()); + connection.setInputStream(socket.getInputStream()); + connection.setOutputStream(socket.getOutputStream()); + connection.restartStream(); + connection.sendOpenStream(); + } catch (NoSuchElementException | XmlPullParserException | IOException sex) { + logger.error("s2s ssl error: {} {}, error {}", connection.to, connection.getStreamID(), sex); + connection.send(""); + removeConnectionOut(connection); + connection.logoff(); + } + } + + @Override + public void verify(ConnectionOut connection, String from, String type, String sid) { + if (from != null && from.equals(connection.to.toEscapedString()) && sid != null && !sid.isEmpty() && type != null) { + getConnectionIn(sid).ifPresent(c -> c.sendDialbackResult(Jid.of(from), type)); + } + } + + @Override + public void dialbackError(ConnectionOut connection, StreamError error) { + logger.warn("Stream error from {}: {}", connection.getStreamID(), error.getCondition()); + removeConnectionOut(connection); + connection.logoff(); + } + + @Override + public void finished(ConnectionOut connection, boolean dirty) { + logger.warn("stream to {} {} finished, dirty={}", connection.to, connection.getStreamID(), dirty); + removeConnectionOut(connection); + connection.logoff(); + } + + @Override + public void exception(ConnectionOut connection, Exception ex) { + logger.error("s2s out exception: {} {}, exception {}", connection.to, connection.getStreamID(), ex); + removeConnectionOut(connection); + connection.logoff(); + } + + @Override + public void ready(ConnectionOut connection) { + logger.debug("stream to {} {} ready", connection.to, connection.getStreamID()); + String cache = getFromCache(connection.to); + if (cache != null) { + logger.debug("stream to {} {} sending cache", connection.to, connection.getStreamID()); + connection.send(cache); + } + } + + @Override + public boolean securing(ConnectionOut connection) { + return tlsConfigured && !Arrays.asList(brokenSSLhosts).contains(connection.to.toEscapedString()); + } + + public Stanza parse(String xml) { + try { + Unmarshaller unmarshaller = session.createUnmarshaller(); + return (Stanza)unmarshaller.unmarshal(new StringReader(xml)); + } catch (JAXBException e) { + logger.error("JAXB exception", e); + } + return null; + } + + public Jid getJid() { + return jid; + } + @Scheduled(fixedDelay = 10000) + public void cleanUp() { + Instant now = Instant.now(); + outConnections.keySet().stream().filter(c -> Duration.between(c.getUpdated(), now).toMinutes() > TIMEOUT_MINUTES) + .forEach(c -> { + logger.info("closing idle outgoing connection to {}", c.to); + c.logoff(); + outConnections.remove(c); + }); + + inConnections.stream().filter(c -> Duration.between(c.updated, now).toMinutes() > TIMEOUT_MINUTES) + .forEach(c -> { + logger.info("closing idle incoming connection from {}", c.from); + c.closeConnection(); + inConnections.remove(c); + }); + } + @PreDestroy + public void preDestroy() throws IOException { + closeFlag.set(true); + if (listener != null && !listener.isClosed()) { + listener.close(); + } + service.shutdown(); + logger.info("XMPP server destroyed"); + } + + public int getServerPort() { + return s2sPort; + } +} -- cgit v1.2.3