diff options
Diffstat (limited to 'src/com/juick/jabber/ws')
-rw-r--r-- | src/com/juick/jabber/ws/Main.java | 10 | ||||
-rw-r--r-- | src/com/juick/jabber/ws/SocketSubscribed.java | 15 | ||||
-rw-r--r-- | src/com/juick/jabber/ws/WSConnections.java | 4 | ||||
-rw-r--r-- | src/com/juick/jabber/ws/WSData.java | 337 | ||||
-rw-r--r-- | src/com/juick/jabber/ws/WSKeepAlive.java | 106 | ||||
-rw-r--r-- | src/com/juick/jabber/ws/XMPPConnection.java | 179 |
6 files changed, 379 insertions, 272 deletions
diff --git a/src/com/juick/jabber/ws/Main.java b/src/com/juick/jabber/ws/Main.java index 7036b0d4..a7aea543 100644 --- a/src/com/juick/jabber/ws/Main.java +++ b/src/com/juick/jabber/ws/Main.java @@ -32,9 +32,8 @@ public class Main { Connection sql; XMPPConnection xmpp; - public final static ArrayList<SocketSubscribed> sockReplies = new ArrayList<SocketSubscribed>(); - public final static ArrayList<SocketSubscribed> sockMessages = new ArrayList<SocketSubscribed>(); - public final static ArrayList<SocketSubscribed> sockAll = new ArrayList<SocketSubscribed>(); + public static String STATSFILE = null; + public final static ArrayList<SocketSubscribed> clients = new ArrayList<SocketSubscribed>(); public static void main(String[] args) { new Main().start(); @@ -45,11 +44,14 @@ public class Main { Properties conf = new Properties(); conf.load(new FileInputStream("/etc/juick/ws.conf")); + STATSFILE = conf.getProperty("statsfile"); + setupSql(conf.getProperty("mysql_username", ""), conf.getProperty("mysql_password", "")); xmpp = new XMPPConnection(sql, conf.getProperty("xmpp_password", "")); new Thread(xmpp).start(); + + //new Thread(new WSConnections()).start(); new Thread(new WSData(sql)).start(); - new Thread(new WSConnections()).start(); new Thread(new WSKeepAlive(sql)).start(); } catch (Exception e) { System.err.println(e); diff --git a/src/com/juick/jabber/ws/SocketSubscribed.java b/src/com/juick/jabber/ws/SocketSubscribed.java index 29baf0b8..f865ce6a 100644 --- a/src/com/juick/jabber/ws/SocketSubscribed.java +++ b/src/com/juick/jabber/ws/SocketSubscribed.java @@ -9,14 +9,19 @@ import java.nio.channels.SocketChannel; public class SocketSubscribed { public SocketChannel sock = null; + public String clientName = null; + public int VUID = 0; public int UID = 0; public int MID = 0; - public long tsConnected = 0; - public long tsLastData = 0; + public boolean allMessages = false; + public boolean allReplies = false; + public long tsConnected; + public long tsLastData; - public SocketSubscribed(SocketChannel sock, int UID, int MID) { + public SocketSubscribed(SocketChannel sock, String clientName, int VUID) { this.sock = sock; - this.UID = UID; - this.MID = MID; + this.clientName = clientName; + this.VUID = VUID; + tsConnected = tsLastData = System.currentTimeMillis(); } } diff --git a/src/com/juick/jabber/ws/WSConnections.java b/src/com/juick/jabber/ws/WSConnections.java index 020442f7..15fbe4e8 100644 --- a/src/com/juick/jabber/ws/WSConnections.java +++ b/src/com/juick/jabber/ws/WSConnections.java @@ -32,9 +32,9 @@ public class WSConnections implements Runnable { it.remove(); ServerSocketChannel ssChannel = (ServerSocketChannel) selKey.channel(); SocketChannel sChannel = ssChannel.accept(); - System.out.println(sChannel.socket().getRemoteSocketAddress().toString() + " ACCEPTED"); sChannel.configureBlocking(false); - sChannel.register(WSData.sel, SelectionKey.OP_READ); + sChannel.register(sel, SelectionKey.OP_READ); + System.out.println(sChannel.socket().getRemoteSocketAddress().toString() + " ACCEPTED"); } } } catch (Exception e) { diff --git a/src/com/juick/jabber/ws/WSData.java b/src/com/juick/jabber/ws/WSData.java index 97193510..d77257e6 100644 --- a/src/com/juick/jabber/ws/WSData.java +++ b/src/com/juick/jabber/ws/WSData.java @@ -1,14 +1,19 @@ package com.juick.jabber.ws; +import com.juick.server.MessagesQueries; +import com.juick.server.UserQueries; +import com.juick.xmpp.utils.Base64; import java.io.IOException; -import java.math.BigInteger; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.sql.Connection; import java.util.Iterator; @@ -17,46 +22,77 @@ import java.util.Iterator; * @author ugnich */ public class WSData implements Runnable { - + + static final String WEBSOCKET_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; Connection sql; - public static Selector sel; - + public Selector sel; + public WSData(Connection sql) { this.sql = sql; } - + @Override public void run() { try { sel = Selector.open(); + ServerSocketChannel listensock = ServerSocketChannel.open(); + listensock.configureBlocking(false); + listensock.socket().bind(new InetSocketAddress(8081)); + listensock.register(sel, SelectionKey.OP_ACCEPT); + while (true) { sel.select(); Iterator<SelectionKey> it = sel.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey selKey = it.next(); it.remove(); - - SocketChannel sChannel = (SocketChannel) selKey.channel(); - ByteBuffer buf = ByteBuffer.allocate(10240); - try { - if (sChannel.read(buf) > 0) { - buf.flip(); - CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf); - if (charbuf.charAt(0) == 0 && charbuf.charAt(charbuf.length() - 1) == 0xFF) { - wsTextFrame(sChannel, charbuf.subSequence(1, charbuf.length() - 2)); - } else if (charbuf.charAt(0) == 'G' && charbuf.charAt(1) == 'E' && charbuf.charAt(2) == 'T' && charbuf.charAt(3) == ' ') { - wsHandshake(sChannel, buf); - } else { - throw new IOException(sChannel.socket().getRemoteSocketAddress().toString() + " INVALID FRAME"); + + if (selKey.isAcceptable()) { + ServerSocketChannel ssChannel = (ServerSocketChannel) selKey.channel(); + SocketChannel sChannel = ssChannel.accept(); + sChannel.configureBlocking(false); + sChannel.register(sel, SelectionKey.OP_READ); + System.out.println(sChannel.socket().getRemoteSocketAddress().toString() + " ACCEPTED"); + } else if (selKey.isReadable()) { + SocketChannel sChannel = (SocketChannel) selKey.channel(); + ByteBuffer buf = ByteBuffer.allocate(10240); + try { + int readbytes = sChannel.read(buf); + if (readbytes > 0) { + buf.flip(); + + CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf); + System.out.println("DATA: " + charbuf.toString()); + buf.rewind(); + + switch (buf.get(0)) { + case (byte) 0x89: // PING + updateSocketTS(sChannel); + wsPing(sChannel); + break; + case (byte) 0x8A: // PONG + updateSocketTS(sChannel); + break; + case (byte) 0x81: // TEXT FRAME + updateSocketTS(sChannel); + wsTextFrame(sChannel, buf); + break; + case (byte) 'G': // HTTP + updateSocketTS(sChannel); + wsHandshake(sChannel, buf); + break; + case (byte) 0x88: // CONNECTION CLOSE + throw new IOException(sChannel.socket().getRemoteSocketAddress().toString() + " CONNECTION CLOSE"); + } + } else if (readbytes < 0) { + throw new IOException(sChannel.socket().getRemoteSocketAddress().toString() + " END OF STREAM"); } - } else { - throw new IOException(sChannel.socket().getRemoteSocketAddress().toString()+ " NO DATA"); + } catch (IOException e) { + System.err.println("WSData: " + e); + sChannel.socket().close(); + sChannel.close(); + selKey.cancel(); } - } catch (IOException e) { - System.err.println("WSData: " + e); - sChannel.socket().close(); - sChannel.close(); - selKey.cancel(); } } } @@ -64,15 +100,15 @@ public class WSData implements Runnable { System.err.println("WSData: " + e); } } - + public void wsHandshake(SocketChannel sock, ByteBuffer buf) throws Exception { String hOrigin = null; String hHost = null; String hLocation = null; - String hSecWebSocketKey1 = null; - String hSecWebSocketKey2 = null; - String hCookie = null; - + String hSecWebSocketKey = null; + String hSecWebSocketVersion = null; + String hXRealIP = null; + buf.rewind(); CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf); String headers[] = charbuf.toString().split("\r\n"); @@ -85,149 +121,156 @@ public class WSData implements Runnable { hOrigin = h[1]; } else if (h[0].equals("Host:")) { hHost = h[1]; - } else if (h[0].equals("Sec-WebSocket-Key1:")) { - hSecWebSocketKey1 = h[1]; - } else if (h[0].equals("Sec-WebSocket-Key2:")) { - hSecWebSocketKey2 = h[1]; - } else if (h[0].equals("Cookie:")) { - hCookie = h[1]; + } else if (h[0].equals("Sec-WebSocket-Key:")) { + hSecWebSocketKey = h[1]; + } else if (h[0].equals("Sec-WebSocket-Version:")) { + hSecWebSocketVersion = h[1]; + } else if (h[0].equals("X-Real-IP:")) { + hXRealIP = h[1]; } } } - - if (hOrigin == null || hHost == null || hLocation == null || hSecWebSocketKey1 == null || hSecWebSocketKey2 == null) { + + if (hOrigin == null || hHost == null || hLocation == null || hSecWebSocketKey == null || hSecWebSocketVersion == null || !hSecWebSocketVersion.equals("13")) { throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " Invalid headers"); } - // Cookies - int UID = 0; - String hash = null; - if (hCookie != null) { - String cookies[] = hCookie.split("; "); - for (int i = 0; i < cookies.length; i++) { - String cookie[] = cookies[i].split("=", 2); - if (cookie[0].equals("hash")) { - hash = cookie[1]; - break; - } + // Auth + int VUID = 0; + int hashloc = hLocation.indexOf("hash="); + if (hashloc > 0) { + String hash = hLocation.substring(hashloc + 5); + if (hash.indexOf('&') > 0) { + hash = hash.substring(0, hash.indexOf('&')); } - if (hash != null) { - UID = com.juick.server.UserQueries.getUIDbyHash(sql, hash); + if (hash.length() == 16) { + VUID = com.juick.server.UserQueries.getUIDbyHash(sql, hash); } } // URL - String loc[] = hLocation.split("/"); + int hLocationQM = hLocation.indexOf('?'); + if (hLocationQM > 0) { + hLocation = hLocation.substring(0, hLocationQM); + } + int MID = 0; - if (hLocation.equals("/my") && UID > 0) { - Main.sockMessages.add(new SocketSubscribed(sock, UID, 0)); - } else if (hLocation.equals("/all")) { - Main.sockAll.add(new SocketSubscribed(sock, UID, 0)); - } else if ((loc.length == 2 || loc.length == 3) && loc[1].equals("replies")) { - if (loc.length == 2) { - Main.sockReplies.add(new SocketSubscribed(sock, UID, 0)); - } else { - try { - MID = Integer.parseInt(loc[2]); - } catch (Exception e) { - } - if (MID > 0) { - Main.sockReplies.add(new SocketSubscribed(sock, UID, MID)); + int responseCode = 404; + SocketSubscribed sockSubscr = null; + if (hLocation.equals("/") && VUID > 0) { + sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID); + responseCode = 101; + } else if (hLocation.equals("/_all")) { + sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID); + sockSubscr.allMessages = true; + responseCode = 101; + } else if (hLocation.equals("/_replies")) { + sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID); + sockSubscr.allReplies = true; + responseCode = 101; + } else if (hLocation.matches("^/\\d+$")) { + try { + MID = Integer.parseInt(hLocation.substring(1)); + } catch (Exception e) { + } + if (MID > 0) { + if (MessagesQueries.canViewThread(sql, MID, VUID)) { + sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID); + sockSubscr.MID = MID; + responseCode = 101; } else { - throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " Invalid MID"); + responseCode = 403; } } - } else { - throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " Invalid location"); - } - - System.out.println(sock.socket().getRemoteSocketAddress().toString() + " HANDSHAKE (Hash=" + hash + "; UID = " + UID + "; MID = " + MID + ")"); - - Long lSecNum1 = calcSecKeyNum(hSecWebSocketKey1); - Long lSecNum2 = calcSecKeyNum(hSecWebSocketKey2); - - BigInteger sec1 = new BigInteger(lSecNum1.toString()); - BigInteger sec2 = new BigInteger(lSecNum2.toString()); - - // concatenate 3 parts secNum1 + secNum2 + secKey (16 Bytes) - byte[] l128Bit = new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; - byte[] lTmp; - - lTmp = sec1.toByteArray(); - int lIdx = lTmp.length; - int lCnt = 0; - while (lIdx > 0 && lCnt < 4) { - lIdx--; - lCnt++; - l128Bit[4 - lCnt] = lTmp[lIdx]; + } else if (hLocation.matches("^/[a-zA-Z0-9\\-]{2,16}/?$")) { + String uname; + if (hLocation.endsWith("/")) { + uname = hLocation.substring(1, hLocation.length() - 2); + } else { + uname = hLocation.substring(1); + } + + int UID = UserQueries.getUIDbyName(sql, uname); + if (UID > 0) { + // check access + sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID); + sockSubscr.UID = UID; + responseCode = 101; + } } - - lTmp = sec2.toByteArray(); - lIdx = lTmp.length; - lCnt = 0; - while (lIdx > 0 && lCnt < 4) { - lIdx--; - lCnt++; - l128Bit[8 - lCnt] = lTmp[lIdx]; + if (sockSubscr != null) { + synchronized (Main.clients) { + Main.clients.add(sockSubscr); + } } - buf.rewind(); - for (int i = 0; i < 8; i++) { - l128Bit[8 + i] = buf.get(buf.limit() - 8 + i); + // Response + String outstr; + if (responseCode == 101) { + outstr = "HTTP/1.1 101 Switching Protocols\r\n" + + "Upgrade: websocket\r\n" + + "Connection: Upgrade\r\n" + + "Sec-WebSocket-Accept: " + calcHeaderAccept(hSecWebSocketKey) + "\r\n" + + "\r\n"; + } else if (responseCode == 403) { + outstr = "HTTP/1.1 403 Forbidden\r\n\r\n"; + } else { + outstr = "HTTP/1.1 404 Not Found\r\n\r\n"; } - - String outstr = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" - + "Upgrade: WebSocket\r\n" - + "Connection: Upgrade\r\n" - + "Sec-WebSocket-Origin: " + hOrigin + "\r\n" - + "Sec-WebSocket-Location: ws://" + hHost + hLocation + "\r\n" - + "Sec-WebSocket-Protocol: sample\r\n" - + "\r\n"; - ByteBuffer out = ByteBuffer.allocate(4096); + ByteBuffer out = ByteBuffer.allocate(1024); out.put(Charset.forName("ISO-8859-1").encode(outstr)); - out.put(MessageDigest.getInstance("MD5").digest(l128Bit)); out.flip(); - sock.write(out); - } - - private static long calcSecKeyNum(String aKey) { - StringBuilder lSB = new StringBuilder(); - int lSpaces = 0; - for (int i = 0; i < aKey.length(); i++) { - char lC = aKey.charAt(i); - if (lC == ' ') { - lSpaces++; - } else if (lC >= '0' && lC <= '9') { - lSB.append(lC); - } + + if (responseCode == 101) { + System.out.println(sock.socket().getRemoteSocketAddress().toString() + " HANDSHAKE (VUID = " + VUID + "; MID = " + MID + ")"); + } else { + throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " " + responseCode); } - long lRes = -1; - if (lSpaces > 0) { - try { - lRes = Long.parseLong(lSB.toString()) / lSpaces; - } catch (NumberFormatException ex) { - // use default result - } + } + + private String calcHeaderAccept(String key) { + String base = key + WEBSOCKET_GUID; + try { + MessageDigest md = MessageDigest.getInstance("SHA-1"); + return Base64.encode(md.digest(base.getBytes())); + } catch (NoSuchAlgorithmException e) { + System.err.println("calcHeaderAccept: " + e); } - return lRes; + return ""; } - - public void wsTextFrame(SocketChannel sock, CharSequence csbuf) { - String buf = csbuf.toString(); - if (buf.equals(" ")) { - ByteBuffer out = ByteBuffer.allocate(4); - out.put((byte) 0x00); - out.put((byte) 0x20); - out.put((byte) 0xFF); - out.flip(); - out.rewind(); - try { - sock.write(out); - } catch (IOException e) { + + public void wsPing(SocketChannel sock) throws Exception { + ByteBuffer out = ByteBuffer.allocate(2); + out.put((byte) 0x8A); // PONG FRAME + out.put((byte) 0x00); // 1 byte long + out.flip(); + out.rewind(); + sock.write(out); + } + + public void wsTextFrame(SocketChannel sock, ByteBuffer buf) throws Exception { + /* + ByteBuffer out = ByteBuffer.allocate(3); + out.put((byte) 0x81); // TEXT FRAME + out.put((byte) 0x01); // 1 byte long + out.put((byte) 0x20); // ' ' + out.flip(); + out.rewind(); + sock.write(out); + */ + } + + public void updateSocketTS(SocketChannel sock) { + synchronized (Main.clients) { + Iterator<SocketSubscribed> i = Main.clients.iterator(); + while (i.hasNext()) { + SocketSubscribed s = i.next(); + if (s.sock == sock) { + s.tsLastData = System.currentTimeMillis(); + break; + } } - } else { - System.out.println(sock.socket().getRemoteSocketAddress().toString() + " DATA '" + buf + "'"); } } } diff --git a/src/com/juick/jabber/ws/WSKeepAlive.java b/src/com/juick/jabber/ws/WSKeepAlive.java index ba6376a4..2e36b100 100644 --- a/src/com/juick/jabber/ws/WSKeepAlive.java +++ b/src/com/juick/jabber/ws/WSKeepAlive.java @@ -1,6 +1,9 @@ package com.juick.jabber.ws; +import java.io.PrintWriter; +import java.nio.ByteBuffer; import java.sql.Connection; +import java.util.Iterator; /** * @@ -9,12 +12,115 @@ import java.sql.Connection; public class WSKeepAlive implements Runnable { Connection sql; + ByteBuffer pingBytes; + ByteBuffer closeBytes; public WSKeepAlive(Connection sql) { this.sql = sql; + + //pingBytes = ByteBuffer.allocate(2); + //pingBytes.put((byte) 0x8A); // PONG FRAME + //pingBytes.put((byte) 0x00); // 0 byte long + pingBytes = ByteBuffer.allocate(3); + pingBytes.put((byte) 0x81); // TEXT FRAME + pingBytes.put((byte) 0x01); // 1 byte long + pingBytes.put((byte) 0x20); // ' ' + pingBytes.flip(); + + closeBytes = ByteBuffer.allocate(2); + closeBytes.put((byte) 0x88); // CLOSE FRAME + closeBytes.put((byte) 0x00); // 0 byte long + closeBytes.flip(); } @Override public void run() { + while (true) { + PrintWriter statsFile = null; + + if (Main.STATSFILE != null) { + try { + statsFile = new PrintWriter(Main.STATSFILE, "UTF-8"); + } catch (Exception e) { + statsFile = null; + System.err.println("WSKeepAlive statsFile: " + e); + } + } + + long now = System.currentTimeMillis(); + + synchronized (Main.clients) { + if (statsFile != null) { + statsFile.write("<html><body><h1>Connections (" + Main.clients.size() + ")</h2><table border=1><tr><th>IP</th><th>inactive</th><th>VUID</th><th>UID</th><th>MID</th><th>allM</th><th>allR</th></tr>"); + } + + for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) { + SocketSubscribed s = i.next(); + int inactive = (int) ((double) (now - s.tsLastData) / 1000.0); + + if (statsFile != null) { + try { + statsFile.print("<tr><td>" + (s.clientName != null ? s.clientName : "?") + "</td>"); + statsFile.print("<td>" + inactive + "</td>"); + statsFile.print("<td>" + (s.VUID > 0 ? s.VUID : "") + "</td>"); + statsFile.print("<td>" + (s.UID > 0 ? s.UID : "") + "</td>"); + statsFile.print("<td>" + (s.MID > 0 ? s.MID : "") + "</td>"); + statsFile.print("<td>" + (s.allMessages ? "+" : "") + "</td>"); + statsFile.print("<td>" + (s.allReplies ? "+" : "") + "</td></tr>"); + } catch (Exception e) { + System.err.println("WSKeepAlive statsFile print: " + e); + } + } + + if (inactive > 180) { + closeBytes.rewind(); + try { + s.sock.write(closeBytes); + } catch (Exception e) { + } finally { + try { + s.sock.socket().close(); + } catch (Exception ex) { + } + try { + s.sock.close(); + } catch (Exception ex) { + } + i.remove(); + } + } else if (inactive > 60) { + pingBytes.rewind(); + try { + s.sock.write(pingBytes); + } catch (Exception e) { + System.err.println("WSKeepAlive ping: " + e); + try { + s.sock.socket().close(); + } catch (Exception ex) { + } + try { + s.sock.close(); + } catch (Exception ex) { + } + i.remove(); + } + } + } + } + + if (Main.STATSFILE != null) { + try { + statsFile.write("</table></body></html>"); + statsFile.close(); + } catch (Exception e) { + System.err.println("WSKeepAlive statsFile close: " + e); + } + } + + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + } + } } } diff --git a/src/com/juick/jabber/ws/XMPPConnection.java b/src/com/juick/jabber/ws/XMPPConnection.java index 91a8387b..24329992 100644 --- a/src/com/juick/jabber/ws/XMPPConnection.java +++ b/src/com/juick/jabber/ws/XMPPConnection.java @@ -1,5 +1,7 @@ package com.juick.jabber.ws; +import com.juick.server.MessagesQueries; +import com.juick.server.Utils; import com.juick.xmpp.JID; import com.juick.xmpp.Message; import com.juick.xmpp.Stream; @@ -13,6 +15,8 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; +import java.util.Iterator; /** * @@ -33,7 +37,7 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. public void run() { try { Socket socket = new Socket("localhost", 5347); - xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword); + xmpp = new StreamComponent(new JID("", "ws2.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword); xmpp.addChildParser(new JuickMessage()); xmpp.addListener((Stream.StreamListener) this); xmpp.addListener((Message.MessageListener) this); @@ -67,35 +71,24 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. } private void onJuickMessagePost(com.juick.Message jmsg) { - String json = "{" - + "\"mid\":" + jmsg.MID + "," - + "\"user\":{" + "\"uid\":" + jmsg.User.UID + "," + "\"uname\":\"" + encloseJSON(jmsg.User.UName) + "\"}," - + "\"timestamp\":\"" + jmsg.TimestampString + "\"," - + "\"body\":\"" + encloseJSON(jmsg.Text) + "\""; - if (jmsg.Tags.size() > 0) { - json += ",\"tags\":["; - for (int i = 0; i < jmsg.Tags.size(); i++) { - if (i > 0) { - json += ","; - } - json += "\"" + encloseJSON((String) jmsg.Tags.get(i)) + "\""; - } - json += "]"; + String json = com.juick.json.Message.toJSON(jmsg).toString(); + ByteBuffer jsonbytes = Charset.forName("UTF-8").encode(json); + ByteBuffer bbMsg = ByteBuffer.allocate(10240); + bbMsg.put((byte) 0x81); + if (jsonbytes.limit() <= 125) { + bbMsg.put((byte) jsonbytes.limit()); + } else { + bbMsg.put((byte) 126); + bbMsg.putShort((short) jsonbytes.limit()); } - json += "}"; - - ByteBuffer out = ByteBuffer.allocate(10240); - out.put((byte) 0x00); - out.put(Charset.forName("UTF-8").encode(json)); - out.put((byte) 0xFF); - out.flip(); - + bbMsg.put(jsonbytes); + bbMsg.flip(); + ArrayList<Integer> uids = new ArrayList<Integer>(); String query = "SELECT suser_id FROM subscr_users WHERE user_id=" + jmsg.User.UID + " AND suser_id NOT IN (SELECT user_id FROM bl_tags INNER JOIN messages_tags USING(tag_id) WHERE message_id=" + jmsg.MID + ")"; if (jmsg.Privacy < 0) { query += " AND suser_id IN (SELECT wl_user_id FROM wl_users WHERE user_id=" + jmsg.User.UID + ")"; } - Statement stmt = null; ResultSet rs = null; try { @@ -103,115 +96,73 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. rs = stmt.executeQuery(query); rs.beforeFirst(); while (rs.next()) { - int UID = rs.getInt(1); - - for (int i = Main.sockMessages.size() - 1; i >= 0; i--) { - SocketSubscribed ss = Main.sockMessages.get(i); - if (ss.UID == UID) { - try { - out.rewind(); - ss.sock.write(out); - } catch (IOException e) { - Main.sockMessages.remove(i); - try { - ss.sock.close(); - } catch (IOException ex) { - } - } - } - } - - if (jmsg.Privacy <= 0) { - for (int i = Main.sockAll.size() - 1; i >= 0; i--) { - SocketSubscribed ss = Main.sockAll.get(i); - if (ss.UID == UID) { - try { - out.rewind(); - ss.sock.write(out); - } catch (IOException e) { - Main.sockAll.remove(i); - try { - ss.sock.close(); - } catch (IOException ex) { - } - } - } - } - } - + uids.add(rs.getInt(1)); } } catch (SQLException e) { - System.err.println(e); + System.err.println("onJuickMessagePost: " + e); } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - } - } - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - } - } + Utils.finishSQL(rs, stmt); } - // Send to all - if (jmsg.Privacy > 0) { - for (int i = Main.sockAll.size() - 1; i >= 0; i--) { - SocketSubscribed ss = Main.sockAll.get(i); - try { - out.rewind(); - ss.sock.write(out); - System.err.println(" --->" + ss.sock.socket().getRemoteSocketAddress()); - } catch (IOException e) { - Main.sockAll.remove(i); + synchronized (Main.clients) { + for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) { + SocketSubscribed s = i.next(); + if ((jmsg.Privacy >= 0 && (s.allMessages || s.UID == jmsg.User.UID)) || uids.contains(s.VUID)) { + bbMsg.rewind(); try { - ss.sock.close(); - } catch (IOException ex) { + s.sock.write(bbMsg); + } catch (Exception e) { + try { + s.sock.socket().close(); + } catch (Exception ex) { + } + try { + s.sock.close(); + } catch (Exception ex) { + } + i.remove(); } } } } - } private void onJuickMessageReply(com.juick.Message jmsg) { - String json = "{" - + "\"mid\":" + jmsg.MID + "," - + "\"rid\":" + jmsg.RID + "," - + "\"replyto\":" + jmsg.ReplyTo + "," - + "\"user\":{" + "\"uid\":" + jmsg.User.UID + "," + "\"uname\":\"" + encloseJSON(jmsg.User.UName) + "\"}," - + "\"timestamp\":\"" + jmsg.TimestampString + "\"," - + "\"body\":\"" + encloseJSON(jmsg.Text) + "\"" - + "}"; + String json = com.juick.json.Message.toJSON(jmsg).toString(); + ByteBuffer jsonbytes = Charset.forName("UTF-8").encode(json); + ByteBuffer bbMsg = ByteBuffer.allocate(10240); + bbMsg.put((byte) 0x81); + if (jsonbytes.limit() <= 125) { + bbMsg.put((byte) jsonbytes.limit()); + } else { + bbMsg.put((byte) 126); + bbMsg.putShort((short) jsonbytes.limit()); + } + bbMsg.put(jsonbytes); + bbMsg.flip(); - ByteBuffer out = ByteBuffer.allocate(10240); - out.put((byte) 0x00); - out.put(Charset.forName("UTF-8").encode(json)); - out.put((byte) 0xFF); - out.flip(); + int privacy = MessagesQueries.getMessagePrivacy(sql, jmsg.MID); - for (int i = Main.sockReplies.size() - 1; i >= 0; i--) { - SocketSubscribed ss = Main.sockReplies.get(i); - if (ss.MID == 0 || ss.MID == jmsg.MID) { - try { - out.rewind(); - ss.sock.write(out); - System.err.println(" --->" + ss.sock.socket().getRemoteSocketAddress()); - } catch (IOException e) { - Main.sockReplies.remove(i); + synchronized (Main.clients) { + for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) { + SocketSubscribed s = i.next(); + if ((privacy >= 0 && s.allReplies) || s.MID == jmsg.MID) { + bbMsg.rewind(); try { - ss.sock.close(); - } catch (IOException ex) { + s.sock.write(bbMsg); + } catch (Exception e) { + try { + s.sock.socket().close(); + } catch (Exception ex) { + } + try { + s.sock.close(); + } catch (Exception ex) { + } + i.remove(); } } } } } - - public static String encloseJSON(String str) { - return str.replace("\"", """).replace("\\", "\\\\").replace("\n", "\\n"); - } } |