aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2018-07-26 09:57:04 +0300
committerGravatar Vitaly Takmazov2018-07-26 10:48:43 +0300
commit91bd51c87c715954e21e96948db11aa980dcbeb2 (patch)
tree1b5af9446825ad0e8cb902c8311655645c865de1
parent09552f180282bd845bac585408f27e1504e7897d (diff)
WebsocketManager refactoring
-rw-r--r--juick-common/src/main/java/com/juick/service/UserService.java3
-rw-r--r--juick-server/src/main/java/com/juick/server/ServerManager.java14
-rw-r--r--juick-server/src/main/java/com/juick/server/WebsocketManager.java102
3 files changed, 50 insertions, 69 deletions
diff --git a/juick-common/src/main/java/com/juick/service/UserService.java b/juick-common/src/main/java/com/juick/service/UserService.java
index df62f0ef..08a7a6ed 100644
--- a/juick-common/src/main/java/com/juick/service/UserService.java
+++ b/juick-common/src/main/java/com/juick/service/UserService.java
@@ -22,6 +22,7 @@ import com.juick.User;
import com.juick.server.helpers.Auth;
import com.juick.server.helpers.UserInfo;
+import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
@@ -65,7 +66,7 @@ public interface UserService {
int getUIDbyHash(String hash);
- com.juick.User getUserByHash(String hash);
+ @Nonnull com.juick.User getUserByHash(String hash);
String getHashByUID(int uid);
diff --git a/juick-server/src/main/java/com/juick/server/ServerManager.java b/juick-server/src/main/java/com/juick/server/ServerManager.java
index f5ca82f1..a4fd68ff 100644
--- a/juick-server/src/main/java/com/juick/server/ServerManager.java
+++ b/juick-server/src/main/java/com/juick/server/ServerManager.java
@@ -28,11 +28,9 @@ import com.juick.util.MessageUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
-import javax.annotation.Nonnull;
import javax.inject.Inject;
import java.io.IOException;
import java.util.List;
@@ -69,7 +67,7 @@ public class ServerManager implements NotificationListener {
.forEach(c -> {
try {
logger.debug("sending pm to {}", c.visitor.getUid());
- c.session.sendMessage(new TextMessage(json));
+ c.sendMessage(new TextMessage(json));
} catch (IOException e) {
logger.warn("ws error", e);
}
@@ -93,7 +91,7 @@ public class ServerManager implements NotificationListener {
.forEach(c -> {
try {
logger.debug("sending message to {}", c.visitor.getUid());
- c.session.sendMessage(new TextMessage(json));
+ c.sendMessage(new TextMessage(json));
} catch (IOException e) {
logger.warn("ws error", e);
}
@@ -103,7 +101,7 @@ public class ServerManager implements NotificationListener {
.forEach(c -> {
try {
logger.debug("sending message to legacy client {}", c.visitor.getUid());
- c.session.sendMessage(new TextMessage(json));
+ c.sendMessage(new TextMessage(json));
} catch (IOException e) {
logger.warn("ws error", e);
}
@@ -130,7 +128,7 @@ public class ServerManager implements NotificationListener {
.forEach(c -> {
try {
logger.debug("sending reply to {}", c.visitor.getUid());
- c.session.sendMessage(new TextMessage(json));
+ c.sendMessage(new TextMessage(json));
} catch (IOException e) {
logger.warn("ws error", e);
}
@@ -140,7 +138,7 @@ public class ServerManager implements NotificationListener {
.forEach(c -> {
try {
logger.debug("sending reply to legacy client {}", c.visitor.getUid());
- c.session.sendMessage(new TextMessage(json));
+ c.sendMessage(new TextMessage(json));
} catch (IOException e) {
logger.warn("ws error", e);
}
@@ -203,7 +201,7 @@ public class ServerManager implements NotificationListener {
serviceMessage.setMid(source.getMid());
serviceMessage.setUnread(false);
try {
- u.session.sendMessage(new TextMessage(jsonMapper.writeValueAsString(serviceMessage)));
+ u.sendMessage(new TextMessage(jsonMapper.writeValueAsString(serviceMessage)));
} catch (IOException e) {
logger.error("JSON error", e);
}
diff --git a/juick-server/src/main/java/com/juick/server/WebsocketManager.java b/juick-server/src/main/java/com/juick/server/WebsocketManager.java
index 360f6c1c..8974a49f 100644
--- a/juick-server/src/main/java/com/juick/server/WebsocketManager.java
+++ b/juick-server/src/main/java/com/juick/server/WebsocketManager.java
@@ -17,11 +17,9 @@
package com.juick.server;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.juick.User;
import com.juick.server.helpers.AnonymousUser;
import com.juick.server.helpers.CommandResult;
-import com.juick.server.util.HttpBadRequestException;
import com.juick.server.util.HttpForbiddenException;
import com.juick.server.util.HttpNotFoundException;
import com.juick.service.MessagesService;
@@ -43,13 +41,14 @@ import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
+import javax.annotation.Nonnull;
import javax.inject.Inject;
import java.io.IOException;
import java.net.URI;
import java.time.Instant;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
/**
* Created by vitalyster on 28.06.2016.
@@ -58,60 +57,48 @@ import java.util.List;
public class WebsocketManager extends TextWebSocketHandler {
private static final Logger logger = LoggerFactory.getLogger(WebsocketManager.class);
- private final List<SocketSubscribed> clients = Collections.synchronizedList(new LinkedList<>());
+ private final List<UserSession> clients = new CopyOnWriteArrayList<>();
@Inject
private UserService userService;
@Inject
private MessagesService messagesService;
- @Value("${service_user:juick}")
- private String serviceUser;
- @Inject
- private ObjectMapper jsonMapper;
@Inject
private CommandsManager commandsManager;
@Override
public void afterConnectionEstablished(WebSocketSession session) {
- URI hLocation;
- String hXRealIP;
- hLocation = session.getUri();
- HttpHeaders headers = session.getHandshakeHeaders();
- hXRealIP = headers.getOrDefault("X-Real-IP",
- Collections.singletonList(session.getRemoteAddress().toString())).get(0);
+ UserSession userSession = new UserSession(session);
+ URI hLocation = session.getUri();
// Auth
- User visitor = AnonymousUser.INSTANCE;
UriComponents uriComponents = UriComponentsBuilder.fromUri(hLocation).build();
List<String> hash = uriComponents.getQueryParams().get("hash");
if (hash != null && hash.get(0).length() == 16) {
- visitor = userService.getUserByHash(hash.get(0));
+ userSession.visitor = userService.getUserByHash(hash.get(0));
} else {
- logger.debug("wrong hash for {} from {}", visitor.getUid(), hXRealIP);
+ logger.debug("wrong hash for {} from {}", userSession.visitor.getUid(), userSession);
}
- int MID = 0;
- SocketSubscribed sockSubscr = null;
if (hLocation.getPath().equals("/ws/")) {
- logger.debug("user {} connected", visitor.getUid());
- sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, false);
+ logger.debug("user {} connected", userSession.visitor.getUid());
} else if (hLocation.getPath().equals("/ws/_all")) {
- logger.debug("user {} connected to legacy _all ({})", visitor.getUid(), hLocation.getPath());
- sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true);
- sockSubscr.allMessages = true;
+ logger.debug("user {} connected to legacy _all ({})", userSession.visitor.getUid(), hLocation.getPath());
+ userSession.legacy = true;
+ userSession.allMessages = true;
} else if (hLocation.getPath().equals("/ws/_replies")) {
- logger.debug("user {} connected to legacy _replies ({})", visitor.getUid(), hLocation.getPath());
- sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true);
- sockSubscr.allReplies = true;
+ logger.debug("user {} connected to legacy _replies ({})", userSession.visitor.getUid(), hLocation.getPath());
+ userSession.legacy = true;
+ userSession.allReplies = true;
} else if (hLocation.getPath().matches("^/ws/(\\d)+$")) {
- MID = NumberUtils.toInt(hLocation.getPath().substring(4), 0);
+ int MID = NumberUtils.toInt(hLocation.getPath().substring(4), 0);
if (MID > 0) {
- if (messagesService.canViewThread(MID, visitor.getUid())) {
- logger.debug("user {} connected to legacy thread ({}) from {}", visitor.getUid(), MID, hXRealIP);
- sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true);
- sockSubscr.MID = MID;
+ if (messagesService.canViewThread(MID, userSession.visitor.getUid())) {
+ logger.debug("user {} connected to legacy thread ({}) from {}", userSession.visitor.getUid(), MID, userSession);
+ userSession.legacy = true;
+ userSession.MID = MID;
} else {
throw new HttpForbiddenException();
}
@@ -119,32 +106,23 @@ public class WebsocketManager extends TextWebSocketHandler {
} else {
throw new HttpNotFoundException();
}
- if (sockSubscr != null) {
- synchronized (clients) {
- clients.add(sockSubscr);
- logger.debug("{} clients connected", clients.size());
- }
- } else {
- throw new HttpBadRequestException();
- }
+ clients.add(userSession);
+ logger.debug("{} clients connected", clients.size());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
- synchronized (clients) {
- logger.debug("session closed with status {}: {}", status.getCode(), status.getReason());
- clients.removeIf(c -> c.session.getDelegate().getId().equals(session.getId()));
- logger.debug("{} clients connected", clients.size());
- }
-
+ logger.debug("session closed with status {}: {}", status.getCode(), status.getReason());
+ clients.removeIf(c -> c.getDelegate().getId().equals(session.getId()));
+ logger.debug("{} clients connected", clients.size());
}
@Scheduled(fixedRate = 30000)
public void ping() {
clients.forEach(c -> {
try {
- if (c.session.isOpen()) {
- c.session.sendMessage(new PingMessage());
+ if (c.isOpen()) {
+ c.sendMessage(new PingMessage());
}
} catch (IOException e) {
logger.error("WebSocket PING exception", e);
@@ -154,26 +132,24 @@ public class WebsocketManager extends TextWebSocketHandler {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
- SocketSubscribed ws = clients.stream().filter(c -> c.session.getDelegate().equals(session))
+ UserSession ws = clients.stream().filter(c -> c.getDelegate().equals(session))
.findFirst().orElseThrow(IllegalStateException::new);
if (!ws.visitor.isAnonymous()) {
String command = message.getPayload().trim();
if (StringUtils.isNotEmpty(command)) {
CommandResult result = commandsManager.processCommand(ws.visitor, command, URI.create(""));
- ws.session.sendMessage(new TextMessage(result.getText()));
+ ws.sendMessage(new TextMessage(result.getText()));
}
} else {
- ws.session.sendMessage(new TextMessage("Authorization required"));
+ ws.sendMessage(new TextMessage("Authorization required"));
}
}
- public List<SocketSubscribed> getClients() {
+ public List<UserSession> getClients() {
return clients;
}
- class SocketSubscribed {
- ConcurrentWebSocketSessionDecorator session;
- String clientName;
+ class UserSession extends ConcurrentWebSocketSessionDecorator {
User visitor;
int MID;
boolean allMessages;
@@ -182,12 +158,18 @@ public class WebsocketManager extends TextWebSocketHandler {
Instant tsLastData;
boolean legacy;
- public SocketSubscribed(WebSocketSession session, String clientName, User visitor, boolean legacy) {
- this.session = new ConcurrentWebSocketSessionDecorator(session, 60000, 65536);
- this.clientName = clientName;
- this.visitor = visitor;
+ UserSession(WebSocketSession session) {
+ super(session, 60000, 65536);
+ this.visitor = AnonymousUser.INSTANCE;
tsConnected = tsLastData = Instant.now();
- this.legacy = legacy;
+ }
+
+ @Nonnull
+ @Override
+ public String toString() {
+ HttpHeaders headers = getHandshakeHeaders();
+ return headers.getOrDefault("X-Real-IP",
+ Collections.singletonList(getRemoteAddress().toString())).get(0);
}
}
}