aboutsummaryrefslogtreecommitdiff
path: root/juick-ws/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'juick-ws/src/main/java')
-rw-r--r--juick-ws/src/main/java/com/juick/ws/WebsocketComponent.java20
-rw-r--r--juick-ws/src/main/java/com/juick/ws/XMPPConnection.java152
-rw-r--r--juick-ws/src/main/java/com/juick/ws/configuration/WebsocketAppConfiguration.java54
-rw-r--r--juick-ws/src/main/java/com/juick/ws/configuration/WebsocketInitializer.java10
-rw-r--r--juick-ws/src/main/java/com/juick/ws/configuration/WebsocketMvcConfiguration.java (renamed from juick-ws/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java)60
-rw-r--r--juick-ws/src/main/java/com/juick/ws/controllers/ApiController.java (renamed from juick-ws/src/main/java/com/juick/ws/ApiController.java)5
-rw-r--r--juick-ws/src/main/java/com/juick/ws/controllers/StatusController.java (renamed from juick-ws/src/main/java/com/juick/ws/StatusController.java)6
7 files changed, 176 insertions, 131 deletions
diff --git a/juick-ws/src/main/java/com/juick/ws/WebsocketComponent.java b/juick-ws/src/main/java/com/juick/ws/WebsocketComponent.java
index 2ab3a94c9..4ce230a4e 100644
--- a/juick-ws/src/main/java/com/juick/ws/WebsocketComponent.java
+++ b/juick-ws/src/main/java/com/juick/ws/WebsocketComponent.java
@@ -9,6 +9,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.util.Assert;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
@@ -24,13 +25,17 @@ import java.util.List;
* Created by vitalyster on 28.06.2016.
*/
public class WebsocketComponent extends TextWebSocketHandler {
+ private static final Logger logger = LoggerFactory.getLogger(WebsocketComponent.class);
- @Inject
- JdbcTemplate jdbc;
+ private JdbcTemplate jdbc;
- private static final Logger logger = LoggerFactory.getLogger(WebsocketComponent.class);
+ private final List<SocketSubscribed> clients = Collections.synchronizedList(new ArrayList<SocketSubscribed>());
- final List<SocketSubscribed> clients = Collections.synchronizedList(new ArrayList<SocketSubscribed>());
+
+ public WebsocketComponent(JdbcTemplate jdbc) {
+ Assert.notNull(jdbc);
+ this.jdbc = jdbc;
+ }
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
@@ -102,6 +107,7 @@ public class WebsocketComponent extends TextWebSocketHandler {
}
}
}
+
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
synchronized (clients) {
@@ -116,8 +122,12 @@ public class WebsocketComponent extends TextWebSocketHandler {
}
}
- class SocketSubscribed {
+ public List<SocketSubscribed> getClients() {
+ return clients;
+ }
+
+ class SocketSubscribed {
WebSocketSession session;
String clientName;
User visitor;
diff --git a/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java b/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java
index 9e793a44d..04ea53781 100644
--- a/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java
+++ b/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java
@@ -4,6 +4,7 @@ import com.juick.User;
import com.juick.json.MessageSerializer;
import com.juick.server.MessagesQueries;
import com.juick.server.SubscriptionsQueries;
+import com.juick.util.ThreadHelper;
import com.juick.xmpp.JID;
import com.juick.xmpp.Message;
import com.juick.xmpp.Stream;
@@ -12,12 +13,13 @@ import com.juick.xmpp.extensions.JuickMessage;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
import org.springframework.web.socket.TextMessage;
-import javax.inject.Inject;
import java.io.IOException;
import java.net.Socket;
import java.util.List;
@@ -25,52 +27,76 @@ import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
/**
- *
* @author ugnich
*/
-@Component
-public class XMPPConnection implements Stream.StreamListener, Message.MessageListener {
+public class XMPPConnection implements InitializingBean, DisposableBean, Stream.StreamListener, Message.MessageListener {
private static final Logger logger = LoggerFactory.getLogger(XMPPConnection.class);
- @Inject
- JdbcTemplate jdbc;
- ExecutorService service;
- Stream xmpp;
- String xmppPassword;
- MessageSerializer ms;
- @Inject
- WebsocketComponent wsHandler;
-
- @Inject
- public XMPPConnection(Environment env, ExecutorService service) {
+ private final JdbcTemplate jdbc;
+ private final ExecutorService service;
+ private final WebsocketComponent wsHandler;
+ private final String xmppPassword;
+ private final MessageSerializer ms;
+ private final int xmppPort;
+ private final String wsJid;
+
+ private Stream xmpp;
+
+
+ public XMPPConnection(
+ final Environment env, final ExecutorService service, final WebsocketComponent wsHandler, final JdbcTemplate jdbc) {
+ Assert.notNull(env);
+ Assert.notNull(service);
+ Assert.notNull(wsHandler);
+ Assert.notNull(jdbc);
+
this.service = service;
+ this.wsHandler = wsHandler;
+ this.jdbc = jdbc;
+
xmppPassword = env.getProperty("xmpp_password");
+ xmppPort = NumberUtils.toInt(env.getProperty("xmpp_port"), 5347);
+ wsJid = env.getProperty("ws_jid", "ws.juick.local");
+
ms = new MessageSerializer();
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
try {
- Socket socket = new Socket("localhost", NumberUtils.toInt(env.getProperty("xmpp_port", ""), 5347));
- xmpp = new StreamComponent(new JID(env.getProperty("ws_jid", "ws.juick.local")), socket.getInputStream(), socket.getOutputStream(), xmppPassword);
+ Socket socket = new Socket("localhost", xmppPort);
+ xmpp = new StreamComponent(new JID(wsJid), socket.getInputStream(), socket.getOutputStream(), xmppPassword);
xmpp.addChildParser(new JuickMessage());
xmpp.addListener((Stream.StreamListener) this);
xmpp.addListener((Message.MessageListener) this);
+
service.submit(() -> xmpp.startParsing());
+
+ logger.info("XMPPConnection initialized");
} catch (IOException e) {
- logger.error("XMPPConnection error", e);
+ logger.error("XMPPConnection initialization error", e);
}
}
+ @Override
+ public void destroy() throws Exception {
+ ThreadHelper.shutdownAndAwaitTermination(service);
+
+ logger.info("XMPPConnection destroyed");
+ }
@Override
public void onStreamReady() {
- logger.info("XMPP stream ready");
+ logger.info("XMPP stream ready");
}
@Override
- public void onStreamFail(Exception ex) {
+ public void onStreamFail(final Exception ex) {
logger.error("XMPP stream failed", ex);
}
@Override
- public void onMessage(com.juick.xmpp.Message msg) {
+ public void onMessage(final com.juick.xmpp.Message msg) {
JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS);
if (jmsg != null) {
logger.info("got jmsg: " + ms.serialize(jmsg).toString());
@@ -95,10 +121,10 @@ public class XMPPConnection implements Stream.StreamListener, Message.MessageLis
MessageSerializer messageSerializer = new MessageSerializer();
- private void onJuickPM(int uid_to, com.juick.Message jmsg) {
+ private void onJuickPM(final int uid_to, final com.juick.Message jmsg) {
String json = messageSerializer.serialize(jmsg).toString();
- synchronized (wsHandler.clients) {
- wsHandler.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> {
+ synchronized (wsHandler.getClients()) {
+ wsHandler.getClients().stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> {
try {
logger.info("sending pm to " + c.visitor.getUID());
c.session.sendMessage(new TextMessage(json));
@@ -109,62 +135,62 @@ public class XMPPConnection implements Stream.StreamListener, Message.MessageLis
}
}
- private void onJuickMessagePost(com.juick.Message jmsg) {
+ private void onJuickMessagePost(final com.juick.Message jmsg) {
String json = messageSerializer.serialize(jmsg).toString();
List<Integer> uids = SubscriptionsQueries.getSubscribedUsers(jdbc, jmsg.getUser().getUID(), jmsg.getMID())
.stream().map(User::getUID).collect(Collectors.toList());
- synchronized (wsHandler.clients) {
- wsHandler.clients.stream().filter(c ->
+ synchronized (wsHandler.getClients()) {
+ wsHandler.getClients().stream().filter(c ->
(!c.legacy && c.visitor.getUID() == 0) // anonymous users
- || (!c.legacy && uids.contains(c.visitor.getUID()))) // subscriptions
+ || (!c.legacy && uids.contains(c.visitor.getUID()))) // subscriptions
.forEach(c -> {
- try {
- logger.info("sending message to " + c.visitor.getUID());
- c.session.sendMessage(new TextMessage(json));
- } catch (IOException e) {
- logger.warn("ws error", e);
- }
- });
- wsHandler.clients.stream().filter(c ->
+ try {
+ logger.info("sending message to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.warn("ws error", e);
+ }
+ });
+ wsHandler.getClients().stream().filter(c ->
c.legacy && c.allMessages) // legacy all posts
.forEach(c -> {
- try {
- logger.info("sending message to legacy client " + c.visitor.getUID());
- c.session.sendMessage(new TextMessage(json));
- } catch (IOException e) {
- logger.warn("ws error", e);
- }
- });
+ try {
+ logger.info("sending message to legacy client " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.warn("ws error", e);
+ }
+ });
}
}
- private void onJuickMessageReply(com.juick.Message jmsg) {
+ private void onJuickMessageReply(final com.juick.Message jmsg) {
String json = messageSerializer.serialize(jmsg).toString();
List<Integer> threadUsers =
SubscriptionsQueries.getUsersSubscribedToComments(jdbc, jmsg.getMID(), jmsg.getUser().getUID())
.stream().map(User::getUID).collect(Collectors.toList());
- synchronized (wsHandler.clients) {
- wsHandler.clients.stream().filter(c ->
+ synchronized (wsHandler.getClients()) {
+ wsHandler.getClients().stream().filter(c ->
(!c.legacy && c.visitor.getUID() == 0) // anonymous users
- || (!c.legacy && threadUsers.contains(c.visitor.getUID()))) // subscriptions
+ || (!c.legacy && threadUsers.contains(c.visitor.getUID()))) // subscriptions
.forEach(c -> {
- try {
- logger.info("sending reply to " + c.visitor.getUID());
- c.session.sendMessage(new TextMessage(json));
- } catch (IOException e) {
- logger.warn("ws error", e);
- }
- });
- wsHandler.clients.stream().filter(c ->
+ try {
+ logger.info("sending reply to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.warn("ws error", e);
+ }
+ });
+ wsHandler.getClients().stream().filter(c ->
(c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())) // legacy replies
.forEach(c -> {
- try {
- logger.info("sending reply to legacy client " + c.visitor.getUID());
- c.session.sendMessage(new TextMessage(json));
- } catch (IOException e) {
- logger.warn("ws error", e);
- }
- });
+ try {
+ logger.info("sending reply to legacy client " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.warn("ws error", e);
+ }
+ });
}
}
}
diff --git a/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketAppConfiguration.java b/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketAppConfiguration.java
new file mode 100644
index 000000000..9e2e4f6f5
--- /dev/null
+++ b/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketAppConfiguration.java
@@ -0,0 +1,54 @@
+package com.juick.ws.configuration;
+
+import com.juick.configuration.DataConfiguration;
+import com.juick.ws.WebsocketComponent;
+import com.juick.ws.XMPPConnection;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.core.env.Environment;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.ServletWebSocketHandlerRegistry;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+
+import javax.inject.Inject;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by aalexeev on 11/12/16.
+ */
+@Configuration
+@EnableWebSocket
+@PropertySource("classpath:juick.conf")
+@Import(DataConfiguration.class)
+public class WebsocketAppConfiguration implements WebSocketConfigurer {
+ @Inject
+ private Environment env;
+ @Inject
+ private JdbcTemplate jdbc;
+
+ @Bean
+ public WebsocketComponent wsHandler() {
+ return new WebsocketComponent(jdbc);
+ }
+
+ @Bean
+ public XMPPConnection ws() {
+ return new XMPPConnection(env, service(), wsHandler(), jdbc);
+ }
+
+ @Bean
+ public ExecutorService service() {
+ return Executors.newCachedThreadPool();
+ }
+
+ @Override
+ public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+ ((ServletWebSocketHandlerRegistry) registry).setOrder(2);
+ registry.addHandler(wsHandler(), "/**").setAllowedOrigins("*");
+ }
+}
diff --git a/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketInitializer.java b/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketInitializer.java
index 4829710f3..d4e797ad2 100644
--- a/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketInitializer.java
+++ b/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketInitializer.java
@@ -10,14 +10,15 @@ import javax.servlet.Filter;
* Created by vt on 09/02/16.
*/
public class WebsocketInitializer extends AbstractAnnotationConfigDispatcherServletInitializer {
+
@Override
protected Class<?>[] getRootConfigClasses() {
- return new Class[]{DataConfiguration.class};
+ return new Class<?>[]{WebsocketAppConfiguration.class, DataConfiguration.class};
}
@Override
protected Class<?>[] getServletConfigClasses() {
- return new Class[]{WebsocketConfiguration.class};
+ return new Class<?>[]{WebsocketMvcConfiguration.class};
}
@Override
@@ -31,4 +32,9 @@ public class WebsocketInitializer extends AbstractAnnotationConfigDispatcherServ
characterEncodingFilter.setEncoding("UTF-8");
return new Filter[]{characterEncodingFilter};
}
+
+ @Override
+ protected String getServletName() {
+ return "Web socket dispather servlet";
+ }
}
diff --git a/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java b/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketMvcConfiguration.java
index 0a2a91fa6..a8a88dcc0 100644
--- a/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java
+++ b/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketMvcConfiguration.java
@@ -2,71 +2,33 @@ package com.juick.ws.configuration;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
-import com.juick.ws.WebsocketComponent;
-import com.juick.ws.XMPPConnection;
import com.mitchellbosecke.pebble.PebbleEngine;
import com.mitchellbosecke.pebble.loader.Loader;
import com.mitchellbosecke.pebble.loader.ServletLoader;
import com.mitchellbosecke.pebble.spring4.PebbleViewResolver;
import com.mitchellbosecke.pebble.spring4.extension.SpringExtension;
-import org.springframework.beans.factory.config.PlaceholderConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.PropertySource;
-import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
-import org.springframework.core.env.Environment;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
-import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.web.servlet.ViewResolver;
import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping;
-import org.springframework.web.socket.config.annotation.EnableWebSocket;
-import org.springframework.web.socket.config.annotation.ServletWebSocketHandlerRegistry;
-import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
-import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
-import javax.inject.Inject;
-import javax.servlet.ServletContext;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
* Created by vitalyster on 28.06.2016.
*/
@Configuration
-@EnableWebSocket
-@ComponentScan(basePackages = {"com.juick"})
-@PropertySource(value = {"classpath:juick.conf", "file:${user.home}/juick.conf"})
-public class WebsocketConfiguration extends WebMvcConfigurationSupport implements WebSocketConfigurer {
- @Inject
- private Environment env;
- @Inject
- private JdbcTemplate jdbc;
- @Inject
- private ServletContext servletContext;
-
- @Bean
- public static PlaceholderConfigurerSupport propertySourcesPlaceholderConfigurer() {
- PlaceholderConfigurerSupport configurer = new PropertySourcesPlaceholderConfigurer();
-
- configurer.setFileEncoding("utf-8");
- configurer.setOrder(1);
- return configurer;
- }
-
- @Bean
- public WebsocketComponent wsHandler() {
- return new WebsocketComponent();
- }
-
+@ComponentScan(basePackages = {"com.juick.ws.controllers"})
+public class WebsocketMvcConfiguration extends WebMvcConfigurationSupport {
@Bean
public Loader templateLoader() {
- return new ServletLoader(servletContext);
+ return new ServletLoader(getServletContext());
}
@Bean
@@ -91,16 +53,6 @@ public class WebsocketConfiguration extends WebMvcConfigurationSupport implement
return viewResolver;
}
- @Bean
- public XMPPConnection ws() {
- return new XMPPConnection(env, service());
- }
-
- @Bean
- public ExecutorService service() {
- return Executors.newCachedThreadPool();
- }
-
@Override
public RequestMappingHandlerMapping requestMappingHandlerMapping() {
RequestMappingHandlerMapping mapping = super.requestMappingHandlerMapping();
@@ -109,12 +61,6 @@ public class WebsocketConfiguration extends WebMvcConfigurationSupport implement
}
@Override
- public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
- ((ServletWebSocketHandlerRegistry) registry).setOrder(2);
- registry.addHandler(wsHandler(), "/**").setAllowedOrigins("*");
- }
-
- @Override
protected void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.setOrder(0);
registry.addResourceHandler("/scripts.js").addResourceLocations("/");
diff --git a/juick-ws/src/main/java/com/juick/ws/ApiController.java b/juick-ws/src/main/java/com/juick/ws/controllers/ApiController.java
index 5203de576..2f57b0b64 100644
--- a/juick-ws/src/main/java/com/juick/ws/ApiController.java
+++ b/juick-ws/src/main/java/com/juick/ws/controllers/ApiController.java
@@ -1,6 +1,7 @@
-package com.juick.ws;
+package com.juick.ws.controllers;
import com.juick.server.helpers.Status;
+import com.juick.ws.WebsocketComponent;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@@ -19,6 +20,6 @@ public class ApiController {
@RequestMapping(value = "/api/status", method = RequestMethod.GET,
produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
public Status status() {
- return new Status(String.valueOf(wsHandler.clients.size()));
+ return new Status(String.valueOf(wsHandler.getClients().size()));
}
}
diff --git a/juick-ws/src/main/java/com/juick/ws/StatusController.java b/juick-ws/src/main/java/com/juick/ws/controllers/StatusController.java
index f7e80106a..2fe55eeb9 100644
--- a/juick-ws/src/main/java/com/juick/ws/StatusController.java
+++ b/juick-ws/src/main/java/com/juick/ws/controllers/StatusController.java
@@ -1,5 +1,7 @@
-package com.juick.ws;
+package com.juick.ws.controllers;
+import com.juick.ws.WebsocketComponent;
+import com.juick.ws.XMPPConnection;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
@@ -22,7 +24,7 @@ public class StatusController {
@RequestMapping(method = RequestMethod.GET, headers = "Connection!=upgrade", value = "/")
public ModelAndView status() {
ModelAndView modelAndView = new ModelAndView();
- modelAndView.addObject("clients", wsHandler.clients.size());
+ modelAndView.addObject("clients", wsHandler.getClients().size());
modelAndView.setViewName("index");
return modelAndView;
}