From 331645f0fd7fbe7d9679d39dcce453cc3b2cab6e Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 28 Jun 2016 10:36:28 +0300 Subject: spring-websocket --- src/main/java/com/juick/ws/WebsocketComponent.java | 134 +++++++++++++++++++++ 1 file changed, 134 insertions(+) create mode 100644 src/main/java/com/juick/ws/WebsocketComponent.java (limited to 'src/main/java/com/juick/ws/WebsocketComponent.java') diff --git a/src/main/java/com/juick/ws/WebsocketComponent.java b/src/main/java/com/juick/ws/WebsocketComponent.java new file mode 100644 index 00000000..83e811a6 --- /dev/null +++ b/src/main/java/com/juick/ws/WebsocketComponent.java @@ -0,0 +1,134 @@ +package com.juick.ws; + +import com.juick.User; +import com.juick.server.MessagesQueries; +import com.juick.server.UserQueries; +import org.apache.http.NameValuePair; +import org.apache.http.client.utils.URLEncodedUtils; +import org.springframework.http.HttpHeaders; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import javax.inject.Inject; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Created by vitalyster on 28.06.2016. + */ +@Component +public class WebsocketComponent extends TextWebSocketHandler { + + @Inject + JdbcTemplate jdbc; + + private static final Logger logger = Logger.getLogger(WebsocketComponent.class.getName()); + final List clients = Collections.synchronizedList(new ArrayList()); + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + URI hLocation; + String hXRealIP = ""; + + hLocation = session.getUri(); + HttpHeaders headers = session.getHandshakeHeaders(); + hXRealIP = headers.getOrDefault("X-Real-IP", + Collections.singletonList(session.getRemoteAddress().toString())).get(0); + + // Auth + User visitor = new User(); + List params = URLEncodedUtils.parse(hLocation, "UTF-8"); + for (NameValuePair param : params) { + if (param.getName().equals("hash")) { + String hash = param.getValue(); + if (hash.length() == 16) { + visitor = UserQueries.getUserByHash(jdbc, hash); + } else { + try { + logger.info(String.format("wrong hash for %d from %s", visitor.getUID(), hXRealIP)); + session.close(new CloseStatus(403, "Forbidden")); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + } + break; + } + } + logger.info(String.format("user %d connected to %s from %s", visitor.getUID(), hLocation.getPath(), hXRealIP)); + + int MID = 0; + SocketSubscribed sockSubscr = null; + if (hLocation.getPath().equals("/") && visitor.getUID() > 0) { + logger.info(String.format("user %d connected", visitor.getUID())); + sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, false); + } else if (hLocation.getPath().equals("/_all")) { + logger.info(String.format("user %d connected to legacy _all (%s)", visitor.getUID(), hLocation.getPath())); + sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true); + sockSubscr.allMessages = true; + } else if (hLocation.getPath().equals("/_replies")) { + logger.info(String.format("user %d connected to legacy _replies (%s)", visitor.getUID(), hLocation.getPath())); + sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true); + sockSubscr.allReplies = true; + } else if (hLocation.getPath().matches("/\\d+$")) { + try { + MID = Integer.parseInt(hLocation.getPath().substring(8)); + } catch (Exception e) { + } + if (MID > 0) { + if (MessagesQueries.canViewThread(jdbc, MID, visitor.getUID())) { + logger.info(String.format("user %d connected to legacy thread (%d) from %s", visitor.getUID(), MID, hXRealIP)); + sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true); + sockSubscr.MID = MID; + } else { + try { + session.close(new CloseStatus(403, "Forbidden")); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + } + } + } + if (sockSubscr != null) { + synchronized (clients) { + clients.add(sockSubscr); + } + } + } + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + synchronized (clients) { + clients.stream().filter(c -> c.session.equals(session)).forEach(c -> { + logger.info(String.format("session %s closed with status %s", c.clientName, status.getCode())); + clients.remove(c); + }); + } + } + class SocketSubscribed { + + WebSocketSession session; + String clientName; + User visitor; + int MID; + boolean allMessages; + boolean allReplies; + long tsConnected; + long tsLastData; + boolean legacy; + + public SocketSubscribed(WebSocketSession session, String clientName, User visitor, boolean legacy) { + this.session = session; + this.clientName = clientName; + this.visitor = visitor; + tsConnected = tsLastData = System.currentTimeMillis(); + this.legacy = legacy; + } + } +} -- cgit v1.2.3 From f53538d230440f16948a12a34ffe587edcd0f245 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 28 Jun 2016 14:53:37 +0300 Subject: fixes --- src/main/java/com/juick/ws/WebsocketComponent.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'src/main/java/com/juick/ws/WebsocketComponent.java') diff --git a/src/main/java/com/juick/ws/WebsocketComponent.java b/src/main/java/com/juick/ws/WebsocketComponent.java index 83e811a6..7399c298 100644 --- a/src/main/java/com/juick/ws/WebsocketComponent.java +++ b/src/main/java/com/juick/ws/WebsocketComponent.java @@ -97,19 +97,18 @@ public class WebsocketComponent extends TextWebSocketHandler { } } if (sockSubscr != null) { - synchronized (clients) { - clients.add(sockSubscr); - } + clients.add(sockSubscr); } } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { synchronized (clients) { - clients.stream().filter(c -> c.session.equals(session)).forEach(c -> { + clients.stream().filter(c -> c.session.getId().equals(session.getId())).forEach(c -> { logger.info(String.format("session %s closed with status %s", c.clientName, status.getCode())); clients.remove(c); }); } + } class SocketSubscribed { -- cgit v1.2.3 From c4948c141a68433b4ba9eca167622816dcd10fa1 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 28 Jun 2016 15:25:08 +0300 Subject: fix session closing --- src/main/java/com/juick/ws/WebsocketComponent.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'src/main/java/com/juick/ws/WebsocketComponent.java') diff --git a/src/main/java/com/juick/ws/WebsocketComponent.java b/src/main/java/com/juick/ws/WebsocketComponent.java index 7399c298..8ccccb3b 100644 --- a/src/main/java/com/juick/ws/WebsocketComponent.java +++ b/src/main/java/com/juick/ws/WebsocketComponent.java @@ -103,9 +103,12 @@ public class WebsocketComponent extends TextWebSocketHandler { @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { synchronized (clients) { - clients.stream().filter(c -> c.session.getId().equals(session.getId())).forEach(c -> { - logger.info(String.format("session %s closed with status %s", c.clientName, status.getCode())); - clients.remove(c); + clients.removeIf(c -> { + if (c.session.equals(session)) { + logger.info(String.format("session from %s closed with status %d", c.clientName, status.getCode())); + return true; + } + return false; }); } -- cgit v1.2.3 From 1f82412b053dce31d01e0c384bd18ad3b9e101ec Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Thu, 30 Jun 2016 12:19:14 +0300 Subject: allow anonymous ws --- src/main/java/com/juick/ws/WebsocketComponent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/main/java/com/juick/ws/WebsocketComponent.java') diff --git a/src/main/java/com/juick/ws/WebsocketComponent.java b/src/main/java/com/juick/ws/WebsocketComponent.java index 8ccccb3b..d38f7b69 100644 --- a/src/main/java/com/juick/ws/WebsocketComponent.java +++ b/src/main/java/com/juick/ws/WebsocketComponent.java @@ -66,7 +66,7 @@ public class WebsocketComponent extends TextWebSocketHandler { int MID = 0; SocketSubscribed sockSubscr = null; - if (hLocation.getPath().equals("/") && visitor.getUID() > 0) { + if (hLocation.getPath().equals("/")) { logger.info(String.format("user %d connected", visitor.getUID())); sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, false); } else if (hLocation.getPath().equals("/_all")) { -- cgit v1.2.3 From e0d4d97777962c6f1752d7772b57392ec53dfc21 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Thu, 30 Jun 2016 12:29:38 +0300 Subject: allow anonymous replies --- src/main/java/com/juick/ws/WebsocketComponent.java | 6 +++++- src/main/java/com/juick/ws/XMPPConnection.java | 25 ++++++++++++---------- 2 files changed, 19 insertions(+), 12 deletions(-) (limited to 'src/main/java/com/juick/ws/WebsocketComponent.java') diff --git a/src/main/java/com/juick/ws/WebsocketComponent.java b/src/main/java/com/juick/ws/WebsocketComponent.java index d38f7b69..daf17753 100644 --- a/src/main/java/com/juick/ws/WebsocketComponent.java +++ b/src/main/java/com/juick/ws/WebsocketComponent.java @@ -97,7 +97,10 @@ public class WebsocketComponent extends TextWebSocketHandler { } } if (sockSubscr != null) { - clients.add(sockSubscr); + synchronized (clients) { + clients.add(sockSubscr); + logger.info(clients.size() + " clients connected"); + } } } @Override @@ -110,6 +113,7 @@ public class WebsocketComponent extends TextWebSocketHandler { } return false; }); + logger.info(clients.size() + " clients connected"); } } diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java index d0bdc42f..4a80eec5 100644 --- a/src/main/java/com/juick/ws/XMPPConnection.java +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -108,11 +108,11 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. String json = messageSerializer.serialize(jmsg).toString(); List uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()) .stream().map(User::getUID).collect(Collectors.toList()); - logger.info(String.format("%d users subscribed to %d", uids.size(), jmsg.getUser().getUID())); synchronized (wsHandler.clients) { - Long legacycount = wsHandler.clients.stream().filter(c -> c.legacy && c.allMessages).count(); - logger.info(String.format("%d legacy users watched %d", legacycount, jmsg.getMID())); - wsHandler.clients.stream().filter(c -> !c.legacy && uids.contains(c.visitor.getUID())).forEach(c -> { + wsHandler.clients.stream().filter(c -> + (!c.legacy && c.visitor.getUID() == 0) // anonymous users + || (!c.legacy && uids.contains(c.visitor.getUID()))) // subscriptions + .forEach(c -> { try { logger.info("sending message to " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); @@ -120,7 +120,9 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. logger.log(Level.WARNING, "ws error", e); } }); - wsHandler.clients.stream().filter(c -> c.legacy && c.allMessages).forEach(c -> { + wsHandler.clients.stream().filter(c -> + c.legacy && c.allMessages) // legacy all posts + .forEach(c -> { try { logger.info("sending message to legacy client " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); @@ -133,15 +135,14 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. private void onJuickMessageReply(com.juick.Message jmsg) { String json = messageSerializer.serialize(jmsg).toString(); - logger.info("got reply: " + json); List threadUsers = SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID()) .stream().map(User::getUID).collect(Collectors.toList()); - logger.info(String.format("%d users subscribed to %d", threadUsers.size(), jmsg.getMID())); synchronized (wsHandler.clients) { - Long legacycount = wsHandler.clients.stream().filter(c -> c.legacy && c.allReplies).count(); - logger.info(String.format("%d legacy users watched %d", legacycount, jmsg.getMID())); - wsHandler.clients.stream().filter(c -> !c.legacy && threadUsers.contains(c.visitor.getUID())).forEach(c -> { + wsHandler.clients.stream().filter(c -> + (!c.legacy && c.visitor.getUID() == 0) // anonymous users + || (!c.legacy && threadUsers.contains(c.visitor.getUID()))) // subscriptions + .forEach(c -> { try { logger.info("sending reply to " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); @@ -149,7 +150,9 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. logger.log(Level.WARNING, "ws error", e); } }); - wsHandler.clients.stream().filter(c -> (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())).forEach(c -> { + wsHandler.clients.stream().filter(c -> + (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())) // legacy replies + .forEach(c -> { try { logger.info("sending reply to legacy client " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); -- cgit v1.2.3 From 4bc9b403ff3cac9ee115a788b18cffff520a9e3b Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Thu, 30 Jun 2016 12:33:46 +0300 Subject: fix session close --- src/main/java/com/juick/ws/WebsocketComponent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/main/java/com/juick/ws/WebsocketComponent.java') diff --git a/src/main/java/com/juick/ws/WebsocketComponent.java b/src/main/java/com/juick/ws/WebsocketComponent.java index daf17753..5dfe8237 100644 --- a/src/main/java/com/juick/ws/WebsocketComponent.java +++ b/src/main/java/com/juick/ws/WebsocketComponent.java @@ -107,7 +107,7 @@ public class WebsocketComponent extends TextWebSocketHandler { public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { synchronized (clients) { clients.removeIf(c -> { - if (c.session.equals(session)) { + if (c.session.getId().equals(session.getId())) { logger.info(String.format("session from %s closed with status %d", c.clientName, status.getCode())); return true; } -- cgit v1.2.3 From d3a850e35880ae5493b3a197e0e8f58da99350f8 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Thu, 30 Jun 2016 13:08:52 +0300 Subject: fix thread subscription --- src/main/java/com/juick/ws/WebsocketComponent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/main/java/com/juick/ws/WebsocketComponent.java') diff --git a/src/main/java/com/juick/ws/WebsocketComponent.java b/src/main/java/com/juick/ws/WebsocketComponent.java index 5dfe8237..e87b96a5 100644 --- a/src/main/java/com/juick/ws/WebsocketComponent.java +++ b/src/main/java/com/juick/ws/WebsocketComponent.java @@ -79,7 +79,7 @@ public class WebsocketComponent extends TextWebSocketHandler { sockSubscr.allReplies = true; } else if (hLocation.getPath().matches("/\\d+$")) { try { - MID = Integer.parseInt(hLocation.getPath().substring(8)); + MID = Integer.parseInt(hLocation.getPath().substring(1)); } catch (Exception e) { } if (MID > 0) { -- cgit v1.2.3