diff options
author | Vitaly Takmazov | 2016-06-28 10:36:28 +0300 |
---|---|---|
committer | Vitaly Takmazov | 2016-06-28 10:36:28 +0300 |
commit | 331645f0fd7fbe7d9679d39dcce453cc3b2cab6e (patch) | |
tree | 53813518b0a831fc88162191525edc8003284bb1 /src/main/java | |
parent | 95e150755d1b11bd78fc604aa7283f2765b2ee46 (diff) |
spring-websocket
Diffstat (limited to 'src/main/java')
-rw-r--r-- | src/main/java/com/juick/jabber/ws/Main.java | 72 | ||||
-rw-r--r-- | src/main/java/com/juick/jabber/ws/SocketSubscribed.java | 51 | ||||
-rw-r--r-- | src/main/java/com/juick/jabber/ws/WSData.java | 277 | ||||
-rw-r--r-- | src/main/java/com/juick/jabber/ws/WSKeepAlive.java | 106 | ||||
-rw-r--r-- | src/main/java/com/juick/ws/WebsocketComponent.java | 134 | ||||
-rw-r--r-- | src/main/java/com/juick/ws/XMPPConnection.java (renamed from src/main/java/com/juick/jabber/ws/XMPPConnection.java) | 88 | ||||
-rw-r--r-- | src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java | 49 |
7 files changed, 219 insertions, 558 deletions
diff --git a/src/main/java/com/juick/jabber/ws/Main.java b/src/main/java/com/juick/jabber/ws/Main.java deleted file mode 100644 index 6a397fff..00000000 --- a/src/main/java/com/juick/jabber/ws/Main.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Juick - * Copyright (C) 2008-2011, Ugnich Anton - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - */ -package com.juick.jabber.ws; - -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.jdbc.datasource.DriverManagerDataSource; - -import java.io.FileInputStream; -import java.util.*; -import java.util.logging.Level; -import java.util.logging.LogManager; -import java.util.logging.Logger; - -/** - * - * @author Ugnich Anton - */ -public class Main { - - private static final Logger logger = Logger.getLogger("Websockets"); - - JdbcTemplate sql; - XMPPConnection xmpp; - public static String STATSFILE = null; - public final static List<SocketSubscribed> clients = Collections.synchronizedList(new ArrayList<SocketSubscribed>()); - - public static void main(String[] args) { - new Main().start(); - } - - public void start() { - try { - LogManager.getLogManager().readConfiguration(new FileInputStream("/etc/juick/ws_logging.properties")); - Properties conf = new Properties(); - conf.load(new FileInputStream("/etc/juick/ws.conf")); - - STATSFILE = conf.getProperty("statsfile"); - - setupSql(conf.getProperty("datasource_driver", "com.mysql.jdbc.Driver"), conf.getProperty("datasource_url")); - xmpp = new XMPPConnection(sql, conf.getProperty("xmpp_password", "")); - new Thread(xmpp).start(); - - //new Thread(new WSConnections()).start(); - new Thread(new WSData(sql)).start(); - new Thread(new WSKeepAlive(sql)).start(); - } catch (Exception e) { - logger.log(Level.SEVERE, "websockets initialization error", e); - } - } - - public void setupSql(String driver, String url) { - DriverManagerDataSource dataSource = new DriverManagerDataSource(); - dataSource.setDriverClassName(driver); - dataSource.setUrl(url); - sql = new JdbcTemplate(dataSource); - } -}
\ No newline at end of file diff --git a/src/main/java/com/juick/jabber/ws/SocketSubscribed.java b/src/main/java/com/juick/jabber/ws/SocketSubscribed.java deleted file mode 100644 index 6144380c..00000000 --- a/src/main/java/com/juick/jabber/ws/SocketSubscribed.java +++ /dev/null @@ -1,51 +0,0 @@ -package com.juick.jabber.ws; - -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; - -/** - * - * @author ugnich - */ -public class SocketSubscribed { - - public SocketChannel sock = null; - public String clientName = null; - public int VUID = 0; - public int UID = 0; - public int MID = 0; - public boolean allMessages = false; - public boolean allReplies = false; - public long tsConnected; - public long tsLastData; - - public SocketSubscribed(SocketChannel sock, String clientName, int VUID) { - this.sock = sock; - this.clientName = clientName; - this.VUID = VUID; - tsConnected = tsLastData = System.currentTimeMillis(); - } - - public boolean sendByteBuffer(ByteBuffer bb) { - boolean ret = false; - bb.rewind(); - try { - sock.write(bb); - ret = true; - } catch (Exception e) { - close(); - } - return ret; - } - - public void close() { - try { - sock.socket().close(); - } catch (Exception e) { - } - try { - sock.close(); - } catch (Exception e) { - } - } -} diff --git a/src/main/java/com/juick/jabber/ws/WSData.java b/src/main/java/com/juick/jabber/ws/WSData.java deleted file mode 100644 index 9ae98bcf..00000000 --- a/src/main/java/com/juick/jabber/ws/WSData.java +++ /dev/null @@ -1,277 +0,0 @@ -package com.juick.jabber.ws; - -import com.juick.server.MessagesQueries; -import com.juick.server.UserQueries; -import com.juick.xmpp.utils.Base64; -import org.springframework.jdbc.core.JdbcTemplate; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.ServerSocketChannel; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.Iterator; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * - * @author ugnich - */ -public class WSData implements Runnable { - - private static final Logger logger = Logger.getLogger("Websockets"); - static final String WEBSOCKET_GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; - JdbcTemplate sql; - public Selector sel; - - public WSData(JdbcTemplate sql) { - this.sql = sql; - } - - @Override - public void run() { - try { - sel = Selector.open(); - ServerSocketChannel listensock = ServerSocketChannel.open(); - listensock.configureBlocking(false); - listensock.socket().bind(new InetSocketAddress(8081)); - listensock.register(sel, SelectionKey.OP_ACCEPT); - - while (true) { - sel.select(); - Iterator<SelectionKey> it = sel.selectedKeys().iterator(); - while (it.hasNext()) { - SelectionKey selKey = it.next(); - it.remove(); - - if (selKey.isAcceptable()) { - ServerSocketChannel ssChannel = (ServerSocketChannel) selKey.channel(); - SocketChannel sChannel = ssChannel.accept(); - sChannel.configureBlocking(false); - sChannel.register(sel, SelectionKey.OP_READ); - logger.info(sChannel.socket().getRemoteSocketAddress().toString() + " ACCEPTED"); - } else if (selKey.isReadable()) { - SocketChannel sChannel = (SocketChannel) selKey.channel(); - ByteBuffer buf = ByteBuffer.allocate(10240); - try { - int readbytes = sChannel.read(buf); - if (readbytes > 0) { - buf.flip(); - - CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf); - buf.rewind(); - - switch (buf.get(0)) { - case (byte) 0x89: // PING - updateSocketTS(sChannel); - wsPing(sChannel); - break; - case (byte) 0x8A: // PONG - updateSocketTS(sChannel); - break; - case (byte) 0x81: // TEXT FRAME - updateSocketTS(sChannel); - wsTextFrame(sChannel, buf); - break; - case (byte) 'G': // HTTP - updateSocketTS(sChannel); - wsHandshake(sChannel, buf); - break; - case (byte) 0x88: // CONNECTION CLOSE - throw new IOException(sChannel.socket().getRemoteSocketAddress().toString() + " CONNECTION CLOSE"); - } - } else if (readbytes < 0) { - throw new IOException(sChannel.socket().getRemoteSocketAddress().toString() + " END OF STREAM"); - } - } catch (IOException e) { - logger.log(Level.SEVERE, "websocket exception", e); - sChannel.socket().close(); - sChannel.close(); - selKey.cancel(); - } - } - } - } - } catch (Exception e) { - logger.log(Level.SEVERE, "websocket exception", e); - } - } - - public void wsHandshake(SocketChannel sock, ByteBuffer buf) throws Exception { - String hOrigin = null; - String hHost = null; - String hLocation = null; - String hSecWebSocketKey = null; - String hSecWebSocketVersion = null; - String hXRealIP = null; - - buf.rewind(); - CharBuffer charbuf = Charset.forName("ISO-8859-1").decode(buf); - String headers[] = charbuf.toString().split("\r\n"); - for (int i = 0; i < headers.length; i++) { - String h[] = headers[i].split(" ", 2); - if (h.length == 2) { - if (h[0].equals("GET")) { - hLocation = headers[i].split(" ", 3)[1]; - } else if (h[0].equals("Origin:")) { - hOrigin = h[1]; - } else if (h[0].equals("Host:")) { - hHost = h[1]; - } else if (h[0].equals("Sec-WebSocket-Key:")) { - hSecWebSocketKey = h[1]; - } else if (h[0].equals("Sec-WebSocket-Version:")) { - hSecWebSocketVersion = h[1]; - } else if (h[0].equals("X-Real-IP:")) { - hXRealIP = h[1]; - } - } - } - - if (hOrigin == null || hHost == null || hLocation == null || hSecWebSocketKey == null || hSecWebSocketVersion == null || !hSecWebSocketVersion.equals("13")) { - throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " Invalid headers"); - } - - // Auth - int VUID = 0; - int hashloc = hLocation.indexOf("hash="); - if (hashloc > 0) { - String hash = hLocation.substring(hashloc + 5); - if (hash.indexOf('&') > 0) { - hash = hash.substring(0, hash.indexOf('&')); - } - if (hash.length() == 16) { - VUID = com.juick.server.UserQueries.getUIDbyHash(sql, hash); - } - } - - // URL - int hLocationQM = hLocation.indexOf('?'); - if (hLocationQM > 0) { - hLocation = hLocation.substring(0, hLocationQM); - } - - int MID = 0; - int responseCode = 404; - SocketSubscribed sockSubscr = null; - if (hLocation.equals("/") && VUID > 0) { - sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID); - responseCode = 101; - } else if (hLocation.equals("/_all")) { - sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID); - sockSubscr.allMessages = true; - responseCode = 101; - } else if (hLocation.equals("/_replies")) { - sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID); - sockSubscr.allReplies = true; - responseCode = 101; - } else if (hLocation.matches("^/\\d+$")) { - try { - MID = Integer.parseInt(hLocation.substring(1)); - } catch (Exception e) { - } - if (MID > 0) { - if (MessagesQueries.canViewThread(sql, MID, VUID)) { - sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID); - sockSubscr.MID = MID; - responseCode = 101; - } else { - responseCode = 403; - } - } - } else if (hLocation.matches("^/[a-zA-Z0-9\\-]{2,16}/?$")) { - String uname; - if (hLocation.endsWith("/")) { - uname = hLocation.substring(1, hLocation.length() - 2); - } else { - uname = hLocation.substring(1); - } - - int UID = UserQueries.getUIDbyName(sql, uname); - if (UID > 0) { - // check access - sockSubscr = new SocketSubscribed(sock, hXRealIP, VUID); - sockSubscr.UID = UID; - responseCode = 101; - } - } - if (sockSubscr != null) { - synchronized (Main.clients) { - Main.clients.add(sockSubscr); - } - } - - // Response - String outstr; - if (responseCode == 101) { - outstr = "HTTP/1.1 101 Switching Protocols\r\n" - + "Upgrade: websocket\r\n" - + "Connection: Upgrade\r\n" - + "Sec-WebSocket-Accept: " + calcHeaderAccept(hSecWebSocketKey) + "\r\n" - + "\r\n"; - } else if (responseCode == 403) { - outstr = "HTTP/1.1 403 Forbidden\r\n\r\n"; - } else { - outstr = "HTTP/1.1 404 Not Found\r\n\r\n"; - } - ByteBuffer out = ByteBuffer.allocate(1024); - out.put(Charset.forName("ISO-8859-1").encode(outstr)); - out.flip(); - sock.write(out); - - if (responseCode == 101) { - logger.info(sock.socket().getRemoteSocketAddress().toString() + " HANDSHAKE (VUID = " + VUID + "; MID = " + MID + ")"); - } else { - throw new IOException(sock.socket().getRemoteSocketAddress().toString() + " " + responseCode); - } - } - - private String calcHeaderAccept(String key) { - String base = key + WEBSOCKET_GUID; - try { - MessageDigest md = MessageDigest.getInstance("SHA-1"); - return Base64.encode(md.digest(base.getBytes())); - } catch (NoSuchAlgorithmException e) { - logger.severe("calcHeaderAccept: " + e); - } - return ""; - } - - public void wsPing(SocketChannel sock) throws Exception { - ByteBuffer out = ByteBuffer.allocate(2); - out.put((byte) 0x8A); // PONG FRAME - out.put((byte) 0x00); // 1 byte long - out.flip(); - out.rewind(); - sock.write(out); - } - - public void wsTextFrame(SocketChannel sock, ByteBuffer buf) throws Exception { - /* - ByteBuffer out = ByteBuffer.allocate(3); - out.put((byte) 0x81); // TEXT FRAME - out.put((byte) 0x01); // 1 byte long - out.put((byte) 0x20); // ' ' - out.flip(); - out.rewind(); - sock.write(out); - */ - } - - public void updateSocketTS(SocketChannel sock) { - synchronized (Main.clients) { - for (SocketSubscribed s : Main.clients) { - if (s.sock == sock) { - s.tsLastData = System.currentTimeMillis(); - break; - } - } - } - } -} diff --git a/src/main/java/com/juick/jabber/ws/WSKeepAlive.java b/src/main/java/com/juick/jabber/ws/WSKeepAlive.java deleted file mode 100644 index 6244013d..00000000 --- a/src/main/java/com/juick/jabber/ws/WSKeepAlive.java +++ /dev/null @@ -1,106 +0,0 @@ -package com.juick.jabber.ws; - -import org.springframework.jdbc.core.JdbcTemplate; - -import java.io.PrintWriter; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * - * @author ugnich - */ -public class WSKeepAlive implements Runnable { - private static final Logger logger = Logger.getLogger(WSKeepAlive.class.getName()); - - JdbcTemplate sql; - ByteBuffer pingBytes; - ByteBuffer closeBytes; - - public WSKeepAlive(JdbcTemplate sql) { - this.sql = sql; - - //pingBytes = ByteBuffer.allocate(2); - //pingBytes.put((byte) 0x8A); // PONG FRAME - //pingBytes.put((byte) 0x00); // 0 byte long - pingBytes = ByteBuffer.allocate(3); - pingBytes.put((byte) 0x81); // TEXT FRAME - pingBytes.put((byte) 0x01); // 1 byte long - pingBytes.put((byte) 0x20); // ' ' - pingBytes.flip(); - - closeBytes = ByteBuffer.allocate(2); - closeBytes.put((byte) 0x88); // CLOSE FRAME - closeBytes.put((byte) 0x00); // 0 byte long - closeBytes.flip(); - } - - @Override - public void run() { - while (true) { - PrintWriter statsFile = null; - - if (Main.STATSFILE != null) { - try { - statsFile = new PrintWriter(Main.STATSFILE, "UTF-8"); - } catch (Exception e) { - statsFile = null; - logger.severe("WSKeepAlive statsFile: " + e); - } - } - - long now = System.currentTimeMillis(); - - synchronized (Main.clients) { - if (statsFile != null) { - statsFile.write("<html><body><h1>Connections (" + Main.clients.size() + ")</h2><table border=1><tr><th>IP</th><th>inactive</th><th>VUID</th><th>UID</th><th>MID</th><th>allM</th><th>allR</th></tr>"); - } - - for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) { - SocketSubscribed s = i.next(); - int inactive = (int) ((double) (now - s.tsLastData) / 1000.0); - - if (statsFile != null) { - try { - statsFile.print("<tr><td>" + (s.clientName != null ? s.clientName : "?") + "</td>"); - statsFile.print("<td>" + inactive + "</td>"); - statsFile.print("<td>" + (s.VUID > 0 ? s.VUID : "") + "</td>"); - statsFile.print("<td>" + (s.UID > 0 ? s.UID : "") + "</td>"); - statsFile.print("<td>" + (s.MID > 0 ? s.MID : "") + "</td>"); - statsFile.print("<td>" + (s.allMessages ? "+" : "") + "</td>"); - statsFile.print("<td>" + (s.allReplies ? "+" : "") + "</td></tr>"); - } catch (Exception e) { - logger.log(Level.SEVERE, "WSKeepAlive statsFile print", e); - } - } - - if (inactive > 180) { - s.sendByteBuffer(closeBytes); - s.close(); - i.remove(); - } else if (inactive > 60) { - if (!s.sendByteBuffer(pingBytes)) { - i.remove(); - } - } - } - } - - if (Main.STATSFILE != null) { - try { - statsFile.write("</table></body></html>"); - statsFile.close(); - } catch (Exception e) { - logger.log(Level.SEVERE, "WSKeepAlive statsFile close", e); - } - } - - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - } - } - } -} diff --git a/src/main/java/com/juick/ws/WebsocketComponent.java b/src/main/java/com/juick/ws/WebsocketComponent.java new file mode 100644 index 00000000..83e811a6 --- /dev/null +++ b/src/main/java/com/juick/ws/WebsocketComponent.java @@ -0,0 +1,134 @@ +package com.juick.ws; + +import com.juick.User; +import com.juick.server.MessagesQueries; +import com.juick.server.UserQueries; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; +import org.springframework.http.HttpHeaders; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import javax.inject.Inject; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Created by vitalyster on 28.06.2016. + */ +@Component +public class WebsocketComponent extends TextWebSocketHandler { + + @Inject + JdbcTemplate jdbc; + + private static final Logger logger = Logger.getLogger(WebsocketComponent.class.getName()); + final List<SocketSubscribed> clients = Collections.synchronizedList(new ArrayList<SocketSubscribed>()); + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + URI hLocation; + String hXRealIP = ""; + + hLocation = session.getUri(); + HttpHeaders headers = session.getHandshakeHeaders(); + hXRealIP = headers.getOrDefault("X-Real-IP", + Collections.singletonList(session.getRemoteAddress().toString())).get(0); + + // Auth + User visitor = new User(); + List<NameValuePair> params = URLEncodedUtils.parse(hLocation, "UTF-8"); + for (NameValuePair param : params) { + if (param.getName().equals("hash")) { + String hash = param.getValue(); + if (hash.length() == 16) { + visitor = UserQueries.getUserByHash(jdbc, hash); + } else { + try { + logger.info(String.format("wrong hash for %d from %s", visitor.getUID(), hXRealIP)); + session.close(new CloseStatus(403, "Forbidden")); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + } + break; + } + } + logger.info(String.format("user %d connected to %s from %s", visitor.getUID(), hLocation.getPath(), hXRealIP)); + + int MID = 0; + SocketSubscribed sockSubscr = null; + if (hLocation.getPath().equals("/") && visitor.getUID() > 0) { + logger.info(String.format("user %d connected", visitor.getUID())); + sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, false); + } else if (hLocation.getPath().equals("/_all")) { + logger.info(String.format("user %d connected to legacy _all (%s)", visitor.getUID(), hLocation.getPath())); + sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true); + sockSubscr.allMessages = true; + } else if (hLocation.getPath().equals("/_replies")) { + logger.info(String.format("user %d connected to legacy _replies (%s)", visitor.getUID(), hLocation.getPath())); + sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true); + sockSubscr.allReplies = true; + } else if (hLocation.getPath().matches("/\\d+$")) { + try { + MID = Integer.parseInt(hLocation.getPath().substring(8)); + } catch (Exception e) { + } + if (MID > 0) { + if (MessagesQueries.canViewThread(jdbc, MID, visitor.getUID())) { + logger.info(String.format("user %d connected to legacy thread (%d) from %s", visitor.getUID(), MID, hXRealIP)); + sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true); + sockSubscr.MID = MID; + } else { + try { + session.close(new CloseStatus(403, "Forbidden")); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + } + } + } + if (sockSubscr != null) { + synchronized (clients) { + clients.add(sockSubscr); + } + } + } + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + synchronized (clients) { + clients.stream().filter(c -> c.session.equals(session)).forEach(c -> { + logger.info(String.format("session %s closed with status %s", c.clientName, status.getCode())); + clients.remove(c); + }); + } + } + class SocketSubscribed { + + WebSocketSession session; + String clientName; + User visitor; + int MID; + boolean allMessages; + boolean allReplies; + long tsConnected; + long tsLastData; + boolean legacy; + + public SocketSubscribed(WebSocketSession session, String clientName, User visitor, boolean legacy) { + this.session = session; + this.clientName = clientName; + this.visitor = visitor; + tsConnected = tsLastData = System.currentTimeMillis(); + this.legacy = legacy; + } + } +} diff --git a/src/main/java/com/juick/jabber/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java index 8c187a36..9ed3d0cd 100644 --- a/src/main/java/com/juick/jabber/ws/XMPPConnection.java +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -1,8 +1,7 @@ -package com.juick.jabber.ws; +package com.juick.ws; import com.juick.User; import com.juick.json.MessageSerializer; -import com.juick.server.MessagesQueries; import com.juick.server.SubscriptionsQueries; import com.juick.xmpp.JID; import com.juick.xmpp.Message; @@ -10,12 +9,12 @@ import com.juick.xmpp.Stream; import com.juick.xmpp.StreamComponent; import com.juick.xmpp.extensions.JuickMessage; import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.TextMessage; +import javax.inject.Inject; import java.io.IOException; import java.net.Socket; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.Iterator; import java.util.List; import java.util.logging.Level; import java.util.logging.Logger; @@ -25,6 +24,7 @@ import java.util.stream.Collectors; * * @author ugnich */ +@Component public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener { private static final Logger logger = Logger.getLogger(XMPPConnection.class.getName()); @@ -32,11 +32,15 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. Stream xmpp; String xmppPassword; MessageSerializer ms; + @Inject + WebsocketComponent ws; + @Inject public XMPPConnection(JdbcTemplate sql, String password) { this.sql = sql; xmppPassword = password; ms = new MessageSerializer(); + new Thread(this).start(); } @Override @@ -89,70 +93,50 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. private void onJuickPM(int uid_to, com.juick.Message jmsg) { String json = messageSerializer.serialize(jmsg).toString(); - ByteBuffer bbMsg = buildTextFrame(json); - - synchronized (Main.clients) { - for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) { - SocketSubscribed s = i.next(); - if (s.VUID == uid_to && s.MID == 0 && s.allMessages == false && s.allReplies == false) { - if (!s.sendByteBuffer(bbMsg)) { - i.remove(); - } + synchronized (ws.clients) { + ws.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> { + try { + logger.info("sending pm to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); } - } + }); } } private void onJuickMessagePost(com.juick.Message jmsg) { String json = messageSerializer.serialize(jmsg).toString(); - ByteBuffer bbMsg = buildTextFrame(json); - List<Integer> uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()) .stream().map(User::getUID).collect(Collectors.toList()); - synchronized (Main.clients) { - for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) { - SocketSubscribed s = i.next(); - if (s.MID == 0 && s.allReplies == false && ((jmsg.Privacy >= 0 - && (s.allMessages || s.UID == jmsg.getUser().getUID())) || uids.contains(s.VUID))) { - if (!s.sendByteBuffer(bbMsg)) { - i.remove(); - } + synchronized (ws.clients) { + ws.clients.stream().filter(c -> !c.legacy && uids.contains(c.visitor.getUID())).forEach(c -> { + try { + logger.info("sending message to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); } - } + }); } } private void onJuickMessageReply(com.juick.Message jmsg) { String json = messageSerializer.serialize(jmsg).toString(); - ByteBuffer bbMsg = buildTextFrame(json); - int privacy = MessagesQueries.getMessagePrivacy(sql, jmsg.getMID()); - - synchronized (Main.clients) { - for (Iterator<SocketSubscribed> i = Main.clients.iterator(); i.hasNext();) { - SocketSubscribed s = i.next(); - if ((privacy >= 0 && s.allReplies) || s.MID == jmsg.getMID()) { - if (!s.sendByteBuffer(bbMsg)) { - i.remove(); - } + List<Integer> threadUsers = + SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID()) + .stream().map(User::getUID).collect(Collectors.toList()); + synchronized (ws.clients) { + ws.clients.stream().filter(c -> !c.legacy && threadUsers.contains(c.visitor.getUID())).forEach(c -> { + try { + logger.info("sending reply to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); } - } - } - } - - private ByteBuffer buildTextFrame(String json) { - ByteBuffer jsonbytes = Charset.forName("UTF-8").encode(json); - ByteBuffer bbMsg = ByteBuffer.allocate(jsonbytes.limit() + 8); - bbMsg.put((byte) 0x81); - if (jsonbytes.limit() <= 125) { - bbMsg.put((byte) jsonbytes.limit()); - } else { - bbMsg.put((byte) 126); - bbMsg.putShort((short) jsonbytes.limit()); + }); } - bbMsg.put(jsonbytes); - bbMsg.flip(); - return bbMsg; } } diff --git a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java new file mode 100644 index 00000000..a56e56f5 --- /dev/null +++ b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java @@ -0,0 +1,49 @@ +package com.juick.ws.configuration; + +import com.juick.ws.WebsocketComponent; +import com.juick.ws.XMPPConnection; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.PropertySource; +import org.springframework.core.env.Environment; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DriverManagerDataSource; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; + +import javax.inject.Inject; + +/** + * Created by vitalyster on 28.06.2016. + */ +@Configuration +@EnableWebSocket +@ComponentScan(basePackages = {"com.juick"}) +@PropertySource("classpath:juick.conf") +public class WebsocketConfiguration implements WebSocketConfigurer { + @Inject + Environment env; + + @Bean + WebSocketHandler wsHandler() { + return new WebsocketComponent(); + } + @Bean + JdbcTemplate jdbc() { + DriverManagerDataSource dataSource = new DriverManagerDataSource(); + dataSource.setDriverClassName(env.getProperty("datasource_driver", "com.mysql.jdbc.Driver")); + dataSource.setUrl(env.getProperty("datasource_url")); + return new JdbcTemplate(dataSource); + } + @Bean + XMPPConnection xmpp() { + return new XMPPConnection(jdbc(), env.getProperty("xmpp_password")); + } + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(wsHandler(), "/"); + } +} |