/* * Copyright (C) 2008-2017, Juick * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package com.juick.components; import com.juick.components.s2s.*; import com.juick.service.UserService; import com.juick.xmpp.extensions.JuickMessage; import com.juick.xmpp.extensions.StreamError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.xmlpull.v1.XmlPullParserException; import rocks.xmpp.addr.Jid; import rocks.xmpp.core.stanza.model.Stanza; import rocks.xmpp.util.XmppUtils; import javax.annotation.PostConstruct; import javax.inject.Inject; import javax.net.ssl.*; import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamWriter; import java.io.*; import java.net.*; import java.security.KeyStore; import java.security.KeyStoreException; import java.security.SecureRandom; import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import static com.juick.components.s2s.Connection.NS_TLS; /** * @author ugnich */ @Component public class XMPPServer implements ConnectionListener, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(XMPPServer.class); @Inject public ExecutorService service; @Value("${hostname}") private Jid jid; @Value("${s2s_port:5269}") private int s2sPort; @Value("${keystore}") public String keystore; @Value("${keystore_password}") public String keystorePassword; @Value("${broken_ssl_hosts}") public String[] brokenSSLhosts; @Value("${banned_hosts}") public String[] bannedHosts; private final List inConnections = new CopyOnWriteArrayList<>(); private final Map outConnections = new ConcurrentHashMap<>(); private final List outCache = new CopyOnWriteArrayList<>(); private final List stanzaListeners = new CopyOnWriteArrayList<>(); SSLContext sc; private TrustManager[] trustAllCerts = new TrustManager[]{ new X509TrustManager() { public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) { } public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) { } public java.security.cert.X509Certificate[] getAcceptedIssuers() { return null; } } }; private ServerSocket listener; @Inject private BasicXmppSession session; @Inject private UserService userService; @PostConstruct public void init() throws KeyStoreException { KeyStore ks = KeyStore.getInstance("JKS"); try (InputStream ksIs = new FileInputStream(keystore)) { ks.load(ksIs, keystorePassword.toCharArray()); KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory .getDefaultAlgorithm()); kmf.init(ks, keystorePassword.toCharArray()); sc = SSLContext.getInstance("TLSv1.2"); sc.init(kmf.getKeyManagers(), trustAllCerts, new SecureRandom()); } catch (Exception e) { logger.warn("tls unavailable"); } service.submit(() -> { try { listener = new ServerSocket(s2sPort); logger.info("s2s listener ready"); while (!listener.isClosed()) { if (Thread.currentThread().isInterrupted()) break; Socket socket = listener.accept(); ConnectionIn client = new ConnectionIn(this, socket); addConnectionIn(client); service.submit(client); } } catch (SocketException e) { // shutdown } catch (IOException | XmlPullParserException e) { logger.warn("xmpp exception", e); } }); } @Override public void close() throws Exception { if (!listener.isClosed()) { listener.close(); } outConnections.forEach((c, s) -> { c.logoff(); outConnections.remove(s); }); inConnections.forEach(c -> { c.closeConnection(); inConnections.remove(c); }); if (!listener.isClosed()) { listener.close(); } service.shutdown(); logger.info("XMPP server destroyed"); } public void addConnectionIn(ConnectionIn c) { c.setListener(this); inConnections.add(c); } public void addConnectionOut(Socket socket, StreamServerDialback c) { c.setListener(this); outConnections.put(c, socket); } public void removeConnectionIn(ConnectionIn c) { inConnections.remove(c); } public void removeConnectionOut(StreamServerDialback c) { outConnections.remove(c); } public String getFromCache(Jid to) { final String[] cache = new String[1]; outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(to)).findFirst().ifPresent(c -> { cache[0] = c.xml; outCache.remove(c); }); return cache[0]; } public Optional getConnectionOut(Jid hostname, boolean needReady) { return outConnections.keySet().stream().filter(c -> c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)).findFirst(); } public Optional getConnectionIn(String streamID) { return inConnections.stream().filter(c -> c.streamID != null && c.streamID.equals(streamID)).findFirst(); } public void sendOut(Stanza s) { try { StringWriter stanzaWriter = new StringWriter(); XMLStreamWriter xmppStreamWriter = XmppUtils.createXmppStreamWriter( session.getConfiguration().getXmlOutputFactory().createXMLStreamWriter(stanzaWriter)); session.createMarshaller().marshal(s, xmppStreamWriter); xmppStreamWriter.flush(); xmppStreamWriter.close(); String xml = stanzaWriter.toString(); logger.info("s2s (out): {}", xml); sendOut(Jid.of(s.getTo().getDomain()), xml); } catch (XMLStreamException | JAXBException e1) { logger.info("jaxb exception", e1); } } public void sendOut(Jid hostname, String xml) { boolean haveAnyConn = false; StreamServerDialback connOut = null; for (StreamServerDialback c : outConnections.keySet()) { if (c.to != null && c.to.equals(hostname)) { if (c.streamReady) { connOut = c; break; } else { haveAnyConn = true; break; } } } if (connOut != null) { connOut.send(xml); return; } boolean haveCache = false; for (CacheEntry c : outCache) { if (c.hostname != null && c.hostname.equals(hostname)) { c.xml += xml; c.updated = Instant.now(); haveCache = true; break; } } if (!haveCache) { outCache.add(new CacheEntry(hostname, xml)); } if (!haveAnyConn) { createDialbackConnection(hostname.toEscapedString(), null, null); } } void createDialbackConnection(String to, String checkSID, String dbKey) { service.submit(() -> { try { Socket socket = new Socket(); socket.connect(DNSQueries.getServerAddress(to)); StreamServerDialback streamServerDialback = new StreamServerDialback(getJid(), Jid.of(to), socket.getInputStream(), socket.getOutputStream(), checkSID, dbKey); addConnectionOut(socket, streamServerDialback); streamServerDialback.addChildParser(new JuickMessage()); streamServerDialback.connect(); } catch (UnknownHostException | ConnectException e) { userService.getActiveJIDs().stream().filter(j -> Jid.of(j).getDomain().equals(to)) .forEach(j -> { userService.setActiveStatusForJID(j, UserService.ActiveStatus.Inactive); logger.info("{} is inactive now", j); }); } catch (Exception e) { logger.error("s2s out error", e); } }); } public void startDialback(Jid from, String streamId, String dbKey) throws Exception { Optional c = getConnectionOut(from, false); if (c.isPresent()) { c.get().sendDialbackVerify(streamId, dbKey); } else { createDialbackConnection(from.toEscapedString(), streamId, dbKey); } } public void addStanzaListener(StanzaListener listener) { stanzaListeners.add(listener); } public void onStanzaReceived(String xmlValue) { Stanza stanza = parse(xmlValue); stanzaListeners.forEach(l -> l.stanzaReceived(stanza)); } public BasicXmppSession getSession() { return session; } public List getInConnections() { return inConnections; } public Map getOutConnections() { return outConnections; } @Override public void starttls(ConnectionIn connection) { logger.info("stream {} securing", connection.streamID); connection.sendStanza(""); try { connection.setSocket(sc.getSocketFactory().createSocket(connection.getSocket(), connection.getSocket().getInetAddress().getHostAddress(), connection.getSocket().getPort(), true)); ((SSLSocket) connection.getSocket()).setUseClientMode(false); ((SSLSocket) connection.getSocket()).startHandshake(); connection.setSecured(true); logger.info("stream {} secured", connection.streamID); connection.restartParser(); } catch (XmlPullParserException | IOException sex) { logger.warn("stream {} ssl error {}", connection.streamID, sex); connection.sendStanza(""); removeConnectionIn(connection); connection.closeConnection(); } } @Override public void proceed(StreamServerDialback connection) { try { Socket socket = outConnections.get(connection); socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(), socket.getPort(), true); ((SSLSocket) socket).startHandshake(); connection.setSecured(true); logger.info("stream {} secured", connection.getStreamID()); connection.setInputStream(socket.getInputStream()); connection.setOutputStream(socket.getOutputStream()); connection.restartStream(); connection.sendOpenStream(); } catch (XmlPullParserException | IOException sex) { logger.error("s2s ssl error: {} {}, error {}", connection.to, connection.getStreamID(), sex); connection.send(""); removeConnectionOut(connection); connection.logoff(); } } @Override public void verify(StreamServerDialback connection, String from, String type, String sid) { if (from != null && from.equals(connection.to.toEscapedString()) && sid != null && !sid.isEmpty() && type != null) { getConnectionIn(sid).ifPresent(c -> c.sendDialbackResult(Jid.of(from), type)); } } @Override public void dialbackError(StreamServerDialback connection, StreamError error) { logger.warn("Stream error from {}: {}", connection.getStreamID(), error.getCondition()); removeConnectionOut(connection); connection.logoff(); } @Override public void finished(StreamServerDialback connection, boolean dirty) { logger.warn("stream to {} {} finished, dirty={}", connection.to, connection.getStreamID(), dirty); removeConnectionOut(connection); connection.logoff(); } @Override public void exception(StreamServerDialback connection, Exception ex) { logger.error("s2s out exception: {} {}, exception {}", connection.to, connection.getStreamID(), ex); removeConnectionOut(connection); connection.logoff(); } @Override public void ready(StreamServerDialback connection) { logger.info("stream to {} {} ready", connection.to, connection.getStreamID()); String cache = getFromCache(connection.to); if (cache != null) { logger.info("stream to {} {} sending cache", connection.to, connection.getStreamID()); connection.send(cache); } } @Override public boolean securing(StreamServerDialback connection) { return !Arrays.asList(brokenSSLhosts).contains(connection.to.toEscapedString()); } public Stanza parse(String xml) { try { Unmarshaller unmarshaller = session.createUnmarshaller(); return (Stanza)unmarshaller.unmarshal(new StringReader(xml)); } catch (JAXBException e) { logger.error("JAXB exception", e); } return null; } public Jid getJid() { return jid; } }