/* * Juick * Copyright (C) 2013, Ugnich Anton * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as * published by the Free Software Foundation, either version 3 of the * License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ package com.juick.components; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.android.gcm.server.*; import com.juick.ExternalToken; import com.juick.User; import com.juick.formatters.PlainTextFormatter; import com.juick.server.component.DisconnectedEvent; import com.juick.util.MessageUtils; import com.turo.pushy.apns.ApnsClient; import com.turo.pushy.apns.PushNotificationResponse; import com.turo.pushy.apns.util.ApnsPayloadBuilder; import com.turo.pushy.apns.util.SimpleApnsPushNotification; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringEscapeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.PingMessage; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator; import org.springframework.web.socket.handler.TextWebSocketHandler; import javax.annotation.Nonnull; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.inject.Inject; import java.io.IOException; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** * @author Ugnich Anton */ @Component public class Notifications extends TextWebSocketHandler implements NotificationClientListener { private static Logger logger = LoggerFactory.getLogger(Notifications.class); @Inject private RestTemplate rest; @Inject private Sender GCMSender; @Inject private ObjectMapper jsonMapper; private final Set invalidGCMTokens = Collections.synchronizedSet(new HashSet<>()); private final Set invalidMPNSTokens = Collections.synchronizedSet(new HashSet<>()); private final Set invalidAPNSTokens = Collections.synchronizedSet(new HashSet<>()); @Inject private MPNSClient mpnsClient; @Inject private ApnsClient apns; @Value("${ios_app_id:}") private String topic; @PostConstruct public void init() { mpnsClient.setListener(this); closeFlag.set(false); } public void messageReceived(@Nonnull com.juick.Message jmsg) { if (jmsg.isService()) { serviceMessageReceived(jmsg); return; } User pmTo = jmsg.getTo(); final List users = new ArrayList<>(); if (MessageUtils.isPM(jmsg)) { users.addAll(rest.exchange(String.format("http://api.juick.com/notifications?uid=%d", pmTo.getUid()), HttpMethod.GET, null, new ParameterizedTypeReference>() { }).getBody()); } else if (MessageUtils.isReply(jmsg)) { users.addAll(rest.exchange(String.format("http://api.juick.com/notifications?uid=%d&mid=%d&rid=%d", jmsg.getUser().getUid(), jmsg.getMid(), jmsg.getRid()), HttpMethod.GET, null, new ParameterizedTypeReference>() { }).getBody()); } else { users.addAll(rest.exchange(String.format("http://api.juick.com/notifications?uid=%s&mid=%s", jmsg.getUser().getUid(), jmsg.getMid()), HttpMethod.GET, null, new ParameterizedTypeReference>() { }).getBody()); } // GCM List regids = users.stream().flatMap(u -> u.getTokens().stream()).filter(d -> d.getType().equals("gcm")) .map(ExternalToken::getToken).collect(Collectors.toList()); if (!regids.isEmpty()) { try { String json = jsonMapper.writeValueAsString(jmsg); logger.info(json); Message message = new Message.Builder().addData("message", json).build(); MulticastResult result = GCMSender.send(message, regids, 3); List results = result.getResults(); for (int i = 0; i < results.size(); i++) { Result currentResult = results.get(i); logger.info("RES {}: {}", i, currentResult); List errorCodes = Arrays.asList(Constants.ERROR_MISMATCH_SENDER_ID, Constants.ERROR_NOT_REGISTERED); if (errorCodes.contains(currentResult.getErrorCodeName())) { // assuming results are in order of regids // http://stackoverflow.com/a/11594531/1097384 String currentId = regids.get(i); logger.info("{} is scheduled to remove", currentId); addInvalidGCMToken(currentId); } } } catch (IOException ex) { logger.error(ex.getMessage(), ex); } catch (IllegalArgumentException err) { logger.warn("Android: Invalid API Key", err); } } else { logger.info("GMS: no recipients"); } /*** WinPhone ***/ List urls = users.stream().flatMap(u -> u.getTokens().stream()).filter(d -> d.getType().equals("mpns")) .map(ExternalToken::getToken).collect(Collectors.toList()); if (urls.isEmpty()) { logger.info("WNS: no recipients"); } else { try { String text1 = "@" + jmsg.getUser().getName(); if (!jmsg.getTags().isEmpty()) { text1 += ":" + StringEscapeUtils.escapeXml11(MessageUtils.getTagsString(jmsg)); } String text2 = StringEscapeUtils.escapeXml11(StringUtils.defaultString(jmsg.getText())); String xml = "" + "" + "" + "" + "" + "" + text1 + "" + "" + text2 + "" + "" + "" + "" + "" + "" + ""; logger.trace(xml); for (String url : urls) { logger.info("WNS: {}", url); mpnsClient.sendNotification(url, xml); } } catch (IOException | IllegalStateException ex) { logger.error("WNS: ", ex); } } /*** iOS ***/ ApnsPayloadBuilder apnsPayloadBuilder = new ApnsPayloadBuilder(); apnsPayloadBuilder.addCustomProperty("mid", jmsg.getMid()); apnsPayloadBuilder.addCustomProperty("uname", jmsg.getUser().getName()); String post = PlainTextFormatter.formatPost(jmsg); String[] parts = post.split("\n", 2); apnsPayloadBuilder.setAlertTitle(parts[0]).setAlertBody(parts[1]); users.forEach( user -> { apnsPayloadBuilder.setBadgeNumber(user.getUnreadCount()); String payload = apnsPayloadBuilder.buildWithDefaultMaximumLength(); user.getTokens().stream().filter(t -> t.getType().equals("apns")) .map(ExternalToken::getToken).forEach(token -> processAPNSResponse(token, apns.sendNotification( new SimpleApnsPushNotification(token, topic, payload)))); }); } private void serviceMessageReceived(@Nonnull com.juick.Message jmsg) { logger.info("Message read event from {} for {}", jmsg.getUser().getName(), jmsg.getMid()); // iOS List users = rest.exchange(String.format("http://api.juick.com/notifications?uid=%d", jmsg.getUser().getUid()), HttpMethod.GET, null, new ParameterizedTypeReference>() { }).getBody(); ApnsPayloadBuilder apnsPayloadBuilder = new ApnsPayloadBuilder(); users.forEach(user -> { apnsPayloadBuilder.setBadgeNumber(user.getUnreadCount()); String payload = apnsPayloadBuilder.buildWithDefaultMaximumLength(); user.getTokens().stream().filter(t -> t.getType().equals("apns")) .map(ExternalToken::getToken).forEach(token -> processAPNSResponse(token, apns.sendNotification( new SimpleApnsPushNotification(token, topic, payload)))); }); } private void processAPNSResponse(String token, Future> notification) { try { final PushNotificationResponse pushNotificationResponse = notification.get(); if (pushNotificationResponse.isAccepted()) { logger.info("APNS accepted: {}", token); } else { String reason = pushNotificationResponse.getRejectionReason(); logger.info("APNS rejected: {}", reason); if (reason.equals("BadDeviceToken")) { invalidAPNSTokens.add(token); } } } catch (final ExecutionException | InterruptedException ex) { logger.info("APNS exception", ex); } } public void addInvalidGCMToken(String token) { synchronized (invalidGCMTokens) { invalidGCMTokens.add(token); } } public Set getInvalidGCMTokens() { return invalidGCMTokens; } public void cleanupGCMTokens() { logger.info("removed {} GCM tokens", invalidGCMTokens.size()); synchronized (invalidGCMTokens) { invalidGCMTokens.clear(); } } public void addInvalidMPNSToken(String token) { synchronized (invalidMPNSTokens) { invalidMPNSTokens.add(token); } } public Set getInvalidMPNSTokens() { return invalidMPNSTokens; } public void cleanupMPNSTokens() { logger.info("removed {} MPNS tokens", invalidMPNSTokens.size()); synchronized (invalidMPNSTokens) { invalidMPNSTokens.clear(); } } @Override public void invalidToken(String type, String token) { switch (type) { case "mpns": addInvalidMPNSToken(token); break; default: break; } } public Set getInvalidAPNSTokens() { return invalidAPNSTokens; } @Inject private ApplicationEventPublisher applicationEventPublisher; private ConcurrentWebSocketSessionDecorator session; private final AtomicBoolean closeFlag = new AtomicBoolean(false); @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { logger.info("WebSocket connected"); this.session = new ConcurrentWebSocketSessionDecorator(session, 60000, 65535); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { logger.info("WebSocket disconnected with code {}: {}", status.getCode(), status.getReason()); applicationEventPublisher.publishEvent(new DisconnectedEvent(this)); } @Override protected void handleTextMessage(WebSocketSession session, TextMessage text) throws Exception { com.juick.Message jmsg = jsonMapper.readValue(text.asBytes(), com.juick.Message.class); if (logger.isInfoEnabled()) // prevent writeValueAsString execution if logger disabled logger.info("got jmsg: {}", jsonMapper.writeValueAsString(jmsg)); if (!closeFlag.get()) { messageReceived(jmsg); } } @Scheduled(fixedRate = 30000, initialDelay = 30000) public void ping() throws IOException { if (session != null && session.isOpen()) { logger.debug("Sending WebSocket ping"); session.sendMessage(new PingMessage()); } else if (!closeFlag.get()) { applicationEventPublisher.publishEvent(new DisconnectedEvent(this)); } } @PreDestroy public void close() { apns.close(); closeFlag.set(true); } @Scheduled(fixedRate = 600000) public void cleanupTokens() { logger.debug("initializing GCM tokens cleanup: {} tokens", getInvalidGCMTokens().size()); deleteTokens("gcm", new ArrayList<>(getInvalidGCMTokens())); cleanupGCMTokens(); logger.debug("initializing MPNS tokens cleanup: {} tokens", getInvalidMPNSTokens().size()); deleteTokens("mpns", new ArrayList<>(getInvalidMPNSTokens())); cleanupMPNSTokens(); logger.debug("initializing APNS tokens cleanup: {} tokens", getInvalidAPNSTokens().size()); deleteTokens("apns", new ArrayList<>(getInvalidAPNSTokens())); cleanupMPNSTokens(); } private void deleteTokens(String type, List devices) { if (devices.size() > 0) { List list = devices.stream() .map(d -> new ExternalToken(null, type, d, null)).collect(Collectors.toList()); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON_UTF8); rest.exchange("http://api.juick.com/notifications", HttpMethod.DELETE, new HttpEntity<>(list, headers), new ParameterizedTypeReference() { }); } } }