diff options
author | Vitaly Takmazov | 2018-10-07 23:58:00 +0300 |
---|---|---|
committer | Vitaly Takmazov | 2018-10-09 13:10:00 +0300 |
commit | 8fa7260a5bbf332312fa3ba77e2a60d8b60054ac (patch) | |
tree | 5f1b8f3ee4a9a52332f6ddf433739b258b4b3bdf /juick-server/src/main/java | |
parent | b664517144d3a82fc1b7951f83751cec1174959d (diff) |
Server Side Events: server and webapp
Diffstat (limited to 'juick-server/src/main/java')
-rw-r--r-- | juick-server/src/main/java/com/juick/server/ServerManager.java | 61 | ||||
-rw-r--r-- | juick-server/src/main/java/com/juick/server/api/Service.java | 16 |
2 files changed, 73 insertions, 4 deletions
diff --git a/juick-server/src/main/java/com/juick/server/ServerManager.java b/juick-server/src/main/java/com/juick/server/ServerManager.java index 81703492..5f805c3d 100644 --- a/juick-server/src/main/java/com/juick/server/ServerManager.java +++ b/juick-server/src/main/java/com/juick/server/ServerManager.java @@ -20,20 +20,31 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.juick.Message; import com.juick.User; -import com.juick.service.component.*; +import com.juick.model.AnonymousUser; import com.juick.service.MessagesService; import com.juick.service.SubscriptionService; import com.juick.service.UserService; +import com.juick.service.component.LikeEvent; +import com.juick.service.component.MessageEvent; +import com.juick.service.component.MessageReadEvent; +import com.juick.service.component.NotificationListener; +import com.juick.service.component.PingEvent; +import com.juick.service.component.SubscribeEvent; +import com.juick.service.component.TopEvent; import com.juick.util.MessageUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.springframework.web.socket.TextMessage; import javax.inject.Inject; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; /** @@ -56,12 +67,14 @@ public class ServerManager implements NotificationListener { @Value("${service_user:juick}") private String serviceUser; - private void onJuickPM(final int uid_to, final com.juick.Message jmsg) { + private CopyOnWriteArrayList<EventSession> sessions = new CopyOnWriteArrayList<>(); + + private void onJuickPM(final User to, final com.juick.Message jmsg) { try { String json = jsonMapper.writeValueAsString(jmsg); synchronized (wsHandler.getClients()) { wsHandler.getClients().stream().filter(c -> - (!c.legacy && c.visitor.getUid() == uid_to) || c.visitor.getName().equals(serviceUser)) + (!c.legacy && c.visitor.getUid() == to.getUid()) || c.visitor.getName().equals(serviceUser)) .forEach(c -> { try { logger.debug("sending pm to {}", c.visitor.getUid()); @@ -74,6 +87,7 @@ public class ServerManager implements NotificationListener { } catch (JsonProcessingException e) { logger.warn("Invalid JSON", e); } + sendSseEvent(jmsg, Collections.singletonList(to)); } private void onJuickMessagePost(final com.juick.Message jmsg, List<User> subscribedUsers) { @@ -108,6 +122,8 @@ public class ServerManager implements NotificationListener { } catch (JsonProcessingException e) { logger.warn("Invalid JSON", e); } + sendSseEvent(jmsg, subscribedUsers); + sendSseEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE)); } private void onJuickMessageReply(final com.juick.Message jmsg, final List<User> subscribedUsers) { @@ -145,6 +161,8 @@ public class ServerManager implements NotificationListener { } catch (JsonProcessingException e) { logger.warn("Invalid JSON", e); } + sendSseEvent(jmsg, subscribedUsers); + sendSseEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE)); } @Override @@ -155,7 +173,7 @@ public class ServerManager implements NotificationListener { return; } if (MessageUtils.isPM(jmsg)) { - onJuickPM(jmsg.getTo().getUid(), jmsg); + onJuickPM(jmsg.getTo(), jmsg); } else if (!MessageUtils.isReply(jmsg)) { // to get full message with attachment, etc. onJuickMessagePost(messagesService.getMessage(jmsg.getMid()), subscribedUsers); @@ -212,4 +230,39 @@ public class ServerManager implements NotificationListener { public void processTopEvent(TopEvent topEvent) { } + + public void sendSseEvent(Message msg, List<User> subscribers) { + List<EventSession> deadEmitters = new ArrayList<>(); + this.sessions.stream().filter(s -> subscribers.contains(s.user)).forEach(session -> { + try { + session.getEmitter().send(msg); + } + catch (Exception e) { + deadEmitters.add(session); + } + }); + this.sessions.removeAll(deadEmitters); + } + + public static class EventSession { + private User user; + private SseEmitter emitter; + + public EventSession(User user, SseEmitter sseEmitter) { + this.user = user; + this.emitter = sseEmitter; + } + + public User getUser() { + return user; + } + + public SseEmitter getEmitter() { + return emitter; + } + } + + public CopyOnWriteArrayList<EventSession> getSessions() { + return sessions; + } } diff --git a/juick-server/src/main/java/com/juick/server/api/Service.java b/juick-server/src/main/java/com/juick/server/api/Service.java index 41f05f97..2351c076 100644 --- a/juick-server/src/main/java/com/juick/server/api/Service.java +++ b/juick-server/src/main/java/com/juick/server/api/Service.java @@ -3,6 +3,7 @@ package com.juick.server.api; import com.juick.User; import com.juick.server.CommandsManager; import com.juick.server.EmailManager; +import com.juick.server.ServerManager; import com.juick.server.util.HttpForbiddenException; import com.juick.server.util.UserUtils; import com.juick.service.EmailService; @@ -17,8 +18,10 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import springfox.documentation.annotations.ApiIgnore; import javax.inject.Inject; @@ -50,6 +53,8 @@ public class Service { private String tmpDir; @Value("${banned_emails:}") private String[] ignoredEmails; + @Inject + private ServerManager serverManager; Session session = Session.getDefaultInstance(new Properties()); @@ -131,4 +136,15 @@ public class Service { throw new HttpForbiddenException(); } } + @GetMapping("/api/events") + public SseEmitter handle() { + logger.info("{} connected", UserUtils.getCurrentUser().getName()); + SseEmitter emitter = new SseEmitter(86400000L); + serverManager.getSessions().add(new ServerManager.EventSession(UserUtils.getCurrentUser(), emitter)); + + emitter.onCompletion(() -> serverManager.getSessions().remove(emitter)); + emitter.onTimeout(() -> serverManager.getSessions().remove(emitter)); + + return emitter; + } } |