aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/com/juick/ws/XMPPConnection.java33
-rw-r--r--src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java14
2 files changed, 25 insertions, 22 deletions
diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java
index ae70d956..d5cb4eba 100644
--- a/src/main/java/com/juick/ws/XMPPConnection.java
+++ b/src/main/java/com/juick/ws/XMPPConnection.java
@@ -17,8 +17,6 @@ import javax.inject.Inject;
import java.io.IOException;
import java.net.Socket;
import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -30,21 +28,18 @@ import java.util.stream.Collectors;
@Component
public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener {
private static final Logger logger = Logger.getLogger(XMPPConnection.class.getName());
-
+ @Inject
JdbcTemplate sql;
Stream xmpp;
String xmppPassword;
MessageSerializer ms;
- WebsocketComponent ws;
- ExecutorService xmppThreadService = Executors.newSingleThreadExecutor();
+ WebsocketComponent wsHandler;
@Inject
- public XMPPConnection(Environment env, WebsocketComponent ws, JdbcTemplate sql) {
- this.sql = sql;
- this.ws = ws;
+ public XMPPConnection(Environment env, WebsocketComponent wsHandler) {
+ this.wsHandler = wsHandler;
xmppPassword = env.getProperty("xmpp_password");
ms = new MessageSerializer();
- xmppThreadService.submit(this);
}
@Override
@@ -97,8 +92,8 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message.
private void onJuickPM(int uid_to, com.juick.Message jmsg) {
String json = messageSerializer.serialize(jmsg).toString();
- synchronized (ws.clients) {
- ws.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> {
+ synchronized (wsHandler.clients) {
+ wsHandler.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> {
try {
logger.info("sending pm to " + c.visitor.getUID());
c.session.sendMessage(new TextMessage(json));
@@ -114,10 +109,10 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message.
List<Integer> uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID())
.stream().map(User::getUID).collect(Collectors.toList());
logger.info(String.format("%d users subscribed to %d", uids.size(), jmsg.getUser().getUID()));
- synchronized (ws.clients) {
- Long legacycount = ws.clients.stream().filter(c -> c.legacy && c.allMessages).count();
+ synchronized (wsHandler.clients) {
+ Long legacycount = wsHandler.clients.stream().filter(c -> c.legacy && c.allMessages).count();
logger.info(String.format("%d legacy users watched %d", legacycount, jmsg.getMID()));
- ws.clients.stream().filter(c -> !c.legacy && uids.contains(c.visitor.getUID())).forEach(c -> {
+ wsHandler.clients.stream().filter(c -> !c.legacy && uids.contains(c.visitor.getUID())).forEach(c -> {
try {
logger.info("sending message to " + c.visitor.getUID());
c.session.sendMessage(new TextMessage(json));
@@ -125,7 +120,7 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message.
logger.log(Level.WARNING, "ws error", e);
}
});
- ws.clients.stream().filter(c -> c.legacy && c.allMessages).forEach(c -> {
+ wsHandler.clients.stream().filter(c -> c.legacy && c.allMessages).forEach(c -> {
try {
logger.info("sending message to legacy client " + c.visitor.getUID());
c.session.sendMessage(new TextMessage(json));
@@ -143,10 +138,10 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message.
SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID())
.stream().map(User::getUID).collect(Collectors.toList());
logger.info(String.format("%d users subscribed to %d", threadUsers.size(), jmsg.getMID()));
- synchronized (ws.clients) {
- Long legacycount = ws.clients.stream().filter(c -> c.legacy && c.allReplies).count();
+ synchronized (wsHandler.clients) {
+ Long legacycount = wsHandler.clients.stream().filter(c -> c.legacy && c.allReplies).count();
logger.info(String.format("%d legacy users watched %d", legacycount, jmsg.getMID()));
- ws.clients.stream().filter(c -> !c.legacy && threadUsers.contains(c.visitor.getUID())).forEach(c -> {
+ wsHandler.clients.stream().filter(c -> !c.legacy && threadUsers.contains(c.visitor.getUID())).forEach(c -> {
try {
logger.info("sending reply to " + c.visitor.getUID());
c.session.sendMessage(new TextMessage(json));
@@ -154,7 +149,7 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message.
logger.log(Level.WARNING, "ws error", e);
}
});
- ws.clients.stream().filter(c -> (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())).forEach(c -> {
+ wsHandler.clients.stream().filter(c -> (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())).forEach(c -> {
try {
logger.info("sending reply to legacy client " + c.visitor.getUID());
c.session.sendMessage(new TextMessage(json));
diff --git a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java
index 0bb293f4..223f8d63 100644
--- a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java
+++ b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java
@@ -1,6 +1,7 @@
package com.juick.ws.configuration;
import com.juick.ws.WebsocketComponent;
+import com.juick.ws.XMPPConnection;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@@ -10,12 +11,13 @@ import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
-import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import javax.inject.Inject;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* Created by vitalyster on 28.06.2016.
@@ -28,12 +30,18 @@ import javax.inject.Inject;
public class WebsocketConfiguration extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
@Inject
Environment env;
-
+ ExecutorService xmppThread = Executors.newSingleThreadExecutor();
@Bean
- WebSocketHandler wsHandler() {
+ WebsocketComponent wsHandler() {
return new WebsocketComponent();
}
@Bean
+ XMPPConnection xmpp() {
+ XMPPConnection xmpp = new XMPPConnection(env, wsHandler());
+ xmppThread.submit(xmpp);
+ return xmpp;
+ }
+ @Bean
JdbcTemplate jdbc() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName(env.getProperty("datasource_driver", "com.mysql.jdbc.Driver"));