aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/com/juick/server/ServerManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/juick/server/ServerManager.java')
-rw-r--r--src/main/java/com/juick/server/ServerManager.java295
1 files changed, 295 insertions, 0 deletions
diff --git a/src/main/java/com/juick/server/ServerManager.java b/src/main/java/com/juick/server/ServerManager.java
new file mode 100644
index 00000000..ef848526
--- /dev/null
+++ b/src/main/java/com/juick/server/ServerManager.java
@@ -0,0 +1,295 @@
+/*
+ * Copyright (C) 2008-2017, Juick
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package com.juick.server;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.juick.Message;
+import com.juick.User;
+import com.juick.model.AnonymousUser;
+import com.juick.service.MessagesService;
+import com.juick.service.SubscriptionService;
+import com.juick.service.UserService;
+import com.juick.service.component.LikeEvent;
+import com.juick.service.component.MessageEvent;
+import com.juick.service.component.MessageReadEvent;
+import com.juick.service.component.NotificationListener;
+import com.juick.service.component.PingEvent;
+import com.juick.service.component.SubscribeEvent;
+import com.juick.service.component.TopEvent;
+import com.juick.util.MessageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+import org.springframework.web.socket.TextMessage;
+
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.Collectors;
+
+/**
+ * @author Ugnich Anton
+ */
+@Component
+public class ServerManager implements NotificationListener {
+ private static Logger logger = LoggerFactory.getLogger(ServerManager.class);
+
+ @Inject
+ private ObjectMapper jsonMapper;
+ @Inject
+ private MessagesService messagesService;
+ @Inject
+ private WebsocketManager wsHandler;
+ @Inject
+ private SubscriptionService subscriptionService;
+ @Inject
+ private UserService userService;
+ private CopyOnWriteArrayList<EventSession> sessions = new CopyOnWriteArrayList<>();
+
+ @Value("${service_user:juick}")
+ private String serviceUsername;
+
+ private User serviceUser;
+
+ @PostConstruct
+ public void init() {
+ serviceUser = userService.getUserByName(serviceUsername);
+ }
+
+
+ private void onJuickPM(final User to, final com.juick.Message jmsg) {
+ try {
+ String json = jsonMapper.writeValueAsString(jmsg);
+ synchronized (wsHandler.getClients()) {
+ wsHandler.getClients().stream().filter(c ->
+ (!c.legacy && c.visitor.getUid() == to.getUid()) || c.visitor.equals(serviceUser))
+ .forEach(c -> {
+ try {
+ logger.debug("sending pm to {}", c.visitor.getUid());
+ c.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.warn("ws error", e);
+ }
+ });
+ }
+ } catch (JsonProcessingException e) {
+ logger.warn("Invalid JSON", e);
+ }
+ messageEvent(jmsg, Collections.singletonList(to));
+ }
+
+ private void onJuickMessagePost(final com.juick.Message jmsg, List<User> subscribedUsers) {
+ try {
+ String json = jsonMapper.writeValueAsString(jmsg);
+ List<Integer> uids = subscribedUsers
+ .stream().map(User::getUid).collect(Collectors.toList());
+ synchronized (wsHandler.getClients()) {
+ wsHandler.getClients().stream().filter(c ->
+ (!c.legacy && c.visitor.isAnonymous()) // anonymous users
+ || c.visitor.equals(serviceUser) // services
+ || (!c.legacy && uids.contains(c.visitor.getUid()))) // subscriptions
+ .forEach(c -> {
+ try {
+ logger.debug("sending message to {}", c.visitor.getUid());
+ c.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.warn("ws error", e);
+ }
+ });
+ wsHandler.getClients().stream().filter(c ->
+ c.legacy && c.allMessages) // legacy all posts
+ .forEach(c -> {
+ try {
+ logger.debug("sending message to legacy client {}", c.visitor.getUid());
+ c.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.warn("ws error", e);
+ }
+ });
+ }
+ } catch (JsonProcessingException e) {
+ logger.warn("Invalid JSON", e);
+ }
+ messageEvent(jmsg, subscribedUsers);
+ messageEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE));
+ }
+
+ private void onJuickMessageReply(final com.juick.Message jmsg, final List<User> subscribedUsers) {
+ try {
+
+ String json = jsonMapper.writeValueAsString(jmsg);
+ com.juick.Message op = messagesService.getMessage(jmsg.getMid());
+ List<Integer> threadUsers =
+ subscribedUsers
+ .stream().map(User::getUid).collect(Collectors.toList());
+ synchronized (wsHandler.getClients()) {
+ wsHandler.getClients().stream().filter(c ->
+ (!c.legacy && c.visitor.isAnonymous()) // anonymous users
+ || c.visitor.equals(serviceUser) // services
+ || (!c.legacy && threadUsers.contains(c.visitor.getUid()))) // subscriptions
+ .forEach(c -> {
+ try {
+ logger.debug("sending reply to {}", c.visitor.getUid());
+ c.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.warn("ws error", e);
+ }
+ });
+ wsHandler.getClients().stream().filter(c ->
+ (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMid())) // legacy replies
+ .forEach(c -> {
+ try {
+ logger.debug("sending reply to legacy client {}", c.visitor.getUid());
+ c.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.warn("ws error", e);
+ }
+ });
+ }
+ } catch (JsonProcessingException e) {
+ logger.warn("Invalid JSON", e);
+ }
+ messageEvent(jmsg, subscribedUsers);
+ messageEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE));
+ }
+
+ @Override
+ public void processMessageEvent(MessageEvent event) {
+ com.juick.Message jmsg = event.getMessage();
+ List<User> subscribedUsers = event.getUsers();
+ if (jmsg.isService()) {
+ readEvent(jmsg, Collections.singletonList(serviceUser));
+ return;
+ }
+ if (MessageUtils.isPM(jmsg)) {
+ onJuickPM(jmsg.getTo(), jmsg);
+ } else if (!MessageUtils.isReply(jmsg)) {
+ // to get full message with attachment, etc.
+ onJuickMessagePost(messagesService.getMessage(jmsg.getMid()), subscribedUsers);
+ } else {
+ // to get quote and attachment
+ Message op = messagesService.getMessage(jmsg.getMid());
+ com.juick.Message reply = messagesService.getReply(jmsg.getMid(), jmsg.getRid());
+ subscriptionService.getUsersSubscribedToComments(op, reply, true).stream()
+ .filter(u -> userService.isReplyToBL(u, reply))
+ .forEach(b -> messagesService.setLastReadComment(b, reply.getMid(), reply.getRid()));
+ onJuickMessageReply(reply, subscribedUsers);
+ }
+ messageEvent(jmsg, Collections.singletonList(serviceUser));
+ }
+
+ @Override
+ public void processSubscribeEvent(SubscribeEvent subscribeEvent) {
+
+ }
+
+ @Override
+ public void processLikeEvent(LikeEvent likeEvent) {
+
+ }
+
+ @Override
+ public void processPingEvent(PingEvent pingEvent) {
+
+ }
+
+ @Override
+ public void processMessageReadEvent(MessageReadEvent messageReadEvent) {
+ User user = messageReadEvent.getUser();
+ Message source = messageReadEvent.getMessage();
+
+ logger.info("Message read event from {} for {}", user.getUid(), source.getMid());
+ Message serviceMessage = new Message();
+ serviceMessage.setService(true);
+ serviceMessage.setUser(user);
+ serviceMessage.setMid(source.getMid());
+ serviceMessage.setUnread(false);
+ wsHandler.getClients().stream().filter(c ->
+ (!c.legacy && c.visitor == user) || c.visitor.equals(serviceUser)
+ ).forEach(u -> {
+ try {
+ u.sendMessage(new TextMessage(jsonMapper.writeValueAsString(serviceMessage)));
+ } catch (IOException e) {
+ logger.error("JSON error", e);
+ }
+ });
+ readEvent(serviceMessage, Collections.singletonList(serviceUser));
+ }
+
+ @Override
+ public void processTopEvent(TopEvent topEvent) {
+ User topUser = topEvent.getMessage().getUser();
+ topEvent(topEvent.getMessage(), Arrays.asList(topUser, serviceUser));
+ }
+
+ public void topEvent(Message msg, List<User> subscribers){
+ sendSseEvent(msg, "top", subscribers);
+ }
+
+ public void readEvent(Message msg, List<User> subscribers){
+ sendSseEvent(msg, "read", subscribers);
+ }
+
+ public void messageEvent(Message msg, List<User> subscribers){
+ sendSseEvent(msg, "msg", subscribers);
+ }
+
+ private void sendSseEvent(Message msg, String name, List<User> subscribers) {
+ List<EventSession> deadEmitters = new ArrayList<>();
+ this.sessions.stream().filter(s -> subscribers.contains(s.user)).forEach(session -> {
+ try {
+ SseEmitter.SseEventBuilder builder = SseEmitter.event()
+ .name(name)
+ .data(msg);
+ session.getEmitter().send(builder);
+ } catch (Exception e) {
+ deadEmitters.add(session);
+ }
+ });
+ this.sessions.removeAll(deadEmitters);
+ }
+
+ public static class EventSession {
+ private User user;
+ private SseEmitter emitter;
+
+ public EventSession(User user, SseEmitter sseEmitter) {
+ this.user = user;
+ this.emitter = sseEmitter;
+ }
+
+ public User getUser() {
+ return user;
+ }
+
+ public SseEmitter getEmitter() {
+ return emitter;
+ }
+ }
+
+ public CopyOnWriteArrayList<EventSession> getSessions() {
+ return sessions;
+ }
+}