aboutsummaryrefslogtreecommitdiff
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/com/juick/jabber/ws/Main.java72
-rw-r--r--src/main/java/com/juick/jabber/ws/SocketSubscribed.java51
-rw-r--r--src/main/java/com/juick/jabber/ws/WSData.java277
-rw-r--r--src/main/java/com/juick/jabber/ws/WSKeepAlive.java106
-rw-r--r--src/main/java/com/juick/ws/WebsocketComponent.java134
-rw-r--r--src/main/java/com/juick/ws/XMPPConnection.java (renamed from src/main/java/com/juick/jabber/ws/XMPPConnection.java)88
-rw-r--r--src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java49
7 files changed, 219 insertions, 558 deletions
diff --git a/src/main/java/com/juick/jabber/ws/Main.java b/src/main/java/com/juick/jabber/ws/Main.java
deleted file mode 100644
index 6a397fff..00000000
--- a/src/main/java/com/juick/jabber/ws/Main.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Juick
- * Copyright (C) 2008-2011, Ugnich Anton
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-package com.juick.jabber.ws;
-
-import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.jdbc.datasource.DriverManagerDataSource;
-
-import java.io.FileInputStream;
-import java.util.*;
-import java.util.logging.Level;
-import java.util.logging.LogManager;
-import java.util.logging.Logger;
-
-/**
- *
- * @author Ugnich Anton
- */
-public class Main {
-
- private static final Logger logger = Logger.getLogger("Websockets");
-
- JdbcTemplate sql;
- XMPPConnection xmpp;
- public static String STATSFILE = null;
- public final static List<SocketSubscribed> clients = Collections.synchronizedList(new ArrayList<SocketSubscribed>());
-
- public static void main(String[] args) {
- new Main().start();
- }
-
- public void start() {
- try {
- LogManager.getLogManager().readConfiguration(new FileInputStream("/etc/juick/ws_logging.properties"));
- Properties conf = new Properties();
- conf.load(new FileInputStream("/etc/juick/ws.conf"));
-
- STATSFILE = conf.getProperty("statsfile");
-
- setupSql(conf.getProperty("datasource_driver", "com.mysql.jdbc.Driver"), conf.getProperty("datasource_url"));
- xmpp = new XMPPConnection(sql, conf.getProperty("xmpp_password", ""));
- new Thread(xmpp).start();
-
- //new Thread(new WSConnections()).start();
- new Thread(new WSData(sql)).start();
- new Thread(new WSKeepAlive(sql)).start();
- } catch (Exception e) {
- logger.log(Level.SEVERE, "websockets initialization error", e);
- }
- }
-
- public void setupSql(String driver, String url) {
- DriverManagerDataSource dataSource = new DriverManagerDataSource();
- dataSource.setDriverClassName(driver);
- dataSource.setUrl(url);
- sql = new JdbcTemplate(dataSource);
- }
-} \ No newline at end of file
diff --git a/src/main/java/com/juick/jabber/ws/SocketSubscribed.java b/src/main/java/com/juick/jabber/ws/SocketSubscribed.java
deleted file mode 100644
index 6144380c..00000000
--- a/src/main/java/com/juick/jabber/ws/SocketSubscribed.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package com.juick.jabber.ws;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-
-/**
- *
- * @author ugnich
- */
-public class SocketSubscribed {
-
- public SocketChannel sock = null;
- public String clientName = null;
- public int VUID = 0;
- public int UID = 0;
- public int MID = 0;
- public boolean allMessages = false;
- public boolean allReplies = false;
- public long tsConnected;
- public long tsLastData;
-
- public SocketSubscribed(SocketChannel sock, String clientName, int VUID) {
- this.sock = sock;
- this.clientName = clientName;
- this.VUID = VUID;
- tsConnected = tsLastData = System.currentTimeMillis();
- }
-
- public boolean sendByteBuffer(ByteBuffer bb) {
- boolean ret = false;
- bb.rewind();
- try {
- sock.write(bb);
- ret = true;
- } catch (Exception e) {
- close();
- }
- return ret;
- }
-
- public void close() {
- try {
- sock.socket().close();
- } catch (Exception e) {
- }
- try {
- sock.close();
- } catch (Exception e) {
- }
- }
-}
diff --git a/src/main/java/com/juick/jabber/ws/WSData.java b/src/main/java/com/juick/jabber/ws/WSData.java
deleted file mode 100644
index 9ae98bcf..00000000
--- a/src/main/java/com/juick/jabber/ws/WSData.java
+++ /dev/null
@@ -1,277 +0,0 @@
-package com.juick.jabber.ws;
-
-import com.juick.server.MessagesQueries;
-import com.juick.server.UserQueries;
-import com.juick.xmpp.utils.Base64;
-import org.springframework.jdbc.core.JdbcTemplate;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.Iterator;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- *
- * @author ugnich
- */
-public class WSData implements Runnable {
-
- private static final Logger logger = Logger.getLogger("Websockets");
- static final String WEBSOCKET_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
- JdbcTemplate sql;
- public Selector sel;
-
- public WSData(JdbcTemplate sql) {
- this.sql = sql;
- }
-
- @Override
- public void run() {
- try {
- sel = Selector.open();
- ServerSocketChannel listensock = ServerSocketChannel.open();
- listensock.configureBlocking(false);
- listensock.socket().bind(new InetSocketAddress(8081));
- listensock.register(sel, SelectionKey.OP_ACCEPT);
-
- while (true) {
- sel.select();
- Iterator<SelectionKey> it = sel.selectedKeys().iterator();
- while (it.hasNext()) {
- SelectionKey selKey = it.next();
- it.remove();
-
- if (selKey.isAcceptable()) {
- ServerSocketChannel ssChannel = (ServerSocketChannel) selKey.channel();
- SocketChannel sChannel = ssChannel.accept();
- sChannel.configureBlocking(false);
- sChannel.register(sel, SelectionKey.OP_READ);
- logger.info(sChannel.socket().getRemoteSocketAddress().toString() + " ACCEPTED");
- } else if (selKey.isReadable()) {
- SocketChannel sChannel = (SocketChannel) selKey.channel();
- ByteBuffer buf = ByteBuffer.allocate(10240);
- try {
- int readbytes = sChannel.read(buf);
- if (readbytes > 0) {
- buf.flip();
-
- CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf);
- buf.rewind();
-
- switch (buf.get(0)) {
- case (byte) 0x89: // PING
- updateSocketTS(sChannel);
- wsPing(sChannel);
- break;
- case (byte) 0x8A: // PONG
- updateSocketTS(sChannel);
- break;
- case (byte) 0x81: // TEXT FRAME
- updateSocketTS(sChannel);
- wsTextFrame(sChannel, buf);
- break;
- case (byte) 'G': // HTTP
- updateSocketTS(sChannel);
- wsHandshake(sChannel, buf);
- break;
- case (byte) 0x88: // CONNECTION CLOSE
- throw new IOException(sChannel.socket().getRemoteSocketAddress().toString() + " CONNECTION CLOSE");
- }
- } else if (readbytes < 0) {
- throw new IOException(sChannel.socket().getRemoteSocketAddress().toString() + " END OF STREAM");
- }
- } catch (IOException e) {
- logger.log(Level.SEVERE, "websocket exception", e);
- sChannel.socket().close();
- sChannel.close();
- selKey.cancel();
- }
- }
- }
- }
- } catch (Exception e) {
- logger.log(Level.SEVERE, "websocket exception", e);
- }
- }
-
- public void wsHandshake(SocketChannel sock, ByteBuffer buf) throws Exception {
- String hOrigin = null;
- String hHost = null;
- String hLocation = null;
- String hSecWebSocketKey = null;
- String hSecWebSocketVersion = null;
- String hXRealIP = null;
-
- buf.rewind();
- CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf);
- String headers[] = charbuf.toString().split("\r\n");
- for (int i = 0; i < headers.length; i++) {
- String h[] = headers[i].split(" ", 2);
- if (h.length == 2) {
- if (h[0].equals("GET")) {
- hLocation = headers[i].split(" ", 3)[1];
- } else if (h[0].equals("Origin:")) {
- hOrigin = h[1];
- } else if (h[0].equals("Host:")) {
- hHost = h[1];
- } else if (h[0].equals("Sec-WebSocket-Key:")) {
- hSecWebSocketKey = h[1];
- } else if (h[0].equals("Sec-WebSocket-Version:")) {
- hSecWebSocketVersion = h[1];
- } else if (h[0].equals("X-Real-IP:")) {
- hXRealIP = h[1];
- }
- }
- }
-
- if (hOrigin == null || hHost == null || hLocation == null || hSecWebSocketKey == null || hSecWebSocketVersion == null || !hSecWebSocketVersion.equals("13")) {
- throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " Invalid headers");
- }
-
- // Auth
- int VUID = 0;
- int hashloc = hLocation.indexOf("hash=");
- if (hashloc > 0) {
- String hash = hLocation.substring(hashloc + 5);
- if (hash.indexOf('&') > 0) {
- hash = hash.substring(0, hash.indexOf('&'));
- }
- if (hash.length() == 16) {
- VUID = com.juick.server.UserQueries.getUIDbyHash(sql, hash);
- }
- }
-
- // URL
- int hLocationQM = hLocation.indexOf('?');
- if (hLocationQM > 0) {
- hLocation = hLocation.substring(0, hLocationQM);
- }
-
- int MID = 0;
- int responseCode = 404;
- SocketSubscribed sockSubscr = null;
- if (hLocation.equals("/") && VUID > 0) {
- sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
- responseCode = 101;
- } else if (hLocation.equals("/_all")) {
- sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
- sockSubscr.allMessages = true;
- responseCode = 101;
- } else if (hLocation.equals("/_replies")) {
- sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
- sockSubscr.allReplies = true;
- responseCode = 101;
- } else if (hLocation.matches("^/\\d+$")) {
- try {
- MID = Integer.parseInt(hLocation.substring(1));
- } catch (Exception e) {
- }
- if (MID > 0) {
- if (MessagesQueries.canViewThread(sql, MID, VUID)) {
- sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
- sockSubscr.MID = MID;
- responseCode = 101;
- } else {
- responseCode = 403;
- }
- }
- } else if (hLocation.matches("^/[a-zA-Z0-9\\-]{2,16}/?$")) {
- String uname;
- if (hLocation.endsWith("/")) {
- uname = hLocation.substring(1, hLocation.length() - 2);
- } else {
- uname = hLocation.substring(1);
- }
-
- int UID = UserQueries.getUIDbyName(sql, uname);
- if (UID > 0) {
- // check access
- sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID);
- sockSubscr.UID = UID;
- responseCode = 101;
- }
- }
- if (sockSubscr != null) {
- synchronized (Main.clients) {
- Main.clients.add(sockSubscr);
- }
- }
-
- // Response
- String outstr;
- if (responseCode == 101) {
- outstr = "HTTP/1.1 101 Switching Protocols\r\n"
- + "Upgrade: websocket\r\n"
- + "Connection: Upgrade\r\n"
- + "Sec-WebSocket-Accept: " + calcHeaderAccept(hSecWebSocketKey) + "\r\n"
- + "\r\n";
- } else if (responseCode == 403) {
- outstr = "HTTP/1.1 403 Forbidden\r\n\r\n";
- } else {
- outstr = "HTTP/1.1 404 Not Found\r\n\r\n";
- }
- ByteBuffer out = ByteBuffer.allocate(1024);
- out.put(Charset.forName("ISO-8859-1").encode(outstr));
- out.flip();
- sock.write(out);
-
- if (responseCode == 101) {
- logger.info(sock.socket().getRemoteSocketAddress().toString() + " HANDSHAKE (VUID = " + VUID + "; MID = " + MID + ")");
- } else {
- throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " " + responseCode);
- }
- }
-
- private String calcHeaderAccept(String key) {
- String base = key + WEBSOCKET_GUID;
- try {
- MessageDigest md = MessageDigest.getInstance("SHA-1");
- return Base64.encode(md.digest(base.getBytes()));
- } catch (NoSuchAlgorithmException e) {
- logger.severe("calcHeaderAccept: " + e);
- }
- return "";
- }
-
- public void wsPing(SocketChannel sock) throws Exception {
- ByteBuffer out = ByteBuffer.allocate(2);
- out.put((byte) 0x8A); // PONG FRAME
- out.put((byte) 0x00); // 1 byte long
- out.flip();
- out.rewind();
- sock.write(out);
- }
-
- public void wsTextFrame(SocketChannel sock, ByteBuffer buf) throws Exception {
- /*
- ByteBuffer out = ByteBuffer.allocate(3);
- out.put((byte) 0x81); // TEXT FRAME
- out.put((byte) 0x01); // 1 byte long
- out.put((byte) 0x20); // ' '
- out.flip();
- out.rewind();
- sock.write(out);
- */
- }
-
- public void updateSocketTS(SocketChannel sock) {
- synchronized (Main.clients) {
- for (SocketSubscribed s : Main.clients) {
- if (s.sock == sock) {
- s.tsLastData = System.currentTimeMillis();
- break;
- }
- }
- }
- }
-}
diff --git a/src/main/java/com/juick/jabber/ws/WSKeepAlive.java b/src/main/java/com/juick/jabber/ws/WSKeepAlive.java
deleted file mode 100644
index 6244013d..00000000
--- a/src/main/java/com/juick/jabber/ws/WSKeepAlive.java
+++ /dev/null
@@ -1,106 +0,0 @@
-package com.juick.jabber.ws;
-
-import org.springframework.jdbc.core.JdbcTemplate;
-
-import java.io.PrintWriter;
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- *
- * @author ugnich
- */
-public class WSKeepAlive implements Runnable {
- private static final Logger logger = Logger.getLogger(WSKeepAlive.class.getName());
-
- JdbcTemplate sql;
- ByteBuffer pingBytes;
- ByteBuffer closeBytes;
-
- public WSKeepAlive(JdbcTemplate sql) {
- this.sql = sql;
-
- //pingBytes = ByteBuffer.allocate(2);
- //pingBytes.put((byte) 0x8A); // PONG FRAME
- //pingBytes.put((byte) 0x00); // 0 byte long
- pingBytes = ByteBuffer.allocate(3);
- pingBytes.put((byte) 0x81); // TEXT FRAME
- pingBytes.put((byte) 0x01); // 1 byte long
- pingBytes.put((byte) 0x20); // ' '
- pingBytes.flip();
-
- closeBytes = ByteBuffer.allocate(2);
- closeBytes.put((byte) 0x88); // CLOSE FRAME
- closeBytes.put((byte) 0x00); // 0 byte long
- closeBytes.flip();
- }
-
- @Override
- public void run() {
- while (true) {
- PrintWriter statsFile = null;
-
- if (Main.STATSFILE != null) {
- try {
- statsFile = new PrintWriter(Main.STATSFILE, "UTF-8");
- } catch (Exception e) {
- statsFile = null;
- logger.severe("WSKeepAlive statsFile: " + e);
- }
- }
-
- long now = System.currentTimeMillis();
-
- synchronized (Main.clients) {
- if (statsFile != null) {
- statsFile.write("<html><body><h1>Connections (" + Main.clients.size() + ")</h2><table border=1><tr><th>IP</th><th>inactive</th><th>VUID</th><th>UID</th><th>MID</th><th>allM</th><th>allR</th></tr>");
- }
-
- for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) {
- SocketSubscribed s = i.next();
- int inactive = (int) ((double) (now - s.tsLastData) / 1000.0);
-
- if (statsFile != null) {
- try {
- statsFile.print("<tr><td>" + (s.clientName != null ? s.clientName : "?") + "</td>");
- statsFile.print("<td>" + inactive + "</td>");
- statsFile.print("<td>" + (s.VUID > 0 ? s.VUID : "") + "</td>");
- statsFile.print("<td>" + (s.UID > 0 ? s.UID : "") + "</td>");
- statsFile.print("<td>" + (s.MID > 0 ? s.MID : "") + "</td>");
- statsFile.print("<td>" + (s.allMessages ? "+" : "") + "</td>");
- statsFile.print("<td>" + (s.allReplies ? "+" : "") + "</td></tr>");
- } catch (Exception e) {
- logger.log(Level.SEVERE, "WSKeepAlive statsFile print", e);
- }
- }
-
- if (inactive > 180) {
- s.sendByteBuffer(closeBytes);
- s.close();
- i.remove();
- } else if (inactive > 60) {
- if (!s.sendByteBuffer(pingBytes)) {
- i.remove();
- }
- }
- }
- }
-
- if (Main.STATSFILE != null) {
- try {
- statsFile.write("</table></body></html>");
- statsFile.close();
- } catch (Exception e) {
- logger.log(Level.SEVERE, "WSKeepAlive statsFile close", e);
- }
- }
-
- try {
- Thread.sleep(10000);
- } catch (InterruptedException e) {
- }
- }
- }
-}
diff --git a/src/main/java/com/juick/ws/WebsocketComponent.java b/src/main/java/com/juick/ws/WebsocketComponent.java
new file mode 100644
index 00000000..83e811a6
--- /dev/null
+++ b/src/main/java/com/juick/ws/WebsocketComponent.java
@@ -0,0 +1,134 @@
+package com.juick.ws;
+
+import com.juick.User;
+import com.juick.server.MessagesQueries;
+import com.juick.server.UserQueries;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.springframework.http.HttpHeaders;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Created by vitalyster on 28.06.2016.
+ */
+@Component
+public class WebsocketComponent extends TextWebSocketHandler {
+
+ @Inject
+ JdbcTemplate jdbc;
+
+ private static final Logger logger = Logger.getLogger(WebsocketComponent.class.getName());
+ final List<SocketSubscribed> clients = Collections.synchronizedList(new ArrayList<SocketSubscribed>());
+
+ @Override
+ public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+ URI hLocation;
+ String hXRealIP = "";
+
+ hLocation = session.getUri();
+ HttpHeaders headers = session.getHandshakeHeaders();
+ hXRealIP = headers.getOrDefault("X-Real-IP",
+ Collections.singletonList(session.getRemoteAddress().toString())).get(0);
+
+ // Auth
+ User visitor = new User();
+ List<NameValuePair> params = URLEncodedUtils.parse(hLocation, "UTF-8");
+ for (NameValuePair param : params) {
+ if (param.getName().equals("hash")) {
+ String hash = param.getValue();
+ if (hash.length() == 16) {
+ visitor = UserQueries.getUserByHash(jdbc, hash);
+ } else {
+ try {
+ logger.info(String.format("wrong hash for %d from %s", visitor.getUID(), hXRealIP));
+ session.close(new CloseStatus(403, "Forbidden"));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ }
+ break;
+ }
+ }
+ logger.info(String.format("user %d connected to %s from %s", visitor.getUID(), hLocation.getPath(), hXRealIP));
+
+ int MID = 0;
+ SocketSubscribed sockSubscr = null;
+ if (hLocation.getPath().equals("/") && visitor.getUID() > 0) {
+ logger.info(String.format("user %d connected", visitor.getUID()));
+ sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, false);
+ } else if (hLocation.getPath().equals("/_all")) {
+ logger.info(String.format("user %d connected to legacy _all (%s)", visitor.getUID(), hLocation.getPath()));
+ sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true);
+ sockSubscr.allMessages = true;
+ } else if (hLocation.getPath().equals("/_replies")) {
+ logger.info(String.format("user %d connected to legacy _replies (%s)", visitor.getUID(), hLocation.getPath()));
+ sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true);
+ sockSubscr.allReplies = true;
+ } else if (hLocation.getPath().matches("/\\d+$")) {
+ try {
+ MID = Integer.parseInt(hLocation.getPath().substring(8));
+ } catch (Exception e) {
+ }
+ if (MID > 0) {
+ if (MessagesQueries.canViewThread(jdbc, MID, visitor.getUID())) {
+ logger.info(String.format("user %d connected to legacy thread (%d) from %s", visitor.getUID(), MID, hXRealIP));
+ sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true);
+ sockSubscr.MID = MID;
+ } else {
+ try {
+ session.close(new CloseStatus(403, "Forbidden"));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ }
+ }
+ }
+ if (sockSubscr != null) {
+ synchronized (clients) {
+ clients.add(sockSubscr);
+ }
+ }
+ }
+ @Override
+ public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+ synchronized (clients) {
+ clients.stream().filter(c -> c.session.equals(session)).forEach(c -> {
+ logger.info(String.format("session %s closed with status %s", c.clientName, status.getCode()));
+ clients.remove(c);
+ });
+ }
+ }
+ class SocketSubscribed {
+
+ WebSocketSession session;
+ String clientName;
+ User visitor;
+ int MID;
+ boolean allMessages;
+ boolean allReplies;
+ long tsConnected;
+ long tsLastData;
+ boolean legacy;
+
+ public SocketSubscribed(WebSocketSession session, String clientName, User visitor, boolean legacy) {
+ this.session = session;
+ this.clientName = clientName;
+ this.visitor = visitor;
+ tsConnected = tsLastData = System.currentTimeMillis();
+ this.legacy = legacy;
+ }
+ }
+}
diff --git a/src/main/java/com/juick/jabber/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java
index 8c187a36..9ed3d0cd 100644
--- a/src/main/java/com/juick/jabber/ws/XMPPConnection.java
+++ b/src/main/java/com/juick/ws/XMPPConnection.java
@@ -1,8 +1,7 @@
-package com.juick.jabber.ws;
+package com.juick.ws;
import com.juick.User;
import com.juick.json.MessageSerializer;
-import com.juick.server.MessagesQueries;
import com.juick.server.SubscriptionsQueries;
import com.juick.xmpp.JID;
import com.juick.xmpp.Message;
@@ -10,12 +9,12 @@ import com.juick.xmpp.Stream;
import com.juick.xmpp.StreamComponent;
import com.juick.xmpp.extensions.JuickMessage;
import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.TextMessage;
+import javax.inject.Inject;
import java.io.IOException;
import java.net.Socket;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -25,6 +24,7 @@ import java.util.stream.Collectors;
*
* @author ugnich
*/
+@Component
public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener {
private static final Logger logger = Logger.getLogger(XMPPConnection.class.getName());
@@ -32,11 +32,15 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message.
Stream xmpp;
String xmppPassword;
MessageSerializer ms;
+ @Inject
+ WebsocketComponent ws;
+ @Inject
public XMPPConnection(JdbcTemplate sql, String password) {
this.sql = sql;
xmppPassword = password;
ms = new MessageSerializer();
+ new Thread(this).start();
}
@Override
@@ -89,70 +93,50 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message.
private void onJuickPM(int uid_to, com.juick.Message jmsg) {
String json = messageSerializer.serialize(jmsg).toString();
- ByteBuffer bbMsg = buildTextFrame(json);
-
- synchronized (Main.clients) {
- for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) {
- SocketSubscribed s = i.next();
- if (s.VUID == uid_to && s.MID == 0 && s.allMessages == false && s.allReplies == false) {
- if (!s.sendByteBuffer(bbMsg)) {
- i.remove();
- }
+ synchronized (ws.clients) {
+ ws.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> {
+ try {
+ logger.info("sending pm to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
}
- }
+ });
}
}
private void onJuickMessagePost(com.juick.Message jmsg) {
String json = messageSerializer.serialize(jmsg).toString();
- ByteBuffer bbMsg = buildTextFrame(json);
-
List<Integer> uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID())
.stream().map(User::getUID).collect(Collectors.toList());
- synchronized (Main.clients) {
- for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) {
- SocketSubscribed s = i.next();
- if (s.MID == 0 && s.allReplies == false && ((jmsg.Privacy >= 0
- && (s.allMessages || s.UID == jmsg.getUser().getUID())) || uids.contains(s.VUID))) {
- if (!s.sendByteBuffer(bbMsg)) {
- i.remove();
- }
+ synchronized (ws.clients) {
+ ws.clients.stream().filter(c -> !c.legacy && uids.contains(c.visitor.getUID())).forEach(c -> {
+ try {
+ logger.info("sending message to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
}
- }
+ });
}
}
private void onJuickMessageReply(com.juick.Message jmsg) {
String json = messageSerializer.serialize(jmsg).toString();
- ByteBuffer bbMsg = buildTextFrame(json);
- int privacy = MessagesQueries.getMessagePrivacy(sql, jmsg.getMID());
-
- synchronized (Main.clients) {
- for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) {
- SocketSubscribed s = i.next();
- if ((privacy >= 0 && s.allReplies) || s.MID == jmsg.getMID()) {
- if (!s.sendByteBuffer(bbMsg)) {
- i.remove();
- }
+ List<Integer> threadUsers =
+ SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID())
+ .stream().map(User::getUID).collect(Collectors.toList());
+ synchronized (ws.clients) {
+ ws.clients.stream().filter(c -> !c.legacy && threadUsers.contains(c.visitor.getUID())).forEach(c -> {
+ try {
+ logger.info("sending reply to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
}
- }
- }
- }
-
- private ByteBuffer buildTextFrame(String json) {
- ByteBuffer jsonbytes = Charset.forName("UTF-8").encode(json);
- ByteBuffer bbMsg = ByteBuffer.allocate(jsonbytes.limit() + 8);
- bbMsg.put((byte) 0x81);
- if (jsonbytes.limit() <= 125) {
- bbMsg.put((byte) jsonbytes.limit());
- } else {
- bbMsg.put((byte) 126);
- bbMsg.putShort((short) jsonbytes.limit());
+ });
}
- bbMsg.put(jsonbytes);
- bbMsg.flip();
- return bbMsg;
}
}
diff --git a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java
new file mode 100644
index 00000000..a56e56f5
--- /dev/null
+++ b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java
@@ -0,0 +1,49 @@
+package com.juick.ws.configuration;
+
+import com.juick.ws.WebsocketComponent;
+import com.juick.ws.XMPPConnection;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.core.env.Environment;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DriverManagerDataSource;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+
+import javax.inject.Inject;
+
+/**
+ * Created by vitalyster on 28.06.2016.
+ */
+@Configuration
+@EnableWebSocket
+@ComponentScan(basePackages = {"com.juick"})
+@PropertySource("classpath:juick.conf")
+public class WebsocketConfiguration implements WebSocketConfigurer {
+ @Inject
+ Environment env;
+
+ @Bean
+ WebSocketHandler wsHandler() {
+ return new WebsocketComponent();
+ }
+ @Bean
+ JdbcTemplate jdbc() {
+ DriverManagerDataSource dataSource = new DriverManagerDataSource();
+ dataSource.setDriverClassName(env.getProperty("datasource_driver", "com.mysql.jdbc.Driver"));
+ dataSource.setUrl(env.getProperty("datasource_url"));
+ return new JdbcTemplate(dataSource);
+ }
+ @Bean
+ XMPPConnection xmpp() {
+ return new XMPPConnection(jdbc(), env.getProperty("xmpp_password"));
+ }
+ @Override
+ public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+ registry.addHandler(wsHandler(), "/");
+ }
+}