diff options
Diffstat (limited to 'juick-notifications/src/main/java/com/juick/components/Notifications.java')
-rw-r--r-- | juick-notifications/src/main/java/com/juick/components/Notifications.java | 88 |
1 files changed, 80 insertions, 8 deletions
diff --git a/juick-notifications/src/main/java/com/juick/components/Notifications.java b/juick-notifications/src/main/java/com/juick/components/Notifications.java index ce89de32..439c4bd7 100644 --- a/juick-notifications/src/main/java/com/juick/components/Notifications.java +++ b/juick-notifications/src/main/java/com/juick/components/Notifications.java @@ -22,6 +22,7 @@ import com.google.android.gcm.server.*; import com.juick.ExternalToken; import com.juick.User; import com.juick.formatters.PlainTextFormatter; +import com.juick.server.component.DisconnectedEvent; import com.juick.util.MessageUtils; import com.turo.pushy.apns.ApnsClient; import com.turo.pushy.apns.PushNotificationResponse; @@ -32,10 +33,20 @@ import org.apache.commons.text.StringEscapeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpEntity; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.PingMessage; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; import javax.annotation.Nonnull; import javax.annotation.PostConstruct; @@ -45,13 +56,14 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** * @author Ugnich Anton */ @Component -public class Notifications implements NotificationClientListener { +public class Notifications extends TextWebSocketHandler implements NotificationClientListener { private static Logger logger = LoggerFactory.getLogger(Notifications.class); @Inject @@ -76,6 +88,7 @@ public class Notifications implements NotificationClientListener { @PostConstruct public void init() throws IOException { mpnsClient.setListener(this); + closeFlag.set(false); } public void messageReceived(@Nonnull com.juick.Message jmsg) { User pmTo = jmsg.getTo(); @@ -202,13 +215,6 @@ public class Notifications implements NotificationClientListener { } } - @PreDestroy - public void close() throws Exception { - apns.close(); - - logger.info("ExternalComponent on notifications destroyed"); - } - public void addInvalidGCMToken(String token) { synchronized (invalidGCMTokens) { invalidGCMTokens.add(token); @@ -252,4 +258,70 @@ public class Notifications implements NotificationClientListener { public Set<String> getInvalidAPNSTokens() { return invalidAPNSTokens; } + @Inject + private ApplicationEventPublisher applicationEventPublisher; + + private WebSocketSession session; + private final AtomicBoolean closeFlag = new AtomicBoolean(false); + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + logger.info("WebSocket connected"); + this.session = session; + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + logger.info("WebSocket disconnected with code {}: {}", status.getCode(), status.getReason()); + applicationEventPublisher.publishEvent(new DisconnectedEvent(this)); + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage text) throws Exception { + com.juick.Message jmsg = jsonMapper.readValue(text.asBytes(), com.juick.Message.class); + + if (logger.isInfoEnabled()) // prevent writeValueAsString execution if logger disabled + logger.info("got jmsg: {}", jsonMapper.writeValueAsString(jmsg)); + if (!closeFlag.get()) { + messageReceived(jmsg); + } + } + + @Scheduled(fixedRate = 30000, initialDelay = 30000) + public void ping() throws IOException { + if (session != null && session.isOpen()) { + logger.debug("Sending WebSocket ping"); + session.sendMessage(new PingMessage()); + } else if (!closeFlag.get()) { + applicationEventPublisher.publishEvent(new DisconnectedEvent(this)); + } + } + @PreDestroy + public void close() { + apns.close(); + closeFlag.set(true); + } + @Scheduled(fixedRate = 600000) + public void cleanupTokens() { + logger.debug("initializing GCM tokens cleanup: {} tokens", getInvalidGCMTokens().size()); + deleteTokens("gcm", new ArrayList<>(getInvalidGCMTokens())); + cleanupGCMTokens(); + logger.debug("initializing MPNS tokens cleanup: {} tokens", getInvalidMPNSTokens().size()); + deleteTokens("mpns", new ArrayList<>(getInvalidMPNSTokens())); + cleanupMPNSTokens(); + logger.debug("initializing APNS tokens cleanup: {} tokens", getInvalidAPNSTokens().size()); + deleteTokens("apns", new ArrayList<>(getInvalidAPNSTokens())); + cleanupMPNSTokens(); + } + private void deleteTokens(String type, List<String> devices) { + if (devices.size() > 0) { + List<ExternalToken> list = devices.stream() + .map(d -> new ExternalToken(null, type, d, null)).collect(Collectors.toList()); + HttpHeaders headers = new HttpHeaders(); + headers.setContentType(MediaType.APPLICATION_JSON_UTF8); + rest.exchange("http://api.juick.com/notifications", + HttpMethod.DELETE, new HttpEntity<>(list, headers), new ParameterizedTypeReference<Void>() { + }); + } + } } |