/* * Copyright (C) 2008-2017, Juick * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package com.juick.components; import com.juick.components.s2s.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.xmlpull.v1.XmlPullParserException; import rocks.xmpp.core.stanza.model.Stanza; import rocks.xmpp.util.XmppUtils; import javax.annotation.PostConstruct; import javax.inject.Inject; import javax.xml.bind.JAXBException; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamWriter; import java.io.IOException; import java.io.StringWriter; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; /** * @author ugnich */ @Component public class XMPPServer implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(XMPPServer.class); @Inject public ExecutorService service; @Value("${hostname}") public String HOSTNAME; @Value("${s2s_port:5269}") private int s2sPort; @Value("${keystore}") public String keystore; @Value("${keystore_password}") public String keystorePassword; @Value("${broken_ssl_hosts}") public String[] brokenSSLhosts; @Value("${banned_hosts}") public String[] bannedHosts; private final List inConnections = new CopyOnWriteArrayList<>(); private final List outConnections = new CopyOnWriteArrayList<>(); private final List outCache = new CopyOnWriteArrayList<>(); private final List stanzaListeners = new CopyOnWriteArrayList<>(); private ServerSocket listener; @Inject private BasicXmppSession session; @PostConstruct public void init() { logger.info("component initialized"); service.submit(() -> { try { listener = new ServerSocket(s2sPort); logger.info("s2s listener ready"); while (!listener.isClosed()) { if (Thread.currentThread().isInterrupted()) break; Socket socket = listener.accept(); ConnectionIn client = new ConnectionIn(this, socket); addConnectionIn(client); service.submit(client); } } catch (SocketException e) { // shutdown } catch (IOException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException | KeyStoreException | XmlPullParserException | KeyManagementException e) { logger.warn("xmpp exception", e); } }); } @Override public void close() throws Exception { if (!listener.isClosed()) { listener.close(); } outConnections.forEach(c -> { c.closeConnection(); outConnections.remove(c); }); inConnections.forEach(c -> { c.closeConnection(); inConnections.remove(c); }); if (!listener.isClosed()) { listener.close(); } service.shutdown(); logger.info("XMPP server destroyed"); } public void addConnectionIn(ConnectionIn c) { inConnections.add(c); } public void addConnectionOut(ConnectionOut c) { outConnections.add(c); } public void removeConnectionIn(ConnectionIn c) { inConnections.remove(c); } public void removeConnectionOut(ConnectionOut c) { outConnections.remove(c); } public String getFromCache(String hostname) { final String[] cache = new String[1]; outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(hostname)).findFirst().ifPresent(c -> { cache[0] = c.xml; outCache.remove(c); }); return cache[0]; } public Optional getConnectionOut(String hostname, boolean needReady) { return outConnections.stream().filter(c -> c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)).findFirst(); } public Optional getConnectionIn(String streamID) { return inConnections.stream().filter(c -> c.streamID != null && c.streamID.equals(streamID)).findFirst(); } public void sendOut(Stanza s) { try { StringWriter stanzaWriter = new StringWriter(); XMLStreamWriter xmppStreamWriter = XmppUtils.createXmppStreamWriter( session.getConfiguration().getXmlOutputFactory().createXMLStreamWriter(stanzaWriter)); session.createMarshaller().marshal(s, xmppStreamWriter); xmppStreamWriter.flush(); xmppStreamWriter.close(); String xml = stanzaWriter.toString(); logger.info("s2s (out): {}", xml); sendOut(s.getTo().getDomain(), xml); } catch (XMLStreamException | JAXBException e1) { logger.info("jaxb exception", e1); } } public void sendOut(String hostname, String xml) { boolean haveAnyConn = false; ConnectionOut connOut = null; for (ConnectionOut c : outConnections) { if (c.to != null && c.to.equals(hostname)) { if (c.streamReady) { connOut = c; break; } else { haveAnyConn = true; break; } } } if (connOut != null) { connOut.sendStanza(xml); return; } boolean haveCache = false; for (CacheEntry c : outCache) { if (c.hostname != null && c.hostname.equals(hostname)) { c.xml += xml; c.tsUpdated = System.currentTimeMillis(); haveCache = true; break; } } if (!haveCache) { outCache.add(new CacheEntry(hostname, xml)); } if (!haveAnyConn) { try { ConnectionOut connectionOut = new ConnectionOut(this, hostname); addConnectionOut(connectionOut); service.submit(connectionOut); } catch (CertificateException | UnrecoverableKeyException | NoSuchAlgorithmException | XmlPullParserException | KeyStoreException | KeyManagementException | IOException e) { logger.error("s2s out error", e); } } } public void startDialback(String from, String streamId, String dbKey) throws Exception { Optional c = getConnectionOut(from, false); if (c.isPresent()) { c.get().sendDialbackVerify(streamId, dbKey); } else { ConnectionOut newConnection = new ConnectionOut(this, from, streamId, dbKey); addConnectionOut(newConnection); service.submit(newConnection); } } public void addStanzaListener(StanzaListener listener) { stanzaListeners.add(listener); } public void onStanzaReceived(Stanza xmlValue) { stanzaListeners.forEach(l -> l.stanzaReceived(xmlValue)); } public BasicXmppSession getSession() { return session; } public List getInConnections() { return inConnections; } public List getOutConnections() { return outConnections; } }