aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/com/juick/ws
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2016-07-07 15:46:59 +0300
committerGravatar Vitaly Takmazov2016-07-07 15:46:59 +0300
commit1def88c0685785aef858f72a1dabd5f44a4ba3e2 (patch)
tree715ae14fedae69982fce00c9d466b634eb37a8a3 /src/main/java/com/juick/ws
parent20ca1d9e0c1b7b8a4822742f120d6c576454d0d9 (diff)
parent0aea7cc831121ca8551824d17d0abd8a32c46c30 (diff)
Merge remote-tracking branch 'ws/master'
Diffstat (limited to 'src/main/java/com/juick/ws')
-rw-r--r--src/main/java/com/juick/ws/WebsocketComponent.java140
-rw-r--r--src/main/java/com/juick/ws/XMPPConnection.java165
-rw-r--r--src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java55
-rw-r--r--src/main/java/com/juick/ws/configuration/WebsocketInitializer.java33
4 files changed, 393 insertions, 0 deletions
diff --git a/src/main/java/com/juick/ws/WebsocketComponent.java b/src/main/java/com/juick/ws/WebsocketComponent.java
new file mode 100644
index 00000000..e87b96a5
--- /dev/null
+++ b/src/main/java/com/juick/ws/WebsocketComponent.java
@@ -0,0 +1,140 @@
+package com.juick.ws;
+
+import com.juick.User;
+import com.juick.server.MessagesQueries;
+import com.juick.server.UserQueries;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.utils.URLEncodedUtils;
+import org.springframework.http.HttpHeaders;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.CloseStatus;
+import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.handler.TextWebSocketHandler;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Created by vitalyster on 28.06.2016.
+ */
+@Component
+public class WebsocketComponent extends TextWebSocketHandler {
+
+ @Inject
+ JdbcTemplate jdbc;
+
+ private static final Logger logger = Logger.getLogger(WebsocketComponent.class.getName());
+ final List<SocketSubscribed> clients = Collections.synchronizedList(new ArrayList<SocketSubscribed>());
+
+ @Override
+ public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+ URI hLocation;
+ String hXRealIP = "";
+
+ hLocation = session.getUri();
+ HttpHeaders headers = session.getHandshakeHeaders();
+ hXRealIP = headers.getOrDefault("X-Real-IP",
+ Collections.singletonList(session.getRemoteAddress().toString())).get(0);
+
+ // Auth
+ User visitor = new User();
+ List<NameValuePair> params = URLEncodedUtils.parse(hLocation, "UTF-8");
+ for (NameValuePair param : params) {
+ if (param.getName().equals("hash")) {
+ String hash = param.getValue();
+ if (hash.length() == 16) {
+ visitor = UserQueries.getUserByHash(jdbc, hash);
+ } else {
+ try {
+ logger.info(String.format("wrong hash for %d from %s", visitor.getUID(), hXRealIP));
+ session.close(new CloseStatus(403, "Forbidden"));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ }
+ break;
+ }
+ }
+ logger.info(String.format("user %d connected to %s from %s", visitor.getUID(), hLocation.getPath(), hXRealIP));
+
+ int MID = 0;
+ SocketSubscribed sockSubscr = null;
+ if (hLocation.getPath().equals("/")) {
+ logger.info(String.format("user %d connected", visitor.getUID()));
+ sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, false);
+ } else if (hLocation.getPath().equals("/_all")) {
+ logger.info(String.format("user %d connected to legacy _all (%s)", visitor.getUID(), hLocation.getPath()));
+ sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true);
+ sockSubscr.allMessages = true;
+ } else if (hLocation.getPath().equals("/_replies")) {
+ logger.info(String.format("user %d connected to legacy _replies (%s)", visitor.getUID(), hLocation.getPath()));
+ sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true);
+ sockSubscr.allReplies = true;
+ } else if (hLocation.getPath().matches("/\\d+$")) {
+ try {
+ MID = Integer.parseInt(hLocation.getPath().substring(1));
+ } catch (Exception e) {
+ }
+ if (MID > 0) {
+ if (MessagesQueries.canViewThread(jdbc, MID, visitor.getUID())) {
+ logger.info(String.format("user %d connected to legacy thread (%d) from %s", visitor.getUID(), MID, hXRealIP));
+ sockSubscr = new SocketSubscribed(session, hXRealIP, visitor, true);
+ sockSubscr.MID = MID;
+ } else {
+ try {
+ session.close(new CloseStatus(403, "Forbidden"));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ }
+ }
+ }
+ if (sockSubscr != null) {
+ synchronized (clients) {
+ clients.add(sockSubscr);
+ logger.info(clients.size() + " clients connected");
+ }
+ }
+ }
+ @Override
+ public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
+ synchronized (clients) {
+ clients.removeIf(c -> {
+ if (c.session.getId().equals(session.getId())) {
+ logger.info(String.format("session from %s closed with status %d", c.clientName, status.getCode()));
+ return true;
+ }
+ return false;
+ });
+ logger.info(clients.size() + " clients connected");
+ }
+
+ }
+ class SocketSubscribed {
+
+ WebSocketSession session;
+ String clientName;
+ User visitor;
+ int MID;
+ boolean allMessages;
+ boolean allReplies;
+ long tsConnected;
+ long tsLastData;
+ boolean legacy;
+
+ public SocketSubscribed(WebSocketSession session, String clientName, User visitor, boolean legacy) {
+ this.session = session;
+ this.clientName = clientName;
+ this.visitor = visitor;
+ tsConnected = tsLastData = System.currentTimeMillis();
+ this.legacy = legacy;
+ }
+ }
+}
diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java
new file mode 100644
index 00000000..4a80eec5
--- /dev/null
+++ b/src/main/java/com/juick/ws/XMPPConnection.java
@@ -0,0 +1,165 @@
+package com.juick.ws;
+
+import com.juick.User;
+import com.juick.json.MessageSerializer;
+import com.juick.server.SubscriptionsQueries;
+import com.juick.xmpp.JID;
+import com.juick.xmpp.Message;
+import com.juick.xmpp.Stream;
+import com.juick.xmpp.StreamComponent;
+import com.juick.xmpp.extensions.JuickMessage;
+import org.springframework.core.env.Environment;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.TextMessage;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+
+/**
+ *
+ * @author ugnich
+ */
+@Component
+public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener {
+ private static final Logger logger = Logger.getLogger(XMPPConnection.class.getName());
+ @Inject
+ JdbcTemplate sql;
+ Stream xmpp;
+ String xmppPassword;
+ MessageSerializer ms;
+ WebsocketComponent wsHandler;
+
+ @Inject
+ public XMPPConnection(Environment env, WebsocketComponent wsHandler) {
+ this.wsHandler = wsHandler;
+ xmppPassword = env.getProperty("xmpp_password");
+ ms = new MessageSerializer();
+ }
+
+ @Override
+ public void run() {
+ try {
+ Socket socket = new Socket("localhost", 5347);
+ xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword);
+ xmpp.addChildParser(new JuickMessage());
+ xmpp.addListener((Stream.StreamListener) this);
+ xmpp.addListener((Message.MessageListener) this);
+ xmpp.startParsing();
+ } catch (IOException e) {
+ logger.log(Level.SEVERE, "XMPPConnection error", e);
+ }
+ }
+
+ @Override
+ public void onStreamReady() {
+ logger.info("XMPP stream ready");
+ }
+
+ @Override
+ public void onStreamFail(Exception ex) {
+ logger.log(Level.SEVERE, "XMPP stream failed", ex);
+ }
+
+ @Override
+ public void onMessage(com.juick.xmpp.Message msg) {
+ JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS);
+ if (jmsg != null) {
+ logger.info("got jmsg: " + ms.serialize(jmsg).toString());
+ if (jmsg.getMID() == 0) {
+ int uid_to = 0;
+ try {
+ uid_to = Integer.parseInt(msg.to.Username);
+ } catch (Exception e) {
+ }
+ if (uid_to > 0) {
+ onJuickPM(uid_to, jmsg);
+ }
+ } else if (jmsg.getRID() == 0) {
+ onJuickMessagePost(jmsg);
+ } else {
+ onJuickMessageReply(jmsg);
+ }
+ }
+ }
+
+ MessageSerializer messageSerializer = new MessageSerializer();
+
+ private void onJuickPM(int uid_to, com.juick.Message jmsg) {
+ String json = messageSerializer.serialize(jmsg).toString();
+ synchronized (wsHandler.clients) {
+ wsHandler.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> {
+ try {
+ logger.info("sending pm to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ }
+ }
+
+ private void onJuickMessagePost(com.juick.Message jmsg) {
+ String json = messageSerializer.serialize(jmsg).toString();
+ List<Integer> uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID())
+ .stream().map(User::getUID).collect(Collectors.toList());
+ synchronized (wsHandler.clients) {
+ wsHandler.clients.stream().filter(c ->
+ (!c.legacy && c.visitor.getUID() == 0) // anonymous users
+ || (!c.legacy && uids.contains(c.visitor.getUID()))) // subscriptions
+ .forEach(c -> {
+ try {
+ logger.info("sending message to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ wsHandler.clients.stream().filter(c ->
+ c.legacy && c.allMessages) // legacy all posts
+ .forEach(c -> {
+ try {
+ logger.info("sending message to legacy client " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ }
+ }
+
+ private void onJuickMessageReply(com.juick.Message jmsg) {
+ String json = messageSerializer.serialize(jmsg).toString();
+ List<Integer> threadUsers =
+ SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID())
+ .stream().map(User::getUID).collect(Collectors.toList());
+ synchronized (wsHandler.clients) {
+ wsHandler.clients.stream().filter(c ->
+ (!c.legacy && c.visitor.getUID() == 0) // anonymous users
+ || (!c.legacy && threadUsers.contains(c.visitor.getUID()))) // subscriptions
+ .forEach(c -> {
+ try {
+ logger.info("sending reply to " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ wsHandler.clients.stream().filter(c ->
+ (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())) // legacy replies
+ .forEach(c -> {
+ try {
+ logger.info("sending reply to legacy client " + c.visitor.getUID());
+ c.session.sendMessage(new TextMessage(json));
+ } catch (IOException e) {
+ logger.log(Level.WARNING, "ws error", e);
+ }
+ });
+ }
+ }
+}
diff --git a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java
new file mode 100644
index 00000000..223f8d63
--- /dev/null
+++ b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java
@@ -0,0 +1,55 @@
+package com.juick.ws.configuration;
+
+import com.juick.ws.WebsocketComponent;
+import com.juick.ws.XMPPConnection;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+import org.springframework.core.env.Environment;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DriverManagerDataSource;
+import org.springframework.web.servlet.config.annotation.EnableWebMvc;
+import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
+import org.springframework.web.socket.config.annotation.EnableWebSocket;
+import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
+import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
+
+import javax.inject.Inject;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Created by vitalyster on 28.06.2016.
+ */
+@Configuration
+@EnableWebMvc
+@EnableWebSocket
+@ComponentScan(basePackages = {"com.juick"})
+@PropertySource("classpath:juick.conf")
+public class WebsocketConfiguration extends WebMvcConfigurerAdapter implements WebSocketConfigurer {
+ @Inject
+ Environment env;
+ ExecutorService xmppThread = Executors.newSingleThreadExecutor();
+ @Bean
+ WebsocketComponent wsHandler() {
+ return new WebsocketComponent();
+ }
+ @Bean
+ XMPPConnection xmpp() {
+ XMPPConnection xmpp = new XMPPConnection(env, wsHandler());
+ xmppThread.submit(xmpp);
+ return xmpp;
+ }
+ @Bean
+ JdbcTemplate jdbc() {
+ DriverManagerDataSource dataSource = new DriverManagerDataSource();
+ dataSource.setDriverClassName(env.getProperty("datasource_driver", "com.mysql.jdbc.Driver"));
+ dataSource.setUrl(env.getProperty("datasource_url"));
+ return new JdbcTemplate(dataSource);
+ }
+ @Override
+ public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
+ registry.addHandler(wsHandler(), "/**").setAllowedOrigins("*");
+ }
+}
diff --git a/src/main/java/com/juick/ws/configuration/WebsocketInitializer.java b/src/main/java/com/juick/ws/configuration/WebsocketInitializer.java
new file mode 100644
index 00000000..89017f8b
--- /dev/null
+++ b/src/main/java/com/juick/ws/configuration/WebsocketInitializer.java
@@ -0,0 +1,33 @@
+package com.juick.ws.configuration;
+import org.springframework.web.filter.CharacterEncodingFilter;
+import org.springframework.web.servlet.support.AbstractAnnotationConfigDispatcherServletInitializer;
+
+import javax.servlet.Filter;
+
+/**
+ * Created by vt on 09/02/16.
+ */
+public class WebsocketInitializer extends AbstractAnnotationConfigDispatcherServletInitializer {
+ @Override
+ protected Class<?>[] getRootConfigClasses() {
+ return new Class[] {WebsocketConfiguration.class};
+ }
+
+ @Override
+ protected Class<?>[] getServletConfigClasses() {
+ return null;
+ }
+
+ @Override
+ protected String[] getServletMappings() {
+ return new String[] {
+ "/"
+ };
+ }
+ @Override
+ protected Filter[] getServletFilters() {
+ CharacterEncodingFilter characterEncodingFilter = new CharacterEncodingFilter();
+ characterEncodingFilter.setEncoding("UTF-8");
+ return new Filter[] { characterEncodingFilter};
+ }
+}