aboutsummaryrefslogtreecommitdiff
path: root/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2016-07-07 15:52:04 +0300
committerGravatar Vitaly Takmazov2016-07-07 15:52:04 +0300
commitde4e52d1004f3a68bd8a863d3a507dc252f20cea (patch)
tree1ab5decb99da15c98ccba504fa54345ab336b1cc /juick-ws/src/main/java/com/juick/ws/XMPPConnection.java
parent1def88c0685785aef858f72a1dabd5f44a4ba3e2 (diff)
reorganize project, part 3
Diffstat (limited to 'juick-ws/src/main/java/com/juick/ws/XMPPConnection.java')
-rw-r--r--juick-ws/src/main/java/com/juick/ws/XMPPConnection.java165
1 files changed, 165 insertions, 0 deletions
diff --git a/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java b/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java
new file mode 100644
index 00000000..4a80eec5
--- /dev/null
+++ b/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java
@@ -0,0 +1,165 @@
+package com.juick.ws;
+
+import com.juick.User;
+import com.juick.json.MessageSerializer;
+import com.juick.server.SubscriptionsQueries;
+import com.juick.xmpp.JID;
+import com.juick.xmpp.Message;
+import com.juick.xmpp.Stream;
+import com.juick.xmpp.StreamComponent;
+import com.juick.xmpp.extensions.JuickMessage;
+import org.springframework.core.env.Environment;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.TextMessage;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+/**
+ *
+ * @author ugnich
+ */
+@Component
+public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener {
+ private static final Logger logger = Logger.getLogger(XMPPConnection.class.getName());
+ @Inject
+ JdbcTemplate sql;
+ Stream xmpp;
+ String xmppPassword;
+ MessageSerializer ms;
+ WebsocketComponent wsHandler;
+
+ @Inject
+ public XMPPConnection(Environment env, WebsocketComponent wsHandler) {
+ this.wsHandler = wsHandler;
+ xmppPassword = env.getProperty("xmpp_password");
+ ms = new MessageSerializer();
+ }
+
+ @Override
+ public void run() {
+ try {
+ Socket socket = new Socket("localhost", 5347);
+ xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword);
+ xmpp.addChildParser(new JuickMessage());
+ xmpp.addListener((Stream.StreamListener) this);
+ xmpp.addListener((Message.MessageListener) this);
+ xmpp.startParsing();
+ } catch (IOException e) {
+ logger.log(Level.SEVERE, "XMPPConnection error", e);
+ }
+ }
+
+ @Override
+ public void onStreamReady() {
+ logger.info("XMPP stream ready");
+ }
+
+ @Override
+ public void onStreamFail(Exception ex) {
+ logger.log(Level.SEVERE, "XMPP stream failed", ex);
+ }
+
+ @Override
+ public void onMessage(com.juick.xmpp.Message msg) {
+ JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS);
+ if (jmsg != null) {
+ logger.info("got jmsg: " + ms.serialize(jmsg).toString());
+ if (jmsg.getMID() == 0) {
+ int uid_to = 0;
+ try {
+ uid_to = Integer.parseInt(msg.to.Username);
+ } catch (Exception e) {
+ }
+ if (uid_to > 0) {
+ onJuickPM(uid_to, jmsg);
+ }
+ } else if (jmsg.getRID() == 0) {
+ onJuickMessagePost(jmsg);
+ } else {
+ onJuickMessageReply(jmsg);
+ }
+ }
+ }
+
+ MessageSerializer messageSerializer = new MessageSerializer();
+
+ private void onJuickPM(int uid_to, com.juick.Message jmsg) {
+ String json = messageSerializer.serialize(jmsg).toString();
+ synchronized (wsHandler.clients) {
+ wsHandler.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> {
+ try {
+ logger.info("sending pm to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ }
+ }
+
+ private void onJuickMessagePost(com.juick.Message jmsg) {
+ String json = messageSerializer.serialize(jmsg).toString();
+ List<Integer> uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID())
+ .stream().map(User::getUID).collect(Collectors.toList());
+ synchronized (wsHandler.clients) {
+ wsHandler.clients.stream().filter(c ->
+ (!c.legacy && c.visitor.getUID() == 0) // anonymous users
+ || (!c.legacy && uids.contains(c.visitor.getUID()))) // subscriptions
+ .forEach(c -> {
+ try {
+ logger.info("sending message to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ wsHandler.clients.stream().filter(c ->
+ c.legacy && c.allMessages) // legacy all posts
+ .forEach(c -> {
+ try {
+ logger.info("sending message to legacy client " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ }
+ }
+
+ private void onJuickMessageReply(com.juick.Message jmsg) {
+ String json = messageSerializer.serialize(jmsg).toString();
+ List<Integer> threadUsers =
+ SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID())
+ .stream().map(User::getUID).collect(Collectors.toList());
+ synchronized (wsHandler.clients) {
+ wsHandler.clients.stream().filter(c ->
+ (!c.legacy && c.visitor.getUID() == 0) // anonymous users
+ || (!c.legacy && threadUsers.contains(c.visitor.getUID()))) // subscriptions
+ .forEach(c -> {
+ try {
+ logger.info("sending reply to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ wsHandler.clients.stream().filter(c ->
+ (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())) // legacy replies
+ .forEach(c -> {
+ try {
+ logger.info("sending reply to legacy client " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ }
+ }
+}