aboutsummaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/juick/jabber/ws/Main.java68
-rw-r--r--src/main/java/com/juick/jabber/ws/SocketSubscribed.java51
-rw-r--r--src/main/java/com/juick/jabber/ws/WSConnections.java44
-rw-r--r--src/main/java/com/juick/jabber/ws/WSData.java276
-rw-r--r--src/main/java/com/juick/jabber/ws/WSKeepAlive.java102
-rw-r--r--src/main/java/com/juick/jabber/ws/XMPPConnection.java166
6 files changed, 707 insertions, 0 deletions
diff --git a/src/main/java/com/juick/jabber/ws/Main.java b/src/main/java/com/juick/jabber/ws/Main.java
new file mode 100644
index 00000000..a7aea543
--- /dev/null
+++ b/src/main/java/com/juick/jabber/ws/Main.java
@@ -0,0 +1,68 @@
+/*
+ * Juick
+ * Copyright (C) 2008-2011, Ugnich Anton
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+package com.juick.jabber.ws;
+
+import java.io.FileInputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Properties;
+
+/**
+ *
+ * @author Ugnich Anton
+ */
+public class Main {
+
+ Connection sql;
+ XMPPConnection xmpp;
+ public static String STATSFILE = null;
+ public final static ArrayList<SocketSubscribed> clients = new ArrayList<SocketSubscribed>();
+
+ public static void main(String[] args) {
+ new Main().start();
+ }
+
+ public void start() {
+ try {
+ Properties conf = new Properties();
+ conf.load(new FileInputStream("/etc/juick/ws.conf"));
+
+ STATSFILE = conf.getProperty("statsfile");
+
+ setupSql(conf.getProperty("mysql_username", ""), conf.getProperty("mysql_password", ""));
+ xmpp = new XMPPConnection(sql, conf.getProperty("xmpp_password", ""));
+ new Thread(xmpp).start();
+
+ //new Thread(new WSConnections()).start();
+ new Thread(new WSData(sql)).start();
+ new Thread(new WSKeepAlive(sql)).start();
+ } catch (Exception e) {
+ System.err.println(e);
+ }
+ }
+
+ public void setupSql(String username, String password) {
+ try {
+ sql = DriverManager.getConnection("jdbc:mysql://localhost/juick?autoReconnect=true&user=" + username + "&password=" + password);
+ } catch (SQLException e) {
+ System.err.println(e);
+ }
+ }
+} \ No newline at end of file
diff --git a/src/main/java/com/juick/jabber/ws/SocketSubscribed.java b/src/main/java/com/juick/jabber/ws/SocketSubscribed.java
new file mode 100644
index 00000000..6144380c
--- /dev/null
+++ b/src/main/java/com/juick/jabber/ws/SocketSubscribed.java
@@ -0,0 +1,51 @@
+package com.juick.jabber.ws;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+/**
+ *
+ * @author ugnich
+ */
+public class SocketSubscribed {
+
+ public SocketChannel sock = null;
+ public String clientName = null;
+ public int VUID = 0;
+ public int UID = 0;
+ public int MID = 0;
+ public boolean allMessages = false;
+ public boolean allReplies = false;
+ public long tsConnected;
+ public long tsLastData;
+
+ public SocketSubscribed(SocketChannel sock, String clientName, int VUID) {
+ this.sock = sock;
+ this.clientName = clientName;
+ this.VUID = VUID;
+ tsConnected = tsLastData = System.currentTimeMillis();
+ }
+
+ public boolean sendByteBuffer(ByteBuffer bb) {
+ boolean ret = false;
+ bb.rewind();
+ try {
+ sock.write(bb);
+ ret = true;
+ } catch (Exception e) {
+ close();
+ }
+ return ret;
+ }
+
+ public void close() {
+ try {
+ sock.socket().close();
+ } catch (Exception e) {
+ }
+ try {
+ sock.close();
+ } catch (Exception e) {
+ }
+ }
+}
diff --git a/src/main/java/com/juick/jabber/ws/WSConnections.java b/src/main/java/com/juick/jabber/ws/WSConnections.java
new file mode 100644
index 00000000..15fbe4e8
--- /dev/null
+++ b/src/main/java/com/juick/jabber/ws/WSConnections.java
@@ -0,0 +1,44 @@
+package com.juick.jabber.ws;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+
+/**
+ *
+ * @author ugnich
+ */
+public class WSConnections implements Runnable {
+
+ Selector sel;
+
+ @Override
+ public void run() {
+ try {
+ sel = Selector.open();
+ ServerSocketChannel listensock = ServerSocketChannel.open();
+ listensock.configureBlocking(false);
+ listensock.socket().bind(new InetSocketAddress(8081));
+ listensock.register(sel, SelectionKey.OP_ACCEPT);
+
+ while (true) {
+ sel.select();
+ Iterator<SelectionKey> it = sel.selectedKeys().iterator();
+ while (it.hasNext()) {
+ SelectionKey selKey = it.next();
+ it.remove();
+ ServerSocketChannel ssChannel = (ServerSocketChannel) selKey.channel();
+ SocketChannel sChannel = ssChannel.accept();
+ sChannel.configureBlocking(false);
+ sChannel.register(sel, SelectionKey.OP_READ);
+ System.out.println(sChannel.socket().getRemoteSocketAddress().toString() + " ACCEPTED");
+ }
+ }
+ } catch (Exception e) {
+ System.err.println("WSConnections: " + e);
+ }
+ }
+}
diff --git a/src/main/java/com/juick/jabber/ws/WSData.java b/src/main/java/com/juick/jabber/ws/WSData.java
new file mode 100644
index 00000000..d77257e6
--- /dev/null
+++ b/src/main/java/com/juick/jabber/ws/WSData.java
@@ -0,0 +1,276 @@
+package com.juick.jabber.ws;
+
+import com.juick.server.MessagesQueries;
+import com.juick.server.UserQueries;
+import com.juick.xmpp.utils.Base64;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.charset.Charset;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.sql.Connection;
+import java.util.Iterator;
+
+/**
+ *
+ * @author ugnich
+ */
+public class WSData implements Runnable {
+
+ static final String WEBSOCKET_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+ Connection sql;
+ public Selector sel;
+
+ public WSData(Connection sql) {
+ this.sql = sql;
+ }
+
+ @Override
+ public void run() {
+ try {
+ sel = Selector.open();
+ ServerSocketChannel listensock = ServerSocketChannel.open();
+ listensock.configureBlocking(false);
+ listensock.socket().bind(new InetSocketAddress(8081));
+ listensock.register(sel, SelectionKey.OP_ACCEPT);
+
+ while (true) {
+ sel.select();
+ Iterator<SelectionKey> it = sel.selectedKeys().iterator();
+ while (it.hasNext()) {
+ SelectionKey selKey = it.next();
+ it.remove();
+
+ if (selKey.isAcceptable()) {
+ ServerSocketChannel ssChannel = (ServerSocketChannel) selKey.channel();
+ SocketChannel sChannel = ssChannel.accept();
+ sChannel.configureBlocking(false);
+ sChannel.register(sel, SelectionKey.OP_READ);
+ System.out.println(sChannel.socket().getRemoteSocketAddress().toString() + " ACCEPTED");
+ } else if (selKey.isReadable()) {
+ SocketChannel sChannel = (SocketChannel) selKey.channel();
+ ByteBuffer buf = ByteBuffer.allocate(10240);
+ try {
+ int readbytes = sChannel.read(buf);
+ if (readbytes > 0) {
+ buf.flip();
+
+ CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf);
+ System.out.println("DATA: " + charbuf.toString());
+ buf.rewind();
+
+ switch (buf.get(0)) {
+ case (byte) 0x89: // PING
+ updateSocketTS(sChannel);
+ wsPing(sChannel);
+ break;
+ case (byte) 0x8A: // PONG
+ updateSocketTS(sChannel);
+ break;
+ case (byte) 0x81: // TEXT FRAME
+ updateSocketTS(sChannel);
+ wsTextFrame(sChannel, buf);
+ break;
+ case (byte) 'G': // HTTP
+ updateSocketTS(sChannel);
+ wsHandshake(sChannel, buf);
+ break;
+ case (byte) 0x88: // CONNECTION CLOSE
+ throw new IOException(sChannel.socket().getRemoteSocketAddress().toString() + " CONNECTION CLOSE");
+ }
+ } else if (readbytes < 0) {
+ throw new IOException(sChannel.socket().getRemoteSocketAddress().toString() + " END OF STREAM");
+ }
+ } catch (IOException e) {
+ System.err.println("WSData: " + e);
+ sChannel.socket().close();
+ sChannel.close();
+ selKey.cancel();
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ System.err.println("WSData: " + e);
+ }
+ }
+
+ public void wsHandshake(SocketChannel sock, ByteBuffer buf) throws Exception {
+ String hOrigin = null;
+ String hHost = null;
+ String hLocation = null;
+ String hSecWebSocketKey = null;
+ String hSecWebSocketVersion = null;
+ String hXRealIP = null;
+
+ buf.rewind();
+ CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf);
+ String headers[] = charbuf.toString().split("\r\n");
+ for (int i = 0; i < headers.length; i++) {
+ String h[] = headers[i].split(" ", 2);
+ if (h.length == 2) {
+ if (h[0].equals("GET")) {
+ hLocation = headers[i].split(" ", 3)[1];
+ } else if (h[0].equals("Origin:")) {
+ hOrigin = h[1];
+ } else if (h[0].equals("Host:")) {
+ hHost = h[1];
+ } else if (h[0].equals("Sec-WebSocket-Key:")) {
+ hSecWebSocketKey = h[1];
+ } else if (h[0].equals("Sec-WebSocket-Version:")) {
+ hSecWebSocketVersion = h[1];
+ } else if (h[0].equals("X-Real-IP:")) {
+ hXRealIP = h[1];
+ }
+ }
+ }
+
+ if (hOrigin == null || hHost == null || hLocation == null || hSecWebSocketKey == null || hSecWebSocketVersion == null || !hSecWebSocketVersion.equals("13")) {
+ throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " Invalid headers");
+ }
+
+ // Auth
+ int VUID = 0;
+ int hashloc = hLocation.indexOf("hash=");
+ if (hashloc > 0) {
+ String hash = hLocation.substring(hashloc + 5);
+ if (hash.indexOf('&') > 0) {
+ hash = hash.substring(0, hash.indexOf('&'));
+ }
+ if (hash.length() == 16) {
+ VUID = com.juick.server.UserQueries.getUIDbyHash(sql, hash);
+ }
+ }
+
+ // URL
+ int hLocationQM = hLocation.indexOf('?');
+ if (hLocationQM > 0) {
+ hLocation = hLocation.substring(0, hLocationQM);
+ }
+
+ int MID = 0;
+ int responseCode = 404;
+ SocketSubscribed sockSubscr = null;
+ if (hLocation.equals("/") && VUID > 0) {
+ sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
+ responseCode = 101;
+ } else if (hLocation.equals("/_all")) {
+ sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
+ sockSubscr.allMessages = true;
+ responseCode = 101;
+ } else if (hLocation.equals("/_replies")) {
+ sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
+ sockSubscr.allReplies = true;
+ responseCode = 101;
+ } else if (hLocation.matches("^/\\d+$")) {
+ try {
+ MID = Integer.parseInt(hLocation.substring(1));
+ } catch (Exception e) {
+ }
+ if (MID > 0) {
+ if (MessagesQueries.canViewThread(sql, MID, VUID)) {
+ sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
+ sockSubscr.MID = MID;
+ responseCode = 101;
+ } else {
+ responseCode = 403;
+ }
+ }
+ } else if (hLocation.matches("^/[a-zA-Z0-9\\-]{2,16}/?$")) {
+ String uname;
+ if (hLocation.endsWith("/")) {
+ uname = hLocation.substring(1, hLocation.length() - 2);
+ } else {
+ uname = hLocation.substring(1);
+ }
+
+ int UID = UserQueries.getUIDbyName(sql, uname);
+ if (UID > 0) {
+ // check access
+ sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
+ sockSubscr.UID = UID;
+ responseCode = 101;
+ }
+ }
+ if (sockSubscr != null) {
+ synchronized (Main.clients) {
+ Main.clients.add(sockSubscr);
+ }
+ }
+
+ // Response
+ String outstr;
+ if (responseCode == 101) {
+ outstr = "HTTP/1.1 101 Switching Protocols\r\n"
+ + "Upgrade: websocket\r\n"
+ + "Connection: Upgrade\r\n"
+ + "Sec-WebSocket-Accept: " + calcHeaderAccept(hSecWebSocketKey) + "\r\n"
+ + "\r\n";
+ } else if (responseCode == 403) {
+ outstr = "HTTP/1.1 403 Forbidden\r\n\r\n";
+ } else {
+ outstr = "HTTP/1.1 404 Not Found\r\n\r\n";
+ }
+ ByteBuffer out = ByteBuffer.allocate(1024);
+ out.put(Charset.forName("ISO-8859-1").encode(outstr));
+ out.flip();
+ sock.write(out);
+
+ if (responseCode == 101) {
+ System.out.println(sock.socket().getRemoteSocketAddress().toString() + " HANDSHAKE (VUID = " + VUID + "; MID = " + MID + ")");
+ } else {
+ throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " " + responseCode);
+ }
+ }
+
+ private String calcHeaderAccept(String key) {
+ String base = key + WEBSOCKET_GUID;
+ try {
+ MessageDigest md = MessageDigest.getInstance("SHA-1");
+ return Base64.encode(md.digest(base.getBytes()));
+ } catch (NoSuchAlgorithmException e) {
+ System.err.println("calcHeaderAccept: " + e);
+ }
+ return "";
+ }
+
+ public void wsPing(SocketChannel sock) throws Exception {
+ ByteBuffer out = ByteBuffer.allocate(2);
+ out.put((byte) 0x8A); // PONG FRAME
+ out.put((byte) 0x00); // 1 byte long
+ out.flip();
+ out.rewind();
+ sock.write(out);
+ }
+
+ public void wsTextFrame(SocketChannel sock, ByteBuffer buf) throws Exception {
+ /*
+ ByteBuffer out = ByteBuffer.allocate(3);
+ out.put((byte) 0x81); // TEXT FRAME
+ out.put((byte) 0x01); // 1 byte long
+ out.put((byte) 0x20); // ' '
+ out.flip();
+ out.rewind();
+ sock.write(out);
+ */
+ }
+
+ public void updateSocketTS(SocketChannel sock) {
+ synchronized (Main.clients) {
+ Iterator<SocketSubscribed> i = Main.clients.iterator();
+ while (i.hasNext()) {
+ SocketSubscribed s = i.next();
+ if (s.sock == sock) {
+ s.tsLastData = System.currentTimeMillis();
+ break;
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/juick/jabber/ws/WSKeepAlive.java b/src/main/java/com/juick/jabber/ws/WSKeepAlive.java
new file mode 100644
index 00000000..2deef594
--- /dev/null
+++ b/src/main/java/com/juick/jabber/ws/WSKeepAlive.java
@@ -0,0 +1,102 @@
+package com.juick.jabber.ws;
+
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.util.Iterator;
+
+/**
+ *
+ * @author ugnich
+ */
+public class WSKeepAlive implements Runnable {
+
+ Connection sql;
+ ByteBuffer pingBytes;
+ ByteBuffer closeBytes;
+
+ public WSKeepAlive(Connection sql) {
+ this.sql = sql;
+
+ //pingBytes = ByteBuffer.allocate(2);
+ //pingBytes.put((byte) 0x8A); // PONG FRAME
+ //pingBytes.put((byte) 0x00); // 0 byte long
+ pingBytes = ByteBuffer.allocate(3);
+ pingBytes.put((byte) 0x81); // TEXT FRAME
+ pingBytes.put((byte) 0x01); // 1 byte long
+ pingBytes.put((byte) 0x20); // ' '
+ pingBytes.flip();
+
+ closeBytes = ByteBuffer.allocate(2);
+ closeBytes.put((byte) 0x88); // CLOSE FRAME
+ closeBytes.put((byte) 0x00); // 0 byte long
+ closeBytes.flip();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ PrintWriter statsFile = null;
+
+ if (Main.STATSFILE != null) {
+ try {
+ statsFile = new PrintWriter(Main.STATSFILE, "UTF-8");
+ } catch (Exception e) {
+ statsFile = null;
+ System.err.println("WSKeepAlive statsFile: " + e);
+ }
+ }
+
+ long now = System.currentTimeMillis();
+
+ synchronized (Main.clients) {
+ if (statsFile != null) {
+ statsFile.write("<html><body><h1>Connections (" + Main.clients.size() + ")</h2><table border=1><tr><th>IP</th><th>inactive</th><th>VUID</th><th>UID</th><th>MID</th><th>allM</th><th>allR</th></tr>");
+ }
+
+ for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) {
+ SocketSubscribed s = i.next();
+ int inactive = (int) ((double) (now - s.tsLastData) / 1000.0);
+
+ if (statsFile != null) {
+ try {
+ statsFile.print("<tr><td>" + (s.clientName != null ? s.clientName : "?") + "</td>");
+ statsFile.print("<td>" + inactive + "</td>");
+ statsFile.print("<td>" + (s.VUID > 0 ? s.VUID : "") + "</td>");
+ statsFile.print("<td>" + (s.UID > 0 ? s.UID : "") + "</td>");
+ statsFile.print("<td>" + (s.MID > 0 ? s.MID : "") + "</td>");
+ statsFile.print("<td>" + (s.allMessages ? "+" : "") + "</td>");
+ statsFile.print("<td>" + (s.allReplies ? "+" : "") + "</td></tr>");
+ } catch (Exception e) {
+ System.err.println("WSKeepAlive statsFile print: " + e);
+ }
+ }
+
+ if (inactive > 180) {
+ s.sendByteBuffer(closeBytes);
+ s.close();
+ i.remove();
+ } else if (inactive > 60) {
+ if (!s.sendByteBuffer(pingBytes)) {
+ i.remove();
+ }
+ }
+ }
+ }
+
+ if (Main.STATSFILE != null) {
+ try {
+ statsFile.write("</table></body></html>");
+ statsFile.close();
+ } catch (Exception e) {
+ System.err.println("WSKeepAlive statsFile close: " + e);
+ }
+ }
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+}
diff --git a/src/main/java/com/juick/jabber/ws/XMPPConnection.java b/src/main/java/com/juick/jabber/ws/XMPPConnection.java
new file mode 100644
index 00000000..39815da5
--- /dev/null
+++ b/src/main/java/com/juick/jabber/ws/XMPPConnection.java
@@ -0,0 +1,166 @@
+package com.juick.jabber.ws;
+
+import com.juick.server.MessagesQueries;
+import com.juick.server.Utils;
+import com.juick.xmpp.JID;
+import com.juick.xmpp.Message;
+import com.juick.xmpp.Stream;
+import com.juick.xmpp.StreamComponent;
+import com.juick.xmpp.extensions.JuickMessage;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+/**
+ *
+ * @author ugnich
+ */
+public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener {
+
+ Connection sql;
+ Stream xmpp;
+ String xmppPassword;
+
+ public XMPPConnection(Connection sql, String password) {
+ this.sql = sql;
+ xmppPassword = password;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Socket socket = new Socket("localhost", 5347);
+ xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword);
+ xmpp.addChildParser(new JuickMessage());
+ xmpp.addListener((Stream.StreamListener) this);
+ xmpp.addListener((Message.MessageListener) this);
+ xmpp.startParsing();
+ } catch (IOException e) {
+ System.err.println("XMPPConnection: " + e);
+ }
+ }
+
+ @Override
+ public void onStreamReady() {
+ System.err.println("Stream ready");
+ }
+
+ @Override
+ public void onStreamFail(String msg) {
+ System.err.println("Stream failed: " + msg);
+ }
+
+ @Override
+ public void onMessage(com.juick.xmpp.Message msg) {
+ JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS);
+ if (jmsg != null) {
+ System.err.println("MID=" + jmsg.MID + "; RID=" + jmsg.RID);
+ if (jmsg.MID == 0) {
+ int uid_to = 0;
+ try {
+ uid_to = Integer.parseInt(msg.to.Username);
+ } catch (Exception e) {
+ }
+ if (uid_to > 0) {
+ onJuickPM(uid_to, jmsg);
+ }
+ } else if (jmsg.RID == 0) {
+ onJuickMessagePost(jmsg);
+ } else {
+ onJuickMessageReply(jmsg);
+ }
+ }
+ }
+
+ private void onJuickPM(int uid_to, com.juick.Message jmsg) {
+ String json = com.juick.json.Message.toJSON(jmsg).toString();
+ ByteBuffer bbMsg = buildTextFrame(json);
+
+ synchronized (Main.clients) {
+ for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) {
+ SocketSubscribed s = i.next();
+ if (s.VUID == uid_to && s.MID == 0 && s.allMessages == false && s.allReplies == false) {
+ if (!s.sendByteBuffer(bbMsg)) {
+ i.remove();
+ }
+ }
+ }
+ }
+ }
+
+ private void onJuickMessagePost(com.juick.Message jmsg) {
+ String json = com.juick.json.Message.toJSON(jmsg).toString();
+ ByteBuffer bbMsg = buildTextFrame(json);
+
+ ArrayList<Integer> uids = new ArrayList<Integer>();
+ String query = "SELECT suser_id FROM subscr_users WHERE user_id=" + jmsg.User.UID + " AND suser_id NOT IN (SELECT user_id FROM bl_tags INNER JOIN messages_tags USING(tag_id) WHERE message_id=" + jmsg.MID + ")";
+ if (jmsg.Privacy < 0) {
+ query += " AND suser_id IN (SELECT wl_user_id FROM wl_users WHERE user_id=" + jmsg.User.UID + ")";
+ }
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ stmt = sql.createStatement();
+ rs = stmt.executeQuery(query);
+ rs.beforeFirst();
+ while (rs.next()) {
+ uids.add(rs.getInt(1));
+ }
+ } catch (SQLException e) {
+ System.err.println("onJuickMessagePost: " + e);
+ } finally {
+ Utils.finishSQL(rs, stmt);
+ }
+
+ synchronized (Main.clients) {
+ for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) {
+ SocketSubscribed s = i.next();
+ if (s.MID == 0 && s.allReplies == false && ((jmsg.Privacy >= 0 && (s.allMessages || s.UID == jmsg.User.UID)) || uids.contains(s.VUID))) {
+ if (!s.sendByteBuffer(bbMsg)) {
+ i.remove();
+ }
+ }
+ }
+ }
+ }
+
+ private void onJuickMessageReply(com.juick.Message jmsg) {
+ String json = com.juick.json.Message.toJSON(jmsg).toString();
+ ByteBuffer bbMsg = buildTextFrame(json);
+
+ int privacy = MessagesQueries.getMessagePrivacy(sql, jmsg.MID);
+
+ synchronized (Main.clients) {
+ for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) {
+ SocketSubscribed s = i.next();
+ if ((privacy >= 0 && s.allReplies) || s.MID == jmsg.MID) {
+ if (!s.sendByteBuffer(bbMsg)) {
+ i.remove();
+ }
+ }
+ }
+ }
+ }
+
+ private ByteBuffer buildTextFrame(String json) {
+ ByteBuffer jsonbytes = Charset.forName("UTF-8").encode(json);
+ ByteBuffer bbMsg = ByteBuffer.allocate(jsonbytes.limit() + 8);
+ bbMsg.put((byte) 0x81);
+ if (jsonbytes.limit() <= 125) {
+ bbMsg.put((byte) jsonbytes.limit());
+ } else {
+ bbMsg.put((byte) 126);
+ bbMsg.putShort((short) jsonbytes.limit());
+ }
+ bbMsg.put(jsonbytes);
+ bbMsg.flip();
+ return bbMsg;
+ }
+}