diff options
Diffstat (limited to 'src/main/java/com/juick/server/ServerManager.java')
-rw-r--r-- | src/main/java/com/juick/server/ServerManager.java | 295 |
1 files changed, 295 insertions, 0 deletions
diff --git a/src/main/java/com/juick/server/ServerManager.java b/src/main/java/com/juick/server/ServerManager.java new file mode 100644 index 00000000..ef848526 --- /dev/null +++ b/src/main/java/com/juick/server/ServerManager.java @@ -0,0 +1,295 @@ +/* + * Copyright (C) 2008-2017, Juick + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package com.juick.server; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.juick.Message; +import com.juick.User; +import com.juick.model.AnonymousUser; +import com.juick.service.MessagesService; +import com.juick.service.SubscriptionService; +import com.juick.service.UserService; +import com.juick.service.component.LikeEvent; +import com.juick.service.component.MessageEvent; +import com.juick.service.component.MessageReadEvent; +import com.juick.service.component.NotificationListener; +import com.juick.service.component.PingEvent; +import com.juick.service.component.SubscribeEvent; +import com.juick.service.component.TopEvent; +import com.juick.util.MessageUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; +import org.springframework.web.socket.TextMessage; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; + +/** + * @author Ugnich Anton + */ +@Component +public class ServerManager implements NotificationListener { + private static Logger logger = LoggerFactory.getLogger(ServerManager.class); + + @Inject + private ObjectMapper jsonMapper; + @Inject + private MessagesService messagesService; + @Inject + private WebsocketManager wsHandler; + @Inject + private SubscriptionService subscriptionService; + @Inject + private UserService userService; + private CopyOnWriteArrayList<EventSession> sessions = new CopyOnWriteArrayList<>(); + + @Value("${service_user:juick}") + private String serviceUsername; + + private User serviceUser; + + @PostConstruct + public void init() { + serviceUser = userService.getUserByName(serviceUsername); + } + + + private void onJuickPM(final User to, final com.juick.Message jmsg) { + try { + String json = jsonMapper.writeValueAsString(jmsg); + synchronized (wsHandler.getClients()) { + wsHandler.getClients().stream().filter(c -> + (!c.legacy && c.visitor.getUid() == to.getUid()) || c.visitor.equals(serviceUser)) + .forEach(c -> { + try { + logger.debug("sending pm to {}", c.visitor.getUid()); + c.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); + } + } catch (JsonProcessingException e) { + logger.warn("Invalid JSON", e); + } + messageEvent(jmsg, Collections.singletonList(to)); + } + + private void onJuickMessagePost(final com.juick.Message jmsg, List<User> subscribedUsers) { + try { + String json = jsonMapper.writeValueAsString(jmsg); + List<Integer> uids = subscribedUsers + .stream().map(User::getUid).collect(Collectors.toList()); + synchronized (wsHandler.getClients()) { + wsHandler.getClients().stream().filter(c -> + (!c.legacy && c.visitor.isAnonymous()) // anonymous users + || c.visitor.equals(serviceUser) // services + || (!c.legacy && uids.contains(c.visitor.getUid()))) // subscriptions + .forEach(c -> { + try { + logger.debug("sending message to {}", c.visitor.getUid()); + c.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); + wsHandler.getClients().stream().filter(c -> + c.legacy && c.allMessages) // legacy all posts + .forEach(c -> { + try { + logger.debug("sending message to legacy client {}", c.visitor.getUid()); + c.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); + } + } catch (JsonProcessingException e) { + logger.warn("Invalid JSON", e); + } + messageEvent(jmsg, subscribedUsers); + messageEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE)); + } + + private void onJuickMessageReply(final com.juick.Message jmsg, final List<User> subscribedUsers) { + try { + + String json = jsonMapper.writeValueAsString(jmsg); + com.juick.Message op = messagesService.getMessage(jmsg.getMid()); + List<Integer> threadUsers = + subscribedUsers + .stream().map(User::getUid).collect(Collectors.toList()); + synchronized (wsHandler.getClients()) { + wsHandler.getClients().stream().filter(c -> + (!c.legacy && c.visitor.isAnonymous()) // anonymous users + || c.visitor.equals(serviceUser) // services + || (!c.legacy && threadUsers.contains(c.visitor.getUid()))) // subscriptions + .forEach(c -> { + try { + logger.debug("sending reply to {}", c.visitor.getUid()); + c.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); + wsHandler.getClients().stream().filter(c -> + (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMid())) // legacy replies + .forEach(c -> { + try { + logger.debug("sending reply to legacy client {}", c.visitor.getUid()); + c.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); + } + } catch (JsonProcessingException e) { + logger.warn("Invalid JSON", e); + } + messageEvent(jmsg, subscribedUsers); + messageEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE)); + } + + @Override + public void processMessageEvent(MessageEvent event) { + com.juick.Message jmsg = event.getMessage(); + List<User> subscribedUsers = event.getUsers(); + if (jmsg.isService()) { + readEvent(jmsg, Collections.singletonList(serviceUser)); + return; + } + if (MessageUtils.isPM(jmsg)) { + onJuickPM(jmsg.getTo(), jmsg); + } else if (!MessageUtils.isReply(jmsg)) { + // to get full message with attachment, etc. + onJuickMessagePost(messagesService.getMessage(jmsg.getMid()), subscribedUsers); + } else { + // to get quote and attachment + Message op = messagesService.getMessage(jmsg.getMid()); + com.juick.Message reply = messagesService.getReply(jmsg.getMid(), jmsg.getRid()); + subscriptionService.getUsersSubscribedToComments(op, reply, true).stream() + .filter(u -> userService.isReplyToBL(u, reply)) + .forEach(b -> messagesService.setLastReadComment(b, reply.getMid(), reply.getRid())); + onJuickMessageReply(reply, subscribedUsers); + } + messageEvent(jmsg, Collections.singletonList(serviceUser)); + } + + @Override + public void processSubscribeEvent(SubscribeEvent subscribeEvent) { + + } + + @Override + public void processLikeEvent(LikeEvent likeEvent) { + + } + + @Override + public void processPingEvent(PingEvent pingEvent) { + + } + + @Override + public void processMessageReadEvent(MessageReadEvent messageReadEvent) { + User user = messageReadEvent.getUser(); + Message source = messageReadEvent.getMessage(); + + logger.info("Message read event from {} for {}", user.getUid(), source.getMid()); + Message serviceMessage = new Message(); + serviceMessage.setService(true); + serviceMessage.setUser(user); + serviceMessage.setMid(source.getMid()); + serviceMessage.setUnread(false); + wsHandler.getClients().stream().filter(c -> + (!c.legacy && c.visitor == user) || c.visitor.equals(serviceUser) + ).forEach(u -> { + try { + u.sendMessage(new TextMessage(jsonMapper.writeValueAsString(serviceMessage))); + } catch (IOException e) { + logger.error("JSON error", e); + } + }); + readEvent(serviceMessage, Collections.singletonList(serviceUser)); + } + + @Override + public void processTopEvent(TopEvent topEvent) { + User topUser = topEvent.getMessage().getUser(); + topEvent(topEvent.getMessage(), Arrays.asList(topUser, serviceUser)); + } + + public void topEvent(Message msg, List<User> subscribers){ + sendSseEvent(msg, "top", subscribers); + } + + public void readEvent(Message msg, List<User> subscribers){ + sendSseEvent(msg, "read", subscribers); + } + + public void messageEvent(Message msg, List<User> subscribers){ + sendSseEvent(msg, "msg", subscribers); + } + + private void sendSseEvent(Message msg, String name, List<User> subscribers) { + List<EventSession> deadEmitters = new ArrayList<>(); + this.sessions.stream().filter(s -> subscribers.contains(s.user)).forEach(session -> { + try { + SseEmitter.SseEventBuilder builder = SseEmitter.event() + .name(name) + .data(msg); + session.getEmitter().send(builder); + } catch (Exception e) { + deadEmitters.add(session); + } + }); + this.sessions.removeAll(deadEmitters); + } + + public static class EventSession { + private User user; + private SseEmitter emitter; + + public EventSession(User user, SseEmitter sseEmitter) { + this.user = user; + this.emitter = sseEmitter; + } + + public User getUser() { + return user; + } + + public SseEmitter getEmitter() { + return emitter; + } + } + + public CopyOnWriteArrayList<EventSession> getSessions() { + return sessions; + } +} |