diff options
Diffstat (limited to 'src/main/java/com/juick/ServerManager.java')
-rw-r--r-- | src/main/java/com/juick/ServerManager.java | 184 |
1 files changed, 184 insertions, 0 deletions
diff --git a/src/main/java/com/juick/ServerManager.java b/src/main/java/com/juick/ServerManager.java new file mode 100644 index 00000000..f8f8b8c6 --- /dev/null +++ b/src/main/java/com/juick/ServerManager.java @@ -0,0 +1,184 @@ +/* + * Copyright (C) 2008-2020, Juick + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package com.juick; + +import com.juick.model.Message; +import com.juick.model.User; +import com.juick.model.AnonymousUser; +import com.juick.www.api.SystemActivity; +import com.juick.service.MessagesService; +import com.juick.service.SubscriptionService; +import com.juick.service.UserService; +import com.juick.service.component.*; +import com.juick.util.MessageUtils; +import org.apache.commons.collections4.ListUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import javax.annotation.Nonnull; +import javax.annotation.PostConstruct; +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.stream.Collectors; + +/** + * @author Ugnich Anton + */ +public class ServerManager implements NotificationListener { + private static final Logger logger = LoggerFactory.getLogger("Session"); + + @Inject + private MessagesService messagesService; + @Inject + private SubscriptionService subscriptionService; + @Inject + private UserService userService; + private final CopyOnWriteArrayList<EventSession> sessions = new CopyOnWriteArrayList<>(); + + @Value("${service_user:juick}") + private String serviceUsername; + + private User serviceUser; + + @PostConstruct + public void init() { + serviceUser = userService.getUserByName(serviceUsername); + } + + + private void onJuickPM(final User to, final Message jmsg) { + messageEvent(jmsg, Arrays.asList(to, jmsg.getUser())); + } + + private void onJuickMessagePost(final Message jmsg, List<User> subscribedUsers) { + messageEvent(jmsg, subscribedUsers); + messageEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE)); + } + + private void onJuickMessageReply(final Message jmsg, final List<User> subscribedUsers) { + messageEvent(jmsg, subscribedUsers); + messageEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE)); + } + + @Override + public void processSystemEvent(@Nonnull SystemEvent systemEvent) { + var activity = systemEvent.getActivity(); + var from = activity.getFrom(); + var message = activity.getMessage(); + var subscribers = activity.getTo(); + if (activity.getType().equals(SystemActivity.ActivityType.message)) { + processMessage(from, message, subscribers); + } else if (activity.getType().equals(SystemActivity.ActivityType.like)) { + if (from.equals(serviceUser)) { + processTop(message); + } + } + } + private void processMessage(User from, Message jmsg, List<User> subscribers) { + List<User> subscribedUsers = ListUtils.union(subscribers, Collections.singletonList(jmsg.getUser())); + if (jmsg.isService()) { + logger.info("Message read event from {} for {}", from.getUid(), jmsg.getMid()); + readEvent(jmsg, Collections.singletonList(serviceUser)); + return; + } + if (MessageUtils.isPM(jmsg)) { + onJuickPM(jmsg.getTo(), jmsg); + } else if (!MessageUtils.isReply(jmsg)) { + onJuickMessagePost(jmsg, subscribedUsers); + } else { + // to get quote and attachment + Message op = messagesService.getMessage(jmsg.getMid()).orElseThrow(IllegalStateException::new); + subscriptionService.getUsersSubscribedToComments(op, jmsg, true).stream() + .filter(u -> userService.isReplyToBL(u, jmsg)) + .forEach(b -> messagesService.setLastReadComment(b, jmsg.getMid(), jmsg.getRid())); + onJuickMessageReply(jmsg, subscribedUsers); + } + messageEvent(jmsg, Collections.singletonList(serviceUser)); + } + + @Override + public void processPingEvent(PingEvent pingEvent) { + + } + + private void processTop(Message msg) { + User topUser = msg.getUser(); + topEvent(msg, Arrays.asList(topUser, serviceUser)); + } + private void topEvent(Message msg, List<User> subscribers){ + sendSseEvent(msg, "top", subscribers); + } + + private void readEvent(Message msg, List<User> subscribers){ + sendSseEvent(msg, "read", subscribers); + } + + private void messageEvent(Message msg, List<User> subscribers) { + sendSseEvent(msg, "msg", subscribers); + } + + private void sendSseEvent(Message msg, String name, List<User> subscribers) { + List<EventSession> deadEmitters = new ArrayList<>(); + this.sessions.stream().filter(s -> subscribers.contains(s.user)).forEach(session -> { + try { + SseEmitter.SseEventBuilder builder = SseEmitter.event() + .name(name) + .data(msg); + session.getEmitter().send(builder); + } catch (Exception e) { + deadEmitters.add(session); + } + }); + this.sessions.removeAll(deadEmitters); + } + + public static class EventSession { + private final User user; + private final SseEmitter emitter; + + public EventSession(User user, SseEmitter sseEmitter) { + this.user = user; + this.emitter = sseEmitter; + } + + public User getUser() { + return user; + } + + public SseEmitter getEmitter() { + return emitter; + } + } + + public CopyOnWriteArrayList<EventSession> getSessions() { + return sessions; + } + @Scheduled(fixedRate = 30000) + public void ping() { + Message ping = new Message(); + ping.setService(true); + sendSseEvent(ping, "ping", getSessions().stream().map(s -> s.user) + .distinct().collect(Collectors.toList())); + } +} |