From 8fa7260a5bbf332312fa3ba77e2a60d8b60054ac Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Sun, 7 Oct 2018 23:58:00 +0300 Subject: Server Side Events: server and webapp --- juick-server/src/main/assets/scripts.js | 71 +++++----------------- .../main/java/com/juick/server/ServerManager.java | 61 +++++++++++++++++-- .../main/java/com/juick/server/api/Service.java | 16 +++++ 3 files changed, 88 insertions(+), 60 deletions(-) (limited to 'juick-server') diff --git a/juick-server/src/main/assets/scripts.js b/juick-server/src/main/assets/scripts.js index 1f2efd33..995a396a 100644 --- a/juick-server/src/main/assets/scripts.js +++ b/juick-server/src/main/assets/scripts.js @@ -122,27 +122,17 @@ function i18n(key, lang = undefined) { || key; } -var ws, - pageTitle; +var es, pageTitle; -function initWS() { - let url = new URL('/ws/', window.location.href); - url.protocol = url.protocol.replace('http', 'ws'); +function initES() { + let url = new URL('/api/events', window.location.href); let hash = document.getElementById('body').getAttribute('data-hash'); if (hash) { url += '?hash=' + hash; - } else { - let content = document.getElementById('content'); - if (content) { - let pageMID = content.getAttribute('data-mid'); - if (pageMID) { - url += pageMID; - } - } } - ws = new WebSocket(url); - ws.onopen = function() { + es = new EventSource(url); + es.onopen = function() { console.log('online'); if (!document.querySelector('#wsthread')) { var d = document.createElement('div'); @@ -152,47 +142,18 @@ function initWS() { pageTitle = document.title; } }; - ws.onclose = function() { - console.log('offline'); - ws = false; - setTimeout(function() { - initWS(); - }, 2000); - }; - ws.onmessage = function(msg) { - if (msg.data == ' ') { - ws.send(' '); - } else { - try { - var jsonMsg = JSON.parse(msg.data); - console.log('data: ' + msg.data); - if (jsonMsg.service) { - return; - } - wsIncomingReply(jsonMsg); - } catch (err) { - console.log(err); + es.onmessage = function(msg) { + try { + var jsonMsg = JSON.parse(msg.data); + console.log('data: ' + msg.data); + if (jsonMsg.service) { + return; } + wsIncomingReply(jsonMsg); + } catch (err) { + console.log(err); } }; - var keepAlive = setInterval(wsSendKeepAlive, 90000); - window.addEventListener('beforeunload', () => { - clearInterval(keepAlive); - ws.close(); - }); -} - -function wsSendKeepAlive() { - if (ws) { - ws.send(' '); - } -} - -function wsShutdown() { - if (ws) { - ws.onclose = function() { }; - ws.close(); - } } function wsIncomingReply(msg) { @@ -896,9 +857,7 @@ ready(function() { }); } }); - initWS(); - - window.addEventListener('pagehide', wsShutdown); + initES(); killy.embedAll(); var elSelector = 'header', diff --git a/juick-server/src/main/java/com/juick/server/ServerManager.java b/juick-server/src/main/java/com/juick/server/ServerManager.java index 81703492..5f805c3d 100644 --- a/juick-server/src/main/java/com/juick/server/ServerManager.java +++ b/juick-server/src/main/java/com/juick/server/ServerManager.java @@ -20,20 +20,31 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.juick.Message; import com.juick.User; -import com.juick.service.component.*; +import com.juick.model.AnonymousUser; import com.juick.service.MessagesService; import com.juick.service.SubscriptionService; import com.juick.service.UserService; +import com.juick.service.component.LikeEvent; +import com.juick.service.component.MessageEvent; +import com.juick.service.component.MessageReadEvent; +import com.juick.service.component.NotificationListener; +import com.juick.service.component.PingEvent; +import com.juick.service.component.SubscribeEvent; +import com.juick.service.component.TopEvent; import com.juick.util.MessageUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.springframework.web.socket.TextMessage; import javax.inject.Inject; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; /** @@ -56,12 +67,14 @@ public class ServerManager implements NotificationListener { @Value("${service_user:juick}") private String serviceUser; - private void onJuickPM(final int uid_to, final com.juick.Message jmsg) { + private CopyOnWriteArrayList sessions = new CopyOnWriteArrayList<>(); + + private void onJuickPM(final User to, final com.juick.Message jmsg) { try { String json = jsonMapper.writeValueAsString(jmsg); synchronized (wsHandler.getClients()) { wsHandler.getClients().stream().filter(c -> - (!c.legacy && c.visitor.getUid() == uid_to) || c.visitor.getName().equals(serviceUser)) + (!c.legacy && c.visitor.getUid() == to.getUid()) || c.visitor.getName().equals(serviceUser)) .forEach(c -> { try { logger.debug("sending pm to {}", c.visitor.getUid()); @@ -74,6 +87,7 @@ public class ServerManager implements NotificationListener { } catch (JsonProcessingException e) { logger.warn("Invalid JSON", e); } + sendSseEvent(jmsg, Collections.singletonList(to)); } private void onJuickMessagePost(final com.juick.Message jmsg, List subscribedUsers) { @@ -108,6 +122,8 @@ public class ServerManager implements NotificationListener { } catch (JsonProcessingException e) { logger.warn("Invalid JSON", e); } + sendSseEvent(jmsg, subscribedUsers); + sendSseEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE)); } private void onJuickMessageReply(final com.juick.Message jmsg, final List subscribedUsers) { @@ -145,6 +161,8 @@ public class ServerManager implements NotificationListener { } catch (JsonProcessingException e) { logger.warn("Invalid JSON", e); } + sendSseEvent(jmsg, subscribedUsers); + sendSseEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE)); } @Override @@ -155,7 +173,7 @@ public class ServerManager implements NotificationListener { return; } if (MessageUtils.isPM(jmsg)) { - onJuickPM(jmsg.getTo().getUid(), jmsg); + onJuickPM(jmsg.getTo(), jmsg); } else if (!MessageUtils.isReply(jmsg)) { // to get full message with attachment, etc. onJuickMessagePost(messagesService.getMessage(jmsg.getMid()), subscribedUsers); @@ -212,4 +230,39 @@ public class ServerManager implements NotificationListener { public void processTopEvent(TopEvent topEvent) { } + + public void sendSseEvent(Message msg, List subscribers) { + List deadEmitters = new ArrayList<>(); + this.sessions.stream().filter(s -> subscribers.contains(s.user)).forEach(session -> { + try { + session.getEmitter().send(msg); + } + catch (Exception e) { + deadEmitters.add(session); + } + }); + this.sessions.removeAll(deadEmitters); + } + + public static class EventSession { + private User user; + private SseEmitter emitter; + + public EventSession(User user, SseEmitter sseEmitter) { + this.user = user; + this.emitter = sseEmitter; + } + + public User getUser() { + return user; + } + + public SseEmitter getEmitter() { + return emitter; + } + } + + public CopyOnWriteArrayList getSessions() { + return sessions; + } } diff --git a/juick-server/src/main/java/com/juick/server/api/Service.java b/juick-server/src/main/java/com/juick/server/api/Service.java index 41f05f97..2351c076 100644 --- a/juick-server/src/main/java/com/juick/server/api/Service.java +++ b/juick-server/src/main/java/com/juick/server/api/Service.java @@ -3,6 +3,7 @@ package com.juick.server.api; import com.juick.User; import com.juick.server.CommandsManager; import com.juick.server.EmailManager; +import com.juick.server.ServerManager; import com.juick.server.util.HttpForbiddenException; import com.juick.server.util.UserUtils; import com.juick.service.EmailService; @@ -17,8 +18,10 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import springfox.documentation.annotations.ApiIgnore; import javax.inject.Inject; @@ -50,6 +53,8 @@ public class Service { private String tmpDir; @Value("${banned_emails:}") private String[] ignoredEmails; + @Inject + private ServerManager serverManager; Session session = Session.getDefaultInstance(new Properties()); @@ -131,4 +136,15 @@ public class Service { throw new HttpForbiddenException(); } } + @GetMapping("/api/events") + public SseEmitter handle() { + logger.info("{} connected", UserUtils.getCurrentUser().getName()); + SseEmitter emitter = new SseEmitter(86400000L); + serverManager.getSessions().add(new ServerManager.EventSession(UserUtils.getCurrentUser(), emitter)); + + emitter.onCompletion(() -> serverManager.getSessions().remove(emitter)); + emitter.onTimeout(() -> serverManager.getSessions().remove(emitter)); + + return emitter; + } } -- cgit v1.2.3