diff options
Diffstat (limited to 'juick-ws')
-rw-r--r-- | juick-ws/src/main/java/com/juick/ws/WebsocketComponent.java | 20 | ||||
-rw-r--r-- | juick-ws/src/main/java/com/juick/ws/XMPPConnection.java | 152 | ||||
-rw-r--r-- | juick-ws/src/main/java/com/juick/ws/configuration/WebsocketAppConfiguration.java | 54 | ||||
-rw-r--r-- | juick-ws/src/main/java/com/juick/ws/configuration/WebsocketInitializer.java | 10 | ||||
-rw-r--r-- | juick-ws/src/main/java/com/juick/ws/configuration/WebsocketMvcConfiguration.java (renamed from juick-ws/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java) | 60 | ||||
-rw-r--r-- | juick-ws/src/main/java/com/juick/ws/controllers/ApiController.java (renamed from juick-ws/src/main/java/com/juick/ws/ApiController.java) | 5 | ||||
-rw-r--r-- | juick-ws/src/main/java/com/juick/ws/controllers/StatusController.java (renamed from juick-ws/src/main/java/com/juick/ws/StatusController.java) | 6 |
7 files changed, 176 insertions, 131 deletions
diff --git a/juick-ws/src/main/java/com/juick/ws/WebsocketComponent.java b/juick-ws/src/main/java/com/juick/ws/WebsocketComponent.java index 2ab3a94c..4ce230a4 100644 --- a/juick-ws/src/main/java/com/juick/ws/WebsocketComponent.java +++ b/juick-ws/src/main/java/com/juick/ws/WebsocketComponent.java @@ -9,6 +9,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpHeaders; import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.util.Assert; import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.WebSocketSession; import org.springframework.web.socket.handler.TextWebSocketHandler; @@ -24,13 +25,17 @@ import java.util.List; * Created by vitalyster on 28.06.2016. */ public class WebsocketComponent extends TextWebSocketHandler { + private static final Logger logger = LoggerFactory.getLogger(WebsocketComponent.class); - @Inject - JdbcTemplate jdbc; + private JdbcTemplate jdbc; - private static final Logger logger = LoggerFactory.getLogger(WebsocketComponent.class); + private final List<SocketSubscribed> clients = Collections.synchronizedList(new ArrayList<SocketSubscribed>()); - final List<SocketSubscribed> clients = Collections.synchronizedList(new ArrayList<SocketSubscribed>()); + + public WebsocketComponent(JdbcTemplate jdbc) { + Assert.notNull(jdbc); + this.jdbc = jdbc; + } @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { @@ -102,6 +107,7 @@ public class WebsocketComponent extends TextWebSocketHandler { } } } + @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { synchronized (clients) { @@ -116,8 +122,12 @@ public class WebsocketComponent extends TextWebSocketHandler { } } - class SocketSubscribed { + public List<SocketSubscribed> getClients() { + return clients; + } + + class SocketSubscribed { WebSocketSession session; String clientName; User visitor; diff --git a/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java b/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java index 9e793a44..04ea5378 100644 --- a/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java +++ b/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java @@ -4,6 +4,7 @@ import com.juick.User; import com.juick.json.MessageSerializer; import com.juick.server.MessagesQueries; import com.juick.server.SubscriptionsQueries; +import com.juick.util.ThreadHelper; import com.juick.xmpp.JID; import com.juick.xmpp.Message; import com.juick.xmpp.Stream; @@ -12,12 +13,13 @@ import com.juick.xmpp.extensions.JuickMessage; import org.apache.commons.lang3.math.NumberUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; import org.springframework.core.env.Environment; import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.stereotype.Component; +import org.springframework.util.Assert; import org.springframework.web.socket.TextMessage; -import javax.inject.Inject; import java.io.IOException; import java.net.Socket; import java.util.List; @@ -25,52 +27,76 @@ import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; /** - * * @author ugnich */ -@Component -public class XMPPConnection implements Stream.StreamListener, Message.MessageListener { +public class XMPPConnection implements InitializingBean, DisposableBean, Stream.StreamListener, Message.MessageListener { private static final Logger logger = LoggerFactory.getLogger(XMPPConnection.class); - @Inject - JdbcTemplate jdbc; - ExecutorService service; - Stream xmpp; - String xmppPassword; - MessageSerializer ms; - @Inject - WebsocketComponent wsHandler; - - @Inject - public XMPPConnection(Environment env, ExecutorService service) { + private final JdbcTemplate jdbc; + private final ExecutorService service; + private final WebsocketComponent wsHandler; + private final String xmppPassword; + private final MessageSerializer ms; + private final int xmppPort; + private final String wsJid; + + private Stream xmpp; + + + public XMPPConnection( + final Environment env, final ExecutorService service, final WebsocketComponent wsHandler, final JdbcTemplate jdbc) { + Assert.notNull(env); + Assert.notNull(service); + Assert.notNull(wsHandler); + Assert.notNull(jdbc); + this.service = service; + this.wsHandler = wsHandler; + this.jdbc = jdbc; + xmppPassword = env.getProperty("xmpp_password"); + xmppPort = NumberUtils.toInt(env.getProperty("xmpp_port"), 5347); + wsJid = env.getProperty("ws_jid", "ws.juick.local"); + ms = new MessageSerializer(); + } + + @Override + public void afterPropertiesSet() throws Exception { try { - Socket socket = new Socket("localhost", NumberUtils.toInt(env.getProperty("xmpp_port", ""), 5347)); - xmpp = new StreamComponent(new JID(env.getProperty("ws_jid", "ws.juick.local")), socket.getInputStream(), socket.getOutputStream(), xmppPassword); + Socket socket = new Socket("localhost", xmppPort); + xmpp = new StreamComponent(new JID(wsJid), socket.getInputStream(), socket.getOutputStream(), xmppPassword); xmpp.addChildParser(new JuickMessage()); xmpp.addListener((Stream.StreamListener) this); xmpp.addListener((Message.MessageListener) this); + service.submit(() -> xmpp.startParsing()); + + logger.info("XMPPConnection initialized"); } catch (IOException e) { - logger.error("XMPPConnection error", e); + logger.error("XMPPConnection initialization error", e); } } + @Override + public void destroy() throws Exception { + ThreadHelper.shutdownAndAwaitTermination(service); + + logger.info("XMPPConnection destroyed"); + } @Override public void onStreamReady() { - logger.info("XMPP stream ready"); + logger.info("XMPP stream ready"); } @Override - public void onStreamFail(Exception ex) { + public void onStreamFail(final Exception ex) { logger.error("XMPP stream failed", ex); } @Override - public void onMessage(com.juick.xmpp.Message msg) { + public void onMessage(final com.juick.xmpp.Message msg) { JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS); if (jmsg != null) { logger.info("got jmsg: " + ms.serialize(jmsg).toString()); @@ -95,10 +121,10 @@ public class XMPPConnection implements Stream.StreamListener, Message.MessageLis MessageSerializer messageSerializer = new MessageSerializer(); - private void onJuickPM(int uid_to, com.juick.Message jmsg) { + private void onJuickPM(final int uid_to, final com.juick.Message jmsg) { String json = messageSerializer.serialize(jmsg).toString(); - synchronized (wsHandler.clients) { - wsHandler.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> { + synchronized (wsHandler.getClients()) { + wsHandler.getClients().stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> { try { logger.info("sending pm to " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); @@ -109,62 +135,62 @@ public class XMPPConnection implements Stream.StreamListener, Message.MessageLis } } - private void onJuickMessagePost(com.juick.Message jmsg) { + private void onJuickMessagePost(final com.juick.Message jmsg) { String json = messageSerializer.serialize(jmsg).toString(); List<Integer> uids = SubscriptionsQueries.getSubscribedUsers(jdbc, jmsg.getUser().getUID(), jmsg.getMID()) .stream().map(User::getUID).collect(Collectors.toList()); - synchronized (wsHandler.clients) { - wsHandler.clients.stream().filter(c -> + synchronized (wsHandler.getClients()) { + wsHandler.getClients().stream().filter(c -> (!c.legacy && c.visitor.getUID() == 0) // anonymous users - || (!c.legacy && uids.contains(c.visitor.getUID()))) // subscriptions + || (!c.legacy && uids.contains(c.visitor.getUID()))) // subscriptions .forEach(c -> { - try { - logger.info("sending message to " + c.visitor.getUID()); - c.session.sendMessage(new TextMessage(json)); - } catch (IOException e) { - logger.warn("ws error", e); - } - }); - wsHandler.clients.stream().filter(c -> + try { + logger.info("sending message to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); + wsHandler.getClients().stream().filter(c -> c.legacy && c.allMessages) // legacy all posts .forEach(c -> { - try { - logger.info("sending message to legacy client " + c.visitor.getUID()); - c.session.sendMessage(new TextMessage(json)); - } catch (IOException e) { - logger.warn("ws error", e); - } - }); + try { + logger.info("sending message to legacy client " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); } } - private void onJuickMessageReply(com.juick.Message jmsg) { + private void onJuickMessageReply(final com.juick.Message jmsg) { String json = messageSerializer.serialize(jmsg).toString(); List<Integer> threadUsers = SubscriptionsQueries.getUsersSubscribedToComments(jdbc, jmsg.getMID(), jmsg.getUser().getUID()) .stream().map(User::getUID).collect(Collectors.toList()); - synchronized (wsHandler.clients) { - wsHandler.clients.stream().filter(c -> + synchronized (wsHandler.getClients()) { + wsHandler.getClients().stream().filter(c -> (!c.legacy && c.visitor.getUID() == 0) // anonymous users - || (!c.legacy && threadUsers.contains(c.visitor.getUID()))) // subscriptions + || (!c.legacy && threadUsers.contains(c.visitor.getUID()))) // subscriptions .forEach(c -> { - try { - logger.info("sending reply to " + c.visitor.getUID()); - c.session.sendMessage(new TextMessage(json)); - } catch (IOException e) { - logger.warn("ws error", e); - } - }); - wsHandler.clients.stream().filter(c -> + try { + logger.info("sending reply to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); + wsHandler.getClients().stream().filter(c -> (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())) // legacy replies .forEach(c -> { - try { - logger.info("sending reply to legacy client " + c.visitor.getUID()); - c.session.sendMessage(new TextMessage(json)); - } catch (IOException e) { - logger.warn("ws error", e); - } - }); + try { + logger.info("sending reply to legacy client " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); } } } diff --git a/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketAppConfiguration.java b/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketAppConfiguration.java new file mode 100644 index 00000000..9e2e4f6f --- /dev/null +++ b/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketAppConfiguration.java @@ -0,0 +1,54 @@ +package com.juick.ws.configuration; + +import com.juick.configuration.DataConfiguration; +import com.juick.ws.WebsocketComponent; +import com.juick.ws.XMPPConnection; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.PropertySource; +import org.springframework.core.env.Environment; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.ServletWebSocketHandlerRegistry; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; + +import javax.inject.Inject; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * Created by aalexeev on 11/12/16. + */ +@Configuration +@EnableWebSocket +@PropertySource("classpath:juick.conf") +@Import(DataConfiguration.class) +public class WebsocketAppConfiguration implements WebSocketConfigurer { + @Inject + private Environment env; + @Inject + private JdbcTemplate jdbc; + + @Bean + public WebsocketComponent wsHandler() { + return new WebsocketComponent(jdbc); + } + + @Bean + public XMPPConnection ws() { + return new XMPPConnection(env, service(), wsHandler(), jdbc); + } + + @Bean + public ExecutorService service() { + return Executors.newCachedThreadPool(); + } + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + ((ServletWebSocketHandlerRegistry) registry).setOrder(2); + registry.addHandler(wsHandler(), "/**").setAllowedOrigins("*"); + } +} diff --git a/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketInitializer.java b/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketInitializer.java index 4829710f..d4e797ad 100644 --- a/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketInitializer.java +++ b/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketInitializer.java @@ -10,14 +10,15 @@ import javax.servlet.Filter; * Created by vt on 09/02/16. */ public class WebsocketInitializer extends AbstractAnnotationConfigDispatcherServletInitializer { + @Override protected Class<?>[] getRootConfigClasses() { - return new Class[]{DataConfiguration.class}; + return new Class<?>[]{WebsocketAppConfiguration.class, DataConfiguration.class}; } @Override protected Class<?>[] getServletConfigClasses() { - return new Class[]{WebsocketConfiguration.class}; + return new Class<?>[]{WebsocketMvcConfiguration.class}; } @Override @@ -31,4 +32,9 @@ public class WebsocketInitializer extends AbstractAnnotationConfigDispatcherServ characterEncodingFilter.setEncoding("UTF-8"); return new Filter[]{characterEncodingFilter}; } + + @Override + protected String getServletName() { + return "Web socket dispather servlet"; + } } diff --git a/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java b/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketMvcConfiguration.java index 0a2a91fa..a8a88dcc 100644 --- a/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java +++ b/juick-ws/src/main/java/com/juick/ws/configuration/WebsocketMvcConfiguration.java @@ -2,71 +2,33 @@ package com.juick.ws.configuration; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; -import com.juick.ws.WebsocketComponent; -import com.juick.ws.XMPPConnection; import com.mitchellbosecke.pebble.PebbleEngine; import com.mitchellbosecke.pebble.loader.Loader; import com.mitchellbosecke.pebble.loader.ServletLoader; import com.mitchellbosecke.pebble.spring4.PebbleViewResolver; import com.mitchellbosecke.pebble.spring4.extension.SpringExtension; -import org.springframework.beans.factory.config.PlaceholderConfigurerSupport; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.PropertySource; -import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; -import org.springframework.core.env.Environment; import org.springframework.http.converter.HttpMessageConverter; import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; -import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.servlet.ViewResolver; import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport; import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping; -import org.springframework.web.socket.config.annotation.EnableWebSocket; -import org.springframework.web.socket.config.annotation.ServletWebSocketHandlerRegistry; -import org.springframework.web.socket.config.annotation.WebSocketConfigurer; -import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; -import javax.inject.Inject; -import javax.servlet.ServletContext; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * Created by vitalyster on 28.06.2016. */ @Configuration -@EnableWebSocket -@ComponentScan(basePackages = {"com.juick"}) -@PropertySource(value = {"classpath:juick.conf", "file:${user.home}/juick.conf"}) -public class WebsocketConfiguration extends WebMvcConfigurationSupport implements WebSocketConfigurer { - @Inject - private Environment env; - @Inject - private JdbcTemplate jdbc; - @Inject - private ServletContext servletContext; - - @Bean - public static PlaceholderConfigurerSupport propertySourcesPlaceholderConfigurer() { - PlaceholderConfigurerSupport configurer = new PropertySourcesPlaceholderConfigurer(); - - configurer.setFileEncoding("utf-8"); - configurer.setOrder(1); - return configurer; - } - - @Bean - public WebsocketComponent wsHandler() { - return new WebsocketComponent(); - } - +@ComponentScan(basePackages = {"com.juick.ws.controllers"}) +public class WebsocketMvcConfiguration extends WebMvcConfigurationSupport { @Bean public Loader templateLoader() { - return new ServletLoader(servletContext); + return new ServletLoader(getServletContext()); } @Bean @@ -91,16 +53,6 @@ public class WebsocketConfiguration extends WebMvcConfigurationSupport implement return viewResolver; } - @Bean - public XMPPConnection ws() { - return new XMPPConnection(env, service()); - } - - @Bean - public ExecutorService service() { - return Executors.newCachedThreadPool(); - } - @Override public RequestMappingHandlerMapping requestMappingHandlerMapping() { RequestMappingHandlerMapping mapping = super.requestMappingHandlerMapping(); @@ -109,12 +61,6 @@ public class WebsocketConfiguration extends WebMvcConfigurationSupport implement } @Override - public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { - ((ServletWebSocketHandlerRegistry) registry).setOrder(2); - registry.addHandler(wsHandler(), "/**").setAllowedOrigins("*"); - } - - @Override protected void addResourceHandlers(ResourceHandlerRegistry registry) { registry.setOrder(0); registry.addResourceHandler("/scripts.js").addResourceLocations("/"); diff --git a/juick-ws/src/main/java/com/juick/ws/ApiController.java b/juick-ws/src/main/java/com/juick/ws/controllers/ApiController.java index 5203de57..2f57b0b6 100644 --- a/juick-ws/src/main/java/com/juick/ws/ApiController.java +++ b/juick-ws/src/main/java/com/juick/ws/controllers/ApiController.java @@ -1,6 +1,7 @@ -package com.juick.ws; +package com.juick.ws.controllers; import com.juick.server.helpers.Status; +import com.juick.ws.WebsocketComponent; import org.springframework.http.MediaType; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -19,6 +20,6 @@ public class ApiController { @RequestMapping(value = "/api/status", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public Status status() { - return new Status(String.valueOf(wsHandler.clients.size())); + return new Status(String.valueOf(wsHandler.getClients().size())); } } diff --git a/juick-ws/src/main/java/com/juick/ws/StatusController.java b/juick-ws/src/main/java/com/juick/ws/controllers/StatusController.java index f7e80106..2fe55eeb 100644 --- a/juick-ws/src/main/java/com/juick/ws/StatusController.java +++ b/juick-ws/src/main/java/com/juick/ws/controllers/StatusController.java @@ -1,5 +1,7 @@ -package com.juick.ws; +package com.juick.ws.controllers; +import com.juick.ws.WebsocketComponent; +import com.juick.ws.XMPPConnection; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -22,7 +24,7 @@ public class StatusController { @RequestMapping(method = RequestMethod.GET, headers = "Connection!=upgrade", value = "/") public ModelAndView status() { ModelAndView modelAndView = new ModelAndView(); - modelAndView.addObject("clients", wsHandler.clients.size()); + modelAndView.addObject("clients", wsHandler.getClients().size()); modelAndView.setViewName("index"); return modelAndView; } |