From db2d48c068715c2841fd60b955c98580236c400d Mon Sep 17 00:00:00 2001 From: Ugnich Anton Date: Sun, 4 Aug 2013 02:50:44 +0700 Subject: 4 threads --- src/com/juick/jabber/ws/XMPPConnection.java | 217 ++++++++++++++++++++++++++++ 1 file changed, 217 insertions(+) create mode 100644 src/com/juick/jabber/ws/XMPPConnection.java (limited to 'src/com/juick/jabber/ws/XMPPConnection.java') diff --git a/src/com/juick/jabber/ws/XMPPConnection.java b/src/com/juick/jabber/ws/XMPPConnection.java new file mode 100644 index 00000000..91a8387b --- /dev/null +++ b/src/com/juick/jabber/ws/XMPPConnection.java @@ -0,0 +1,217 @@ +package com.juick.jabber.ws; + +import com.juick.xmpp.JID; +import com.juick.xmpp.Message; +import com.juick.xmpp.Stream; +import com.juick.xmpp.StreamComponent; +import com.juick.xmpp.extensions.JuickMessage; +import java.io.IOException; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +/** + * + * @author ugnich + */ +public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener { + + Connection sql; + Stream xmpp; + String xmppPassword; + + public XMPPConnection(Connection sql, String password) { + this.sql = sql; + xmppPassword = password; + } + + @Override + public void run() { + try { + Socket socket = new Socket("localhost", 5347); + xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword); + xmpp.addChildParser(new JuickMessage()); + xmpp.addListener((Stream.StreamListener) this); + xmpp.addListener((Message.MessageListener) this); + xmpp.startParsing(); + } catch (IOException e) { + System.err.println("XMPPConnection: " + e); + } + } + + @Override + public void onStreamReady() { + System.err.println("Stream ready"); + } + + @Override + public void onStreamFail(String msg) { + System.err.println("Stream failed: " + msg); + } + + @Override + public void onMessage(com.juick.xmpp.Message msg) { + JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + System.err.println("MID=" + jmsg.MID + "; RID=" + jmsg.RID); + if (jmsg.RID == 0) { + onJuickMessagePost(jmsg); + } else { + onJuickMessageReply(jmsg); + } + } + } + + private void onJuickMessagePost(com.juick.Message jmsg) { + String json = "{" + + "\"mid\":" + jmsg.MID + "," + + "\"user\":{" + "\"uid\":" + jmsg.User.UID + "," + "\"uname\":\"" + encloseJSON(jmsg.User.UName) + "\"}," + + "\"timestamp\":\"" + jmsg.TimestampString + "\"," + + "\"body\":\"" + encloseJSON(jmsg.Text) + "\""; + if (jmsg.Tags.size() > 0) { + json += ",\"tags\":["; + for (int i = 0; i < jmsg.Tags.size(); i++) { + if (i > 0) { + json += ","; + } + json += "\"" + encloseJSON((String) jmsg.Tags.get(i)) + "\""; + } + json += "]"; + } + json += "}"; + + ByteBuffer out = ByteBuffer.allocate(10240); + out.put((byte) 0x00); + out.put(Charset.forName("UTF-8").encode(json)); + out.put((byte) 0xFF); + out.flip(); + + + String query = "SELECT suser_id FROM subscr_users WHERE user_id=" + jmsg.User.UID + " AND suser_id NOT IN (SELECT user_id FROM bl_tags INNER JOIN messages_tags USING(tag_id) WHERE message_id=" + jmsg.MID + ")"; + if (jmsg.Privacy < 0) { + query += " AND suser_id IN (SELECT wl_user_id FROM wl_users WHERE user_id=" + jmsg.User.UID + ")"; + } + + Statement stmt = null; + ResultSet rs = null; + try { + stmt = sql.createStatement(); + rs = stmt.executeQuery(query); + rs.beforeFirst(); + while (rs.next()) { + int UID = rs.getInt(1); + + for (int i = Main.sockMessages.size() - 1; i >= 0; i--) { + SocketSubscribed ss = Main.sockMessages.get(i); + if (ss.UID == UID) { + try { + out.rewind(); + ss.sock.write(out); + } catch (IOException e) { + Main.sockMessages.remove(i); + try { + ss.sock.close(); + } catch (IOException ex) { + } + } + } + } + + if (jmsg.Privacy <= 0) { + for (int i = Main.sockAll.size() - 1; i >= 0; i--) { + SocketSubscribed ss = Main.sockAll.get(i); + if (ss.UID == UID) { + try { + out.rewind(); + ss.sock.write(out); + } catch (IOException e) { + Main.sockAll.remove(i); + try { + ss.sock.close(); + } catch (IOException ex) { + } + } + } + } + } + + } + } catch (SQLException e) { + System.err.println(e); + } finally { + if (rs != null) { + try { + rs.close(); + } catch (SQLException e) { + } + } + if (stmt != null) { + try { + stmt.close(); + } catch (SQLException e) { + } + } + } + + // Send to all + if (jmsg.Privacy > 0) { + for (int i = Main.sockAll.size() - 1; i >= 0; i--) { + SocketSubscribed ss = Main.sockAll.get(i); + try { + out.rewind(); + ss.sock.write(out); + System.err.println(" --->" + ss.sock.socket().getRemoteSocketAddress()); + } catch (IOException e) { + Main.sockAll.remove(i); + try { + ss.sock.close(); + } catch (IOException ex) { + } + } + } + } + + } + + private void onJuickMessageReply(com.juick.Message jmsg) { + String json = "{" + + "\"mid\":" + jmsg.MID + "," + + "\"rid\":" + jmsg.RID + "," + + "\"replyto\":" + jmsg.ReplyTo + "," + + "\"user\":{" + "\"uid\":" + jmsg.User.UID + "," + "\"uname\":\"" + encloseJSON(jmsg.User.UName) + "\"}," + + "\"timestamp\":\"" + jmsg.TimestampString + "\"," + + "\"body\":\"" + encloseJSON(jmsg.Text) + "\"" + + "}"; + + ByteBuffer out = ByteBuffer.allocate(10240); + out.put((byte) 0x00); + out.put(Charset.forName("UTF-8").encode(json)); + out.put((byte) 0xFF); + out.flip(); + + for (int i = Main.sockReplies.size() - 1; i >= 0; i--) { + SocketSubscribed ss = Main.sockReplies.get(i); + if (ss.MID == 0 || ss.MID == jmsg.MID) { + try { + out.rewind(); + ss.sock.write(out); + System.err.println(" --->" + ss.sock.socket().getRemoteSocketAddress()); + } catch (IOException e) { + Main.sockReplies.remove(i); + try { + ss.sock.close(); + } catch (IOException ex) { + } + } + } + } + } + + public static String encloseJSON(String str) { + return str.replace("\"", """).replace("\\", "\\\\").replace("\n", "\\n"); + } +} -- cgit v1.2.3