From 91bd51c87c715954e21e96948db11aa980dcbeb2 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Thu, 26 Jul 2018 09:57:04 +0300 Subject: WebsocketManager refactoring --- .../main/java/com/juick/server/ServerManager.java | 14 ++- .../java/com/juick/server/WebsocketManager.java | 102 +++++++++------------ 2 files changed, 48 insertions(+), 68 deletions(-) (limited to 'juick-server') diff --git a/juick-server/src/main/java/com/juick/server/ServerManager.java b/juick-server/src/main/java/com/juick/server/ServerManager.java index f5ca82f1..a4fd68ff 100644 --- a/juick-server/src/main/java/com/juick/server/ServerManager.java +++ b/juick-server/src/main/java/com/juick/server/ServerManager.java @@ -28,11 +28,9 @@ import com.juick.util.MessageUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.web.socket.TextMessage; -import javax.annotation.Nonnull; import javax.inject.Inject; import java.io.IOException; import java.util.List; @@ -69,7 +67,7 @@ public class ServerManager implements NotificationListener { .forEach(c -> { try { logger.debug("sending pm to {}", c.visitor.getUid()); - c.session.sendMessage(new TextMessage(json)); + c.sendMessage(new TextMessage(json)); } catch (IOException e) { logger.warn("ws error", e); } @@ -93,7 +91,7 @@ public class ServerManager implements NotificationListener { .forEach(c -> { try { logger.debug("sending message to {}", c.visitor.getUid()); - c.session.sendMessage(new TextMessage(json)); + c.sendMessage(new TextMessage(json)); } catch (IOException e) { logger.warn("ws error", e); } @@ -103,7 +101,7 @@ public class ServerManager implements NotificationListener { .forEach(c -> { try { logger.debug("sending message to legacy client {}", c.visitor.getUid()); - c.session.sendMessage(new TextMessage(json)); + c.sendMessage(new TextMessage(json)); } catch (IOException e) { logger.warn("ws error", e); } @@ -130,7 +128,7 @@ public class ServerManager implements NotificationListener { .forEach(c -> { try { logger.debug("sending reply to {}", c.visitor.getUid()); - c.session.sendMessage(new TextMessage(json)); + c.sendMessage(new TextMessage(json)); } catch (IOException e) { logger.warn("ws error", e); } @@ -140,7 +138,7 @@ public class ServerManager implements NotificationListener { .forEach(c -> { try { logger.debug("sending reply to legacy client {}", c.visitor.getUid()); - c.session.sendMessage(new TextMessage(json)); + c.sendMessage(new TextMessage(json)); } catch (IOException e) { logger.warn("ws error", e); } @@ -203,7 +201,7 @@ public class ServerManager implements NotificationListener { serviceMessage.setMid(source.getMid()); serviceMessage.setUnread(false); try { - u.session.sendMessage(new TextMessage(jsonMapper.writeValueAsString(serviceMessage))); + u.sendMessage(new TextMessage(jsonMapper.writeValueAsString(serviceMessage))); } catch (IOException e) { logger.error("JSON error", e); } diff --git a/juick-server/src/main/java/com/juick/server/WebsocketManager.java b/juick-server/src/main/java/com/juick/server/WebsocketManager.java index 360f6c1c..8974a49f 100644 --- a/juick-server/src/main/java/com/juick/server/WebsocketManager.java +++ b/juick-server/src/main/java/com/juick/server/WebsocketManager.java @@ -17,11 +17,9 @@ package com.juick.server; -import com.fasterxml.jackson.databind.ObjectMapper; import com.juick.User; import com.juick.server.helpers.AnonymousUser; import com.juick.server.helpers.CommandResult; -import com.juick.server.util.HttpBadRequestException; import com.juick.server.util.HttpForbiddenException; import com.juick.server.util.HttpNotFoundException; import com.juick.service.MessagesService; @@ -43,13 +41,14 @@ import org.springframework.web.socket.handler.TextWebSocketHandler; import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; +import javax.annotation.Nonnull; import javax.inject.Inject; import java.io.IOException; import java.net.URI; import java.time.Instant; import java.util.Collections; -import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; /** * Created by vitalyster on 28.06.2016. @@ -58,60 +57,48 @@ import java.util.List; public class WebsocketManager extends TextWebSocketHandler { private static final Logger logger = LoggerFactory.getLogger(WebsocketManager.class); - private final List clients = Collections.synchronizedList(new LinkedList<>()); + private final List clients = new CopyOnWriteArrayList<>(); @Inject private UserService userService; @Inject private MessagesService messagesService; - @Value("${service_user:juick}") - private String serviceUser; - @Inject - private ObjectMapper jsonMapper; @Inject private CommandsManager commandsManager; @Override public void afterConnectionEstablished(WebSocketSession session) { - URI hLocation; - String hXRealIP; - hLocation = session.getUri(); - HttpHeaders headers = session.getHandshakeHeaders(); - hXRealIP = headers.getOrDefault("X-Real-IP", - Collections.singletonList(session.getRemoteAddress().toString())).get(0); + UserSession userSession = new UserSession(session); + URI hLocation = session.getUri(); // Auth - User visitor = AnonymousUser.INSTANCE; UriComponents uriComponents = UriComponentsBuilder.fromUri(hLocation).build(); List hash = uriComponents.getQueryParams().get("hash"); if (hash != null && hash.get(0).length() == 16) { - visitor = userService.getUserByHash(hash.get(0)); + userSession.visitor = userService.getUserByHash(hash.get(0)); } else { - logger.debug("wrong hash for {} from {}", visitor.getUid(), hXRealIP); + logger.debug("wrong hash for {} from {}", userSession.visitor.getUid(), userSession); } - int MID = 0; - SocketSubscribed sockSubscr = null; if (hLocation.getPath().equals("/ws/")) { - logger.debug("user {} connected", visitor.getUid()); - sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, false); + logger.debug("user {} connected", userSession.visitor.getUid()); } else if (hLocation.getPath().equals("/ws/_all")) { - logger.debug("user {} connected to legacy _all ({})", visitor.getUid(), hLocation.getPath()); - sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true); - sockSubscr.allMessages = true; + logger.debug("user {} connected to legacy _all ({})", userSession.visitor.getUid(), hLocation.getPath()); + userSession.legacy = true; + userSession.allMessages = true; } else if (hLocation.getPath().equals("/ws/_replies")) { - logger.debug("user {} connected to legacy _replies ({})", visitor.getUid(), hLocation.getPath()); - sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true); - sockSubscr.allReplies = true; + logger.debug("user {} connected to legacy _replies ({})", userSession.visitor.getUid(), hLocation.getPath()); + userSession.legacy = true; + userSession.allReplies = true; } else if (hLocation.getPath().matches("^/ws/(\\d)+$")) { - MID = NumberUtils.toInt(hLocation.getPath().substring(4), 0); + int MID = NumberUtils.toInt(hLocation.getPath().substring(4), 0); if (MID > 0) { - if (messagesService.canViewThread(MID, visitor.getUid())) { - logger.debug("user {} connected to legacy thread ({}) from {}", visitor.getUid(), MID, hXRealIP); - sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true); - sockSubscr.MID = MID; + if (messagesService.canViewThread(MID, userSession.visitor.getUid())) { + logger.debug("user {} connected to legacy thread ({}) from {}", userSession.visitor.getUid(), MID, userSession); + userSession.legacy = true; + userSession.MID = MID; } else { throw new HttpForbiddenException(); } @@ -119,32 +106,23 @@ public class WebsocketManager extends TextWebSocketHandler { } else { throw new HttpNotFoundException(); } - if (sockSubscr != null) { - synchronized (clients) { - clients.add(sockSubscr); - logger.debug("{} clients connected", clients.size()); - } - } else { - throw new HttpBadRequestException(); - } + clients.add(userSession); + logger.debug("{} clients connected", clients.size()); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { - synchronized (clients) { - logger.debug("session closed with status {}: {}", status.getCode(), status.getReason()); - clients.removeIf(c -> c.session.getDelegate().getId().equals(session.getId())); - logger.debug("{} clients connected", clients.size()); - } - + logger.debug("session closed with status {}: {}", status.getCode(), status.getReason()); + clients.removeIf(c -> c.getDelegate().getId().equals(session.getId())); + logger.debug("{} clients connected", clients.size()); } @Scheduled(fixedRate = 30000) public void ping() { clients.forEach(c -> { try { - if (c.session.isOpen()) { - c.session.sendMessage(new PingMessage()); + if (c.isOpen()) { + c.sendMessage(new PingMessage()); } } catch (IOException e) { logger.error("WebSocket PING exception", e); @@ -154,26 +132,24 @@ public class WebsocketManager extends TextWebSocketHandler { @Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { - SocketSubscribed ws = clients.stream().filter(c -> c.session.getDelegate().equals(session)) + UserSession ws = clients.stream().filter(c -> c.getDelegate().equals(session)) .findFirst().orElseThrow(IllegalStateException::new); if (!ws.visitor.isAnonymous()) { String command = message.getPayload().trim(); if (StringUtils.isNotEmpty(command)) { CommandResult result = commandsManager.processCommand(ws.visitor, command, URI.create("")); - ws.session.sendMessage(new TextMessage(result.getText())); + ws.sendMessage(new TextMessage(result.getText())); } } else { - ws.session.sendMessage(new TextMessage("Authorization required")); + ws.sendMessage(new TextMessage("Authorization required")); } } - public List getClients() { + public List getClients() { return clients; } - class SocketSubscribed { - ConcurrentWebSocketSessionDecorator session; - String clientName; + class UserSession extends ConcurrentWebSocketSessionDecorator { User visitor; int MID; boolean allMessages; @@ -182,12 +158,18 @@ public class WebsocketManager extends TextWebSocketHandler { Instant tsLastData; boolean legacy; - public SocketSubscribed(WebSocketSession session, String clientName, User visitor, boolean legacy) { - this.session = new ConcurrentWebSocketSessionDecorator(session, 60000, 65536); - this.clientName = clientName; - this.visitor = visitor; + UserSession(WebSocketSession session) { + super(session, 60000, 65536); + this.visitor = AnonymousUser.INSTANCE; tsConnected = tsLastData = Instant.now(); - this.legacy = legacy; + } + + @Nonnull + @Override + public String toString() { + HttpHeaders headers = getHandshakeHeaders(); + return headers.getOrDefault("X-Real-IP", + Collections.singletonList(getRemoteAddress().toString())).get(0); } } } -- cgit v1.2.3