aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/com/juick/ws/XMPPConnection.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/com/juick/ws/XMPPConnection.java')
-rw-r--r--src/main/java/com/juick/ws/XMPPConnection.java142
1 files changed, 142 insertions, 0 deletions
diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java
new file mode 100644
index 00000000..9ed3d0cd
--- /dev/null
+++ b/src/main/java/com/juick/ws/XMPPConnection.java
@@ -0,0 +1,142 @@
+package com.juick.ws;
+
+import com.juick.User;
+import com.juick.json.MessageSerializer;
+import com.juick.server.SubscriptionsQueries;
+import com.juick.xmpp.JID;
+import com.juick.xmpp.Message;
+import com.juick.xmpp.Stream;
+import com.juick.xmpp.StreamComponent;
+import com.juick.xmpp.extensions.JuickMessage;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.TextMessage;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+/**
+ *
+ * @author ugnich
+ */
+@Component
+public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener {
+ private static final Logger logger = Logger.getLogger(XMPPConnection.class.getName());
+
+ JdbcTemplate sql;
+ Stream xmpp;
+ String xmppPassword;
+ MessageSerializer ms;
+ @Inject
+ WebsocketComponent ws;
+
+ @Inject
+ public XMPPConnection(JdbcTemplate sql, String password) {
+ this.sql = sql;
+ xmppPassword = password;
+ ms = new MessageSerializer();
+ new Thread(this).start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ Socket socket = new Socket("localhost", 5347);
+ xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword);
+ xmpp.addChildParser(new JuickMessage());
+ xmpp.addListener((Stream.StreamListener) this);
+ xmpp.addListener((Message.MessageListener) this);
+ xmpp.startParsing();
+ } catch (IOException e) {
+ logger.log(Level.SEVERE, "XMPPConnection error", e);
+ }
+ }
+
+ @Override
+ public void onStreamReady() {
+ logger.info("XMPP stream ready");
+ }
+
+ @Override
+ public void onStreamFail(String msg) {
+ logger.severe("XMPP stream failed: " + msg);
+ }
+
+ @Override
+ public void onMessage(com.juick.xmpp.Message msg) {
+ JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS);
+ if (jmsg != null) {
+ logger.info("got jmsg: " + ms.serialize(jmsg).toString());
+ if (jmsg.getMID() == 0) {
+ int uid_to = 0;
+ try {
+ uid_to = Integer.parseInt(msg.to.Username);
+ } catch (Exception e) {
+ }
+ if (uid_to > 0) {
+ onJuickPM(uid_to, jmsg);
+ }
+ } else if (jmsg.getRID() == 0) {
+ onJuickMessagePost(jmsg);
+ } else {
+ onJuickMessageReply(jmsg);
+ }
+ }
+ }
+
+ MessageSerializer messageSerializer = new MessageSerializer();
+
+ private void onJuickPM(int uid_to, com.juick.Message jmsg) {
+ String json = messageSerializer.serialize(jmsg).toString();
+ synchronized (ws.clients) {
+ ws.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> {
+ try {
+ logger.info("sending pm to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ }
+ }
+
+ private void onJuickMessagePost(com.juick.Message jmsg) {
+ String json = messageSerializer.serialize(jmsg).toString();
+ List<Integer> uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID())
+ .stream().map(User::getUID).collect(Collectors.toList());
+
+ synchronized (ws.clients) {
+ ws.clients.stream().filter(c -> !c.legacy && uids.contains(c.visitor.getUID())).forEach(c -> {
+ try {
+ logger.info("sending message to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ }
+ }
+
+ private void onJuickMessageReply(com.juick.Message jmsg) {
+ String json = messageSerializer.serialize(jmsg).toString();
+
+ List<Integer> threadUsers =
+ SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID())
+ .stream().map(User::getUID).collect(Collectors.toList());
+ synchronized (ws.clients) {
+ ws.clients.stream().filter(c -> !c.legacy && threadUsers.contains(c.visitor.getUID())).forEach(c -> {
+ try {
+ logger.info("sending reply to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ }
+ }
+}