aboutsummaryrefslogtreecommitdiff
path: root/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'juick-xmpp/src/main/java/com/juick/components/XMPPServer.java')
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/XMPPServer.java281
1 files changed, 99 insertions, 182 deletions
diff --git a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
index e1ca72ad..1df7d575 100644
--- a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
+++ b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java
@@ -18,20 +18,15 @@
package com.juick.components;
import com.juick.components.s2s.*;
-import com.juick.service.*;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.core.env.Environment;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
import org.xmlpull.v1.XmlPullParserException;
-import rocks.xmpp.addr.Jid;
-import rocks.xmpp.core.session.Extension;
-import rocks.xmpp.core.session.XmppSessionConfiguration;
-import rocks.xmpp.core.session.debug.LogbackDebugger;
import rocks.xmpp.core.stanza.model.Stanza;
import rocks.xmpp.util.XmppUtils;
+import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.xml.bind.JAXBException;
import javax.xml.stream.XMLStreamException;
@@ -40,180 +35,126 @@ import java.io.IOException;
import java.io.StringWriter;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
-import java.util.*;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
/**
* @author ugnich
*/
+@Component
public class XMPPServer implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(XMPPServer.class);
+ @Inject
public ExecutorService service;
-
+ @Value("${hostname}")
public String HOSTNAME;
+ @Value("${s2s_port:5269}")
+ private int s2sPort;
+ @Value("${keystore}")
public String keystore;
+ @Value("${keystore_password}")
public String keystorePassword;
- public List<String> brokenSSLhosts;
- public List<String> bannedHosts;
+ @Value("${broken_ssl_hosts}")
+ public String[] brokenSSLhosts;
+ @Value("${banned_hosts}")
+ public String[] bannedHosts;
- private final List<ConnectionIn> inConnections = Collections.synchronizedList(new ArrayList<>());
- private final List<ConnectionOut> outConnections = Collections.synchronizedList(new ArrayList<>());
- private final List<CacheEntry> outCache = Collections.synchronizedList(new ArrayList<>());
- private final List<StanzaListener> stanzaListeners = Collections.synchronizedList(new ArrayList<>());
+ private final List<ConnectionIn> inConnections = new CopyOnWriteArrayList<>();
+ private final List<ConnectionOut> outConnections = new CopyOnWriteArrayList<>();
+ private final List<CacheEntry> outCache = new CopyOnWriteArrayList<>();
+ private final List<StanzaListener> stanzaListeners = new CopyOnWriteArrayList<>();
- @Inject
- private XMPPConnection router;
- @Inject
- public MessagesService messagesService;
- @Inject
- public UserService userService;
- @Inject
- public TagService tagService;
- @Inject
- public PMQueriesService pmQueriesService;
- @Inject
- public SubscriptionService subscriptionService;
- @Inject
- public ShowQueriesService showQueriesService;
-
- private Jid jid;
-
private ServerSocket listener;
+ @Inject
private BasicXmppSession session;
- public XMPPServer(Environment env, ExecutorService service) {
- this.service = service;
- XmppSessionConfiguration configuration = XmppSessionConfiguration.builder()
- .extensions(Extension.of(com.juick.Message.class))
- .debugger(LogbackDebugger.class)
- .build();
+ @PostConstruct
+ public void init() {
logger.info("component initialized");
- try {
- HOSTNAME = env.getProperty("hostname");
- session = BasicXmppSession.create(HOSTNAME, configuration);
- int s2sPort = NumberUtils.toInt(env.getProperty("s2s_port"), 5269);
- keystore = env.getProperty("keystore");
- keystorePassword = env.getProperty("keystore_password");
- brokenSSLhosts = Arrays.asList(env.getProperty("broken_ssl_hosts", StringUtils.EMPTY).split(","));
- bannedHosts = Arrays.asList(env.getProperty("banned_hosts", StringUtils.EMPTY).split(","));
- jid = Jid.of(env.getProperty("xmppbot_jid"));
-
- service.submit(() -> {
- try {
- listener = new ServerSocket(s2sPort);
- logger.info("s2s listener ready");
- while (true) {
- if (Thread.currentThread().isInterrupted()) break;
- Socket socket = listener.accept();
- ConnectionIn client = new ConnectionIn(this, socket);
- addConnectionIn(client);
- service.submit(client);
- }
- } catch (IOException e) {
- logger.warn("io exception", e);
- Thread.currentThread().interrupt();
- } catch (Exception ex) {
- logger.warn("s2s error", ex);
+ service.submit(() -> {
+ try {
+ listener = new ServerSocket(s2sPort);
+ logger.info("s2s listener ready");
+ while (!listener.isClosed()) {
+ if (Thread.currentThread().isInterrupted()) break;
+ Socket socket = listener.accept();
+ ConnectionIn client = new ConnectionIn(this, socket);
+ addConnectionIn(client);
+ service.submit(client);
}
- logger.info("s2s interrupted");
- });
-
- } catch (Exception e) {
- logger.error("XMPPComponent error", e);
- }
+ } catch (SocketException e) {
+ // shutdown
+ } catch (IOException | CertificateException | NoSuchAlgorithmException | UnrecoverableKeyException | KeyStoreException | XmlPullParserException | KeyManagementException e) {
+ logger.warn("xmpp exception", e);
+ }
+ });
}
@Override
public void close() throws Exception {
- synchronized (getOutConnections()) {
- for (Iterator<ConnectionOut> i = getOutConnections().iterator(); i.hasNext(); ) {
- ConnectionOut c = i.next();
- c.closeConnection();
- i.remove();
- }
- }
-
- synchronized (getInConnections()) {
- for (Iterator<ConnectionIn> i = getInConnections().iterator(); i.hasNext(); ) {
- ConnectionIn c = i.next();
- c.closeConnection();
- i.remove();
- }
+ if (!listener.isClosed()) {
+ listener.close();
}
+ outConnections.forEach(c -> {
+ c.closeConnection();
+ outConnections.remove(c);
+ });
+ inConnections.forEach(c -> {
+ c.closeConnection();
+ inConnections.remove(c);
+ });
if (!listener.isClosed()) {
listener.close();
}
+ service.shutdown();
logger.info("XMPP server destroyed");
}
public void addConnectionIn(ConnectionIn c) {
- synchronized (getInConnections()) {
- getInConnections().add(c);
- }
+ inConnections.add(c);
}
public void addConnectionOut(ConnectionOut c) {
- synchronized (getOutConnections()) {
- getOutConnections().add(c);
- }
+ outConnections.add(c);
}
public void removeConnectionIn(ConnectionIn c) {
- synchronized (getInConnections()) {
- getInConnections().remove(c);
- }
+ inConnections.remove(c);
}
public void removeConnectionOut(ConnectionOut c) {
- synchronized (getOutConnections()) {
- getOutConnections().remove(c);
- }
+ outConnections.remove(c);
}
public String getFromCache(String hostname) {
- CacheEntry ret = null;
- synchronized (getOutCache()) {
- for (Iterator<CacheEntry> i = getOutCache().iterator(); i.hasNext(); ) {
- CacheEntry c = i.next();
- if (c.hostname != null && c.hostname.equals(hostname)) {
- ret = c;
- i.remove();
- break;
- }
- }
- }
- return (ret != null) ? ret.xml : null;
+ final String[] cache = new String[1];
+ outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(hostname)).findFirst().ifPresent(c -> {
+ cache[0] = c.xml;
+ outCache.remove(c);
+ });
+ return cache[0];
}
- public ConnectionOut getConnectionOut(String hostname, boolean needReady) {
- synchronized (getOutConnections()) {
- for (ConnectionOut c : getOutConnections()) {
- if (c.to != null && c.to.equals(hostname) && (!needReady || c.streamReady)) {
- return c;
- }
- }
- }
- return null;
+ public Optional<ConnectionOut> getConnectionOut(String hostname, boolean needReady) {
+ return outConnections.stream().filter(c -> c.to != null &&
+ c.to.equals(hostname) && (!needReady || c.streamReady)).findFirst();
}
- public ConnectionIn getConnectionIn(String streamID) {
- synchronized (getInConnections()) {
- for (ConnectionIn c : getInConnections()) {
- if (c.streamID != null && c.streamID.equals(streamID)) {
- return c;
- }
- }
- }
- return null;
+ public Optional<ConnectionIn> getConnectionIn(String streamID) {
+ return inConnections.stream().filter(c -> c.streamID != null && c.streamID.equals(streamID)).findFirst();
}
public void sendOut(Stanza s) {
@@ -236,16 +177,14 @@ public class XMPPServer implements AutoCloseable {
boolean haveAnyConn = false;
ConnectionOut connOut = null;
- synchronized (getOutConnections()) {
- for (ConnectionOut c : getOutConnections()) {
- if (c.to != null && c.to.equals(hostname)) {
- if (c.streamReady) {
- connOut = c;
- break;
- } else {
- haveAnyConn = true;
- break;
- }
+ for (ConnectionOut c : outConnections) {
+ if (c.to != null && c.to.equals(hostname)) {
+ if (c.streamReady) {
+ connOut = c;
+ break;
+ } else {
+ haveAnyConn = true;
+ break;
}
}
}
@@ -255,23 +194,22 @@ public class XMPPServer implements AutoCloseable {
}
boolean haveCache = false;
- synchronized (getOutCache()) {
- for (CacheEntry c : getOutCache()) {
- if (c.hostname != null && c.hostname.equals(hostname)) {
- c.xml += xml;
- c.tsUpdated = System.currentTimeMillis();
- haveCache = true;
- break;
- }
- }
- if (!haveCache) {
- getOutCache().add(new CacheEntry(hostname, xml));
+ for (CacheEntry c : outCache) {
+ if (c.hostname != null && c.hostname.equals(hostname)) {
+ c.xml += xml;
+ c.tsUpdated = System.currentTimeMillis();
+ haveCache = true;
+ break;
}
}
+ if (!haveCache) {
+ outCache.add(new CacheEntry(hostname, xml));
+ }
if (!haveAnyConn) {
try {
ConnectionOut connectionOut = new ConnectionOut(this, hostname);
+ addConnectionOut(connectionOut);
service.submit(connectionOut);
} catch (CertificateException | UnrecoverableKeyException | NoSuchAlgorithmException | XmlPullParserException | KeyStoreException | KeyManagementException | IOException e) {
logger.error("s2s out error", e);
@@ -279,55 +217,34 @@ public class XMPPServer implements AutoCloseable {
}
}
- public XMPPConnection getRouter() {
- return router;
- }
-
- public List<ConnectionIn> getInConnections() {
- return inConnections;
- }
-
- public List<ConnectionOut> getOutConnections() {
- return outConnections;
- }
-
- public List<CacheEntry> getOutCache() {
- return outCache;
- }
-
public void startDialback(String from, String streamId, String dbKey) throws Exception {
- ConnectionOut c = getConnectionOut(from, false);
- if (c != null) {
- c.sendDialbackVerify(streamId, dbKey);
+ Optional<ConnectionOut> c = getConnectionOut(from, false);
+ if (c.isPresent()) {
+ c.get().sendDialbackVerify(streamId, dbKey);
} else {
- c = new ConnectionOut(this, from, streamId, dbKey);
- service.submit(c);
+ ConnectionOut newConnection = new ConnectionOut(this, from, streamId, dbKey);
+ addConnectionOut(newConnection);
+ service.submit(newConnection);
}
}
- public List<StanzaListener> getStanzaListeners() {
- return stanzaListeners;
- }
-
public void addStanzaListener(StanzaListener listener) {
- synchronized (stanzaListeners) {
- stanzaListeners.add(listener);
- }
+ stanzaListeners.add(listener);
}
public void onStanzaReceived(Stanza xmlValue) {
stanzaListeners.forEach(l -> l.stanzaReceived(xmlValue));
}
- public Jid getJid() {
- return jid;
- }
-
public BasicXmppSession getSession() {
return session;
}
- public void setSession(BasicXmppSession session) {
- this.session = session;
+ public List<ConnectionIn> getInConnections() {
+ return inConnections;
+ }
+
+ public List<ConnectionOut> getOutConnections() {
+ return outConnections;
}
}