/* * Copyright (C) 2008-2020, Juick * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. */ package com.juick; import com.juick.model.Message; import com.juick.model.User; import com.juick.model.AnonymousUser; import com.juick.www.api.SystemActivity; import com.juick.service.MessagesService; import com.juick.service.SubscriptionService; import com.juick.service.UserService; import com.juick.service.component.*; import com.juick.util.MessageUtils; import org.apache.commons.collections4.ListUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.annotation.Nonnull; import javax.inject.Inject; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; /** * @author Ugnich Anton */ public class ServerManager implements NotificationListener { private static final Logger logger = LoggerFactory.getLogger("Session"); @Inject private MessagesService messagesService; @Inject private SubscriptionService subscriptionService; @Inject private UserService userService; private final CopyOnWriteArrayList<EventSession> sessions = new CopyOnWriteArrayList<>(); @Inject private User serviceUser; private void onJuickPM(final User to, final Message jmsg) { messageEvent(jmsg, Arrays.asList(to, jmsg.getUser())); } private void onJuickMessagePost(final Message jmsg, List<User> subscribedUsers) { messageEvent(jmsg, subscribedUsers); messageEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE)); } private void onJuickMessageReply(final Message jmsg, final List<User> subscribedUsers) { messageEvent(jmsg, subscribedUsers); messageEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE)); } @Override public void processSystemEvent(@Nonnull SystemEvent systemEvent) { var activity = systemEvent.getActivity(); var from = activity.getFrom(); var message = activity.getMessage(); var subscribers = activity.getTo(); if (activity.getType().equals(SystemActivity.ActivityType.message)) { processMessage(from, message, subscribers); } else if (activity.getType().equals(SystemActivity.ActivityType.like)) { if (from.equals(serviceUser)) { processTop(message); } } } private void processMessage(User from, Message jmsg, List<User> subscribers) { List<User> subscribedUsers = ListUtils.union(subscribers, Collections.singletonList(jmsg.getUser())); if (jmsg.isService()) { logger.info("Message read event from {} for {}", from.getUid(), jmsg.getMid()); readEvent(jmsg, Collections.singletonList(serviceUser)); return; } if (MessageUtils.isPM(jmsg)) { onJuickPM(jmsg.getTo(), jmsg); } else if (!MessageUtils.isReply(jmsg)) { onJuickMessagePost(jmsg, subscribedUsers); } else { // to get quote and attachment Message op = messagesService.getMessage(jmsg.getMid()).orElseThrow(IllegalStateException::new); subscriptionService.getUsersSubscribedToComments(op, jmsg, true).stream() .filter(u -> userService.isReplyToBL(u, jmsg)) .forEach(b -> messagesService.setLastReadComment(b, jmsg.getMid(), jmsg.getRid())); onJuickMessageReply(jmsg, subscribedUsers); } messageEvent(jmsg, Collections.singletonList(serviceUser)); } @Override public void processPingEvent(PingEvent pingEvent) { } private void processTop(Message msg) { User topUser = msg.getUser(); topEvent(msg, Arrays.asList(topUser, serviceUser)); } private void topEvent(Message msg, List<User> subscribers){ sendSseEvent(msg, "top", subscribers); } private void readEvent(Message msg, List<User> subscribers){ sendSseEvent(msg, "read", subscribers); } private void messageEvent(Message msg, List<User> subscribers) { sendSseEvent(msg, "msg", subscribers); } private void sendSseEvent(Message msg, String name, List<User> subscribers) { List<EventSession> deadEmitters = new ArrayList<>(); this.sessions.stream().filter(s -> subscribers.contains(s.user)).forEach(session -> { try { SseEmitter.SseEventBuilder builder = SseEmitter.event() .name(name) .data(msg); session.getEmitter().send(builder); } catch (Exception e) { deadEmitters.add(session); } }); this.sessions.removeAll(deadEmitters); } public static class EventSession { private final User user; private final SseEmitter emitter; public EventSession(User user, SseEmitter sseEmitter) { this.user = user; this.emitter = sseEmitter; } public User getUser() { return user; } public SseEmitter getEmitter() { return emitter; } } public CopyOnWriteArrayList<EventSession> getSessions() { return sessions; } @Scheduled(fixedRate = 30000) public void ping() { Message ping = new Message(); ping.setService(true); sendSseEvent(ping, "ping", getSessions().stream().map(s -> s.user) .distinct().toList()); } }