/* * Juick * Copyright (C) 2013, Ugnich Anton * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package com.juick.components; import com.fasterxml.jackson.databind.ObjectMapper; import com.juick.ExternalToken; import com.juick.User; import com.juick.service.component.DisconnectedEvent; import com.juick.service.component.MessageEvent; import com.juick.service.component.MessageReadEvent; import com.juick.util.MessageUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.PingMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; import org.springframework.web.socket.handler.TextWebSocketHandler; import javax.annotation.Nonnull; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.inject.Inject; import java.io.IOException; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** * @author Ugnich Anton */ @Component public class NotificationsManager extends TextWebSocketHandler { private static Logger logger = LoggerFactory.getLogger(NotificationsManager.class); @Inject private RestTemplate rest; @Inject private ObjectMapper jsonMapper; private final Set invalidGCMTokens = Collections.synchronizedSet(new HashSet<>()); private final Set invalidMPNSTokens = Collections.synchronizedSet(new HashSet<>()); private final Set invalidAPNSTokens = Collections.synchronizedSet(new HashSet<>()); @PostConstruct public void init() { closeFlag.set(false); } public void messageReceived(@Nonnull com.juick.Message jmsg) { if (jmsg.isService()) { serviceMessageReceived(jmsg); return; } User pmTo = jmsg.getTo(); final List users = new ArrayList<>(); if (MessageUtils.isPM(jmsg)) { users.addAll(rest.exchange(String.format("http://api.juick.com/notifications?uid=%d", pmTo.getUid()), HttpMethod.GET, null, new ParameterizedTypeReference>() { }).getBody()); } else if (MessageUtils.isReply(jmsg)) { users.addAll(rest.exchange(String.format("http://api.juick.com/notifications?uid=%d&mid=%d&rid=%d", jmsg.getUser().getUid(), jmsg.getMid(), jmsg.getRid()), HttpMethod.GET, null, new ParameterizedTypeReference>() { }).getBody()); } else { users.addAll(rest.exchange(String.format("http://api.juick.com/notifications?uid=%s&mid=%s", jmsg.getUser().getUid(), jmsg.getMid()), HttpMethod.GET, null, new ParameterizedTypeReference>() { }).getBody()); } applicationEventPublisher.publishEvent(new MessageEvent(this, jmsg, users)); } private void serviceMessageReceived(@Nonnull com.juick.Message jmsg) { logger.info("Message read event from {} for {}", jmsg.getUser().getName(), jmsg.getMid()); List users = rest.exchange(String.format("http://api.juick.com/notifications?uid=%d", jmsg.getUser().getUid()), HttpMethod.GET, null, new ParameterizedTypeReference>() { }).getBody(); users.forEach(user -> { applicationEventPublisher.publishEvent(new MessageReadEvent(this, user, jmsg)); }); } public void addInvalidGCMToken(String token) { synchronized (invalidGCMTokens) { invalidGCMTokens.add(token); } } public Set getInvalidGCMTokens() { return invalidGCMTokens; } public void cleanupGCMTokens() { logger.info("removed {} GCM tokens", invalidGCMTokens.size()); synchronized (invalidGCMTokens) { invalidGCMTokens.clear(); } } public void addInvalidMPNSToken(String token) { synchronized (invalidMPNSTokens) { invalidMPNSTokens.add(token); } } public Set getInvalidMPNSTokens() { return invalidMPNSTokens; } public void cleanupMPNSTokens() { logger.info("removed {} MPNS tokens", invalidMPNSTokens.size()); synchronized (invalidMPNSTokens) { invalidMPNSTokens.clear(); } } public Set getInvalidAPNSTokens() { return invalidAPNSTokens; } @Inject private ApplicationEventPublisher applicationEventPublisher; private ConcurrentWebSocketSessionDecorator session; private final AtomicBoolean closeFlag = new AtomicBoolean(false); @Override public void afterConnectionEstablished(WebSocketSession session) { if (!closeFlag.get()) { logger.info("WebSocket connected"); this.session = new ConcurrentWebSocketSessionDecorator(session, 60000, 65535); } } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { if (!closeFlag.get()) { logger.info("WebSocket disconnected with code {}: {}", status.getCode(), status.getReason()); applicationEventPublisher.publishEvent(new DisconnectedEvent(this)); } } @Override protected void handleTextMessage(WebSocketSession session, TextMessage text) throws Exception { if (!closeFlag.get() && this.session.getDelegate().equals(session)) { com.juick.Message jmsg = jsonMapper.readValue(text.asBytes(), com.juick.Message.class); if (logger.isInfoEnabled()) // prevent writeValueAsString execution if logger disabled logger.info("got jmsg: {}", jsonMapper.writeValueAsString(jmsg)); messageReceived(jmsg); } } @Scheduled(fixedRate = 30000, initialDelay = 30000) public void ping() throws IOException { if (!closeFlag.get()) { if (session != null && session.isOpen()) { logger.debug("Sending WebSocket ping"); session.sendMessage(new PingMessage()); } else { applicationEventPublisher.publishEvent(new DisconnectedEvent(this)); } } } @PreDestroy public void close() { closeFlag.set(true); } @Scheduled(fixedRate = 600000) public void cleanupTokens() { if (!closeFlag.get()) { logger.debug("initializing GCM tokens cleanup: {} tokens", getInvalidGCMTokens().size()); deleteTokens("gcm", new ArrayList<>(getInvalidGCMTokens())); cleanupGCMTokens(); logger.debug("initializing MPNS tokens cleanup: {} tokens", getInvalidMPNSTokens().size()); deleteTokens("mpns", new ArrayList<>(getInvalidMPNSTokens())); cleanupMPNSTokens(); logger.debug("initializing APNS tokens cleanup: {} tokens", getInvalidAPNSTokens().size()); deleteTokens("apns", new ArrayList<>(getInvalidAPNSTokens())); cleanupMPNSTokens(); } } private void deleteTokens(String type, List devices) { if (devices.size() > 0) { List list = devices.stream() .map(d -> new ExternalToken(null, type, d, null)).collect(Collectors.toList()); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON_UTF8); rest.exchange("http://api.juick.com/notifications", HttpMethod.DELETE, new HttpEntity<>(list, headers), new ParameterizedTypeReference() { }); } } }