/*
* Juick
* Copyright (C) 2013, Ugnich Anton
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package com.juick.components;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.juick.ExternalToken;
import com.juick.User;
import com.juick.service.component.DisconnectedEvent;
import com.juick.service.component.MessageEvent;
import com.juick.service.component.MessageReadEvent;
import com.juick.util.MessageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PingMessage;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import javax.annotation.Nonnull;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* @author Ugnich Anton
*/
@Component
public class NotificationsManager extends TextWebSocketHandler {
private static Logger logger = LoggerFactory.getLogger(NotificationsManager.class);
@Inject
private RestTemplate rest;
@Inject
private ObjectMapper jsonMapper;
private final Set invalidGCMTokens = Collections.synchronizedSet(new HashSet<>());
private final Set invalidMPNSTokens = Collections.synchronizedSet(new HashSet<>());
private final Set invalidAPNSTokens = Collections.synchronizedSet(new HashSet<>());
@PostConstruct
public void init() {
closeFlag.set(false);
}
public void messageReceived(@Nonnull com.juick.Message jmsg) {
if (jmsg.isService()) {
serviceMessageReceived(jmsg);
return;
}
User pmTo = jmsg.getTo();
final List users = new ArrayList<>();
if (MessageUtils.isPM(jmsg)) {
users.addAll(rest.exchange(String.format("http://api.juick.com/notifications?uid=%d",
pmTo.getUid()),
HttpMethod.GET, null, new ParameterizedTypeReference>() {
}).getBody());
} else if (MessageUtils.isReply(jmsg)) {
users.addAll(rest.exchange(String.format("http://api.juick.com/notifications?uid=%d&mid=%d&rid=%d",
jmsg.getUser().getUid(), jmsg.getMid(), jmsg.getRid()),
HttpMethod.GET, null, new ParameterizedTypeReference>() {
}).getBody());
} else {
users.addAll(rest.exchange(String.format("http://api.juick.com/notifications?uid=%s&mid=%s",
jmsg.getUser().getUid(), jmsg.getMid()),
HttpMethod.GET, null, new ParameterizedTypeReference>() {
}).getBody());
}
applicationEventPublisher.publishEvent(new MessageEvent(this, jmsg, users));
}
private void serviceMessageReceived(@Nonnull com.juick.Message jmsg) {
logger.info("Message read event from {} for {}", jmsg.getUser().getName(), jmsg.getMid());
List users = rest.exchange(String.format("http://api.juick.com/notifications?uid=%d",
jmsg.getUser().getUid()),
HttpMethod.GET, null, new ParameterizedTypeReference>() {
}).getBody();
users.forEach(user -> {
applicationEventPublisher.publishEvent(new MessageReadEvent(this, user, jmsg));
});
}
public void addInvalidGCMToken(String token) {
synchronized (invalidGCMTokens) {
invalidGCMTokens.add(token);
}
}
public Set getInvalidGCMTokens() {
return invalidGCMTokens;
}
public void cleanupGCMTokens() {
logger.info("removed {} GCM tokens", invalidGCMTokens.size());
synchronized (invalidGCMTokens) {
invalidGCMTokens.clear();
}
}
public void addInvalidMPNSToken(String token) {
synchronized (invalidMPNSTokens) {
invalidMPNSTokens.add(token);
}
}
public Set getInvalidMPNSTokens() {
return invalidMPNSTokens;
}
public void cleanupMPNSTokens() {
logger.info("removed {} MPNS tokens", invalidMPNSTokens.size());
synchronized (invalidMPNSTokens) {
invalidMPNSTokens.clear();
}
}
public Set getInvalidAPNSTokens() {
return invalidAPNSTokens;
}
@Inject
private ApplicationEventPublisher applicationEventPublisher;
private ConcurrentWebSocketSessionDecorator session;
private final AtomicBoolean closeFlag = new AtomicBoolean(false);
@Override
public void afterConnectionEstablished(WebSocketSession session) {
if (!closeFlag.get()) {
logger.info("WebSocket connected");
this.session = new ConcurrentWebSocketSessionDecorator(session, 60000, 65535);
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
if (!closeFlag.get()) {
logger.info("WebSocket disconnected with code {}: {}", status.getCode(), status.getReason());
applicationEventPublisher.publishEvent(new DisconnectedEvent(this));
}
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage text) throws Exception {
if (!closeFlag.get() && this.session.getDelegate().equals(session)) {
com.juick.Message jmsg = jsonMapper.readValue(text.asBytes(), com.juick.Message.class);
if (logger.isInfoEnabled()) // prevent writeValueAsString execution if logger disabled
logger.info("got jmsg: {}", jsonMapper.writeValueAsString(jmsg));
messageReceived(jmsg);
}
}
@Scheduled(fixedRate = 30000, initialDelay = 30000)
public void ping() throws IOException {
if (!closeFlag.get()) {
if (session != null && session.isOpen()) {
logger.debug("Sending WebSocket ping");
session.sendMessage(new PingMessage());
} else {
applicationEventPublisher.publishEvent(new DisconnectedEvent(this));
}
}
}
@PreDestroy
public void close() {
closeFlag.set(true);
}
@Scheduled(fixedRate = 600000)
public void cleanupTokens() {
if (!closeFlag.get()) {
logger.debug("initializing GCM tokens cleanup: {} tokens", getInvalidGCMTokens().size());
deleteTokens("gcm", new ArrayList<>(getInvalidGCMTokens()));
cleanupGCMTokens();
logger.debug("initializing MPNS tokens cleanup: {} tokens", getInvalidMPNSTokens().size());
deleteTokens("mpns", new ArrayList<>(getInvalidMPNSTokens()));
cleanupMPNSTokens();
logger.debug("initializing APNS tokens cleanup: {} tokens", getInvalidAPNSTokens().size());
deleteTokens("apns", new ArrayList<>(getInvalidAPNSTokens()));
cleanupMPNSTokens();
}
}
private void deleteTokens(String type, List devices) {
if (devices.size() > 0) {
List list = devices.stream()
.map(d -> new ExternalToken(null, type, d, null)).collect(Collectors.toList());
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
rest.exchange("http://api.juick.com/notifications",
HttpMethod.DELETE, new HttpEntity<>(list, headers), new ParameterizedTypeReference() {
});
}
}
}