diff options
author | Vitaly Takmazov | 2017-11-02 16:02:17 +0300 |
---|---|---|
committer | Vitaly Takmazov | 2017-11-02 16:12:40 +0300 |
commit | ee20020d9c576a48173c13b68b03d349c0ec3e47 (patch) | |
tree | 7c59dcac74cde4e5b819a245dc0fbbb315020f10 | |
parent | 61c6a3fe338a5cbbef55081f8ac26011c053125d (diff) |
server-web: base websocket component with autoreconnect
13 files changed, 165 insertions, 90 deletions
diff --git a/juick-api/build.gradle b/juick-api/build.gradle index 608d4afb..a28060b7 100644 --- a/juick-api/build.gradle +++ b/juick-api/build.gradle @@ -14,7 +14,6 @@ dependencies { compile 'io.springfox:springfox-swagger2:2.7.0' compile 'io.springfox:springfox-swagger-ui:2.7.0' - compile "org.springframework:spring-websocket:${rootProject.springFrameworkVersion}" compile 'com.github.pengrad:java-telegram-bot-api:3.4.0' compile 'org.apache.commons:commons-email:1.5' diff --git a/juick-api/src/main/java/com/juick/api/ApiServer.java b/juick-api/src/main/java/com/juick/api/ApiServer.java index f4dd4d99..a8e2db85 100644 --- a/juick-api/src/main/java/com/juick/api/ApiServer.java +++ b/juick-api/src/main/java/com/juick/api/ApiServer.java @@ -17,13 +17,10 @@ package com.juick.api; import com.juick.User; -import com.juick.server.component.DisconnectedEvent; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.ApplicationListener; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.web.socket.client.WebSocketConnectionManager; import rocks.xmpp.addr.Jid; import rocks.xmpp.core.XmppException; @@ -34,7 +31,6 @@ import rocks.xmpp.core.stanza.model.Message; import rocks.xmpp.extensions.component.accept.ExternalComponent; import rocks.xmpp.extensions.oob.model.x.OobX; -import javax.annotation.Nonnull; import javax.annotation.PostConstruct; import javax.inject.Inject; import java.net.URI; @@ -43,7 +39,7 @@ import java.net.URISyntaxException; /** * @author Ugnich Anton */ -public class ApiServer implements ApplicationListener<DisconnectedEvent>, AutoCloseable { +public class ApiServer implements AutoCloseable { private static Logger logger = LoggerFactory.getLogger(ApiServer.class); private ExternalComponent xmpp; @@ -58,8 +54,6 @@ public class ApiServer implements ApplicationListener<DisconnectedEvent>, AutoCl private int xmppPort; @Value("${xmpp_disabled:false}") private boolean isXmppDisabled; - @Inject - private WebSocketConnectionManager webSocketConnectionManager; @PostConstruct public void init() { @@ -114,11 +108,4 @@ public class ApiServer implements ApplicationListener<DisconnectedEvent>, AutoCl logger.warn("attachment error", e1); } } - - @Override - public void onApplicationEvent(@Nonnull DisconnectedEvent event) { - logger.info("retrying..."); - webSocketConnectionManager.stop(); - webSocketConnectionManager.start(); - } } diff --git a/juick-api/src/main/java/com/juick/api/TelegramBotManager.java b/juick-api/src/main/java/com/juick/api/TelegramBotManager.java index 0516af5f..1611c23c 100644 --- a/juick-api/src/main/java/com/juick/api/TelegramBotManager.java +++ b/juick-api/src/main/java/com/juick/api/TelegramBotManager.java @@ -19,6 +19,7 @@ package com.juick.api; import com.fasterxml.jackson.databind.ObjectMapper; import com.juick.server.component.DisconnectedEvent; +import com.juick.server.component.MessageEvent; import com.juick.service.MessagesService; import com.juick.service.SubscriptionService; import com.juick.service.TelegramService; @@ -35,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationListener; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.web.socket.CloseStatus; @@ -55,7 +57,7 @@ import static com.juick.formatters.PlainTextFormatter.formatUrl; * Created by vt on 12/05/16. */ @Component -public class TelegramBotManager extends TextWebSocketHandler { +public class TelegramBotManager implements ApplicationListener<MessageEvent> { private static final Logger logger = LoggerFactory.getLogger(TelegramBotManager.class); private TelegramBot bot; @@ -68,12 +70,6 @@ public class TelegramBotManager extends TextWebSocketHandler { private MessagesService messagesService; @Inject private SubscriptionService subscriptionService; - @Inject - private ObjectMapper jsonMapper; - @Inject - private ApplicationEventPublisher applicationEventPublisher; - - private WebSocketSession session; public static final String MSG_LINK = "🔗"; @@ -94,35 +90,11 @@ public class TelegramBotManager extends TextWebSocketHandler { } } - @Override - public void afterConnectionEstablished(WebSocketSession session) throws Exception { - logger.info("WebSocket connected"); - this.session = session; - } - @Override - public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { - logger.info("WebSocket disconnected with code {}: {}", status.getCode(), status.getReason()); - applicationEventPublisher.publishEvent(new DisconnectedEvent(this)); - } - - @Scheduled(fixedRate = 30000, initialDelay = 30000) - public void ping() throws IOException { - if (session != null && session.isOpen()) { - logger.debug("Sending WebSocket ping"); - session.sendMessage(new PingMessage()); - } else { - applicationEventPublisher.publishEvent(new DisconnectedEvent(this)); - } - } @Override - protected void handleTextMessage(WebSocketSession session, TextMessage text) throws Exception { - com.juick.Message jmsg = jsonMapper.readValue(text.asBytes(), com.juick.Message.class); - - if (logger.isInfoEnabled()) // prevent writeValueAsString execution if logger disabled - logger.info("got jmsg: {}", jsonMapper.writeValueAsString(jmsg)); - + public void onApplicationEvent(MessageEvent event) { + com.juick.Message jmsg = event.getMessage(); String msgUrl = formatUrl(jmsg); if (jmsg.getRid() == 0) { String msg = String.format("[%s](%s) %s", MSG_LINK, msgUrl, formatPost(jmsg)); diff --git a/juick-api/src/main/java/com/juick/api/configuration/ApiAppConfiguration.java b/juick-api/src/main/java/com/juick/api/configuration/ApiAppConfiguration.java index f51528bf..69a2438d 100644 --- a/juick-api/src/main/java/com/juick/api/configuration/ApiAppConfiguration.java +++ b/juick-api/src/main/java/com/juick/api/configuration/ApiAppConfiguration.java @@ -19,6 +19,8 @@ package com.juick.api.configuration; import com.juick.api.ApiServer; import com.juick.api.TelegramBotManager; +import com.juick.server.component.JuickServerComponent; +import com.juick.server.component.JuickServerReconnectManager; import com.juick.server.configuration.BaseWebConfiguration; import com.juick.service.ImagesService; import com.juick.service.ImagesServiceImpl; @@ -44,36 +46,24 @@ import java.util.Collections; * Created by aalexeev on 11/12/16. */ @Configuration -@EnableScheduling @EnableAsync @EnableWebMvc @EnableSwagger2 @PropertySource("classpath:juick.conf") @ComponentScan(basePackages = "com.juick.api") public class ApiAppConfiguration extends BaseWebConfiguration { - @Inject - TelegramBotManager telegramBotManager; - - @Value("${websocket_url:ws://localhost:8080/ws/}") - private String WS_URI; - @Bean - public WebSocketConnectionManager connectionManager() { - WebSocketConnectionManager manager = new WebSocketConnectionManager(client(), telegramBotManager, WS_URI); - manager.setAutoStartup(true); - return manager; + public JuickServerComponent juickServerComponent() { + return new JuickServerComponent(); } - @Bean - public StandardWebSocketClient client() { - return new StandardWebSocketClient(); + public JuickServerReconnectManager juickServerReconnectManager() { + return new JuickServerReconnectManager(); } - @Bean public ApiServer apiServer() { return new ApiServer(); } - @Bean public Docket api() { return new Docket(DocumentationType.SWAGGER_2) diff --git a/juick-api/src/main/java/com/juick/api/configuration/ApiInitializer.java b/juick-api/src/main/java/com/juick/api/configuration/ApiInitializer.java index 59c69926..3380df10 100644 --- a/juick-api/src/main/java/com/juick/api/configuration/ApiInitializer.java +++ b/juick-api/src/main/java/com/juick/api/configuration/ApiInitializer.java @@ -18,6 +18,7 @@ package com.juick.api.configuration; import com.juick.configuration.DataConfiguration; +import com.juick.server.configuration.JuickServerComponentConfiguration; import com.juick.server.configuration.StorageConfiguration; import org.apache.commons.codec.CharEncoding; import org.springframework.web.filter.CharacterEncodingFilter; @@ -35,7 +36,8 @@ public class ApiInitializer extends AbstractAnnotationConfigDispatcherServletIni return new Class<?>[]{ ApiSecurityConfig.class, DataConfiguration.class, - StorageConfiguration.class + StorageConfiguration.class, + JuickServerComponentConfiguration.class }; } diff --git a/juick-crosspost/src/main/java/com/juick/components/configuration/CrosspostAppConfiguration.java b/juick-crosspost/src/main/java/com/juick/components/configuration/CrosspostAppConfiguration.java index eae12e6c..76a909ad 100644 --- a/juick-crosspost/src/main/java/com/juick/components/configuration/CrosspostAppConfiguration.java +++ b/juick-crosspost/src/main/java/com/juick/components/configuration/CrosspostAppConfiguration.java @@ -17,6 +17,8 @@ package com.juick.components.configuration; +import com.juick.server.component.JuickServerComponent; +import com.juick.server.component.JuickServerReconnectManager; import com.juick.server.configuration.BaseWebConfiguration; import com.juick.service.Crosspost; import org.springframework.beans.factory.annotation.Value; @@ -29,8 +31,6 @@ import org.springframework.http.client.InterceptingClientHttpRequestFactory; import org.springframework.http.client.support.BasicAuthorizationInterceptor; import org.springframework.web.client.RestTemplate; import org.springframework.web.servlet.config.annotation.EnableWebMvc; -import org.springframework.web.socket.client.WebSocketConnectionManager; -import org.springframework.web.socket.client.standard.StandardWebSocketClient; import java.util.Collections; import java.util.List; @@ -47,7 +47,14 @@ public class CrosspostAppConfiguration extends BaseWebConfiguration { private String apiUser; @Value("${api_password:secret}") private String apiSecret; - + @Bean + public JuickServerComponent juickServerComponent() { + return new JuickServerComponent(); + } + @Bean + public JuickServerReconnectManager juickServerReconnectManager() { + return new JuickServerReconnectManager(); + } @Bean public RestTemplate rest() { RestTemplate rest = new RestTemplate(); @@ -57,22 +64,10 @@ public class CrosspostAppConfiguration extends BaseWebConfiguration { rest.setRequestFactory(new InterceptingClientHttpRequestFactory(rest.getRequestFactory(), interceptors)); return rest; } - private static final String WS_URI = "wss://ws.juick.com/"; @Bean public Crosspost crosspost() { return new Crosspost(); } - @Bean - public WebSocketConnectionManager connectionManager() { - WebSocketConnectionManager manager = new WebSocketConnectionManager(client(), crosspost(), WS_URI); - manager.setAutoStartup(true); - return manager; - } - - @Bean - public StandardWebSocketClient client() { - return new StandardWebSocketClient(); - } } diff --git a/juick-crosspost/src/main/java/com/juick/components/configuration/CrosspostInitializer.java b/juick-crosspost/src/main/java/com/juick/components/configuration/CrosspostInitializer.java index 6a95c1f0..88280480 100644 --- a/juick-crosspost/src/main/java/com/juick/components/configuration/CrosspostInitializer.java +++ b/juick-crosspost/src/main/java/com/juick/components/configuration/CrosspostInitializer.java @@ -17,6 +17,7 @@ package com.juick.components.configuration; +import com.juick.server.configuration.JuickServerComponentConfiguration; import org.apache.commons.codec.CharEncoding; import org.springframework.web.filter.CharacterEncodingFilter; import org.springframework.web.servlet.support.AbstractAnnotationConfigDispatcherServletInitializer; @@ -30,7 +31,10 @@ public class CrosspostInitializer extends AbstractAnnotationConfigDispatcherServ @Override protected Class<?>[] getRootConfigClasses() { - return new Class<?>[]{ CrosspostAppConfiguration.class }; + return new Class<?>[]{ + CrosspostAppConfiguration.class, + JuickServerComponentConfiguration.class + }; } @Override diff --git a/juick-crosspost/src/main/java/com/juick/service/Crosspost.java b/juick-crosspost/src/main/java/com/juick/service/Crosspost.java index a8458439..6bcf7c81 100644 --- a/juick-crosspost/src/main/java/com/juick/service/Crosspost.java +++ b/juick-crosspost/src/main/java/com/juick/service/Crosspost.java @@ -16,8 +16,8 @@ */ package com.juick.service; -import com.fasterxml.jackson.databind.ObjectMapper; import com.juick.Message; +import com.juick.server.component.MessageEvent; import com.juick.util.MessageUtils; import org.apache.commons.codec.CharEncoding; import org.apache.commons.io.IOUtils; @@ -26,11 +26,9 @@ import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationListener; import org.springframework.social.twitter.api.Twitter; import org.springframework.social.twitter.api.impl.TwitterTemplate; -import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.WebSocketSession; -import org.springframework.web.socket.handler.TextWebSocketHandler; import javax.inject.Inject; import javax.net.ssl.HttpsURLConnection; @@ -42,7 +40,7 @@ import java.nio.charset.StandardCharsets; /** * @author Ugnich Anton */ -public class Crosspost extends TextWebSocketHandler { +public class Crosspost implements ApplicationListener<MessageEvent> { final static String FBURL = "https://graph.facebook.com/me/feed"; final static String VKURL = "https://api.vk.com/method/wall.post"; @@ -55,12 +53,10 @@ public class Crosspost extends TextWebSocketHandler { private String twitter_consumer_key; @Value("${twitter_consumer_secret:}") private String twitter_consumer_secret; - @Inject - private ObjectMapper jsonMapper; @Override - protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - Message msg = jsonMapper.readValue(message.asBytes(), Message.class); + public void onApplicationEvent(MessageEvent event) { + Message msg = event.getMessage(); if (msg.getMid() > 0 && msg.getRid() == 0) { if (StringUtils.isNotEmpty(crosspostService.getTwitterName(msg.getUser().getUid()))) { if (msg.getTags().stream().noneMatch(t -> t.getName().equals("notwitter"))) { diff --git a/juick-server-web/build.gradle b/juick-server-web/build.gradle index b7ac8081..b4163a27 100644 --- a/juick-server-web/build.gradle +++ b/juick-server-web/build.gradle @@ -24,6 +24,7 @@ dependencies { compile "org.springframework:spring-webmvc:${rootProject.springFrameworkVersion}" compile "org.springframework:spring-context-support:${rootProject.springFrameworkVersion}" + compile "org.springframework:spring-websocket:${rootProject.springFrameworkVersion}" compile "org.springframework.security:spring-security-web:${rootProject.springSecurityVersion}" compile "org.springframework.security:spring-security-config:${rootProject.springSecurityVersion}" diff --git a/juick-server-web/src/main/java/com/juick/server/component/JuickServerComponent.java b/juick-server-web/src/main/java/com/juick/server/component/JuickServerComponent.java new file mode 100644 index 00000000..96b8c398 --- /dev/null +++ b/juick-server-web/src/main/java/com/juick/server/component/JuickServerComponent.java @@ -0,0 +1,56 @@ +package com.juick.server.component; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.PingMessage; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import javax.inject.Inject; +import java.io.IOException; + +public class JuickServerComponent extends TextWebSocketHandler { + private static Logger logger = LoggerFactory.getLogger(JuickServerComponent.class); + @Inject + private ApplicationEventPublisher applicationEventPublisher; + @Inject + private ObjectMapper jsonMapper; + + private WebSocketSession session; + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + logger.info("WebSocket connected"); + this.session = session; + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + logger.info("WebSocket disconnected with code {}: {}", status.getCode(), status.getReason()); + applicationEventPublisher.publishEvent(new DisconnectedEvent(this)); + } + + @Override + protected void handleTextMessage(WebSocketSession session, TextMessage text) throws Exception { + com.juick.Message jmsg = jsonMapper.readValue(text.asBytes(), com.juick.Message.class); + + if (logger.isInfoEnabled()) // prevent writeValueAsString execution if logger disabled + logger.info("got jmsg: {}", jsonMapper.writeValueAsString(jmsg)); + applicationEventPublisher.publishEvent(new MessageEvent(this, jmsg)); + } + + @Scheduled(fixedRate = 30000, initialDelay = 30000) + public void ping() throws IOException { + if (session != null && session.isOpen()) { + logger.debug("Sending WebSocket ping"); + session.sendMessage(new PingMessage()); + } else { + applicationEventPublisher.publishEvent(new DisconnectedEvent(this)); + } + } +} diff --git a/juick-server-web/src/main/java/com/juick/server/component/JuickServerReconnectManager.java b/juick-server-web/src/main/java/com/juick/server/component/JuickServerReconnectManager.java new file mode 100644 index 00000000..a662e4fb --- /dev/null +++ b/juick-server-web/src/main/java/com/juick/server/component/JuickServerReconnectManager.java @@ -0,0 +1,22 @@ +package com.juick.server.component; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.client.WebSocketConnectionManager; + +import javax.inject.Inject; + +@Component +public class JuickServerReconnectManager implements ApplicationListener<DisconnectedEvent> { + private static Logger logger = LoggerFactory.getLogger(JuickServerReconnectManager.class); + @Inject + private WebSocketConnectionManager webSocketConnectionManager; + @Override + public void onApplicationEvent(DisconnectedEvent event) { + logger.info("retrying..."); + webSocketConnectionManager.stop(); + webSocketConnectionManager.start(); + } +} diff --git a/juick-server-web/src/main/java/com/juick/server/component/MessageEvent.java b/juick-server-web/src/main/java/com/juick/server/component/MessageEvent.java new file mode 100644 index 00000000..59537a79 --- /dev/null +++ b/juick-server-web/src/main/java/com/juick/server/component/MessageEvent.java @@ -0,0 +1,21 @@ +package com.juick.server.component; + +import com.juick.Message; +import org.springframework.context.ApplicationEvent; + +public class MessageEvent extends ApplicationEvent { + private Message message; + /** + * Create a new ApplicationEvent. + * + * @param source the object on which the event initially occurred (never {@code null}) + */ + public MessageEvent(Object source, Message message) { + super(source); + this.message = message; + } + + public Message getMessage() { + return message; + } +} diff --git a/juick-server-web/src/main/java/com/juick/server/configuration/JuickServerComponentConfiguration.java b/juick-server-web/src/main/java/com/juick/server/configuration/JuickServerComponentConfiguration.java new file mode 100644 index 00000000..7ddda36e --- /dev/null +++ b/juick-server-web/src/main/java/com/juick/server/configuration/JuickServerComponentConfiguration.java @@ -0,0 +1,30 @@ +package com.juick.server.configuration; + +import com.juick.server.component.JuickServerComponent; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.web.socket.client.WebSocketConnectionManager; +import org.springframework.web.socket.client.standard.StandardWebSocketClient; + +import javax.inject.Inject; + +@Configuration +@EnableScheduling +public class JuickServerComponentConfiguration { + @Value("${websocket_url:ws://localhost:8080/ws/}") + private String WS_URI; + @Inject + private JuickServerComponent juickServerComponent; + @Bean + public WebSocketConnectionManager connectionManager() { + WebSocketConnectionManager manager = new WebSocketConnectionManager(client(), juickServerComponent, WS_URI); + manager.setAutoStartup(true); + return manager; + } + @Bean + public StandardWebSocketClient client() { + return new StandardWebSocketClient(); + } +} |