aboutsummaryrefslogtreecommitdiff
path: root/juick-server/src/main
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2018-10-09 13:28:06 +0300
committerGravatar Vitaly Takmazov2018-10-09 13:28:06 +0300
commit475c877892ca414bcbaf812a75d53cf455fe4539 (patch)
tree50dc5f745a388c2101644432ca54bd86e446f7d3 /juick-server/src/main
parent8fa7260a5bbf332312fa3ba77e2a60d8b60054ac (diff)
SSE: service user support and synchronization
Diffstat (limited to 'juick-server/src/main')
-rw-r--r--juick-server/src/main/java/com/juick/server/ServerManager.java36
1 files changed, 24 insertions, 12 deletions
diff --git a/juick-server/src/main/java/com/juick/server/ServerManager.java b/juick-server/src/main/java/com/juick/server/ServerManager.java
index 5f805c3d..4f25929b 100644
--- a/juick-server/src/main/java/com/juick/server/ServerManager.java
+++ b/juick-server/src/main/java/com/juick/server/ServerManager.java
@@ -39,6 +39,7 @@ import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.springframework.web.socket.TextMessage;
+import javax.annotation.PostConstruct;
import javax.inject.Inject;
import java.io.IOException;
import java.util.ArrayList;
@@ -64,17 +65,25 @@ public class ServerManager implements NotificationListener {
private SubscriptionService subscriptionService;
@Inject
private UserService userService;
+ private CopyOnWriteArrayList<EventSession> sessions = new CopyOnWriteArrayList<>();
+
@Value("${service_user:juick}")
- private String serviceUser;
+ private String serviceUsername;
+
+ private User serviceUser;
+
+ @PostConstruct
+ public void init() {
+ serviceUser = userService.getUserByName(serviceUsername);
+ }
- private CopyOnWriteArrayList<EventSession> sessions = new CopyOnWriteArrayList<>();
private void onJuickPM(final User to, final com.juick.Message jmsg) {
try {
String json = jsonMapper.writeValueAsString(jmsg);
synchronized (wsHandler.getClients()) {
wsHandler.getClients().stream().filter(c ->
- (!c.legacy && c.visitor.getUid() == to.getUid()) || c.visitor.getName().equals(serviceUser))
+ (!c.legacy && c.visitor.getUid() == to.getUid()) || c.visitor.equals(serviceUser))
.forEach(c -> {
try {
logger.debug("sending pm to {}", c.visitor.getUid());
@@ -186,6 +195,7 @@ public class ServerManager implements NotificationListener {
.forEach(b -> messagesService.setLastReadComment(b, reply.getMid(), reply.getRid()));
onJuickMessageReply(reply, subscribedUsers);
}
+ sendSseEvent(jmsg, Collections.singletonList(serviceUser));
}
@Override
@@ -224,6 +234,7 @@ public class ServerManager implements NotificationListener {
}
});
});
+ sendSseEvent(source, Collections.singletonList(serviceUser));
}
@Override
@@ -233,15 +244,16 @@ public class ServerManager implements NotificationListener {
public void sendSseEvent(Message msg, List<User> subscribers) {
List<EventSession> deadEmitters = new ArrayList<>();
- this.sessions.stream().filter(s -> subscribers.contains(s.user)).forEach(session -> {
- try {
- session.getEmitter().send(msg);
- }
- catch (Exception e) {
- deadEmitters.add(session);
- }
- });
- this.sessions.removeAll(deadEmitters);
+ synchronized (this.getSessions()) {
+ this.sessions.stream().filter(s -> subscribers.contains(s.user)).forEach(session -> {
+ try {
+ session.getEmitter().send(msg);
+ } catch (Exception e) {
+ deadEmitters.add(session);
+ }
+ });
+ this.sessions.removeAll(deadEmitters);
+ }
}
public static class EventSession {