diff options
Diffstat (limited to 'src/main/java/com/juick/ws/XMPPConnection.java')
-rw-r--r-- | src/main/java/com/juick/ws/XMPPConnection.java | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java new file mode 100644 index 00000000..9ed3d0cd --- /dev/null +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -0,0 +1,142 @@ +package com.juick.ws; + +import com.juick.User; +import com.juick.json.MessageSerializer; +import com.juick.server.SubscriptionsQueries; +import com.juick.xmpp.JID; +import com.juick.xmpp.Message; +import com.juick.xmpp.Stream; +import com.juick.xmpp.StreamComponent; +import com.juick.xmpp.extensions.JuickMessage; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.TextMessage; + +import javax.inject.Inject; +import java.io.IOException; +import java.net.Socket; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * + * @author ugnich + */ +@Component +public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener { + private static final Logger logger = Logger.getLogger(XMPPConnection.class.getName()); + + JdbcTemplate sql; + Stream xmpp; + String xmppPassword; + MessageSerializer ms; + @Inject + WebsocketComponent ws; + + @Inject + public XMPPConnection(JdbcTemplate sql, String password) { + this.sql = sql; + xmppPassword = password; + ms = new MessageSerializer(); + new Thread(this).start(); + } + + @Override + public void run() { + try { + Socket socket = new Socket("localhost", 5347); + xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword); + xmpp.addChildParser(new JuickMessage()); + xmpp.addListener((Stream.StreamListener) this); + xmpp.addListener((Message.MessageListener) this); + xmpp.startParsing(); + } catch (IOException e) { + logger.log(Level.SEVERE, "XMPPConnection error", e); + } + } + + @Override + public void onStreamReady() { + logger.info("XMPP stream ready"); + } + + @Override + public void onStreamFail(String msg) { + logger.severe("XMPP stream failed: " + msg); + } + + @Override + public void onMessage(com.juick.xmpp.Message msg) { + JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + logger.info("got jmsg: " + ms.serialize(jmsg).toString()); + if (jmsg.getMID() == 0) { + int uid_to = 0; + try { + uid_to = Integer.parseInt(msg.to.Username); + } catch (Exception e) { + } + if (uid_to > 0) { + onJuickPM(uid_to, jmsg); + } + } else if (jmsg.getRID() == 0) { + onJuickMessagePost(jmsg); + } else { + onJuickMessageReply(jmsg); + } + } + } + + MessageSerializer messageSerializer = new MessageSerializer(); + + private void onJuickPM(int uid_to, com.juick.Message jmsg) { + String json = messageSerializer.serialize(jmsg).toString(); + synchronized (ws.clients) { + ws.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> { + try { + logger.info("sending pm to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + }); + } + } + + private void onJuickMessagePost(com.juick.Message jmsg) { + String json = messageSerializer.serialize(jmsg).toString(); + List<Integer> uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()) + .stream().map(User::getUID).collect(Collectors.toList()); + + synchronized (ws.clients) { + ws.clients.stream().filter(c -> !c.legacy && uids.contains(c.visitor.getUID())).forEach(c -> { + try { + logger.info("sending message to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + }); + } + } + + private void onJuickMessageReply(com.juick.Message jmsg) { + String json = messageSerializer.serialize(jmsg).toString(); + + List<Integer> threadUsers = + SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID()) + .stream().map(User::getUID).collect(Collectors.toList()); + synchronized (ws.clients) { + ws.clients.stream().filter(c -> !c.legacy && threadUsers.contains(c.visitor.getUID())).forEach(c -> { + try { + logger.info("sending reply to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + }); + } + } +} |