aboutsummaryrefslogtreecommitdiff
path: root/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'juick-ws/src/main/java/com/juick/ws/XMPPConnection.java')
-rw-r--r--juick-ws/src/main/java/com/juick/ws/XMPPConnection.java152
1 files changed, 89 insertions, 63 deletions
diff --git a/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java b/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java
index 9e793a44..04ea5378 100644
--- a/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java
+++ b/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java
@@ -4,6 +4,7 @@ import com.juick.User;
import com.juick.json.MessageSerializer;
import com.juick.server.MessagesQueries;
import com.juick.server.SubscriptionsQueries;
+import com.juick.util.ThreadHelper;
import com.juick.xmpp.JID;
import com.juick.xmpp.Message;
import com.juick.xmpp.Stream;
@@ -12,12 +13,13 @@ import com.juick.xmpp.extensions.JuickMessage;
import org.apache.commons.lang3.math.NumberUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.env.Environment;
import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
import org.springframework.web.socket.TextMessage;
-import javax.inject.Inject;
import java.io.IOException;
import java.net.Socket;
import java.util.List;
@@ -25,52 +27,76 @@ import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
/**
- *
* @author ugnich
*/
-@Component
-public class XMPPConnection implements Stream.StreamListener, Message.MessageListener {
+public class XMPPConnection implements InitializingBean, DisposableBean, Stream.StreamListener, Message.MessageListener {
private static final Logger logger = LoggerFactory.getLogger(XMPPConnection.class);
- @Inject
- JdbcTemplate jdbc;
- ExecutorService service;
- Stream xmpp;
- String xmppPassword;
- MessageSerializer ms;
- @Inject
- WebsocketComponent wsHandler;
-
- @Inject
- public XMPPConnection(Environment env, ExecutorService service) {
+ private final JdbcTemplate jdbc;
+ private final ExecutorService service;
+ private final WebsocketComponent wsHandler;
+ private final String xmppPassword;
+ private final MessageSerializer ms;
+ private final int xmppPort;
+ private final String wsJid;
+
+ private Stream xmpp;
+
+
+ public XMPPConnection(
+ final Environment env, final ExecutorService service, final WebsocketComponent wsHandler, final JdbcTemplate jdbc) {
+ Assert.notNull(env);
+ Assert.notNull(service);
+ Assert.notNull(wsHandler);
+ Assert.notNull(jdbc);
+
this.service = service;
+ this.wsHandler = wsHandler;
+ this.jdbc = jdbc;
+
xmppPassword = env.getProperty("xmpp_password");
+ xmppPort = NumberUtils.toInt(env.getProperty("xmpp_port"), 5347);
+ wsJid = env.getProperty("ws_jid", "ws.juick.local");
+
ms = new MessageSerializer();
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
try {
- Socket socket = new Socket("localhost", NumberUtils.toInt(env.getProperty("xmpp_port", ""), 5347));
- xmpp = new StreamComponent(new JID(env.getProperty("ws_jid", "ws.juick.local")), socket.getInputStream(), socket.getOutputStream(), xmppPassword);
+ Socket socket = new Socket("localhost", xmppPort);
+ xmpp = new StreamComponent(new JID(wsJid), socket.getInputStream(), socket.getOutputStream(), xmppPassword);
xmpp.addChildParser(new JuickMessage());
xmpp.addListener((Stream.StreamListener) this);
xmpp.addListener((Message.MessageListener) this);
+
service.submit(() -> xmpp.startParsing());
+
+ logger.info("XMPPConnection initialized");
} catch (IOException e) {
- logger.error("XMPPConnection error", e);
+ logger.error("XMPPConnection initialization error", e);
}
}
+ @Override
+ public void destroy() throws Exception {
+ ThreadHelper.shutdownAndAwaitTermination(service);
+
+ logger.info("XMPPConnection destroyed");
+ }
@Override
public void onStreamReady() {
- logger.info("XMPP stream ready");
+ logger.info("XMPP stream ready");
}
@Override
- public void onStreamFail(Exception ex) {
+ public void onStreamFail(final Exception ex) {
logger.error("XMPP stream failed", ex);
}
@Override
- public void onMessage(com.juick.xmpp.Message msg) {
+ public void onMessage(final com.juick.xmpp.Message msg) {
JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS);
if (jmsg != null) {
logger.info("got jmsg: " + ms.serialize(jmsg).toString());
@@ -95,10 +121,10 @@ public class XMPPConnection implements Stream.StreamListener, Message.MessageLis
MessageSerializer messageSerializer = new MessageSerializer();
- private void onJuickPM(int uid_to, com.juick.Message jmsg) {
+ private void onJuickPM(final int uid_to, final com.juick.Message jmsg) {
String json = messageSerializer.serialize(jmsg).toString();
- synchronized (wsHandler.clients) {
- wsHandler.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> {
+ synchronized (wsHandler.getClients()) {
+ wsHandler.getClients().stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> {
try {
logger.info("sending pm to " + c.visitor.getUID());
c.session.sendMessage(new TextMessage(json));
@@ -109,62 +135,62 @@ public class XMPPConnection implements Stream.StreamListener, Message.MessageLis
}
}
- private void onJuickMessagePost(com.juick.Message jmsg) {
+ private void onJuickMessagePost(final com.juick.Message jmsg) {
String json = messageSerializer.serialize(jmsg).toString();
List<Integer> uids = SubscriptionsQueries.getSubscribedUsers(jdbc, jmsg.getUser().getUID(), jmsg.getMID())
.stream().map(User::getUID).collect(Collectors.toList());
- synchronized (wsHandler.clients) {
- wsHandler.clients.stream().filter(c ->
+ synchronized (wsHandler.getClients()) {
+ wsHandler.getClients().stream().filter(c ->
(!c.legacy && c.visitor.getUID() == 0) // anonymous users
- || (!c.legacy && uids.contains(c.visitor.getUID()))) // subscriptions
+ || (!c.legacy && uids.contains(c.visitor.getUID()))) // subscriptions
.forEach(c -> {
- try {
- logger.info("sending message to " + c.visitor.getUID());
- c.session.sendMessage(new TextMessage(json));
- } catch (IOException e) {
- logger.warn("ws error", e);
- }
- });
- wsHandler.clients.stream().filter(c ->
+ try {
+ logger.info("sending message to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.warn("ws error", e);
+ }
+ });
+ wsHandler.getClients().stream().filter(c ->
c.legacy && c.allMessages) // legacy all posts
.forEach(c -> {
- try {
- logger.info("sending message to legacy client " + c.visitor.getUID());
- c.session.sendMessage(new TextMessage(json));
- } catch (IOException e) {
- logger.warn("ws error", e);
- }
- });
+ try {
+ logger.info("sending message to legacy client " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.warn("ws error", e);
+ }
+ });
}
}
- private void onJuickMessageReply(com.juick.Message jmsg) {
+ private void onJuickMessageReply(final com.juick.Message jmsg) {
String json = messageSerializer.serialize(jmsg).toString();
List<Integer> threadUsers =
SubscriptionsQueries.getUsersSubscribedToComments(jdbc, jmsg.getMID(), jmsg.getUser().getUID())
.stream().map(User::getUID).collect(Collectors.toList());
- synchronized (wsHandler.clients) {
- wsHandler.clients.stream().filter(c ->
+ synchronized (wsHandler.getClients()) {
+ wsHandler.getClients().stream().filter(c ->
(!c.legacy && c.visitor.getUID() == 0) // anonymous users
- || (!c.legacy && threadUsers.contains(c.visitor.getUID()))) // subscriptions
+ || (!c.legacy && threadUsers.contains(c.visitor.getUID()))) // subscriptions
.forEach(c -> {
- try {
- logger.info("sending reply to " + c.visitor.getUID());
- c.session.sendMessage(new TextMessage(json));
- } catch (IOException e) {
- logger.warn("ws error", e);
- }
- });
- wsHandler.clients.stream().filter(c ->
+ try {
+ logger.info("sending reply to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.warn("ws error", e);
+ }
+ });
+ wsHandler.getClients().stream().filter(c ->
(c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())) // legacy replies
.forEach(c -> {
- try {
- logger.info("sending reply to legacy client " + c.visitor.getUID());
- c.session.sendMessage(new TextMessage(json));
- } catch (IOException e) {
- logger.warn("ws error", e);
- }
- });
+ try {
+ logger.info("sending reply to legacy client " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.warn("ws error", e);
+ }
+ });
}
}
}