From de4e52d1004f3a68bd8a863d3a507dc252f20cea Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Thu, 7 Jul 2016 15:52:04 +0300 Subject: reorganize project, part 3 --- .../src/main/java/com/juick/ws/XMPPConnection.java | 165 +++++++++++++++++++++ 1 file changed, 165 insertions(+) create mode 100644 juick-ws/src/main/java/com/juick/ws/XMPPConnection.java (limited to 'juick-ws/src/main/java/com/juick/ws/XMPPConnection.java') diff --git a/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java b/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java new file mode 100644 index 00000000..4a80eec5 --- /dev/null +++ b/juick-ws/src/main/java/com/juick/ws/XMPPConnection.java @@ -0,0 +1,165 @@ +package com.juick.ws; + +import com.juick.User; +import com.juick.json.MessageSerializer; +import com.juick.server.SubscriptionsQueries; +import com.juick.xmpp.JID; +import com.juick.xmpp.Message; +import com.juick.xmpp.Stream; +import com.juick.xmpp.StreamComponent; +import com.juick.xmpp.extensions.JuickMessage; +import org.springframework.core.env.Environment; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.TextMessage; + +import javax.inject.Inject; +import java.io.IOException; +import java.net.Socket; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * + * @author ugnich + */ +@Component +public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener { + private static final Logger logger = Logger.getLogger(XMPPConnection.class.getName()); + @Inject + JdbcTemplate sql; + Stream xmpp; + String xmppPassword; + MessageSerializer ms; + WebsocketComponent wsHandler; + + @Inject + public XMPPConnection(Environment env, WebsocketComponent wsHandler) { + this.wsHandler = wsHandler; + xmppPassword = env.getProperty("xmpp_password"); + ms = new MessageSerializer(); + } + + @Override + public void run() { + try { + Socket socket = new Socket("localhost", 5347); + xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword); + xmpp.addChildParser(new JuickMessage()); + xmpp.addListener((Stream.StreamListener) this); + xmpp.addListener((Message.MessageListener) this); + xmpp.startParsing(); + } catch (IOException e) { + logger.log(Level.SEVERE, "XMPPConnection error", e); + } + } + + @Override + public void onStreamReady() { + logger.info("XMPP stream ready"); + } + + @Override + public void onStreamFail(Exception ex) { + logger.log(Level.SEVERE, "XMPP stream failed", ex); + } + + @Override + public void onMessage(com.juick.xmpp.Message msg) { + JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + logger.info("got jmsg: " + ms.serialize(jmsg).toString()); + if (jmsg.getMID() == 0) { + int uid_to = 0; + try { + uid_to = Integer.parseInt(msg.to.Username); + } catch (Exception e) { + } + if (uid_to > 0) { + onJuickPM(uid_to, jmsg); + } + } else if (jmsg.getRID() == 0) { + onJuickMessagePost(jmsg); + } else { + onJuickMessageReply(jmsg); + } + } + } + + MessageSerializer messageSerializer = new MessageSerializer(); + + private void onJuickPM(int uid_to, com.juick.Message jmsg) { + String json = messageSerializer.serialize(jmsg).toString(); + synchronized (wsHandler.clients) { + wsHandler.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> { + try { + logger.info("sending pm to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + }); + } + } + + private void onJuickMessagePost(com.juick.Message jmsg) { + String json = messageSerializer.serialize(jmsg).toString(); + List uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()) + .stream().map(User::getUID).collect(Collectors.toList()); + synchronized (wsHandler.clients) { + wsHandler.clients.stream().filter(c -> + (!c.legacy && c.visitor.getUID() == 0) // anonymous users + || (!c.legacy && uids.contains(c.visitor.getUID()))) // subscriptions + .forEach(c -> { + try { + logger.info("sending message to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + }); + wsHandler.clients.stream().filter(c -> + c.legacy && c.allMessages) // legacy all posts + .forEach(c -> { + try { + logger.info("sending message to legacy client " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + }); + } + } + + private void onJuickMessageReply(com.juick.Message jmsg) { + String json = messageSerializer.serialize(jmsg).toString(); + List threadUsers = + SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID()) + .stream().map(User::getUID).collect(Collectors.toList()); + synchronized (wsHandler.clients) { + wsHandler.clients.stream().filter(c -> + (!c.legacy && c.visitor.getUID() == 0) // anonymous users + || (!c.legacy && threadUsers.contains(c.visitor.getUID()))) // subscriptions + .forEach(c -> { + try { + logger.info("sending reply to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + }); + wsHandler.clients.stream().filter(c -> + (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())) // legacy replies + .forEach(c -> { + try { + logger.info("sending reply to legacy client " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + }); + } + } +} -- cgit v1.2.3