From 475c877892ca414bcbaf812a75d53cf455fe4539 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 9 Oct 2018 13:28:06 +0300 Subject: SSE: service user support and synchronization --- .../main/java/com/juick/server/ServerManager.java | 36 ++++++++++++++-------- 1 file changed, 24 insertions(+), 12 deletions(-) (limited to 'juick-server') diff --git a/juick-server/src/main/java/com/juick/server/ServerManager.java b/juick-server/src/main/java/com/juick/server/ServerManager.java index 5f805c3d..4f25929b 100644 --- a/juick-server/src/main/java/com/juick/server/ServerManager.java +++ b/juick-server/src/main/java/com/juick/server/ServerManager.java @@ -39,6 +39,7 @@ import org.springframework.stereotype.Component; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.springframework.web.socket.TextMessage; +import javax.annotation.PostConstruct; import javax.inject.Inject; import java.io.IOException; import java.util.ArrayList; @@ -64,17 +65,25 @@ public class ServerManager implements NotificationListener { private SubscriptionService subscriptionService; @Inject private UserService userService; + private CopyOnWriteArrayList sessions = new CopyOnWriteArrayList<>(); + @Value("${service_user:juick}") - private String serviceUser; + private String serviceUsername; + + private User serviceUser; + + @PostConstruct + public void init() { + serviceUser = userService.getUserByName(serviceUsername); + } - private CopyOnWriteArrayList sessions = new CopyOnWriteArrayList<>(); private void onJuickPM(final User to, final com.juick.Message jmsg) { try { String json = jsonMapper.writeValueAsString(jmsg); synchronized (wsHandler.getClients()) { wsHandler.getClients().stream().filter(c -> - (!c.legacy && c.visitor.getUid() == to.getUid()) || c.visitor.getName().equals(serviceUser)) + (!c.legacy && c.visitor.getUid() == to.getUid()) || c.visitor.equals(serviceUser)) .forEach(c -> { try { logger.debug("sending pm to {}", c.visitor.getUid()); @@ -186,6 +195,7 @@ public class ServerManager implements NotificationListener { .forEach(b -> messagesService.setLastReadComment(b, reply.getMid(), reply.getRid())); onJuickMessageReply(reply, subscribedUsers); } + sendSseEvent(jmsg, Collections.singletonList(serviceUser)); } @Override @@ -224,6 +234,7 @@ public class ServerManager implements NotificationListener { } }); }); + sendSseEvent(source, Collections.singletonList(serviceUser)); } @Override @@ -233,15 +244,16 @@ public class ServerManager implements NotificationListener { public void sendSseEvent(Message msg, List subscribers) { List deadEmitters = new ArrayList<>(); - this.sessions.stream().filter(s -> subscribers.contains(s.user)).forEach(session -> { - try { - session.getEmitter().send(msg); - } - catch (Exception e) { - deadEmitters.add(session); - } - }); - this.sessions.removeAll(deadEmitters); + synchronized (this.getSessions()) { + this.sessions.stream().filter(s -> subscribers.contains(s.user)).forEach(session -> { + try { + session.getEmitter().send(msg); + } catch (Exception e) { + deadEmitters.add(session); + } + }); + this.sessions.removeAll(deadEmitters); + } } public static class EventSession { -- cgit v1.2.3