aboutsummaryrefslogtreecommitdiff
path: root/src/com/juick/jabber/ws
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/juick/jabber/ws')
-rw-r--r--src/com/juick/jabber/ws/Main.java10
-rw-r--r--src/com/juick/jabber/ws/SocketSubscribed.java15
-rw-r--r--src/com/juick/jabber/ws/WSConnections.java4
-rw-r--r--src/com/juick/jabber/ws/WSData.java337
-rw-r--r--src/com/juick/jabber/ws/WSKeepAlive.java106
-rw-r--r--src/com/juick/jabber/ws/XMPPConnection.java179
6 files changed, 379 insertions, 272 deletions
diff --git a/src/com/juick/jabber/ws/Main.java b/src/com/juick/jabber/ws/Main.java
index 7036b0d4..a7aea543 100644
--- a/src/com/juick/jabber/ws/Main.java
+++ b/src/com/juick/jabber/ws/Main.java
@@ -32,9 +32,8 @@ public class Main {
Connection sql;
XMPPConnection xmpp;
- public final static ArrayList<SocketSubscribed> sockReplies = new ArrayList<SocketSubscribed>();
- public final static ArrayList<SocketSubscribed> sockMessages = new ArrayList<SocketSubscribed>();
- public final static ArrayList<SocketSubscribed> sockAll = new ArrayList<SocketSubscribed>();
+ public static String STATSFILE = null;
+ public final static ArrayList<SocketSubscribed> clients = new ArrayList<SocketSubscribed>();
public static void main(String[] args) {
new Main().start();
@@ -45,11 +44,14 @@ public class Main {
Properties conf = new Properties();
conf.load(new FileInputStream("/etc/juick/ws.conf"));
+ STATSFILE = conf.getProperty("statsfile");
+
setupSql(conf.getProperty("mysql_username", ""), conf.getProperty("mysql_password", ""));
xmpp = new XMPPConnection(sql, conf.getProperty("xmpp_password", ""));
new Thread(xmpp).start();
+
+ //new Thread(new WSConnections()).start();
new Thread(new WSData(sql)).start();
- new Thread(new WSConnections()).start();
new Thread(new WSKeepAlive(sql)).start();
} catch (Exception e) {
System.err.println(e);
diff --git a/src/com/juick/jabber/ws/SocketSubscribed.java b/src/com/juick/jabber/ws/SocketSubscribed.java
index 29baf0b8..f865ce6a 100644
--- a/src/com/juick/jabber/ws/SocketSubscribed.java
+++ b/src/com/juick/jabber/ws/SocketSubscribed.java
@@ -9,14 +9,19 @@ import java.nio.channels.SocketChannel;
public class SocketSubscribed {
public SocketChannel sock = null;
+ public String clientName = null;
+ public int VUID = 0;
public int UID = 0;
public int MID = 0;
- public long tsConnected = 0;
- public long tsLastData = 0;
+ public boolean allMessages = false;
+ public boolean allReplies = false;
+ public long tsConnected;
+ public long tsLastData;
- public SocketSubscribed(SocketChannel sock, int UID, int MID) {
+ public SocketSubscribed(SocketChannel sock, String clientName, int VUID) {
this.sock = sock;
- this.UID = UID;
- this.MID = MID;
+ this.clientName = clientName;
+ this.VUID = VUID;
+ tsConnected = tsLastData = System.currentTimeMillis();
}
}
diff --git a/src/com/juick/jabber/ws/WSConnections.java b/src/com/juick/jabber/ws/WSConnections.java
index 020442f7..15fbe4e8 100644
--- a/src/com/juick/jabber/ws/WSConnections.java
+++ b/src/com/juick/jabber/ws/WSConnections.java
@@ -32,9 +32,9 @@ public class WSConnections implements Runnable {
it.remove();
ServerSocketChannel ssChannel = (ServerSocketChannel) selKey.channel();
SocketChannel sChannel = ssChannel.accept();
- System.out.println(sChannel.socket().getRemoteSocketAddress().toString() + " ACCEPTED");
sChannel.configureBlocking(false);
- sChannel.register(WSData.sel, SelectionKey.OP_READ);
+ sChannel.register(sel, SelectionKey.OP_READ);
+ System.out.println(sChannel.socket().getRemoteSocketAddress().toString() + " ACCEPTED");
}
}
} catch (Exception e) {
diff --git a/src/com/juick/jabber/ws/WSData.java b/src/com/juick/jabber/ws/WSData.java
index 97193510..d77257e6 100644
--- a/src/com/juick/jabber/ws/WSData.java
+++ b/src/com/juick/jabber/ws/WSData.java
@@ -1,14 +1,19 @@
package com.juick.jabber.ws;
+import com.juick.server.MessagesQueries;
+import com.juick.server.UserQueries;
+import com.juick.xmpp.utils.Base64;
import java.io.IOException;
-import java.math.BigInteger;
+import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.sql.Connection;
import java.util.Iterator;
@@ -17,46 +22,77 @@ import java.util.Iterator;
* @author ugnich
*/
public class WSData implements Runnable {
-
+
+ static final String WEBSOCKET_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
Connection sql;
- public static Selector sel;
-
+ public Selector sel;
+
public WSData(Connection sql) {
this.sql = sql;
}
-
+
@Override
public void run() {
try {
sel = Selector.open();
+ ServerSocketChannel listensock = ServerSocketChannel.open();
+ listensock.configureBlocking(false);
+ listensock.socket().bind(new InetSocketAddress(8081));
+ listensock.register(sel, SelectionKey.OP_ACCEPT);
+
while (true) {
sel.select();
Iterator<SelectionKey> it = sel.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey selKey = it.next();
it.remove();
-
- SocketChannel sChannel = (SocketChannel) selKey.channel();
- ByteBuffer buf = ByteBuffer.allocate(10240);
- try {
- if (sChannel.read(buf) > 0) {
- buf.flip();
- CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf);
- if (charbuf.charAt(0) == 0 && charbuf.charAt(charbuf.length() - 1) == 0xFF) {
- wsTextFrame(sChannel, charbuf.subSequence(1, charbuf.length() - 2));
- } else if (charbuf.charAt(0) == 'G' && charbuf.charAt(1) == 'E' && charbuf.charAt(2) == 'T' && charbuf.charAt(3) == ' ') {
- wsHandshake(sChannel, buf);
- } else {
- throw new IOException(sChannel.socket().getRemoteSocketAddress().toString() + " INVALID FRAME");
+
+ if (selKey.isAcceptable()) {
+ ServerSocketChannel ssChannel = (ServerSocketChannel) selKey.channel();
+ SocketChannel sChannel = ssChannel.accept();
+ sChannel.configureBlocking(false);
+ sChannel.register(sel, SelectionKey.OP_READ);
+ System.out.println(sChannel.socket().getRemoteSocketAddress().toString() + " ACCEPTED");
+ } else if (selKey.isReadable()) {
+ SocketChannel sChannel = (SocketChannel) selKey.channel();
+ ByteBuffer buf = ByteBuffer.allocate(10240);
+ try {
+ int readbytes = sChannel.read(buf);
+ if (readbytes > 0) {
+ buf.flip();
+
+ CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf);
+ System.out.println("DATA: " + charbuf.toString());
+ buf.rewind();
+
+ switch (buf.get(0)) {
+ case (byte) 0x89: // PING
+ updateSocketTS(sChannel);
+ wsPing(sChannel);
+ break;
+ case (byte) 0x8A: // PONG
+ updateSocketTS(sChannel);
+ break;
+ case (byte) 0x81: // TEXT FRAME
+ updateSocketTS(sChannel);
+ wsTextFrame(sChannel, buf);
+ break;
+ case (byte) 'G': // HTTP
+ updateSocketTS(sChannel);
+ wsHandshake(sChannel, buf);
+ break;
+ case (byte) 0x88: // CONNECTION CLOSE
+ throw new IOException(sChannel.socket().getRemoteSocketAddress().toString() + " CONNECTION CLOSE");
+ }
+ } else if (readbytes < 0) {
+ throw new IOException(sChannel.socket().getRemoteSocketAddress().toString() + " END OF STREAM");
}
- } else {
- throw new IOException(sChannel.socket().getRemoteSocketAddress().toString()+ " NO DATA");
+ } catch (IOException e) {
+ System.err.println("WSData: " + e);
+ sChannel.socket().close();
+ sChannel.close();
+ selKey.cancel();
}
- } catch (IOException e) {
- System.err.println("WSData: " + e);
- sChannel.socket().close();
- sChannel.close();
- selKey.cancel();
}
}
}
@@ -64,15 +100,15 @@ public class WSData implements Runnable {
System.err.println("WSData: " + e);
}
}
-
+
public void wsHandshake(SocketChannel sock, ByteBuffer buf) throws Exception {
String hOrigin = null;
String hHost = null;
String hLocation = null;
- String hSecWebSocketKey1 = null;
- String hSecWebSocketKey2 = null;
- String hCookie = null;
-
+ String hSecWebSocketKey = null;
+ String hSecWebSocketVersion = null;
+ String hXRealIP = null;
+
buf.rewind();
CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf);
String headers[] = charbuf.toString().split("\r\n");
@@ -85,149 +121,156 @@ public class WSData implements Runnable {
hOrigin = h[1];
} else if (h[0].equals("Host:")) {
hHost = h[1];
- } else if (h[0].equals("Sec-WebSocket-Key1:")) {
- hSecWebSocketKey1 = h[1];
- } else if (h[0].equals("Sec-WebSocket-Key2:")) {
- hSecWebSocketKey2 = h[1];
- } else if (h[0].equals("Cookie:")) {
- hCookie = h[1];
+ } else if (h[0].equals("Sec-WebSocket-Key:")) {
+ hSecWebSocketKey = h[1];
+ } else if (h[0].equals("Sec-WebSocket-Version:")) {
+ hSecWebSocketVersion = h[1];
+ } else if (h[0].equals("X-Real-IP:")) {
+ hXRealIP = h[1];
}
}
}
-
- if (hOrigin == null || hHost == null || hLocation == null || hSecWebSocketKey1 == null || hSecWebSocketKey2 == null) {
+
+ if (hOrigin == null || hHost == null || hLocation == null || hSecWebSocketKey == null || hSecWebSocketVersion == null || !hSecWebSocketVersion.equals("13")) {
throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " Invalid headers");
}
- // Cookies
- int UID = 0;
- String hash = null;
- if (hCookie != null) {
- String cookies[] = hCookie.split("; ");
- for (int i = 0; i < cookies.length; i++) {
- String cookie[] = cookies[i].split("=", 2);
- if (cookie[0].equals("hash")) {
- hash = cookie[1];
- break;
- }
+ // Auth
+ int VUID = 0;
+ int hashloc = hLocation.indexOf("hash=");
+ if (hashloc > 0) {
+ String hash = hLocation.substring(hashloc + 5);
+ if (hash.indexOf('&') > 0) {
+ hash = hash.substring(0, hash.indexOf('&'));
}
- if (hash != null) {
- UID = com.juick.server.UserQueries.getUIDbyHash(sql, hash);
+ if (hash.length() == 16) {
+ VUID = com.juick.server.UserQueries.getUIDbyHash(sql, hash);
}
}
// URL
- String loc[] = hLocation.split("/");
+ int hLocationQM = hLocation.indexOf('?');
+ if (hLocationQM > 0) {
+ hLocation = hLocation.substring(0, hLocationQM);
+ }
+
int MID = 0;
- if (hLocation.equals("/my") && UID > 0) {
- Main.sockMessages.add(new SocketSubscribed(sock, UID, 0));
- } else if (hLocation.equals("/all")) {
- Main.sockAll.add(new SocketSubscribed(sock, UID, 0));
- } else if ((loc.length == 2 || loc.length == 3) && loc[1].equals("replies")) {
- if (loc.length == 2) {
- Main.sockReplies.add(new SocketSubscribed(sock, UID, 0));
- } else {
- try {
- MID = Integer.parseInt(loc[2]);
- } catch (Exception e) {
- }
- if (MID > 0) {
- Main.sockReplies.add(new SocketSubscribed(sock, UID, MID));
+ int responseCode = 404;
+ SocketSubscribed sockSubscr = null;
+ if (hLocation.equals("/") && VUID > 0) {
+ sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
+ responseCode = 101;
+ } else if (hLocation.equals("/_all")) {
+ sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
+ sockSubscr.allMessages = true;
+ responseCode = 101;
+ } else if (hLocation.equals("/_replies")) {
+ sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
+ sockSubscr.allReplies = true;
+ responseCode = 101;
+ } else if (hLocation.matches("^/\\d+$")) {
+ try {
+ MID = Integer.parseInt(hLocation.substring(1));
+ } catch (Exception e) {
+ }
+ if (MID > 0) {
+ if (MessagesQueries.canViewThread(sql, MID, VUID)) {
+ sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
+ sockSubscr.MID = MID;
+ responseCode = 101;
} else {
- throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " Invalid MID");
+ responseCode = 403;
}
}
- } else {
- throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " Invalid location");
- }
-
- System.out.println(sock.socket().getRemoteSocketAddress().toString() + " HANDSHAKE (Hash=" + hash + "; UID = " + UID + "; MID = " + MID + ")");
-
- Long lSecNum1 = calcSecKeyNum(hSecWebSocketKey1);
- Long lSecNum2 = calcSecKeyNum(hSecWebSocketKey2);
-
- BigInteger sec1 = new BigInteger(lSecNum1.toString());
- BigInteger sec2 = new BigInteger(lSecNum2.toString());
-
- // concatenate 3 parts secNum1 + secNum2 + secKey (16 Bytes)
- byte[] l128Bit = new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
- byte[] lTmp;
-
- lTmp = sec1.toByteArray();
- int lIdx = lTmp.length;
- int lCnt = 0;
- while (lIdx > 0 && lCnt < 4) {
- lIdx--;
- lCnt++;
- l128Bit[4 - lCnt] = lTmp[lIdx];
+ } else if (hLocation.matches("^/[a-zA-Z0-9\\-]{2,16}/?$")) {
+ String uname;
+ if (hLocation.endsWith("/")) {
+ uname = hLocation.substring(1, hLocation.length() - 2);
+ } else {
+ uname = hLocation.substring(1);
+ }
+
+ int UID = UserQueries.getUIDbyName(sql, uname);
+ if (UID > 0) {
+ // check access
+ sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
+ sockSubscr.UID = UID;
+ responseCode = 101;
+ }
}
-
- lTmp = sec2.toByteArray();
- lIdx = lTmp.length;
- lCnt = 0;
- while (lIdx > 0 && lCnt < 4) {
- lIdx--;
- lCnt++;
- l128Bit[8 - lCnt] = lTmp[lIdx];
+ if (sockSubscr != null) {
+ synchronized (Main.clients) {
+ Main.clients.add(sockSubscr);
+ }
}
- buf.rewind();
- for (int i = 0; i < 8; i++) {
- l128Bit[8 + i] = buf.get(buf.limit() - 8 + i);
+ // Response
+ String outstr;
+ if (responseCode == 101) {
+ outstr = "HTTP/1.1 101 Switching Protocols\r\n"
+ + "Upgrade: websocket\r\n"
+ + "Connection: Upgrade\r\n"
+ + "Sec-WebSocket-Accept: " + calcHeaderAccept(hSecWebSocketKey) + "\r\n"
+ + "\r\n";
+ } else if (responseCode == 403) {
+ outstr = "HTTP/1.1 403 Forbidden\r\n\r\n";
+ } else {
+ outstr = "HTTP/1.1 404 Not Found\r\n\r\n";
}
-
- String outstr = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n"
- + "Upgrade: WebSocket\r\n"
- + "Connection: Upgrade\r\n"
- + "Sec-WebSocket-Origin: " + hOrigin + "\r\n"
- + "Sec-WebSocket-Location: ws://" + hHost + hLocation + "\r\n"
- + "Sec-WebSocket-Protocol: sample\r\n"
- + "\r\n";
- ByteBuffer out = ByteBuffer.allocate(4096);
+ ByteBuffer out = ByteBuffer.allocate(1024);
out.put(Charset.forName("ISO-8859-1").encode(outstr));
- out.put(MessageDigest.getInstance("MD5").digest(l128Bit));
out.flip();
-
sock.write(out);
- }
-
- private static long calcSecKeyNum(String aKey) {
- StringBuilder lSB = new StringBuilder();
- int lSpaces = 0;
- for (int i = 0; i < aKey.length(); i++) {
- char lC = aKey.charAt(i);
- if (lC == ' ') {
- lSpaces++;
- } else if (lC >= '0' && lC <= '9') {
- lSB.append(lC);
- }
+
+ if (responseCode == 101) {
+ System.out.println(sock.socket().getRemoteSocketAddress().toString() + " HANDSHAKE (VUID = " + VUID + "; MID = " + MID + ")");
+ } else {
+ throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " " + responseCode);
}
- long lRes = -1;
- if (lSpaces > 0) {
- try {
- lRes = Long.parseLong(lSB.toString()) / lSpaces;
- } catch (NumberFormatException ex) {
- // use default result
- }
+ }
+
+ private String calcHeaderAccept(String key) {
+ String base = key + WEBSOCKET_GUID;
+ try {
+ MessageDigest md = MessageDigest.getInstance("SHA-1");
+ return Base64.encode(md.digest(base.getBytes()));
+ } catch (NoSuchAlgorithmException e) {
+ System.err.println("calcHeaderAccept: " + e);
}
- return lRes;
+ return "";
}
-
- public void wsTextFrame(SocketChannel sock, CharSequence csbuf) {
- String buf = csbuf.toString();
- if (buf.equals(" ")) {
- ByteBuffer out = ByteBuffer.allocate(4);
- out.put((byte) 0x00);
- out.put((byte) 0x20);
- out.put((byte) 0xFF);
- out.flip();
- out.rewind();
- try {
- sock.write(out);
- } catch (IOException e) {
+
+ public void wsPing(SocketChannel sock) throws Exception {
+ ByteBuffer out = ByteBuffer.allocate(2);
+ out.put((byte) 0x8A); // PONG FRAME
+ out.put((byte) 0x00); // 1 byte long
+ out.flip();
+ out.rewind();
+ sock.write(out);
+ }
+
+ public void wsTextFrame(SocketChannel sock, ByteBuffer buf) throws Exception {
+ /*
+ ByteBuffer out = ByteBuffer.allocate(3);
+ out.put((byte) 0x81); // TEXT FRAME
+ out.put((byte) 0x01); // 1 byte long
+ out.put((byte) 0x20); // ' '
+ out.flip();
+ out.rewind();
+ sock.write(out);
+ */
+ }
+
+ public void updateSocketTS(SocketChannel sock) {
+ synchronized (Main.clients) {
+ Iterator<SocketSubscribed> i = Main.clients.iterator();
+ while (i.hasNext()) {
+ SocketSubscribed s = i.next();
+ if (s.sock == sock) {
+ s.tsLastData = System.currentTimeMillis();
+ break;
+ }
}
- } else {
- System.out.println(sock.socket().getRemoteSocketAddress().toString() + " DATA '" + buf + "'");
}
}
}
diff --git a/src/com/juick/jabber/ws/WSKeepAlive.java b/src/com/juick/jabber/ws/WSKeepAlive.java
index ba6376a4..2e36b100 100644
--- a/src/com/juick/jabber/ws/WSKeepAlive.java
+++ b/src/com/juick/jabber/ws/WSKeepAlive.java
@@ -1,6 +1,9 @@
package com.juick.jabber.ws;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
import java.sql.Connection;
+import java.util.Iterator;
/**
*
@@ -9,12 +12,115 @@ import java.sql.Connection;
public class WSKeepAlive implements Runnable {
Connection sql;
+ ByteBuffer pingBytes;
+ ByteBuffer closeBytes;
public WSKeepAlive(Connection sql) {
this.sql = sql;
+
+ //pingBytes = ByteBuffer.allocate(2);
+ //pingBytes.put((byte) 0x8A); // PONG FRAME
+ //pingBytes.put((byte) 0x00); // 0 byte long
+ pingBytes = ByteBuffer.allocate(3);
+ pingBytes.put((byte) 0x81); // TEXT FRAME
+ pingBytes.put((byte) 0x01); // 1 byte long
+ pingBytes.put((byte) 0x20); // ' '
+ pingBytes.flip();
+
+ closeBytes = ByteBuffer.allocate(2);
+ closeBytes.put((byte) 0x88); // CLOSE FRAME
+ closeBytes.put((byte) 0x00); // 0 byte long
+ closeBytes.flip();
}
@Override
public void run() {
+ while (true) {
+ PrintWriter statsFile = null;
+
+ if (Main.STATSFILE != null) {
+ try {
+ statsFile = new PrintWriter(Main.STATSFILE, "UTF-8");
+ } catch (Exception e) {
+ statsFile = null;
+ System.err.println("WSKeepAlive statsFile: " + e);
+ }
+ }
+
+ long now = System.currentTimeMillis();
+
+ synchronized (Main.clients) {
+ if (statsFile != null) {
+ statsFile.write("<html><body><h1>Connections (" + Main.clients.size() + ")</h2><table border=1><tr><th>IP</th><th>inactive</th><th>VUID</th><th>UID</th><th>MID</th><th>allM</th><th>allR</th></tr>");
+ }
+
+ for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) {
+ SocketSubscribed s = i.next();
+ int inactive = (int) ((double) (now - s.tsLastData) / 1000.0);
+
+ if (statsFile != null) {
+ try {
+ statsFile.print("<tr><td>" + (s.clientName != null ? s.clientName : "?") + "</td>");
+ statsFile.print("<td>" + inactive + "</td>");
+ statsFile.print("<td>" + (s.VUID > 0 ? s.VUID : "") + "</td>");
+ statsFile.print("<td>" + (s.UID > 0 ? s.UID : "") + "</td>");
+ statsFile.print("<td>" + (s.MID > 0 ? s.MID : "") + "</td>");
+ statsFile.print("<td>" + (s.allMessages ? "+" : "") + "</td>");
+ statsFile.print("<td>" + (s.allReplies ? "+" : "") + "</td></tr>");
+ } catch (Exception e) {
+ System.err.println("WSKeepAlive statsFile print: " + e);
+ }
+ }
+
+ if (inactive > 180) {
+ closeBytes.rewind();
+ try {
+ s.sock.write(closeBytes);
+ } catch (Exception e) {
+ } finally {
+ try {
+ s.sock.socket().close();
+ } catch (Exception ex) {
+ }
+ try {
+ s.sock.close();
+ } catch (Exception ex) {
+ }
+ i.remove();
+ }
+ } else if (inactive > 60) {
+ pingBytes.rewind();
+ try {
+ s.sock.write(pingBytes);
+ } catch (Exception e) {
+ System.err.println("WSKeepAlive ping: " + e);
+ try {
+ s.sock.socket().close();
+ } catch (Exception ex) {
+ }
+ try {
+ s.sock.close();
+ } catch (Exception ex) {
+ }
+ i.remove();
+ }
+ }
+ }
+ }
+
+ if (Main.STATSFILE != null) {
+ try {
+ statsFile.write("</table></body></html>");
+ statsFile.close();
+ } catch (Exception e) {
+ System.err.println("WSKeepAlive statsFile close: " + e);
+ }
+ }
+
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ }
+ }
}
}
diff --git a/src/com/juick/jabber/ws/XMPPConnection.java b/src/com/juick/jabber/ws/XMPPConnection.java
index 91a8387b..24329992 100644
--- a/src/com/juick/jabber/ws/XMPPConnection.java
+++ b/src/com/juick/jabber/ws/XMPPConnection.java
@@ -1,5 +1,7 @@
package com.juick.jabber.ws;
+import com.juick.server.MessagesQueries;
+import com.juick.server.Utils;
import com.juick.xmpp.JID;
import com.juick.xmpp.Message;
import com.juick.xmpp.Stream;
@@ -13,6 +15,8 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Iterator;
/**
*
@@ -33,7 +37,7 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message.
public void run() {
try {
Socket socket = new Socket("localhost", 5347);
- xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword);
+ xmpp = new StreamComponent(new JID("", "ws2.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword);
xmpp.addChildParser(new JuickMessage());
xmpp.addListener((Stream.StreamListener) this);
xmpp.addListener((Message.MessageListener) this);
@@ -67,35 +71,24 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message.
}
private void onJuickMessagePost(com.juick.Message jmsg) {
- String json = "{"
- + "\"mid\":" + jmsg.MID + ","
- + "\"user\":{" + "\"uid\":" + jmsg.User.UID + "," + "\"uname\":\"" + encloseJSON(jmsg.User.UName) + "\"},"
- + "\"timestamp\":\"" + jmsg.TimestampString + "\","
- + "\"body\":\"" + encloseJSON(jmsg.Text) + "\"";
- if (jmsg.Tags.size() > 0) {
- json += ",\"tags\":[";
- for (int i = 0; i < jmsg.Tags.size(); i++) {
- if (i > 0) {
- json += ",";
- }
- json += "\"" + encloseJSON((String) jmsg.Tags.get(i)) + "\"";
- }
- json += "]";
+ String json = com.juick.json.Message.toJSON(jmsg).toString();
+ ByteBuffer jsonbytes = Charset.forName("UTF-8").encode(json);
+ ByteBuffer bbMsg = ByteBuffer.allocate(10240);
+ bbMsg.put((byte) 0x81);
+ if (jsonbytes.limit() <= 125) {
+ bbMsg.put((byte) jsonbytes.limit());
+ } else {
+ bbMsg.put((byte) 126);
+ bbMsg.putShort((short) jsonbytes.limit());
}
- json += "}";
-
- ByteBuffer out = ByteBuffer.allocate(10240);
- out.put((byte) 0x00);
- out.put(Charset.forName("UTF-8").encode(json));
- out.put((byte) 0xFF);
- out.flip();
-
+ bbMsg.put(jsonbytes);
+ bbMsg.flip();
+ ArrayList<Integer> uids = new ArrayList<Integer>();
String query = "SELECT suser_id FROM subscr_users WHERE user_id=" + jmsg.User.UID + " AND suser_id NOT IN (SELECT user_id FROM bl_tags INNER JOIN messages_tags USING(tag_id) WHERE message_id=" + jmsg.MID + ")";
if (jmsg.Privacy < 0) {
query += " AND suser_id IN (SELECT wl_user_id FROM wl_users WHERE user_id=" + jmsg.User.UID + ")";
}
-
Statement stmt = null;
ResultSet rs = null;
try {
@@ -103,115 +96,73 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message.
rs = stmt.executeQuery(query);
rs.beforeFirst();
while (rs.next()) {
- int UID = rs.getInt(1);
-
- for (int i = Main.sockMessages.size() - 1; i >= 0; i--) {
- SocketSubscribed ss = Main.sockMessages.get(i);
- if (ss.UID == UID) {
- try {
- out.rewind();
- ss.sock.write(out);
- } catch (IOException e) {
- Main.sockMessages.remove(i);
- try {
- ss.sock.close();
- } catch (IOException ex) {
- }
- }
- }
- }
-
- if (jmsg.Privacy <= 0) {
- for (int i = Main.sockAll.size() - 1; i >= 0; i--) {
- SocketSubscribed ss = Main.sockAll.get(i);
- if (ss.UID == UID) {
- try {
- out.rewind();
- ss.sock.write(out);
- } catch (IOException e) {
- Main.sockAll.remove(i);
- try {
- ss.sock.close();
- } catch (IOException ex) {
- }
- }
- }
- }
- }
-
+ uids.add(rs.getInt(1));
}
} catch (SQLException e) {
- System.err.println(e);
+ System.err.println("onJuickMessagePost: " + e);
} finally {
- if (rs != null) {
- try {
- rs.close();
- } catch (SQLException e) {
- }
- }
- if (stmt != null) {
- try {
- stmt.close();
- } catch (SQLException e) {
- }
- }
+ Utils.finishSQL(rs, stmt);
}
- // Send to all
- if (jmsg.Privacy > 0) {
- for (int i = Main.sockAll.size() - 1; i >= 0; i--) {
- SocketSubscribed ss = Main.sockAll.get(i);
- try {
- out.rewind();
- ss.sock.write(out);
- System.err.println(" --->" + ss.sock.socket().getRemoteSocketAddress());
- } catch (IOException e) {
- Main.sockAll.remove(i);
+ synchronized (Main.clients) {
+ for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) {
+ SocketSubscribed s = i.next();
+ if ((jmsg.Privacy >= 0 && (s.allMessages || s.UID == jmsg.User.UID)) || uids.contains(s.VUID)) {
+ bbMsg.rewind();
try {
- ss.sock.close();
- } catch (IOException ex) {
+ s.sock.write(bbMsg);
+ } catch (Exception e) {
+ try {
+ s.sock.socket().close();
+ } catch (Exception ex) {
+ }
+ try {
+ s.sock.close();
+ } catch (Exception ex) {
+ }
+ i.remove();
}
}
}
}
-
}
private void onJuickMessageReply(com.juick.Message jmsg) {
- String json = "{"
- + "\"mid\":" + jmsg.MID + ","
- + "\"rid\":" + jmsg.RID + ","
- + "\"replyto\":" + jmsg.ReplyTo + ","
- + "\"user\":{" + "\"uid\":" + jmsg.User.UID + "," + "\"uname\":\"" + encloseJSON(jmsg.User.UName) + "\"},"
- + "\"timestamp\":\"" + jmsg.TimestampString + "\","
- + "\"body\":\"" + encloseJSON(jmsg.Text) + "\""
- + "}";
+ String json = com.juick.json.Message.toJSON(jmsg).toString();
+ ByteBuffer jsonbytes = Charset.forName("UTF-8").encode(json);
+ ByteBuffer bbMsg = ByteBuffer.allocate(10240);
+ bbMsg.put((byte) 0x81);
+ if (jsonbytes.limit() <= 125) {
+ bbMsg.put((byte) jsonbytes.limit());
+ } else {
+ bbMsg.put((byte) 126);
+ bbMsg.putShort((short) jsonbytes.limit());
+ }
+ bbMsg.put(jsonbytes);
+ bbMsg.flip();
- ByteBuffer out = ByteBuffer.allocate(10240);
- out.put((byte) 0x00);
- out.put(Charset.forName("UTF-8").encode(json));
- out.put((byte) 0xFF);
- out.flip();
+ int privacy = MessagesQueries.getMessagePrivacy(sql, jmsg.MID);
- for (int i = Main.sockReplies.size() - 1; i >= 0; i--) {
- SocketSubscribed ss = Main.sockReplies.get(i);
- if (ss.MID == 0 || ss.MID == jmsg.MID) {
- try {
- out.rewind();
- ss.sock.write(out);
- System.err.println(" --->" + ss.sock.socket().getRemoteSocketAddress());
- } catch (IOException e) {
- Main.sockReplies.remove(i);
+ synchronized (Main.clients) {
+ for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) {
+ SocketSubscribed s = i.next();
+ if ((privacy >= 0 && s.allReplies) || s.MID == jmsg.MID) {
+ bbMsg.rewind();
try {
- ss.sock.close();
- } catch (IOException ex) {
+ s.sock.write(bbMsg);
+ } catch (Exception e) {
+ try {
+ s.sock.socket().close();
+ } catch (Exception ex) {
+ }
+ try {
+ s.sock.close();
+ } catch (Exception ex) {
+ }
+ i.remove();
}
}
}
}
}
-
- public static String encloseJSON(String str) {
- return str.replace("\"", "&quot;").replace("\\", "\\\\").replace("\n", "\\n");
- }
}