package com.juick.components; import com.juick.components.s2s.*; import com.juick.service.*; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.env.Environment; import org.xmlpull.v1.XmlPullParserException; import rocks.xmpp.addr.Jid; import rocks.xmpp.core.session.Extension; import rocks.xmpp.core.session.XmppSessionConfiguration; import rocks.xmpp.core.session.debug.LogbackDebugger; import rocks.xmpp.core.stanza.model.Stanza; import rocks.xmpp.util.XmppUtils; import javax.inject.Inject; import javax.xml.bind.JAXBException; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamWriter; import java.io.IOException; import java.io.StringWriter; import java.net.ServerSocket; import java.net.Socket; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.UnrecoverableKeyException; import java.security.cert.CertificateException; import java.util.*; import java.util.concurrent.ExecutorService; /** * @author ugnich */ public class XMPPServer implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(XMPPServer.class); public ExecutorService service; public String HOSTNAME; public String keystore; public String keystorePassword; public List brokenSSLhosts; public List bannedHosts; private final List inConnections = Collections.synchronizedList(new ArrayList<>()); private final List outConnections = Collections.synchronizedList(new ArrayList<>()); private final List outCache = Collections.synchronizedList(new ArrayList<>()); private final List stanzaListeners = Collections.synchronizedList(new ArrayList<>()); @Inject private XMPPConnection router; @Inject public MessagesService messagesService; @Inject public UserService userService; @Inject public TagService tagService; @Inject public PMQueriesService pmQueriesService; @Inject public SubscriptionService subscriptionService; @Inject public ShowQueriesService showQueriesService; private Jid jid; private ServerSocket listener; private BasicXmppSession session; public XMPPServer(Environment env, ExecutorService service) { this.service = service; XmppSessionConfiguration configuration = XmppSessionConfiguration.builder() .extensions(Extension.of(com.juick.Message.class)) .debugger(LogbackDebugger.class) .build(); logger.info("component initialized"); try { HOSTNAME = env.getProperty("hostname"); session = BasicXmppSession.create(HOSTNAME, configuration); int s2sPort = NumberUtils.toInt(env.getProperty("s2s_port"), 5269); keystore = env.getProperty("keystore"); keystorePassword = env.getProperty("keystore_password"); brokenSSLhosts = Arrays.asList(env.getProperty("broken_ssl_hosts", StringUtils.EMPTY).split(",")); bannedHosts = Arrays.asList(env.getProperty("banned_hosts", StringUtils.EMPTY).split(",")); jid = Jid.of(env.getProperty("xmppbot_jid")); service.submit(() -> { try { listener = new ServerSocket(s2sPort); logger.info("s2s listener ready"); while (true) { if (Thread.currentThread().isInterrupted()) break; Socket socket = listener.accept(); ConnectionIn client = new ConnectionIn(this, socket); addConnectionIn(client); service.submit(client); } } catch (IOException e) { logger.warn("io exception", e); Thread.currentThread().interrupt(); } catch (Exception ex) { logger.warn("s2s error", ex); } logger.info("s2s interrupted"); }); } catch (Exception e) { logger.error("XMPPComponent error", e); } } @Override public void close() throws Exception { synchronized (getOutConnections()) { for (Iterator i = getOutConnections().iterator(); i.hasNext(); ) { ConnectionOut c = i.next(); c.closeConnection(); i.remove(); } } synchronized (getInConnections()) { for (Iterator i = getInConnections().iterator(); i.hasNext(); ) { ConnectionIn c = i.next(); c.closeConnection(); i.remove(); } } if (!listener.isClosed()) { listener.close(); } logger.info("XMPP server destroyed"); } public void addConnectionIn(ConnectionIn c) { synchronized (getInConnections()) { getInConnections().add(c); } } public void addConnectionOut(ConnectionOut c) { synchronized (getOutConnections()) { getOutConnections().add(c); } } public void removeConnectionIn(ConnectionIn c) { synchronized (getInConnections()) { getInConnections().remove(c); } } public void removeConnectionOut(ConnectionOut c) { synchronized (getOutConnections()) { getOutConnections().remove(c); } } public String getFromCache(String hostname) { CacheEntry ret = null; synchronized (getOutCache()) { for (Iterator i = getOutCache().iterator(); i.hasNext(); ) { CacheEntry c = i.next(); if (c.hostname != null && c.hostname.equals(hostname)) { ret = c; i.remove(); break; } } } return (ret != null) ? ret.xml : null; } public ConnectionOut getConnectionOut(String hostname, boolean needReady) { synchronized (getOutConnections()) { for (ConnectionOut c : getOutConnections()) { if (c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)) { return c; } } } return null; } public ConnectionIn getConnectionIn(String streamID) { synchronized (getInConnections()) { for (ConnectionIn c : getInConnections()) { if (c.streamID != null && c.streamID.equals(streamID)) { return c; } } } return null; } public void sendOut(Stanza s) { try { StringWriter stanzaWriter = new StringWriter(); XMLStreamWriter xmppStreamWriter = XmppUtils.createXmppStreamWriter( session.getConfiguration().getXmlOutputFactory().createXMLStreamWriter(stanzaWriter)); session.createMarshaller().marshal(s, xmppStreamWriter); xmppStreamWriter.flush(); xmppStreamWriter.close(); String xml = stanzaWriter.toString(); logger.info("s2s (out): {}", xml); sendOut(s.getTo().getDomain(), xml); } catch (XMLStreamException | JAXBException e1) { logger.info("jaxb exception", e1); } } public void sendOut(String hostname, String xml) { boolean haveAnyConn = false; ConnectionOut connOut = null; synchronized (getOutConnections()) { for (ConnectionOut c : getOutConnections()) { if (c.to != null && c.to.equals(hostname)) { if (c.streamReady) { connOut = c; break; } else { haveAnyConn = true; break; } } } } if (connOut != null) { connOut.sendStanza(xml); return; } boolean haveCache = false; synchronized (getOutCache()) { for (CacheEntry c : getOutCache()) { if (c.hostname != null && c.hostname.equals(hostname)) { c.xml += xml; c.tsUpdated = System.currentTimeMillis(); haveCache = true; break; } } if (!haveCache) { getOutCache().add(new CacheEntry(hostname, xml)); } } if (!haveAnyConn) { try { ConnectionOut connectionOut = new ConnectionOut(this, hostname); service.submit(connectionOut); } catch (CertificateException | UnrecoverableKeyException | NoSuchAlgorithmException | XmlPullParserException | KeyStoreException | KeyManagementException | IOException e) { logger.error("s2s out error", e); } } } public XMPPConnection getRouter() { return router; } public List getInConnections() { return inConnections; } public List getOutConnections() { return outConnections; } public List getOutCache() { return outCache; } public void startDialback(String from, String streamId, String dbKey) throws Exception { ConnectionOut c = getConnectionOut(from, false); if (c != null) { c.sendDialbackVerify(streamId, dbKey); } else { c = new ConnectionOut(this, from, streamId, dbKey); service.submit(c); } } public List getStanzaListeners() { return stanzaListeners; } public void addStanzaListener(StanzaListener listener) { synchronized (stanzaListeners) { stanzaListeners.add(listener); } } public void onStanzaReceived(Stanza xmlValue) { stanzaListeners.forEach(l -> l.stanzaReceived(xmlValue)); } public Jid getJid() { return jid; } public BasicXmppSession getSession() { return session; } public void setSession(BasicXmppSession session) { this.session = session; } }