aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/com/juick/ws
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2016-06-28 10:36:28 +0300
committerGravatar Vitaly Takmazov2016-06-28 10:36:28 +0300
commit331645f0fd7fbe7d9679d39dcce453cc3b2cab6e (patch)
tree53813518b0a831fc88162191525edc8003284bb1 /src/main/java/com/juick/ws
parent95e150755d1b11bd78fc604aa7283f2765b2ee46 (diff)
spring-websocket
Diffstat (limited to 'src/main/java/com/juick/ws')
-rw-r--r--src/main/java/com/juick/ws/WebsocketComponent.java134
-rw-r--r--src/main/java/com/juick/ws/XMPPConnection.java142
-rw-r--r--src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java49
3 files changed, 325 insertions, 0 deletions
diff --git a/src/main/java/com/juick/ws/WebsocketComponent.java b/src/main/java/com/juick/ws/WebsocketComponent.java
new file mode 100644
index 00000000..83e811a6
--- /dev/null
+++ b/src/main/java/com/juick/ws/WebsocketComponent.java
@@ -0,0 +1,134 @@
+package com.juick.ws;
+
+import com.juick.User;
+import com.juick.server.MessagesQueries;
+import com.juick.server.UserQueries;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.springframework.http.HttpHeaders;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Created by vitalyster on 28.06.2016.
+ */
+@Component
+public class WebsocketComponent extends TextWebSocketHandler {
+
+ @Inject
+ JdbcTemplate jdbc;
+
+ private static final Logger logger = Logger.getLogger(WebsocketComponent.class.getName());
+ final List<SocketSubscribed> clients = Collections.synchronizedList(new ArrayList<SocketSubscribed>());
+
+ @Override
+ public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+ URI hLocation;
+ String hXRealIP = "";
+
+ hLocation = session.getUri();
+ HttpHeaders headers = session.getHandshakeHeaders();
+ hXRealIP = headers.getOrDefault("X-Real-IP",
+ Collections.singletonList(session.getRemoteAddress().toString())).get(0);
+
+ // Auth
+ User visitor = new User();
+ List<NameValuePair> params = URLEncodedUtils.parse(hLocation, "UTF-8");
+ for (NameValuePair param : params) {
+ if (param.getName().equals("hash")) {
+ String hash = param.getValue();
+ if (hash.length() == 16) {
+ visitor = UserQueries.getUserByHash(jdbc, hash);
+ } else {
+ try {
+ logger.info(String.format("wrong hash for %d from %s", visitor.getUID(), hXRealIP));
+ session.close(new CloseStatus(403, "Forbidden"));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ }
+ break;
+ }
+ }
+ logger.info(String.format("user %d connected to %s from %s", visitor.getUID(), hLocation.getPath(), hXRealIP));
+
+ int MID = 0;
+ SocketSubscribed sockSubscr = null;
+ if (hLocation.getPath().equals("/") && visitor.getUID() > 0) {
+ logger.info(String.format("user %d connected", visitor.getUID()));
+ sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, false);
+ } else if (hLocation.getPath().equals("/_all")) {
+ logger.info(String.format("user %d connected to legacy _all (%s)", visitor.getUID(), hLocation.getPath()));
+ sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true);
+ sockSubscr.allMessages = true;
+ } else if (hLocation.getPath().equals("/_replies")) {
+ logger.info(String.format("user %d connected to legacy _replies (%s)", visitor.getUID(), hLocation.getPath()));
+ sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true);
+ sockSubscr.allReplies = true;
+ } else if (hLocation.getPath().matches("/\\d+$")) {
+ try {
+ MID = Integer.parseInt(hLocation.getPath().substring(8));
+ } catch (Exception e) {
+ }
+ if (MID > 0) {
+ if (MessagesQueries.canViewThread(jdbc, MID, visitor.getUID())) {
+ logger.info(String.format("user %d connected to legacy thread (%d) from %s", visitor.getUID(), MID, hXRealIP));
+ sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true);
+ sockSubscr.MID = MID;
+ } else {
+ try {
+ session.close(new CloseStatus(403, "Forbidden"));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ }
+ }
+ }
+ if (sockSubscr != null) {
+ synchronized (clients) {
+ clients.add(sockSubscr);
+ }
+ }
+ }
+ @Override
+ public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+ synchronized (clients) {
+ clients.stream().filter(c -> c.session.equals(session)).forEach(c -> {
+ logger.info(String.format("session %s closed with status %s", c.clientName, status.getCode()));
+ clients.remove(c);
+ });
+ }
+ }
+ class SocketSubscribed {
+
+ WebSocketSession session;
+ String clientName;
+ User visitor;
+ int MID;
+ boolean allMessages;
+ boolean allReplies;
+ long tsConnected;
+ long tsLastData;
+ boolean legacy;
+
+ public SocketSubscribed(WebSocketSession session, String clientName, User visitor, boolean legacy) {
+ this.session = session;
+ this.clientName = clientName;
+ this.visitor = visitor;
+ tsConnected = tsLastData = System.currentTimeMillis();
+ this.legacy = legacy;
+ }
+ }
+}
diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java
new file mode 100644
index 00000000..9ed3d0cd
--- /dev/null
+++ b/src/main/java/com/juick/ws/XMPPConnection.java
@@ -0,0 +1,142 @@
+package com.juick.ws;
+
+import com.juick.User;
+import com.juick.json.MessageSerializer;
+import com.juick.server.SubscriptionsQueries;
+import com.juick.xmpp.JID;
+import com.juick.xmpp.Message;
+import com.juick.xmpp.Stream;
+import com.juick.xmpp.StreamComponent;
+import com.juick.xmpp.extensions.JuickMessage;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.TextMessage;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+/**
+ *
+ * @author ugnich
+ */
+@Component
+public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener {
+ private static final Logger logger = Logger.getLogger(XMPPConnection.class.getName());
+
+ JdbcTemplate sql;
+ Stream xmpp;
+ String xmppPassword;
+ MessageSerializer ms;
+ @Inject
+ WebsocketComponent ws;
+
+ @Inject
+ public XMPPConnection(JdbcTemplate sql, String password) {
+ this.sql = sql;
+ xmppPassword = password;
+ ms = new MessageSerializer();
+ new Thread(this).start();
+ }
+
+ @Override
+ public void run() {
+ try {
+ Socket socket = new Socket("localhost", 5347);
+ xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword);
+ xmpp.addChildParser(new JuickMessage());
+ xmpp.addListener((Stream.StreamListener) this);
+ xmpp.addListener((Message.MessageListener) this);
+ xmpp.startParsing();
+ } catch (IOException e) {
+ logger.log(Level.SEVERE, "XMPPConnection error", e);
+ }
+ }
+
+ @Override
+ public void onStreamReady() {
+ logger.info("XMPP stream ready");
+ }
+
+ @Override
+ public void onStreamFail(String msg) {
+ logger.severe("XMPP stream failed: " + msg);
+ }
+
+ @Override
+ public void onMessage(com.juick.xmpp.Message msg) {
+ JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS);
+ if (jmsg != null) {
+ logger.info("got jmsg: " + ms.serialize(jmsg).toString());
+ if (jmsg.getMID() == 0) {
+ int uid_to = 0;
+ try {
+ uid_to = Integer.parseInt(msg.to.Username);
+ } catch (Exception e) {
+ }
+ if (uid_to > 0) {
+ onJuickPM(uid_to, jmsg);
+ }
+ } else if (jmsg.getRID() == 0) {
+ onJuickMessagePost(jmsg);
+ } else {
+ onJuickMessageReply(jmsg);
+ }
+ }
+ }
+
+ MessageSerializer messageSerializer = new MessageSerializer();
+
+ private void onJuickPM(int uid_to, com.juick.Message jmsg) {
+ String json = messageSerializer.serialize(jmsg).toString();
+ synchronized (ws.clients) {
+ ws.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> {
+ try {
+ logger.info("sending pm to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ }
+ }
+
+ private void onJuickMessagePost(com.juick.Message jmsg) {
+ String json = messageSerializer.serialize(jmsg).toString();
+ List<Integer> uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID())
+ .stream().map(User::getUID).collect(Collectors.toList());
+
+ synchronized (ws.clients) {
+ ws.clients.stream().filter(c -> !c.legacy && uids.contains(c.visitor.getUID())).forEach(c -> {
+ try {
+ logger.info("sending message to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ }
+ }
+
+ private void onJuickMessageReply(com.juick.Message jmsg) {
+ String json = messageSerializer.serialize(jmsg).toString();
+
+ List<Integer> threadUsers =
+ SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID())
+ .stream().map(User::getUID).collect(Collectors.toList());
+ synchronized (ws.clients) {
+ ws.clients.stream().filter(c -> !c.legacy && threadUsers.contains(c.visitor.getUID())).forEach(c -> {
+ try {
+ logger.info("sending reply to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ }
+ }
+}
diff --git a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java
new file mode 100644
index 00000000..a56e56f5
--- /dev/null
+++ b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java
@@ -0,0 +1,49 @@
+package com.juick.ws.configuration;
+
+import com.juick.ws.WebsocketComponent;
+import com.juick.ws.XMPPConnection;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.core.env.Environment;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DriverManagerDataSource;
+import org.springframework.web.socket.WebSocketHandler;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+
+import javax.inject.Inject;
+
+/**
+ * Created by vitalyster on 28.06.2016.
+ */
+@Configuration
+@EnableWebSocket
+@ComponentScan(basePackages = {"com.juick"})
+@PropertySource("classpath:juick.conf")
+public class WebsocketConfiguration implements WebSocketConfigurer {
+ @Inject
+ Environment env;
+
+ @Bean
+ WebSocketHandler wsHandler() {
+ return new WebsocketComponent();
+ }
+ @Bean
+ JdbcTemplate jdbc() {
+ DriverManagerDataSource dataSource = new DriverManagerDataSource();
+ dataSource.setDriverClassName(env.getProperty("datasource_driver", "com.mysql.jdbc.Driver"));
+ dataSource.setUrl(env.getProperty("datasource_url"));
+ return new JdbcTemplate(dataSource);
+ }
+ @Bean
+ XMPPConnection xmpp() {
+ return new XMPPConnection(jdbc(), env.getProperty("xmpp_password"));
+ }
+ @Override
+ public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+ registry.addHandler(wsHandler(), "/");
+ }
+}