aboutsummaryrefslogblamecommitdiff
path: root/src/main/java/com/juick/ServerManager.java
blob: a33d7d9a9c4ed8a0cd0fb7247aa9e0c8c3fbbbba (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  
                                 












                                                                           
                  
 
                               
                                     
                                        
                                             
                                     
                                     
                                   
                                                 
                               
                                                           
                                                                        
 
                                
                           
                           
                        
                             
                      
                                                 
                                   
   
                       
   
                                                            
                                                                            
 
           
                                            
           

                                                    
                                                                                             
           
                             
                                                               
                                                              
     
                                                                                     
                                                                              
     
                                                                                            
                                                                              
     
             










                                                                                 
     
                                                                                  
                                                                                                             
                               
                                                                                           
                                                                    

                                      
                                          
                                                 
                                                      
                                          
                                                                                                           


                                                                                                       
         
                                                                   





                                                       

                                                           
     
                                                               

                                              
                                                                
                                               
 
                                                                    
                                              
     
 
                                                                                 
                                                            
                                                                                             


                                                                       



                                              

                                      
                                         
















                                                               



                                                                          
                                      
     
 
/*
 * Copyright (C) 2008-2020, Juick
 *
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as
 * published by the Free Software Foundation, either version 3 of the
 * License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU Affero General Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
package com.juick;

import com.juick.model.Message;
import com.juick.model.User;
import com.juick.model.AnonymousUser;
import com.juick.www.api.SystemActivity;
import com.juick.service.MessagesService;
import com.juick.service.SubscriptionService;
import com.juick.service.UserService;
import com.juick.service.component.*;
import com.juick.util.MessageUtils;
import org.apache.commons.collections4.ListUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.annotation.Nonnull;
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;

/**
 * @author Ugnich Anton
 */
public class ServerManager implements NotificationListener {
    private static final Logger logger = LoggerFactory.getLogger("Session");

    @Inject
    private MessagesService messagesService;
    @Inject
    private SubscriptionService subscriptionService;
    @Inject
    private UserService userService;
    private final CopyOnWriteArrayList<EventSession> sessions = new CopyOnWriteArrayList<>();
    @Inject
    private User serviceUser;

    private void onJuickPM(final User to, final Message jmsg) {
        messageEvent(jmsg, Arrays.asList(to, jmsg.getUser()));
    }

    private void onJuickMessagePost(final Message jmsg, List<User> subscribedUsers) {
        messageEvent(jmsg, subscribedUsers);
        messageEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE));
    }

    private void onJuickMessageReply(final Message jmsg, final List<User> subscribedUsers) {
        messageEvent(jmsg, subscribedUsers);
        messageEvent(jmsg, Collections.singletonList(AnonymousUser.INSTANCE));
    }

    @Override
    public void processSystemEvent(@Nonnull SystemEvent systemEvent) {
        var activity = systemEvent.getActivity();
        var from = activity.getFrom();
        var message = activity.getMessage();
        var subscribers = activity.getTo();
        if (activity.getType().equals(SystemActivity.ActivityType.message)) {
            processMessage(from, message, subscribers);
        } else if (activity.getType().equals(SystemActivity.ActivityType.like)) {
            if (from.equals(serviceUser)) {
                processTop(message);
            }
        }
    }
    private void processMessage(User from, Message jmsg, List<User> subscribers) {
        List<User> subscribedUsers = ListUtils.union(subscribers, Collections.singletonList(jmsg.getUser()));
        if (jmsg.isService()) {
            logger.info("Message read event from {} for {}", from.getUid(), jmsg.getMid());
            readEvent(jmsg, Collections.singletonList(serviceUser));
            return;
        }
        if (MessageUtils.isPM(jmsg)) {
            onJuickPM(jmsg.getTo(), jmsg);
        } else if (!MessageUtils.isReply(jmsg)) {
            onJuickMessagePost(jmsg, subscribedUsers);
        } else {
            // to get quote and attachment
            Message op = messagesService.getMessage(jmsg.getMid()).orElseThrow(IllegalStateException::new);
            subscriptionService.getUsersSubscribedToComments(op, jmsg, true).stream()
                    .filter(u -> userService.isReplyToBL(u, jmsg))
                    .forEach(b -> messagesService.setLastReadComment(b, jmsg.getMid(), jmsg.getRid()));
            onJuickMessageReply(jmsg, subscribedUsers);
        }
        messageEvent(jmsg, Collections.singletonList(serviceUser));
    }

    @Override
    public void processPingEvent(PingEvent pingEvent) {

    }

    private void processTop(Message msg) {
        User topUser = msg.getUser();
        topEvent(msg, Arrays.asList(topUser, serviceUser));
    }
    private void topEvent(Message msg, List<User> subscribers){
        sendSseEvent(msg, "top", subscribers);
    }

    private void readEvent(Message msg, List<User> subscribers){
        sendSseEvent(msg, "read", subscribers);
    }

    private void messageEvent(Message msg, List<User> subscribers) {
        sendSseEvent(msg, "msg", subscribers);
    }

    private void sendSseEvent(Message msg, String name, List<User> subscribers) {
        List<EventSession> deadEmitters = new ArrayList<>();
        this.sessions.stream().filter(s -> subscribers.contains(s.user)).forEach(session -> {
            try {
                SseEmitter.SseEventBuilder builder = SseEmitter.event()
                        .name(name)
                        .data(msg);
                session.getEmitter().send(builder);
            } catch (Exception e) {
                deadEmitters.add(session);
            }
        });
        this.sessions.removeAll(deadEmitters);
    }

    public static class EventSession {
        private final User user;
        private final SseEmitter emitter;

        public EventSession(User user, SseEmitter sseEmitter) {
            this.user = user;
            this.emitter = sseEmitter;
        }

        public User getUser() {
            return user;
        }

        public SseEmitter getEmitter() {
            return emitter;
        }
    }

    public CopyOnWriteArrayList<EventSession> getSessions() {
        return sessions;
    }
    @Scheduled(fixedRate = 30000)
    public void ping() {
        Message ping = new Message();
        ping.setService(true);
        sendSseEvent(ping, "ping", getSessions().stream().map(s -> s.user)
                .distinct().toList());
    }
}