diff options
author | Ugnich Anton | 2013-08-04 02:50:44 +0700 |
---|---|---|
committer | Ugnich Anton | 2013-08-04 02:50:44 +0700 |
commit | db2d48c068715c2841fd60b955c98580236c400d (patch) | |
tree | 200df4736f27f978b770e6e23fac87a5ebbb5a70 | |
parent | 44c7c01059640147fbb9df129c2cff7b0a127329 (diff) |
4 threads
-rw-r--r-- | src/com/juick/jabber/ws/Main.java | 475 | ||||
-rw-r--r-- | src/com/juick/jabber/ws/SocketSubscribed.java | 22 | ||||
-rw-r--r-- | src/com/juick/jabber/ws/WSConnections.java | 44 | ||||
-rw-r--r-- | src/com/juick/jabber/ws/WSData.java | 233 | ||||
-rw-r--r-- | src/com/juick/jabber/ws/WSKeepAlive.java | 20 | ||||
-rw-r--r-- | src/com/juick/jabber/ws/XMPPConnection.java | 217 |
6 files changed, 547 insertions, 464 deletions
diff --git a/src/com/juick/jabber/ws/Main.java b/src/com/juick/jabber/ws/Main.java index dec38586..7036b0d4 100644 --- a/src/com/juick/jabber/ws/Main.java +++ b/src/com/juick/jabber/ws/Main.java @@ -17,45 +17,24 @@ */ package com.juick.jabber.ws; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; -import com.juick.xmpp.Stream; -import com.juick.xmpp.StreamComponent; -import com.juick.xmpp.extensions.JuickMessage; import java.io.FileInputStream; -import java.io.IOException; -import java.math.BigInteger; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.security.MessageDigest; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; -import java.util.Iterator; import java.util.Properties; /** * * @author Ugnich Anton */ -public class Main implements Stream.StreamListener, Message.MessageListener { +public class Main { Connection sql; - Stream xmpp; - ArrayList<SocketSubscribed> sockReplies = new ArrayList<SocketSubscribed>(); - ArrayList<SocketSubscribed> sockMessages = new ArrayList<SocketSubscribed>(); - ArrayList<SocketSubscribed> sockAll = new ArrayList<SocketSubscribed>(); - Selector sel; + XMPPConnection xmpp; + public final static ArrayList<SocketSubscribed> sockReplies = new ArrayList<SocketSubscribed>(); + public final static ArrayList<SocketSubscribed> sockMessages = new ArrayList<SocketSubscribed>(); + public final static ArrayList<SocketSubscribed> sockAll = new ArrayList<SocketSubscribed>(); public static void main(String[] args) { new Main().start(); @@ -67,8 +46,11 @@ public class Main implements Stream.StreamListener, Message.MessageListener { conf.load(new FileInputStream("/etc/juick/ws.conf")); setupSql(conf.getProperty("mysql_username", ""), conf.getProperty("mysql_password", "")); - setupXmppComponent(conf.getProperty("xmpp_password", "")); - setupWsServer(); + xmpp = new XMPPConnection(sql, conf.getProperty("xmpp_password", "")); + new Thread(xmpp).start(); + new Thread(new WSData(sql)).start(); + new Thread(new WSConnections()).start(); + new Thread(new WSKeepAlive(sql)).start(); } catch (Exception e) { System.err.println(e); } @@ -81,439 +63,4 @@ public class Main implements Stream.StreamListener, Message.MessageListener { System.err.println(e); } } - - public void setupXmppComponent(String password) { - try { - Socket socket = new Socket("localhost", 5347); - xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), password); - xmpp.addChildParser(new JuickMessage()); - xmpp.addListener((Stream.StreamListener) this); - xmpp.addListener((Message.MessageListener) this); - xmpp.startParsing(); - } catch (IOException e) { - System.err.println(e); - } - } - - @Override - public void onStreamFail(String msg) { - System.err.println("Component stream failed: " + msg); - } - - @Override - public void onStreamReady() { - System.err.println("Component stream ready"); - } - - public void setupWsServer() { - try { - sel = Selector.open(); - ServerSocketChannel listensock = ServerSocketChannel.open(); - listensock.configureBlocking(false); - listensock.socket().bind(new InetSocketAddress(8080)); - listensock.register(sel, SelectionKey.OP_ACCEPT); - - while (true) { - sel.select(); -// System.out.println("ONE"); - Iterator<SelectionKey> it = sel.selectedKeys().iterator(); - while (it.hasNext()) { -// System.out.println("TWO"); - SelectionKey selKey = it.next(); - it.remove(); - if (selKey.isAcceptable()) { - ServerSocketChannel ssChannel = (ServerSocketChannel) selKey.channel(); - SocketChannel sChannel = ssChannel.accept(); - System.out.println(sChannel.socket().getRemoteSocketAddress().toString() + " ACCEPTED"); - sChannel.configureBlocking(false); - sChannel.register(sel, SelectionKey.OP_READ); - } else if (selKey.isReadable()) { -// System.out.println("THREE"); - SocketChannel sChannel = (SocketChannel) selKey.channel(); - ByteBuffer buf = ByteBuffer.allocate(10240); - try { - if (sChannel.read(buf) > 0) { - buf.flip(); - CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf); - if (charbuf.charAt(0) == 0 && charbuf.charAt(charbuf.length() - 1) == 0xFF) { - wsTextFrame(sChannel, charbuf.subSequence(1, charbuf.length() - 2)); - } else if (charbuf.charAt(0) == 'G' && charbuf.charAt(1) == 'E' && charbuf.charAt(2) == 'T' && charbuf.charAt(3) == ' ') { - wsHandshake(sChannel, buf); - } else { - System.out.println(sChannel.socket().getRemoteSocketAddress().toString() + " INVALID FRAME"); -// System.out.println("FOUR"); - sChannel.socket().close(); - sChannel.close(); - selKey.cancel(); - } - } else { - sChannel.socket().close(); - sChannel.close(); - selKey.cancel(); -// System.out.println("SIX"); - } - } catch (IOException e) { - System.err.println(e); - sChannel.socket().close(); - sChannel.close(); -// System.out.println("FIVE"); - selKey.cancel(); - } - } - } - } - } catch (Exception e) { - System.err.println(e); - } - } - - public void wsHandshake(SocketChannel sock, ByteBuffer buf) throws Exception { - String hOrigin = null; - String hHost = null; - String hLocation = null; - String hSecWebSocketKey1 = null; - String hSecWebSocketKey2 = null; - String hCookie = null; - - buf.rewind(); - CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf); - String headers[] = charbuf.toString().split("\r\n"); - for (int i = 0; i < headers.length; i++) { - String h[] = headers[i].split(" ", 2); - if (h.length == 2) { - if (h[0].equals("GET")) { - hLocation = headers[i].split(" ", 3)[1]; - } else if (h[0].equals("Origin:")) { - hOrigin = h[1]; - } else if (h[0].equals("Host:")) { - hHost = h[1]; - } else if (h[0].equals("Sec-WebSocket-Key1:")) { - hSecWebSocketKey1 = h[1]; - } else if (h[0].equals("Sec-WebSocket-Key2:")) { - hSecWebSocketKey2 = h[1]; - } else if (h[0].equals("Cookie:")) { - hCookie = h[1]; - } - } - } - - if (hOrigin == null || hHost == null || hLocation == null || hSecWebSocketKey1 == null || hSecWebSocketKey2 == null) { - System.err.println(sock.socket().getRemoteSocketAddress().toString() + " Invalid headers"); - sock.close(); - return; - } - - // Cookies - int UID = 0; - String hash = null; - if (hCookie != null) { - String cookies[] = hCookie.split("; "); - for (int i = 0; i < cookies.length; i++) { - String cookie[] = cookies[i].split("=", 2); - if (cookie[0].equals("hash")) { - hash = cookie[1]; - break; - } - } - if (hash != null) { - UID = com.juick.server.UserQueries.getUIDbyHash(sql, hash); - } - } - - // URL - String loc[] = hLocation.split("/"); - int MID = 0; - if (hLocation.equals("/my") && UID > 0) { - sockMessages.add(new SocketSubscribed(sock, UID, 0)); - } else if (hLocation.equals("/all")) { - sockAll.add(new SocketSubscribed(sock, UID, 0)); - } else if ((loc.length == 2 || loc.length == 3) && loc[1].equals("replies")) { - if (loc.length == 2) { - sockReplies.add(new SocketSubscribed(sock, UID, 0)); - } else { - try { - MID = Integer.parseInt(loc[2]); - } catch (Exception e) { - } - if (MID > 0) { - sockReplies.add(new SocketSubscribed(sock, UID, MID)); - } else { - System.err.println(sock.socket().getRemoteSocketAddress().toString() + " Invalid MID"); - sock.close(); - return; - } - } - } else { - System.err.println(sock.socket().getRemoteSocketAddress().toString() + " Invalid location"); - sock.close(); - return; - } - - System.out.println(sock.socket().getRemoteSocketAddress().toString() + " HANDSHAKE (Hash=" + hash + "; UID = " + UID + "; MID = " + MID + ")"); - - Long lSecNum1 = calcSecKeyNum(hSecWebSocketKey1); - Long lSecNum2 = calcSecKeyNum(hSecWebSocketKey2); - - BigInteger sec1 = new BigInteger(lSecNum1.toString()); - BigInteger sec2 = new BigInteger(lSecNum2.toString()); - - // concatenate 3 parts secNum1 + secNum2 + secKey (16 Bytes) - byte[] l128Bit = new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; - byte[] lTmp; - - lTmp = sec1.toByteArray(); - int lIdx = lTmp.length; - int lCnt = 0; - while (lIdx > 0 && lCnt < 4) { - lIdx--; - lCnt++; - l128Bit[4 - lCnt] = lTmp[lIdx]; - } - - lTmp = sec2.toByteArray(); - lIdx = lTmp.length; - lCnt = 0; - while (lIdx > 0 && lCnt < 4) { - lIdx--; - lCnt++; - l128Bit[8 - lCnt] = lTmp[lIdx]; - } - - buf.rewind(); - for (int i = 0; i < 8; i++) { - l128Bit[8 + i] = buf.get(buf.limit() - 8 + i); - } - - String outstr = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" - + "Upgrade: WebSocket\r\n" - + "Connection: Upgrade\r\n" - + "Sec-WebSocket-Origin: " + hOrigin + "\r\n" - + "Sec-WebSocket-Location: ws://" + hHost + hLocation + "\r\n" - + "Sec-WebSocket-Protocol: sample\r\n" - + "\r\n"; - ByteBuffer out = ByteBuffer.allocate(4096); - out.put(Charset.forName("ISO-8859-1").encode(outstr)); - out.put(MessageDigest.getInstance("MD5").digest(l128Bit)); - out.flip(); - - sock.write(out); - } - - private static long calcSecKeyNum(String aKey) { - StringBuilder lSB = new StringBuilder(); - // StringBuuffer lSB = new StringBuuffer(); - int lSpaces = 0; - for (int i = 0; i < aKey.length(); i++) { - char lC = aKey.charAt(i); - if (lC == ' ') { - lSpaces++; - } else if (lC >= '0' && lC <= '9') { - lSB.append(lC); - } - } - long lRes = -1; - if (lSpaces > 0) { - try { - lRes = Long.parseLong(lSB.toString()) / lSpaces; - // log.debug("Key: " + aKey + ", Numbers: " + lSB.toString() + - // ", Spaces: " + lSpaces + ", Result: " + lRes); - } catch (NumberFormatException ex) { - // use default result - } - } - return lRes; - } - - public void wsTextFrame(SocketChannel sock, CharSequence csbuf) { - String buf = csbuf.toString(); - if (buf.equals(" ")) { - ByteBuffer out = ByteBuffer.allocate(4); - out.put((byte) 0x00); - out.put((byte) 0x20); - out.put((byte) 0xFF); - out.flip(); - out.rewind(); - try { - sock.write(out); - } catch (IOException e) { - } - } else { - System.out.println(sock.socket().getRemoteSocketAddress().toString() + " DATA '" + buf + "'"); - } - } - - @Override - public void onMessage(com.juick.xmpp.Message msg) { - JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS); - if (jmsg != null) { - System.err.println("MID=" + jmsg.MID + "; RID=" + jmsg.RID); - if (jmsg.RID == 0) { - onJuickMessagePost(jmsg); - } else { - onJuickMessageReply(jmsg); - } - } - } - - private void onJuickMessagePost(com.juick.Message jmsg) { - String json = "{" - + "\"mid\":" + jmsg.MID + "," - + "\"user\":{" + "\"uid\":" + jmsg.User.UID + "," + "\"uname\":\"" + encloseJSON(jmsg.User.UName) + "\"}," - + "\"timestamp\":\"" + jmsg.TimestampString + "\"," - + "\"body\":\"" + encloseJSON(jmsg.Text) + "\""; - if (jmsg.Tags.size() > 0) { - json += ",\"tags\":["; - for (int i = 0; i < jmsg.Tags.size(); i++) { - if (i > 0) { - json += ","; - } - json += "\"" + encloseJSON((String) jmsg.Tags.get(i)) + "\""; - } - json += "]"; - } - json += "}"; - - ByteBuffer out = ByteBuffer.allocate(10240); - out.put((byte) 0x00); - out.put(Charset.forName("UTF-8").encode(json)); - out.put((byte) 0xFF); - out.flip(); - - - String query = "SELECT suser_id FROM subscr_users WHERE user_id=" + jmsg.User.UID + " AND suser_id NOT IN (SELECT user_id FROM bl_tags INNER JOIN messages_tags USING(tag_id) WHERE message_id=" + jmsg.MID + ")"; - if (jmsg.Privacy < 0) { - query += " AND suser_id IN (SELECT wl_user_id FROM wl_users WHERE user_id=" + jmsg.User.UID + ")"; - } - - Statement stmt = null; - ResultSet rs = null; - try { - stmt = sql.createStatement(); - rs = stmt.executeQuery(query); - rs.beforeFirst(); - while (rs.next()) { - int UID = rs.getInt(1); - - for (int i = sockMessages.size() - 1; i >= 0; i--) { - SocketSubscribed ss = sockMessages.get(i); - if (ss.UID == UID) { - try { - out.rewind(); - ss.sock.write(out); - } catch (IOException e) { - sockMessages.remove(i); - try { - ss.sock.close(); - } catch (IOException ex) { - } - } - } - } - - if (jmsg.Privacy <= 0) { - for (int i = sockAll.size() - 1; i >= 0; i--) { - SocketSubscribed ss = sockAll.get(i); - if (ss.UID == UID) { - try { - out.rewind(); - ss.sock.write(out); - } catch (IOException e) { - sockAll.remove(i); - try { - ss.sock.close(); - } catch (IOException ex) { - } - } - } - } - } - - } - } catch (SQLException e) { - System.err.println(e); - } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - } - } - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - } - } - } - - // Send to all - if (jmsg.Privacy > 0) { - for (int i = sockAll.size() - 1; i >= 0; i--) { - SocketSubscribed ss = sockAll.get(i); - try { - out.rewind(); - ss.sock.write(out); - System.err.println(" --->" + ss.sock.socket().getRemoteSocketAddress()); - } catch (IOException e) { - sockAll.remove(i); - try { - ss.sock.close(); - } catch (IOException ex) { - } - } - } - } - - } - - private void onJuickMessageReply(com.juick.Message jmsg) { - String json = "{" - + "\"mid\":" + jmsg.MID + "," - + "\"rid\":" + jmsg.RID + "," - + "\"replyto\":" + jmsg.ReplyTo + "," - + "\"user\":{" + "\"uid\":" + jmsg.User.UID + "," + "\"uname\":\"" + encloseJSON(jmsg.User.UName) + "\"}," - + "\"timestamp\":\"" + jmsg.TimestampString + "\"," - + "\"body\":\"" + encloseJSON(jmsg.Text) + "\"" - + "}"; - - ByteBuffer out = ByteBuffer.allocate(10240); - out.put((byte) 0x00); - out.put(Charset.forName("UTF-8").encode(json)); - out.put((byte) 0xFF); - out.flip(); - - for (int i = sockReplies.size() - 1; i >= 0; i--) { - SocketSubscribed ss = sockReplies.get(i); - if (ss.MID == 0 || ss.MID == jmsg.MID) { - try { - out.rewind(); - ss.sock.write(out); - System.err.println(" --->" + ss.sock.socket().getRemoteSocketAddress()); - } catch (IOException e) { - sockReplies.remove(i); - try { - ss.sock.close(); - } catch (IOException ex) { - } - } - } - } - } - - public static String encloseJSON(String str) { - return str.replace("\"", """).replace("\\", "\\\\").replace("\n", "\\n"); - } -} - -class SocketSubscribed { - - public SocketChannel sock; - public int UID; - public int MID; - - public SocketSubscribed(SocketChannel sock, int UID, int MID) { - this.sock = sock; - this.UID = UID; - this.MID = MID; - } -} +}
\ No newline at end of file diff --git a/src/com/juick/jabber/ws/SocketSubscribed.java b/src/com/juick/jabber/ws/SocketSubscribed.java new file mode 100644 index 00000000..29baf0b8 --- /dev/null +++ b/src/com/juick/jabber/ws/SocketSubscribed.java @@ -0,0 +1,22 @@ +package com.juick.jabber.ws; + +import java.nio.channels.SocketChannel; + +/** + * + * @author ugnich + */ +public class SocketSubscribed { + + public SocketChannel sock = null; + public int UID = 0; + public int MID = 0; + public long tsConnected = 0; + public long tsLastData = 0; + + public SocketSubscribed(SocketChannel sock, int UID, int MID) { + this.sock = sock; + this.UID = UID; + this.MID = MID; + } +} diff --git a/src/com/juick/jabber/ws/WSConnections.java b/src/com/juick/jabber/ws/WSConnections.java new file mode 100644 index 00000000..020442f7 --- /dev/null +++ b/src/com/juick/jabber/ws/WSConnections.java @@ -0,0 +1,44 @@ +package com.juick.jabber.ws; + +import java.net.InetSocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; + +/** + * + * @author ugnich + */ +public class WSConnections implements Runnable { + + Selector sel; + + @Override + public void run() { + try { + sel = Selector.open(); + ServerSocketChannel listensock = ServerSocketChannel.open(); + listensock.configureBlocking(false); + listensock.socket().bind(new InetSocketAddress(8081)); + listensock.register(sel, SelectionKey.OP_ACCEPT); + + while (true) { + sel.select(); + Iterator<SelectionKey> it = sel.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey selKey = it.next(); + it.remove(); + ServerSocketChannel ssChannel = (ServerSocketChannel) selKey.channel(); + SocketChannel sChannel = ssChannel.accept(); + System.out.println(sChannel.socket().getRemoteSocketAddress().toString() + " ACCEPTED"); + sChannel.configureBlocking(false); + sChannel.register(WSData.sel, SelectionKey.OP_READ); + } + } + } catch (Exception e) { + System.err.println("WSConnections: " + e); + } + } +} diff --git a/src/com/juick/jabber/ws/WSData.java b/src/com/juick/jabber/ws/WSData.java new file mode 100644 index 00000000..97193510 --- /dev/null +++ b/src/com/juick/jabber/ws/WSData.java @@ -0,0 +1,233 @@ +package com.juick.jabber.ws; + +import java.io.IOException; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.nio.charset.Charset; +import java.security.MessageDigest; +import java.sql.Connection; +import java.util.Iterator; + +/** + * + * @author ugnich + */ +public class WSData implements Runnable { + + Connection sql; + public static Selector sel; + + public WSData(Connection sql) { + this.sql = sql; + } + + @Override + public void run() { + try { + sel = Selector.open(); + while (true) { + sel.select(); + Iterator<SelectionKey> it = sel.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey selKey = it.next(); + it.remove(); + + SocketChannel sChannel = (SocketChannel) selKey.channel(); + ByteBuffer buf = ByteBuffer.allocate(10240); + try { + if (sChannel.read(buf) > 0) { + buf.flip(); + CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf); + if (charbuf.charAt(0) == 0 && charbuf.charAt(charbuf.length() - 1) == 0xFF) { + wsTextFrame(sChannel, charbuf.subSequence(1, charbuf.length() - 2)); + } else if (charbuf.charAt(0) == 'G' && charbuf.charAt(1) == 'E' && charbuf.charAt(2) == 'T' && charbuf.charAt(3) == ' ') { + wsHandshake(sChannel, buf); + } else { + throw new IOException(sChannel.socket().getRemoteSocketAddress().toString() + " INVALID FRAME"); + } + } else { + throw new IOException(sChannel.socket().getRemoteSocketAddress().toString()+ " NO DATA"); + } + } catch (IOException e) { + System.err.println("WSData: " + e); + sChannel.socket().close(); + sChannel.close(); + selKey.cancel(); + } + } + } + } catch (Exception e) { + System.err.println("WSData: " + e); + } + } + + public void wsHandshake(SocketChannel sock, ByteBuffer buf) throws Exception { + String hOrigin = null; + String hHost = null; + String hLocation = null; + String hSecWebSocketKey1 = null; + String hSecWebSocketKey2 = null; + String hCookie = null; + + buf.rewind(); + CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf); + String headers[] = charbuf.toString().split("\r\n"); + for (int i = 0; i < headers.length; i++) { + String h[] = headers[i].split(" ", 2); + if (h.length == 2) { + if (h[0].equals("GET")) { + hLocation = headers[i].split(" ", 3)[1]; + } else if (h[0].equals("Origin:")) { + hOrigin = h[1]; + } else if (h[0].equals("Host:")) { + hHost = h[1]; + } else if (h[0].equals("Sec-WebSocket-Key1:")) { + hSecWebSocketKey1 = h[1]; + } else if (h[0].equals("Sec-WebSocket-Key2:")) { + hSecWebSocketKey2 = h[1]; + } else if (h[0].equals("Cookie:")) { + hCookie = h[1]; + } + } + } + + if (hOrigin == null || hHost == null || hLocation == null || hSecWebSocketKey1 == null || hSecWebSocketKey2 == null) { + throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " Invalid headers"); + } + + // Cookies + int UID = 0; + String hash = null; + if (hCookie != null) { + String cookies[] = hCookie.split("; "); + for (int i = 0; i < cookies.length; i++) { + String cookie[] = cookies[i].split("=", 2); + if (cookie[0].equals("hash")) { + hash = cookie[1]; + break; + } + } + if (hash != null) { + UID = com.juick.server.UserQueries.getUIDbyHash(sql, hash); + } + } + + // URL + String loc[] = hLocation.split("/"); + int MID = 0; + if (hLocation.equals("/my") && UID > 0) { + Main.sockMessages.add(new SocketSubscribed(sock, UID, 0)); + } else if (hLocation.equals("/all")) { + Main.sockAll.add(new SocketSubscribed(sock, UID, 0)); + } else if ((loc.length == 2 || loc.length == 3) && loc[1].equals("replies")) { + if (loc.length == 2) { + Main.sockReplies.add(new SocketSubscribed(sock, UID, 0)); + } else { + try { + MID = Integer.parseInt(loc[2]); + } catch (Exception e) { + } + if (MID > 0) { + Main.sockReplies.add(new SocketSubscribed(sock, UID, MID)); + } else { + throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " Invalid MID"); + } + } + } else { + throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " Invalid location"); + } + + System.out.println(sock.socket().getRemoteSocketAddress().toString() + " HANDSHAKE (Hash=" + hash + "; UID = " + UID + "; MID = " + MID + ")"); + + Long lSecNum1 = calcSecKeyNum(hSecWebSocketKey1); + Long lSecNum2 = calcSecKeyNum(hSecWebSocketKey2); + + BigInteger sec1 = new BigInteger(lSecNum1.toString()); + BigInteger sec2 = new BigInteger(lSecNum2.toString()); + + // concatenate 3 parts secNum1 + secNum2 + secKey (16 Bytes) + byte[] l128Bit = new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; + byte[] lTmp; + + lTmp = sec1.toByteArray(); + int lIdx = lTmp.length; + int lCnt = 0; + while (lIdx > 0 && lCnt < 4) { + lIdx--; + lCnt++; + l128Bit[4 - lCnt] = lTmp[lIdx]; + } + + lTmp = sec2.toByteArray(); + lIdx = lTmp.length; + lCnt = 0; + while (lIdx > 0 && lCnt < 4) { + lIdx--; + lCnt++; + l128Bit[8 - lCnt] = lTmp[lIdx]; + } + + buf.rewind(); + for (int i = 0; i < 8; i++) { + l128Bit[8 + i] = buf.get(buf.limit() - 8 + i); + } + + String outstr = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" + + "Upgrade: WebSocket\r\n" + + "Connection: Upgrade\r\n" + + "Sec-WebSocket-Origin: " + hOrigin + "\r\n" + + "Sec-WebSocket-Location: ws://" + hHost + hLocation + "\r\n" + + "Sec-WebSocket-Protocol: sample\r\n" + + "\r\n"; + ByteBuffer out = ByteBuffer.allocate(4096); + out.put(Charset.forName("ISO-8859-1").encode(outstr)); + out.put(MessageDigest.getInstance("MD5").digest(l128Bit)); + out.flip(); + + sock.write(out); + } + + private static long calcSecKeyNum(String aKey) { + StringBuilder lSB = new StringBuilder(); + int lSpaces = 0; + for (int i = 0; i < aKey.length(); i++) { + char lC = aKey.charAt(i); + if (lC == ' ') { + lSpaces++; + } else if (lC >= '0' && lC <= '9') { + lSB.append(lC); + } + } + long lRes = -1; + if (lSpaces > 0) { + try { + lRes = Long.parseLong(lSB.toString()) / lSpaces; + } catch (NumberFormatException ex) { + // use default result + } + } + return lRes; + } + + public void wsTextFrame(SocketChannel sock, CharSequence csbuf) { + String buf = csbuf.toString(); + if (buf.equals(" ")) { + ByteBuffer out = ByteBuffer.allocate(4); + out.put((byte) 0x00); + out.put((byte) 0x20); + out.put((byte) 0xFF); + out.flip(); + out.rewind(); + try { + sock.write(out); + } catch (IOException e) { + } + } else { + System.out.println(sock.socket().getRemoteSocketAddress().toString() + " DATA '" + buf + "'"); + } + } +} diff --git a/src/com/juick/jabber/ws/WSKeepAlive.java b/src/com/juick/jabber/ws/WSKeepAlive.java new file mode 100644 index 00000000..ba6376a4 --- /dev/null +++ b/src/com/juick/jabber/ws/WSKeepAlive.java @@ -0,0 +1,20 @@ +package com.juick.jabber.ws; + +import java.sql.Connection; + +/** + * + * @author ugnich + */ +public class WSKeepAlive implements Runnable { + + Connection sql; + + public WSKeepAlive(Connection sql) { + this.sql = sql; + } + + @Override + public void run() { + } +} diff --git a/src/com/juick/jabber/ws/XMPPConnection.java b/src/com/juick/jabber/ws/XMPPConnection.java new file mode 100644 index 00000000..91a8387b --- /dev/null +++ b/src/com/juick/jabber/ws/XMPPConnection.java @@ -0,0 +1,217 @@ +package com.juick.jabber.ws; + +import com.juick.xmpp.JID; +import com.juick.xmpp.Message; +import com.juick.xmpp.Stream; +import com.juick.xmpp.StreamComponent; +import com.juick.xmpp.extensions.JuickMessage; +import java.io.IOException; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +/** + * + * @author ugnich + */ +public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener { + + Connection sql; + Stream xmpp; + String xmppPassword; + + public XMPPConnection(Connection sql, String password) { + this.sql = sql; + xmppPassword = password; + } + + @Override + public void run() { + try { + Socket socket = new Socket("localhost", 5347); + xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword); + xmpp.addChildParser(new JuickMessage()); + xmpp.addListener((Stream.StreamListener) this); + xmpp.addListener((Message.MessageListener) this); + xmpp.startParsing(); + } catch (IOException e) { + System.err.println("XMPPConnection: " + e); + } + } + + @Override + public void onStreamReady() { + System.err.println("Stream ready"); + } + + @Override + public void onStreamFail(String msg) { + System.err.println("Stream failed: " + msg); + } + + @Override + public void onMessage(com.juick.xmpp.Message msg) { + JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + System.err.println("MID=" + jmsg.MID + "; RID=" + jmsg.RID); + if (jmsg.RID == 0) { + onJuickMessagePost(jmsg); + } else { + onJuickMessageReply(jmsg); + } + } + } + + private void onJuickMessagePost(com.juick.Message jmsg) { + String json = "{" + + "\"mid\":" + jmsg.MID + "," + + "\"user\":{" + "\"uid\":" + jmsg.User.UID + "," + "\"uname\":\"" + encloseJSON(jmsg.User.UName) + "\"}," + + "\"timestamp\":\"" + jmsg.TimestampString + "\"," + + "\"body\":\"" + encloseJSON(jmsg.Text) + "\""; + if (jmsg.Tags.size() > 0) { + json += ",\"tags\":["; + for (int i = 0; i < jmsg.Tags.size(); i++) { + if (i > 0) { + json += ","; + } + json += "\"" + encloseJSON((String) jmsg.Tags.get(i)) + "\""; + } + json += "]"; + } + json += "}"; + + ByteBuffer out = ByteBuffer.allocate(10240); + out.put((byte) 0x00); + out.put(Charset.forName("UTF-8").encode(json)); + out.put((byte) 0xFF); + out.flip(); + + + String query = "SELECT suser_id FROM subscr_users WHERE user_id=" + jmsg.User.UID + " AND suser_id NOT IN (SELECT user_id FROM bl_tags INNER JOIN messages_tags USING(tag_id) WHERE message_id=" + jmsg.MID + ")"; + if (jmsg.Privacy < 0) { + query += " AND suser_id IN (SELECT wl_user_id FROM wl_users WHERE user_id=" + jmsg.User.UID + ")"; + } + + Statement stmt = null; + ResultSet rs = null; + try { + stmt = sql.createStatement(); + rs = stmt.executeQuery(query); + rs.beforeFirst(); + while (rs.next()) { + int UID = rs.getInt(1); + + for (int i = Main.sockMessages.size() - 1; i >= 0; i--) { + SocketSubscribed ss = Main.sockMessages.get(i); + if (ss.UID == UID) { + try { + out.rewind(); + ss.sock.write(out); + } catch (IOException e) { + Main.sockMessages.remove(i); + try { + ss.sock.close(); + } catch (IOException ex) { + } + } + } + } + + if (jmsg.Privacy <= 0) { + for (int i = Main.sockAll.size() - 1; i >= 0; i--) { + SocketSubscribed ss = Main.sockAll.get(i); + if (ss.UID == UID) { + try { + out.rewind(); + ss.sock.write(out); + } catch (IOException e) { + Main.sockAll.remove(i); + try { + ss.sock.close(); + } catch (IOException ex) { + } + } + } + } + } + + } + } catch (SQLException e) { + System.err.println(e); + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + } + } + } + + // Send to all + if (jmsg.Privacy > 0) { + for (int i = Main.sockAll.size() - 1; i >= 0; i--) { + SocketSubscribed ss = Main.sockAll.get(i); + try { + out.rewind(); + ss.sock.write(out); + System.err.println(" --->" + ss.sock.socket().getRemoteSocketAddress()); + } catch (IOException e) { + Main.sockAll.remove(i); + try { + ss.sock.close(); + } catch (IOException ex) { + } + } + } + } + + } + + private void onJuickMessageReply(com.juick.Message jmsg) { + String json = "{" + + "\"mid\":" + jmsg.MID + "," + + "\"rid\":" + jmsg.RID + "," + + "\"replyto\":" + jmsg.ReplyTo + "," + + "\"user\":{" + "\"uid\":" + jmsg.User.UID + "," + "\"uname\":\"" + encloseJSON(jmsg.User.UName) + "\"}," + + "\"timestamp\":\"" + jmsg.TimestampString + "\"," + + "\"body\":\"" + encloseJSON(jmsg.Text) + "\"" + + "}"; + + ByteBuffer out = ByteBuffer.allocate(10240); + out.put((byte) 0x00); + out.put(Charset.forName("UTF-8").encode(json)); + out.put((byte) 0xFF); + out.flip(); + + for (int i = Main.sockReplies.size() - 1; i >= 0; i--) { + SocketSubscribed ss = Main.sockReplies.get(i); + if (ss.MID == 0 || ss.MID == jmsg.MID) { + try { + out.rewind(); + ss.sock.write(out); + System.err.println(" --->" + ss.sock.socket().getRemoteSocketAddress()); + } catch (IOException e) { + Main.sockReplies.remove(i); + try { + ss.sock.close(); + } catch (IOException ex) { + } + } + } + } + } + + public static String encloseJSON(String str) { + return str.replace("\"", """).replace("\\", "\\\\").replace("\n", "\\n"); + } +} |