/*
* Copyright (C) 2008-2017, Juick
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package com.juick.components;
import com.juick.components.s2s.*;
import com.juick.service.*;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.xmlpull.v1.XmlPullParserException;
import rocks.xmpp.addr.Jid;
import rocks.xmpp.core.session.Extension;
import rocks.xmpp.core.session.XmppSessionConfiguration;
import rocks.xmpp.core.session.debug.LogbackDebugger;
import rocks.xmpp.core.stanza.model.Stanza;
import rocks.xmpp.util.XmppUtils;
import javax.inject.Inject;
import javax.xml.bind.JAXBException;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
import java.util.*;
import java.util.concurrent.ExecutorService;
/**
* @author ugnich
*/
public class XMPPServer implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(XMPPServer.class);
public ExecutorService service;
public String HOSTNAME;
public String keystore;
public String keystorePassword;
public List brokenSSLhosts;
public List bannedHosts;
private final List inConnections = Collections.synchronizedList(new ArrayList<>());
private final List outConnections = Collections.synchronizedList(new ArrayList<>());
private final List outCache = Collections.synchronizedList(new ArrayList<>());
private final List stanzaListeners = Collections.synchronizedList(new ArrayList<>());
@Inject
private XMPPConnection router;
@Inject
public MessagesService messagesService;
@Inject
public UserService userService;
@Inject
public TagService tagService;
@Inject
public PMQueriesService pmQueriesService;
@Inject
public SubscriptionService subscriptionService;
@Inject
public ShowQueriesService showQueriesService;
private Jid jid;
private ServerSocket listener;
private BasicXmppSession session;
public XMPPServer(Environment env, ExecutorService service) {
this.service = service;
XmppSessionConfiguration configuration = XmppSessionConfiguration.builder()
.extensions(Extension.of(com.juick.Message.class))
.debugger(LogbackDebugger.class)
.build();
logger.info("component initialized");
try {
HOSTNAME = env.getProperty("hostname");
session = BasicXmppSession.create(HOSTNAME, configuration);
int s2sPort = NumberUtils.toInt(env.getProperty("s2s_port"), 5269);
keystore = env.getProperty("keystore");
keystorePassword = env.getProperty("keystore_password");
brokenSSLhosts = Arrays.asList(env.getProperty("broken_ssl_hosts", StringUtils.EMPTY).split(","));
bannedHosts = Arrays.asList(env.getProperty("banned_hosts", StringUtils.EMPTY).split(","));
jid = Jid.of(env.getProperty("xmppbot_jid"));
service.submit(() -> {
try {
listener = new ServerSocket(s2sPort);
logger.info("s2s listener ready");
while (true) {
if (Thread.currentThread().isInterrupted()) break;
Socket socket = listener.accept();
ConnectionIn client = new ConnectionIn(this, socket);
addConnectionIn(client);
service.submit(client);
}
} catch (IOException e) {
logger.warn("io exception", e);
Thread.currentThread().interrupt();
} catch (Exception ex) {
logger.warn("s2s error", ex);
}
logger.info("s2s interrupted");
});
} catch (Exception e) {
logger.error("XMPPComponent error", e);
}
}
@Override
public void close() throws Exception {
synchronized (getOutConnections()) {
for (Iterator i = getOutConnections().iterator(); i.hasNext(); ) {
ConnectionOut c = i.next();
c.closeConnection();
i.remove();
}
}
synchronized (getInConnections()) {
for (Iterator i = getInConnections().iterator(); i.hasNext(); ) {
ConnectionIn c = i.next();
c.closeConnection();
i.remove();
}
}
if (!listener.isClosed()) {
listener.close();
}
logger.info("XMPP server destroyed");
}
public void addConnectionIn(ConnectionIn c) {
synchronized (getInConnections()) {
getInConnections().add(c);
}
}
public void addConnectionOut(ConnectionOut c) {
synchronized (getOutConnections()) {
getOutConnections().add(c);
}
}
public void removeConnectionIn(ConnectionIn c) {
synchronized (getInConnections()) {
getInConnections().remove(c);
}
}
public void removeConnectionOut(ConnectionOut c) {
synchronized (getOutConnections()) {
getOutConnections().remove(c);
}
}
public String getFromCache(String hostname) {
CacheEntry ret = null;
synchronized (getOutCache()) {
for (Iterator i = getOutCache().iterator(); i.hasNext(); ) {
CacheEntry c = i.next();
if (c.hostname != null && c.hostname.equals(hostname)) {
ret = c;
i.remove();
break;
}
}
}
return (ret != null) ? ret.xml : null;
}
public ConnectionOut getConnectionOut(String hostname, boolean needReady) {
synchronized (getOutConnections()) {
for (ConnectionOut c : getOutConnections()) {
if (c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)) {
return c;
}
}
}
return null;
}
public ConnectionIn getConnectionIn(String streamID) {
synchronized (getInConnections()) {
for (ConnectionIn c : getInConnections()) {
if (c.streamID != null && c.streamID.equals(streamID)) {
return c;
}
}
}
return null;
}
public void sendOut(Stanza s) {
try {
StringWriter stanzaWriter = new StringWriter();
XMLStreamWriter xmppStreamWriter = XmppUtils.createXmppStreamWriter(
session.getConfiguration().getXmlOutputFactory().createXMLStreamWriter(stanzaWriter));
session.createMarshaller().marshal(s, xmppStreamWriter);
xmppStreamWriter.flush();
xmppStreamWriter.close();
String xml = stanzaWriter.toString();
logger.info("s2s (out): {}", xml);
sendOut(s.getTo().getDomain(), xml);
} catch (XMLStreamException | JAXBException e1) {
logger.info("jaxb exception", e1);
}
}
public void sendOut(String hostname, String xml) {
boolean haveAnyConn = false;
ConnectionOut connOut = null;
synchronized (getOutConnections()) {
for (ConnectionOut c : getOutConnections()) {
if (c.to != null && c.to.equals(hostname)) {
if (c.streamReady) {
connOut = c;
break;
} else {
haveAnyConn = true;
break;
}
}
}
}
if (connOut != null) {
connOut.sendStanza(xml);
return;
}
boolean haveCache = false;
synchronized (getOutCache()) {
for (CacheEntry c : getOutCache()) {
if (c.hostname != null && c.hostname.equals(hostname)) {
c.xml += xml;
c.tsUpdated = System.currentTimeMillis();
haveCache = true;
break;
}
}
if (!haveCache) {
getOutCache().add(new CacheEntry(hostname, xml));
}
}
if (!haveAnyConn) {
try {
ConnectionOut connectionOut = new ConnectionOut(this, hostname);
service.submit(connectionOut);
} catch (CertificateException | UnrecoverableKeyException | NoSuchAlgorithmException | XmlPullParserException | KeyStoreException | KeyManagementException | IOException e) {
logger.error("s2s out error", e);
}
}
}
public XMPPConnection getRouter() {
return router;
}
public List getInConnections() {
return inConnections;
}
public List getOutConnections() {
return outConnections;
}
public List getOutCache() {
return outCache;
}
public void startDialback(String from, String streamId, String dbKey) throws Exception {
ConnectionOut c = getConnectionOut(from, false);
if (c != null) {
c.sendDialbackVerify(streamId, dbKey);
} else {
c = new ConnectionOut(this, from, streamId, dbKey);
service.submit(c);
}
}
public List getStanzaListeners() {
return stanzaListeners;
}
public void addStanzaListener(StanzaListener listener) {
synchronized (stanzaListeners) {
stanzaListeners.add(listener);
}
}
public void onStanzaReceived(Stanza xmlValue) {
stanzaListeners.forEach(l -> l.stanzaReceived(xmlValue));
}
public Jid getJid() {
return jid;
}
public BasicXmppSession getSession() {
return session;
}
public void setSession(BasicXmppSession session) {
this.session = session;
}
}