aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2018-07-17 16:53:43 +0300
committerGravatar Vitaly Takmazov2018-07-17 16:53:43 +0300
commit208de1f5f5ecac70fd4dfbc3d5c46ae786fad8a5 (patch)
tree15a4472b3236f0b7522021c8d33e6c88d9acae1e
parentca2ed5206a2e882d3405217a5c75786561b02c3c (diff)
XMPP: cache router stanzas until component connected
-rw-r--r--juick-server/src/main/java/com/juick/server/XMPPConnection.java1
-rw-r--r--juick-server/src/main/java/com/juick/server/xmpp/router/StreamComponentServer.java2
-rw-r--r--juick-server/src/main/java/com/juick/server/xmpp/router/StreamHandler.java2
-rw-r--r--juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java36
4 files changed, 37 insertions, 4 deletions
diff --git a/juick-server/src/main/java/com/juick/server/XMPPConnection.java b/juick-server/src/main/java/com/juick/server/XMPPConnection.java
index b63a4d47..344860fb 100644
--- a/juick-server/src/main/java/com/juick/server/XMPPConnection.java
+++ b/juick-server/src/main/java/com/juick/server/XMPPConnection.java
@@ -250,6 +250,7 @@ public class XMPPConnection implements StanzaListener, NotificationListener {
}
});
router.addSessionStatusListener(event -> {
+ logger.info("event: " + event.getStatus(), event.getThrowable());
if (event.getStatus().equals(XmppSession.Status.AUTHENTICATED)) {
logger.info("Authenticated, broadcasting...");
broadcastPresence(null);
diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/StreamComponentServer.java b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamComponentServer.java
index 5e2f6f82..a7759afa 100644
--- a/juick-server/src/main/java/com/juick/server/xmpp/router/StreamComponentServer.java
+++ b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamComponentServer.java
@@ -53,6 +53,6 @@ public class StreamComponentServer extends Stream {
return;
}
send(new Handshake().toString());
- streamHandler.ready();
+ streamHandler.ready(this);
}
}
diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/StreamHandler.java b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamHandler.java
index 43836c2d..048c61ec 100644
--- a/juick-server/src/main/java/com/juick/server/xmpp/router/StreamHandler.java
+++ b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamHandler.java
@@ -6,7 +6,7 @@ import rocks.xmpp.addr.Jid;
* Created by vitalyster on 01.02.2017.
*/
public interface StreamHandler {
- void ready();
+ void ready(StreamComponentServer componentServer);
void fail(final Exception ex);
boolean filter(Jid from, Jid to);
void stanzaReceived(String stanza);
diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java b/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java
index 6edecf05..5a6ee055 100644
--- a/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java
+++ b/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java
@@ -2,6 +2,7 @@ package com.juick.server.xmpp.router;
import com.juick.server.XMPPServer;
import com.juick.server.xmpp.s2s.BasicXmppSession;
+import com.juick.server.xmpp.s2s.CacheEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@@ -30,10 +31,12 @@ import java.io.StringWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
+import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
@Component
@@ -44,6 +47,7 @@ public class XMPPRouter implements StreamHandler {
private ExecutorService service;
private final List<StreamComponentServer> connections = Collections.synchronizedList(new ArrayList<>());
+ private final List<CacheEntry> outCache = new CopyOnWriteArrayList<>();
private ServerSocket listener;
@@ -135,6 +139,20 @@ public class XMPPRouter implements StreamHandler {
if (c.isLoggedIn()) {
connOut = c;
break;
+ } else {
+ logger.info("bouncing stanza to {} component until it will be ready", hostname);
+ boolean haveCache = false;
+ for (CacheEntry entry : outCache) {
+ if (entry.hostname != null && entry.hostname.equals(hostname)) {
+ entry.xml += xml;
+ entry.updated = Instant.now();
+ haveCache = true;
+ break;
+ }
+ }
+ if (!haveCache) {
+ outCache.add(new CacheEntry(Jid.of(hostname), xml));
+ }
}
}
}
@@ -172,9 +190,23 @@ public class XMPPRouter implements StreamHandler {
}
}
- @Override
- public void ready() {
+ public String getFromCache(Jid to) {
+ final String[] cache = new String[1];
+ outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(to)).findFirst().ifPresent(c -> {
+ cache[0] = c.xml;
+ outCache.remove(c);
+ });
+ return cache[0];
+ }
+ @Override
+ public void ready(StreamComponentServer componentServer) {
+ logger.info("component {} ready", componentServer.to);
+ String cache = getFromCache(componentServer.to);
+ if (cache != null) {
+ logger.debug("sending cache to {}", componentServer.to);
+ componentServer.send(cache);
+ }
}
@Override