aboutsummaryrefslogtreecommitdiff
path: root/juick-server/src/main/java
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2018-10-07 23:58:00 +0300
committerGravatar Vitaly Takmazov2018-10-09 13:10:00 +0300
commit8fa7260a5bbf332312fa3ba77e2a60d8b60054ac (patch)
tree5f1b8f3ee4a9a52332f6ddf433739b258b4b3bdf /juick-server/src/main/java
parentb664517144d3a82fc1b7951f83751cec1174959d (diff)
Server Side Events: server and webapp
Diffstat (limited to 'juick-server/src/main/java')
-rw-r--r--juick-server/src/main/java/com/juick/server/ServerManager.java61
-rw-r--r--juick-server/src/main/java/com/juick/server/api/Service.java16
2 files changed, 73 insertions, 4 deletions
diff --git a/juick-server/src/main/java/com/juick/server/ServerManager.java b/juick-server/src/main/java/com/juick/server/ServerManager.java
index 81703492..5f805c3d 100644
--- a/juick-server/src/main/java/com/juick/server/ServerManager.java
+++ b/juick-server/src/main/java/com/juick/server/ServerManager.java
@@ -20,20 +20,31 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.juick.Message;
import com.juick.User;
-import com.juick.service.component.*;
+import com.juick.model.AnonymousUser;
import com.juick.service.MessagesService;
import com.juick.service.SubscriptionService;
import com.juick.service.UserService;
+import com.juick.service.component.LikeEvent;
+import com.juick.service.component.MessageEvent;
+import com.juick.service.component.MessageReadEvent;
+import com.juick.service.component.NotificationListener;
+import com.juick.service.component.PingEvent;
+import com.juick.service.component.SubscribeEvent;
+import com.juick.service.component.TopEvent;
import com.juick.util.MessageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.springframework.web.socket.TextMessage;
import javax.inject.Inject;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
/**
@@ -56,12 +67,14 @@ public class ServerManager implements NotificationListener {
@Value("${service_user:juick}")
private String serviceUser;
- private void onJuickPM(final int uid_to, final com.juick.Message jmsg) {
+ private CopyOnWriteArrayList<EventSession> sessions = new CopyOnWriteArrayList<>();
+
+ private void onJuickPM(final User to, final com.juick.Message jmsg) {
try {
String json = jsonMapper.writeValueAsString(jmsg);
synchronized (wsHandler.getClients()) {
wsHandler.getClients().stream().filter(c ->
- (!c.legacy && c.visitor.getUid() == uid_to) || c.visitor.getName().equals(serviceUser))
+ (!c.legacy && c.visitor.getUid() == to.getUid()) || c.visitor.getName().equals(serviceUser))
.forEach(c -> {
try {
logger.debug("sending pm to {}", c.visitor.getUid());
@@ -74,6 +87,7 @@ public class ServerManager implements NotificationListener {
} catch (JsonProcessingException e) {
logger.warn("Invalid JSON", e);
}
+ sendSseEvent(jmsg, Collections.singletonList(to));
}
private void onJuickMessagePost(final com.juick.Message jmsg, List<User> subscribedUsers) {
@@ -108,6 +122,8 @@ public class ServerManager implements NotificationListener {
} catch (JsonProcessingException e) {
logger.warn("Invalid JSON", e);
}
+ sendSseEvent(jmsg, subscribedUsers);
+ sendSseEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE));
}
private void onJuickMessageReply(final com.juick.Message jmsg, final List<User> subscribedUsers) {
@@ -145,6 +161,8 @@ public class ServerManager implements NotificationListener {
} catch (JsonProcessingException e) {
logger.warn("Invalid JSON", e);
}
+ sendSseEvent(jmsg, subscribedUsers);
+ sendSseEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE));
}
@Override
@@ -155,7 +173,7 @@ public class ServerManager implements NotificationListener {
return;
}
if (MessageUtils.isPM(jmsg)) {
- onJuickPM(jmsg.getTo().getUid(), jmsg);
+ onJuickPM(jmsg.getTo(), jmsg);
} else if (!MessageUtils.isReply(jmsg)) {
// to get full message with attachment, etc.
onJuickMessagePost(messagesService.getMessage(jmsg.getMid()), subscribedUsers);
@@ -212,4 +230,39 @@ public class ServerManager implements NotificationListener {
public void processTopEvent(TopEvent topEvent) {
}
+
+ public void sendSseEvent(Message msg, List<User> subscribers) {
+ List<EventSession> deadEmitters = new ArrayList<>();
+ this.sessions.stream().filter(s -> subscribers.contains(s.user)).forEach(session -> {
+ try {
+ session.getEmitter().send(msg);
+ }
+ catch (Exception e) {
+ deadEmitters.add(session);
+ }
+ });
+ this.sessions.removeAll(deadEmitters);
+ }
+
+ public static class EventSession {
+ private User user;
+ private SseEmitter emitter;
+
+ public EventSession(User user, SseEmitter sseEmitter) {
+ this.user = user;
+ this.emitter = sseEmitter;
+ }
+
+ public User getUser() {
+ return user;
+ }
+
+ public SseEmitter getEmitter() {
+ return emitter;
+ }
+ }
+
+ public CopyOnWriteArrayList<EventSession> getSessions() {
+ return sessions;
+ }
}
diff --git a/juick-server/src/main/java/com/juick/server/api/Service.java b/juick-server/src/main/java/com/juick/server/api/Service.java
index 41f05f97..2351c076 100644
--- a/juick-server/src/main/java/com/juick/server/api/Service.java
+++ b/juick-server/src/main/java/com/juick/server/api/Service.java
@@ -3,6 +3,7 @@ package com.juick.server.api;
import com.juick.User;
import com.juick.server.CommandsManager;
import com.juick.server.EmailManager;
+import com.juick.server.ServerManager;
import com.juick.server.util.HttpForbiddenException;
import com.juick.server.util.UserUtils;
import com.juick.service.EmailService;
@@ -17,8 +18,10 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import springfox.documentation.annotations.ApiIgnore;
import javax.inject.Inject;
@@ -50,6 +53,8 @@ public class Service {
private String tmpDir;
@Value("${banned_emails:}")
private String[] ignoredEmails;
+ @Inject
+ private ServerManager serverManager;
Session session = Session.getDefaultInstance(new Properties());
@@ -131,4 +136,15 @@ public class Service {
throw new HttpForbiddenException();
}
}
+ @GetMapping("/api/events")
+ public SseEmitter handle() {
+ logger.info("{} connected", UserUtils.getCurrentUser().getName());
+ SseEmitter emitter = new SseEmitter(86400000L);
+ serverManager.getSessions().add(new ServerManager.EventSession(UserUtils.getCurrentUser(), emitter));
+
+ emitter.onCompletion(() -> serverManager.getSessions().remove(emitter));
+ emitter.onTimeout(() -> serverManager.getSessions().remove(emitter));
+
+ return emitter;
+ }
}