diff options
Diffstat (limited to 'juick-server/src/main/java/com/juick/server/ServerManager.java')
-rw-r--r-- | juick-server/src/main/java/com/juick/server/ServerManager.java | 61 |
1 files changed, 57 insertions, 4 deletions
diff --git a/juick-server/src/main/java/com/juick/server/ServerManager.java b/juick-server/src/main/java/com/juick/server/ServerManager.java index 81703492..5f805c3d 100644 --- a/juick-server/src/main/java/com/juick/server/ServerManager.java +++ b/juick-server/src/main/java/com/juick/server/ServerManager.java @@ -20,20 +20,31 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.juick.Message; import com.juick.User; -import com.juick.service.component.*; +import com.juick.model.AnonymousUser; import com.juick.service.MessagesService; import com.juick.service.SubscriptionService; import com.juick.service.UserService; +import com.juick.service.component.LikeEvent; +import com.juick.service.component.MessageEvent; +import com.juick.service.component.MessageReadEvent; +import com.juick.service.component.NotificationListener; +import com.juick.service.component.PingEvent; +import com.juick.service.component.SubscribeEvent; +import com.juick.service.component.TopEvent; import com.juick.util.MessageUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.springframework.web.socket.TextMessage; import javax.inject.Inject; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; /** @@ -56,12 +67,14 @@ public class ServerManager implements NotificationListener { @Value("${service_user:juick}") private String serviceUser; - private void onJuickPM(final int uid_to, final com.juick.Message jmsg) { + private CopyOnWriteArrayList<EventSession> sessions = new CopyOnWriteArrayList<>(); + + private void onJuickPM(final User to, final com.juick.Message jmsg) { try { String json = jsonMapper.writeValueAsString(jmsg); synchronized (wsHandler.getClients()) { wsHandler.getClients().stream().filter(c -> - (!c.legacy && c.visitor.getUid() == uid_to) || c.visitor.getName().equals(serviceUser)) + (!c.legacy && c.visitor.getUid() == to.getUid()) || c.visitor.getName().equals(serviceUser)) .forEach(c -> { try { logger.debug("sending pm to {}", c.visitor.getUid()); @@ -74,6 +87,7 @@ public class ServerManager implements NotificationListener { } catch (JsonProcessingException e) { logger.warn("Invalid JSON", e); } + sendSseEvent(jmsg, Collections.singletonList(to)); } private void onJuickMessagePost(final com.juick.Message jmsg, List<User> subscribedUsers) { @@ -108,6 +122,8 @@ public class ServerManager implements NotificationListener { } catch (JsonProcessingException e) { logger.warn("Invalid JSON", e); } + sendSseEvent(jmsg, subscribedUsers); + sendSseEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE)); } private void onJuickMessageReply(final com.juick.Message jmsg, final List<User> subscribedUsers) { @@ -145,6 +161,8 @@ public class ServerManager implements NotificationListener { } catch (JsonProcessingException e) { logger.warn("Invalid JSON", e); } + sendSseEvent(jmsg, subscribedUsers); + sendSseEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE)); } @Override @@ -155,7 +173,7 @@ public class ServerManager implements NotificationListener { return; } if (MessageUtils.isPM(jmsg)) { - onJuickPM(jmsg.getTo().getUid(), jmsg); + onJuickPM(jmsg.getTo(), jmsg); } else if (!MessageUtils.isReply(jmsg)) { // to get full message with attachment, etc. onJuickMessagePost(messagesService.getMessage(jmsg.getMid()), subscribedUsers); @@ -212,4 +230,39 @@ public class ServerManager implements NotificationListener { public void processTopEvent(TopEvent topEvent) { } + + public void sendSseEvent(Message msg, List<User> subscribers) { + List<EventSession> deadEmitters = new ArrayList<>(); + this.sessions.stream().filter(s -> subscribers.contains(s.user)).forEach(session -> { + try { + session.getEmitter().send(msg); + } + catch (Exception e) { + deadEmitters.add(session); + } + }); + this.sessions.removeAll(deadEmitters); + } + + public static class EventSession { + private User user; + private SseEmitter emitter; + + public EventSession(User user, SseEmitter sseEmitter) { + this.user = user; + this.emitter = sseEmitter; + } + + public User getUser() { + return user; + } + + public SseEmitter getEmitter() { + return emitter; + } + } + + public CopyOnWriteArrayList<EventSession> getSessions() { + return sessions; + } } |