diff options
author | Ugnich Anton | 2013-08-04 02:50:44 +0700 |
---|---|---|
committer | Ugnich Anton | 2013-08-04 02:50:44 +0700 |
commit | db2d48c068715c2841fd60b955c98580236c400d (patch) | |
tree | 200df4736f27f978b770e6e23fac87a5ebbb5a70 /src/com/juick/jabber/ws/Main.java | |
parent | 44c7c01059640147fbb9df129c2cff7b0a127329 (diff) |
4 threads
Diffstat (limited to 'src/com/juick/jabber/ws/Main.java')
-rw-r--r-- | src/com/juick/jabber/ws/Main.java | 475 |
1 files changed, 11 insertions, 464 deletions
diff --git a/src/com/juick/jabber/ws/Main.java b/src/com/juick/jabber/ws/Main.java index dec38586..7036b0d4 100644 --- a/src/com/juick/jabber/ws/Main.java +++ b/src/com/juick/jabber/ws/Main.java @@ -17,45 +17,24 @@ */ package com.juick.jabber.ws; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; -import com.juick.xmpp.Stream; -import com.juick.xmpp.StreamComponent; -import com.juick.xmpp.extensions.JuickMessage; import java.io.FileInputStream; -import java.io.IOException; -import java.math.BigInteger; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.security.MessageDigest; import java.sql.Connection; import java.sql.DriverManager; -import java.sql.ResultSet; import java.sql.SQLException; -import java.sql.Statement; import java.util.ArrayList; -import java.util.Iterator; import java.util.Properties; /** * * @author Ugnich Anton */ -public class Main implements Stream.StreamListener, Message.MessageListener { +public class Main { Connection sql; - Stream xmpp; - ArrayList<SocketSubscribed> sockReplies = new ArrayList<SocketSubscribed>(); - ArrayList<SocketSubscribed> sockMessages = new ArrayList<SocketSubscribed>(); - ArrayList<SocketSubscribed> sockAll = new ArrayList<SocketSubscribed>(); - Selector sel; + XMPPConnection xmpp; + public final static ArrayList<SocketSubscribed> sockReplies = new ArrayList<SocketSubscribed>(); + public final static ArrayList<SocketSubscribed> sockMessages = new ArrayList<SocketSubscribed>(); + public final static ArrayList<SocketSubscribed> sockAll = new ArrayList<SocketSubscribed>(); public static void main(String[] args) { new Main().start(); @@ -67,8 +46,11 @@ public class Main implements Stream.StreamListener, Message.MessageListener { conf.load(new FileInputStream("/etc/juick/ws.conf")); setupSql(conf.getProperty("mysql_username", ""), conf.getProperty("mysql_password", "")); - setupXmppComponent(conf.getProperty("xmpp_password", "")); - setupWsServer(); + xmpp = new XMPPConnection(sql, conf.getProperty("xmpp_password", "")); + new Thread(xmpp).start(); + new Thread(new WSData(sql)).start(); + new Thread(new WSConnections()).start(); + new Thread(new WSKeepAlive(sql)).start(); } catch (Exception e) { System.err.println(e); } @@ -81,439 +63,4 @@ public class Main implements Stream.StreamListener, Message.MessageListener { System.err.println(e); } } - - public void setupXmppComponent(String password) { - try { - Socket socket = new Socket("localhost", 5347); - xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), password); - xmpp.addChildParser(new JuickMessage()); - xmpp.addListener((Stream.StreamListener) this); - xmpp.addListener((Message.MessageListener) this); - xmpp.startParsing(); - } catch (IOException e) { - System.err.println(e); - } - } - - @Override - public void onStreamFail(String msg) { - System.err.println("Component stream failed: " + msg); - } - - @Override - public void onStreamReady() { - System.err.println("Component stream ready"); - } - - public void setupWsServer() { - try { - sel = Selector.open(); - ServerSocketChannel listensock = ServerSocketChannel.open(); - listensock.configureBlocking(false); - listensock.socket().bind(new InetSocketAddress(8080)); - listensock.register(sel, SelectionKey.OP_ACCEPT); - - while (true) { - sel.select(); -// System.out.println("ONE"); - Iterator<SelectionKey> it = sel.selectedKeys().iterator(); - while (it.hasNext()) { -// System.out.println("TWO"); - SelectionKey selKey = it.next(); - it.remove(); - if (selKey.isAcceptable()) { - ServerSocketChannel ssChannel = (ServerSocketChannel) selKey.channel(); - SocketChannel sChannel = ssChannel.accept(); - System.out.println(sChannel.socket().getRemoteSocketAddress().toString() + " ACCEPTED"); - sChannel.configureBlocking(false); - sChannel.register(sel, SelectionKey.OP_READ); - } else if (selKey.isReadable()) { -// System.out.println("THREE"); - SocketChannel sChannel = (SocketChannel) selKey.channel(); - ByteBuffer buf = ByteBuffer.allocate(10240); - try { - if (sChannel.read(buf) > 0) { - buf.flip(); - CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf); - if (charbuf.charAt(0) == 0 && charbuf.charAt(charbuf.length() - 1) == 0xFF) { - wsTextFrame(sChannel, charbuf.subSequence(1, charbuf.length() - 2)); - } else if (charbuf.charAt(0) == 'G' && charbuf.charAt(1) == 'E' && charbuf.charAt(2) == 'T' && charbuf.charAt(3) == ' ') { - wsHandshake(sChannel, buf); - } else { - System.out.println(sChannel.socket().getRemoteSocketAddress().toString() + " INVALID FRAME"); -// System.out.println("FOUR"); - sChannel.socket().close(); - sChannel.close(); - selKey.cancel(); - } - } else { - sChannel.socket().close(); - sChannel.close(); - selKey.cancel(); -// System.out.println("SIX"); - } - } catch (IOException e) { - System.err.println(e); - sChannel.socket().close(); - sChannel.close(); -// System.out.println("FIVE"); - selKey.cancel(); - } - } - } - } - } catch (Exception e) { - System.err.println(e); - } - } - - public void wsHandshake(SocketChannel sock, ByteBuffer buf) throws Exception { - String hOrigin = null; - String hHost = null; - String hLocation = null; - String hSecWebSocketKey1 = null; - String hSecWebSocketKey2 = null; - String hCookie = null; - - buf.rewind(); - CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf); - String headers[] = charbuf.toString().split("\r\n"); - for (int i = 0; i < headers.length; i++) { - String h[] = headers[i].split(" ", 2); - if (h.length == 2) { - if (h[0].equals("GET")) { - hLocation = headers[i].split(" ", 3)[1]; - } else if (h[0].equals("Origin:")) { - hOrigin = h[1]; - } else if (h[0].equals("Host:")) { - hHost = h[1]; - } else if (h[0].equals("Sec-WebSocket-Key1:")) { - hSecWebSocketKey1 = h[1]; - } else if (h[0].equals("Sec-WebSocket-Key2:")) { - hSecWebSocketKey2 = h[1]; - } else if (h[0].equals("Cookie:")) { - hCookie = h[1]; - } - } - } - - if (hOrigin == null || hHost == null || hLocation == null || hSecWebSocketKey1 == null || hSecWebSocketKey2 == null) { - System.err.println(sock.socket().getRemoteSocketAddress().toString() + " Invalid headers"); - sock.close(); - return; - } - - // Cookies - int UID = 0; - String hash = null; - if (hCookie != null) { - String cookies[] = hCookie.split("; "); - for (int i = 0; i < cookies.length; i++) { - String cookie[] = cookies[i].split("=", 2); - if (cookie[0].equals("hash")) { - hash = cookie[1]; - break; - } - } - if (hash != null) { - UID = com.juick.server.UserQueries.getUIDbyHash(sql, hash); - } - } - - // URL - String loc[] = hLocation.split("/"); - int MID = 0; - if (hLocation.equals("/my") && UID > 0) { - sockMessages.add(new SocketSubscribed(sock, UID, 0)); - } else if (hLocation.equals("/all")) { - sockAll.add(new SocketSubscribed(sock, UID, 0)); - } else if ((loc.length == 2 || loc.length == 3) && loc[1].equals("replies")) { - if (loc.length == 2) { - sockReplies.add(new SocketSubscribed(sock, UID, 0)); - } else { - try { - MID = Integer.parseInt(loc[2]); - } catch (Exception e) { - } - if (MID > 0) { - sockReplies.add(new SocketSubscribed(sock, UID, MID)); - } else { - System.err.println(sock.socket().getRemoteSocketAddress().toString() + " Invalid MID"); - sock.close(); - return; - } - } - } else { - System.err.println(sock.socket().getRemoteSocketAddress().toString() + " Invalid location"); - sock.close(); - return; - } - - System.out.println(sock.socket().getRemoteSocketAddress().toString() + " HANDSHAKE (Hash=" + hash + "; UID = " + UID + "; MID = " + MID + ")"); - - Long lSecNum1 = calcSecKeyNum(hSecWebSocketKey1); - Long lSecNum2 = calcSecKeyNum(hSecWebSocketKey2); - - BigInteger sec1 = new BigInteger(lSecNum1.toString()); - BigInteger sec2 = new BigInteger(lSecNum2.toString()); - - // concatenate 3 parts secNum1 + secNum2 + secKey (16 Bytes) - byte[] l128Bit = new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}; - byte[] lTmp; - - lTmp = sec1.toByteArray(); - int lIdx = lTmp.length; - int lCnt = 0; - while (lIdx > 0 && lCnt < 4) { - lIdx--; - lCnt++; - l128Bit[4 - lCnt] = lTmp[lIdx]; - } - - lTmp = sec2.toByteArray(); - lIdx = lTmp.length; - lCnt = 0; - while (lIdx > 0 && lCnt < 4) { - lIdx--; - lCnt++; - l128Bit[8 - lCnt] = lTmp[lIdx]; - } - - buf.rewind(); - for (int i = 0; i < 8; i++) { - l128Bit[8 + i] = buf.get(buf.limit() - 8 + i); - } - - String outstr = "HTTP/1.1 101 Web Socket Protocol Handshake\r\n" - + "Upgrade: WebSocket\r\n" - + "Connection: Upgrade\r\n" - + "Sec-WebSocket-Origin: " + hOrigin + "\r\n" - + "Sec-WebSocket-Location: ws://" + hHost + hLocation + "\r\n" - + "Sec-WebSocket-Protocol: sample\r\n" - + "\r\n"; - ByteBuffer out = ByteBuffer.allocate(4096); - out.put(Charset.forName("ISO-8859-1").encode(outstr)); - out.put(MessageDigest.getInstance("MD5").digest(l128Bit)); - out.flip(); - - sock.write(out); - } - - private static long calcSecKeyNum(String aKey) { - StringBuilder lSB = new StringBuilder(); - // StringBuuffer lSB = new StringBuuffer(); - int lSpaces = 0; - for (int i = 0; i < aKey.length(); i++) { - char lC = aKey.charAt(i); - if (lC == ' ') { - lSpaces++; - } else if (lC >= '0' && lC <= '9') { - lSB.append(lC); - } - } - long lRes = -1; - if (lSpaces > 0) { - try { - lRes = Long.parseLong(lSB.toString()) / lSpaces; - // log.debug("Key: " + aKey + ", Numbers: " + lSB.toString() + - // ", Spaces: " + lSpaces + ", Result: " + lRes); - } catch (NumberFormatException ex) { - // use default result - } - } - return lRes; - } - - public void wsTextFrame(SocketChannel sock, CharSequence csbuf) { - String buf = csbuf.toString(); - if (buf.equals(" ")) { - ByteBuffer out = ByteBuffer.allocate(4); - out.put((byte) 0x00); - out.put((byte) 0x20); - out.put((byte) 0xFF); - out.flip(); - out.rewind(); - try { - sock.write(out); - } catch (IOException e) { - } - } else { - System.out.println(sock.socket().getRemoteSocketAddress().toString() + " DATA '" + buf + "'"); - } - } - - @Override - public void onMessage(com.juick.xmpp.Message msg) { - JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS); - if (jmsg != null) { - System.err.println("MID=" + jmsg.MID + "; RID=" + jmsg.RID); - if (jmsg.RID == 0) { - onJuickMessagePost(jmsg); - } else { - onJuickMessageReply(jmsg); - } - } - } - - private void onJuickMessagePost(com.juick.Message jmsg) { - String json = "{" - + "\"mid\":" + jmsg.MID + "," - + "\"user\":{" + "\"uid\":" + jmsg.User.UID + "," + "\"uname\":\"" + encloseJSON(jmsg.User.UName) + "\"}," - + "\"timestamp\":\"" + jmsg.TimestampString + "\"," - + "\"body\":\"" + encloseJSON(jmsg.Text) + "\""; - if (jmsg.Tags.size() > 0) { - json += ",\"tags\":["; - for (int i = 0; i < jmsg.Tags.size(); i++) { - if (i > 0) { - json += ","; - } - json += "\"" + encloseJSON((String) jmsg.Tags.get(i)) + "\""; - } - json += "]"; - } - json += "}"; - - ByteBuffer out = ByteBuffer.allocate(10240); - out.put((byte) 0x00); - out.put(Charset.forName("UTF-8").encode(json)); - out.put((byte) 0xFF); - out.flip(); - - - String query = "SELECT suser_id FROM subscr_users WHERE user_id=" + jmsg.User.UID + " AND suser_id NOT IN (SELECT user_id FROM bl_tags INNER JOIN messages_tags USING(tag_id) WHERE message_id=" + jmsg.MID + ")"; - if (jmsg.Privacy < 0) { - query += " AND suser_id IN (SELECT wl_user_id FROM wl_users WHERE user_id=" + jmsg.User.UID + ")"; - } - - Statement stmt = null; - ResultSet rs = null; - try { - stmt = sql.createStatement(); - rs = stmt.executeQuery(query); - rs.beforeFirst(); - while (rs.next()) { - int UID = rs.getInt(1); - - for (int i = sockMessages.size() - 1; i >= 0; i--) { - SocketSubscribed ss = sockMessages.get(i); - if (ss.UID == UID) { - try { - out.rewind(); - ss.sock.write(out); - } catch (IOException e) { - sockMessages.remove(i); - try { - ss.sock.close(); - } catch (IOException ex) { - } - } - } - } - - if (jmsg.Privacy <= 0) { - for (int i = sockAll.size() - 1; i >= 0; i--) { - SocketSubscribed ss = sockAll.get(i); - if (ss.UID == UID) { - try { - out.rewind(); - ss.sock.write(out); - } catch (IOException e) { - sockAll.remove(i); - try { - ss.sock.close(); - } catch (IOException ex) { - } - } - } - } - } - - } - } catch (SQLException e) { - System.err.println(e); - } finally { - if (rs != null) { - try { - rs.close(); - } catch (SQLException e) { - } - } - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException e) { - } - } - } - - // Send to all - if (jmsg.Privacy > 0) { - for (int i = sockAll.size() - 1; i >= 0; i--) { - SocketSubscribed ss = sockAll.get(i); - try { - out.rewind(); - ss.sock.write(out); - System.err.println(" --->" + ss.sock.socket().getRemoteSocketAddress()); - } catch (IOException e) { - sockAll.remove(i); - try { - ss.sock.close(); - } catch (IOException ex) { - } - } - } - } - - } - - private void onJuickMessageReply(com.juick.Message jmsg) { - String json = "{" - + "\"mid\":" + jmsg.MID + "," - + "\"rid\":" + jmsg.RID + "," - + "\"replyto\":" + jmsg.ReplyTo + "," - + "\"user\":{" + "\"uid\":" + jmsg.User.UID + "," + "\"uname\":\"" + encloseJSON(jmsg.User.UName) + "\"}," - + "\"timestamp\":\"" + jmsg.TimestampString + "\"," - + "\"body\":\"" + encloseJSON(jmsg.Text) + "\"" - + "}"; - - ByteBuffer out = ByteBuffer.allocate(10240); - out.put((byte) 0x00); - out.put(Charset.forName("UTF-8").encode(json)); - out.put((byte) 0xFF); - out.flip(); - - for (int i = sockReplies.size() - 1; i >= 0; i--) { - SocketSubscribed ss = sockReplies.get(i); - if (ss.MID == 0 || ss.MID == jmsg.MID) { - try { - out.rewind(); - ss.sock.write(out); - System.err.println(" --->" + ss.sock.socket().getRemoteSocketAddress()); - } catch (IOException e) { - sockReplies.remove(i); - try { - ss.sock.close(); - } catch (IOException ex) { - } - } - } - } - } - - public static String encloseJSON(String str) { - return str.replace("\"", """).replace("\\", "\\\\").replace("\n", "\\n"); - } -} - -class SocketSubscribed { - - public SocketChannel sock; - public int UID; - public int MID; - - public SocketSubscribed(SocketChannel sock, int UID, int MID) { - this.sock = sock; - this.UID = UID; - this.MID = MID; - } -} +}
\ No newline at end of file |