From 208de1f5f5ecac70fd4dfbc3d5c46ae786fad8a5 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 17 Jul 2018 16:53:43 +0300 Subject: XMPP: cache router stanzas until component connected --- .../main/java/com/juick/server/XMPPConnection.java | 1 + .../server/xmpp/router/StreamComponentServer.java | 2 +- .../juick/server/xmpp/router/StreamHandler.java | 2 +- .../com/juick/server/xmpp/router/XMPPRouter.java | 36 ++++++++++++++++++++-- 4 files changed, 37 insertions(+), 4 deletions(-) diff --git a/juick-server/src/main/java/com/juick/server/XMPPConnection.java b/juick-server/src/main/java/com/juick/server/XMPPConnection.java index b63a4d47..344860fb 100644 --- a/juick-server/src/main/java/com/juick/server/XMPPConnection.java +++ b/juick-server/src/main/java/com/juick/server/XMPPConnection.java @@ -250,6 +250,7 @@ public class XMPPConnection implements StanzaListener, NotificationListener { } }); router.addSessionStatusListener(event -> { + logger.info("event: " + event.getStatus(), event.getThrowable()); if (event.getStatus().equals(XmppSession.Status.AUTHENTICATED)) { logger.info("Authenticated, broadcasting..."); broadcastPresence(null); diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/StreamComponentServer.java b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamComponentServer.java index 5e2f6f82..a7759afa 100644 --- a/juick-server/src/main/java/com/juick/server/xmpp/router/StreamComponentServer.java +++ b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamComponentServer.java @@ -53,6 +53,6 @@ public class StreamComponentServer extends Stream { return; } send(new Handshake().toString()); - streamHandler.ready(); + streamHandler.ready(this); } } diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/StreamHandler.java b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamHandler.java index 43836c2d..048c61ec 100644 --- a/juick-server/src/main/java/com/juick/server/xmpp/router/StreamHandler.java +++ b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamHandler.java @@ -6,7 +6,7 @@ import rocks.xmpp.addr.Jid; * Created by vitalyster on 01.02.2017. */ public interface StreamHandler { - void ready(); + void ready(StreamComponentServer componentServer); void fail(final Exception ex); boolean filter(Jid from, Jid to); void stanzaReceived(String stanza); diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java b/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java index 6edecf05..5a6ee055 100644 --- a/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java +++ b/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java @@ -2,6 +2,7 @@ package com.juick.server.xmpp.router; import com.juick.server.XMPPServer; import com.juick.server.xmpp.s2s.BasicXmppSession; +import com.juick.server.xmpp.s2s.CacheEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -30,10 +31,12 @@ import java.io.StringWriter; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; @Component @@ -44,6 +47,7 @@ public class XMPPRouter implements StreamHandler { private ExecutorService service; private final List connections = Collections.synchronizedList(new ArrayList<>()); + private final List outCache = new CopyOnWriteArrayList<>(); private ServerSocket listener; @@ -135,6 +139,20 @@ public class XMPPRouter implements StreamHandler { if (c.isLoggedIn()) { connOut = c; break; + } else { + logger.info("bouncing stanza to {} component until it will be ready", hostname); + boolean haveCache = false; + for (CacheEntry entry : outCache) { + if (entry.hostname != null && entry.hostname.equals(hostname)) { + entry.xml += xml; + entry.updated = Instant.now(); + haveCache = true; + break; + } + } + if (!haveCache) { + outCache.add(new CacheEntry(Jid.of(hostname), xml)); + } } } } @@ -172,9 +190,23 @@ public class XMPPRouter implements StreamHandler { } } - @Override - public void ready() { + public String getFromCache(Jid to) { + final String[] cache = new String[1]; + outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(to)).findFirst().ifPresent(c -> { + cache[0] = c.xml; + outCache.remove(c); + }); + return cache[0]; + } + @Override + public void ready(StreamComponentServer componentServer) { + logger.info("component {} ready", componentServer.to); + String cache = getFromCache(componentServer.to); + if (cache != null) { + logger.debug("sending cache to {}", componentServer.to); + componentServer.send(cache); + } } @Override -- cgit v1.2.3