package com.juick.jabber.ws; import com.juick.User; import com.juick.json.MessageSerializer; import com.juick.server.MessagesQueries; import com.juick.server.SubscriptionsQueries; import com.juick.xmpp.JID; import com.juick.xmpp.Message; import com.juick.xmpp.Stream; import com.juick.xmpp.StreamComponent; import com.juick.xmpp.extensions.JuickMessage; import org.springframework.jdbc.core.JdbcTemplate; import java.io.IOException; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; /** * * @author ugnich */ public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener { JdbcTemplate sql; Stream xmpp; String xmppPassword; public XMPPConnection(JdbcTemplate sql, String password) { this.sql = sql; xmppPassword = password; } @Override public void run() { try { Socket socket = new Socket("localhost", 5347); xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword); xmpp.addChildParser(new JuickMessage()); xmpp.addListener((Stream.StreamListener) this); xmpp.addListener((Message.MessageListener) this); xmpp.startParsing(); } catch (IOException e) { System.err.println("XMPPConnection: " + e); } } @Override public void onStreamReady() { System.err.println("Stream ready"); } @Override public void onStreamFail(String msg) { System.err.println("Stream failed: " + msg); } @Override public void onMessage(com.juick.xmpp.Message msg) { JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS); if (jmsg != null) { System.err.println("MID=" + jmsg.getMID() + "; RID=" + jmsg.getRID()); if (jmsg.getMID() == 0) { int uid_to = 0; try { uid_to = Integer.parseInt(msg.to.Username); } catch (Exception e) { } if (uid_to > 0) { onJuickPM(uid_to, jmsg); } } else if (jmsg.getRID() == 0) { onJuickMessagePost(jmsg); } else { onJuickMessageReply(jmsg); } } } MessageSerializer messageSerializer = new MessageSerializer(); private void onJuickPM(int uid_to, com.juick.Message jmsg) { String json = messageSerializer.serialize(jmsg).toString(); ByteBuffer bbMsg = buildTextFrame(json); synchronized (Main.clients) { for (Iterator i = Main.clients.iterator(); i.hasNext();) { SocketSubscribed s = i.next(); if (s.VUID == uid_to && s.MID == 0 && s.allMessages == false && s.allReplies == false) { if (!s.sendByteBuffer(bbMsg)) { i.remove(); } } } } } private void onJuickMessagePost(com.juick.Message jmsg) { String json = messageSerializer.serialize(jmsg).toString(); ByteBuffer bbMsg = buildTextFrame(json); List uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()) .stream().map(User::getUID).collect(Collectors.toList()); synchronized (Main.clients) { for (Iterator i = Main.clients.iterator(); i.hasNext();) { SocketSubscribed s = i.next(); if (s.MID == 0 && s.allReplies == false && ((jmsg.Privacy >= 0 && (s.allMessages || s.UID == jmsg.getUser().getUID())) || uids.contains(s.VUID))) { if (!s.sendByteBuffer(bbMsg)) { i.remove(); } } } } } private void onJuickMessageReply(com.juick.Message jmsg) { String json = messageSerializer.serialize(jmsg).toString(); ByteBuffer bbMsg = buildTextFrame(json); int privacy = MessagesQueries.getMessagePrivacy(sql, jmsg.getMID()); synchronized (Main.clients) { for (Iterator i = Main.clients.iterator(); i.hasNext();) { SocketSubscribed s = i.next(); if ((privacy >= 0 && s.allReplies) || s.MID == jmsg.getMID()) { if (!s.sendByteBuffer(bbMsg)) { i.remove(); } } } } } private ByteBuffer buildTextFrame(String json) { ByteBuffer jsonbytes = Charset.forName("UTF-8").encode(json); ByteBuffer bbMsg = ByteBuffer.allocate(jsonbytes.limit() + 8); bbMsg.put((byte) 0x81); if (jsonbytes.limit() <= 125) { bbMsg.put((byte) jsonbytes.limit()); } else { bbMsg.put((byte) 126); bbMsg.putShort((short) jsonbytes.limit()); } bbMsg.put(jsonbytes); bbMsg.flip(); return bbMsg; } }