From 331645f0fd7fbe7d9679d39dcce453cc3b2cab6e Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 28 Jun 2016 10:36:28 +0300 Subject: spring-websocket --- src/main/java/com/juick/ws/XMPPConnection.java | 142 +++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 src/main/java/com/juick/ws/XMPPConnection.java (limited to 'src/main/java/com/juick/ws/XMPPConnection.java') diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java new file mode 100644 index 00000000..9ed3d0cd --- /dev/null +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -0,0 +1,142 @@ +package com.juick.ws; + +import com.juick.User; +import com.juick.json.MessageSerializer; +import com.juick.server.SubscriptionsQueries; +import com.juick.xmpp.JID; +import com.juick.xmpp.Message; +import com.juick.xmpp.Stream; +import com.juick.xmpp.StreamComponent; +import com.juick.xmpp.extensions.JuickMessage; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.socket.TextMessage; + +import javax.inject.Inject; +import java.io.IOException; +import java.net.Socket; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; + +/** + * + * @author ugnich + */ +@Component +public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener { + private static final Logger logger = Logger.getLogger(XMPPConnection.class.getName()); + + JdbcTemplate sql; + Stream xmpp; + String xmppPassword; + MessageSerializer ms; + @Inject + WebsocketComponent ws; + + @Inject + public XMPPConnection(JdbcTemplate sql, String password) { + this.sql = sql; + xmppPassword = password; + ms = new MessageSerializer(); + new Thread(this).start(); + } + + @Override + public void run() { + try { + Socket socket = new Socket("localhost", 5347); + xmpp = new StreamComponent(new JID("", "ws.juick.com", ""), socket.getInputStream(), socket.getOutputStream(), xmppPassword); + xmpp.addChildParser(new JuickMessage()); + xmpp.addListener((Stream.StreamListener) this); + xmpp.addListener((Message.MessageListener) this); + xmpp.startParsing(); + } catch (IOException e) { + logger.log(Level.SEVERE, "XMPPConnection error", e); + } + } + + @Override + public void onStreamReady() { + logger.info("XMPP stream ready"); + } + + @Override + public void onStreamFail(String msg) { + logger.severe("XMPP stream failed: " + msg); + } + + @Override + public void onMessage(com.juick.xmpp.Message msg) { + JuickMessage jmsg = (JuickMessage) msg.getChild(JuickMessage.XMLNS); + if (jmsg != null) { + logger.info("got jmsg: " + ms.serialize(jmsg).toString()); + if (jmsg.getMID() == 0) { + int uid_to = 0; + try { + uid_to = Integer.parseInt(msg.to.Username); + } catch (Exception e) { + } + if (uid_to > 0) { + onJuickPM(uid_to, jmsg); + } + } else if (jmsg.getRID() == 0) { + onJuickMessagePost(jmsg); + } else { + onJuickMessageReply(jmsg); + } + } + } + + MessageSerializer messageSerializer = new MessageSerializer(); + + private void onJuickPM(int uid_to, com.juick.Message jmsg) { + String json = messageSerializer.serialize(jmsg).toString(); + synchronized (ws.clients) { + ws.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> { + try { + logger.info("sending pm to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + }); + } + } + + private void onJuickMessagePost(com.juick.Message jmsg) { + String json = messageSerializer.serialize(jmsg).toString(); + List uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()) + .stream().map(User::getUID).collect(Collectors.toList()); + + synchronized (ws.clients) { + ws.clients.stream().filter(c -> !c.legacy && uids.contains(c.visitor.getUID())).forEach(c -> { + try { + logger.info("sending message to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + }); + } + } + + private void onJuickMessageReply(com.juick.Message jmsg) { + String json = messageSerializer.serialize(jmsg).toString(); + + List threadUsers = + SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID()) + .stream().map(User::getUID).collect(Collectors.toList()); + synchronized (ws.clients) { + ws.clients.stream().filter(c -> !c.legacy && threadUsers.contains(c.visitor.getUID())).forEach(c -> { + try { + logger.info("sending reply to " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + }); + } + } +} -- cgit v1.2.3 From f2387bcc06dca0bf3368f32a99a6ec74a7381ccd Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 28 Jun 2016 11:23:34 +0300 Subject: fix xmpp component wiring --- src/main/java/com/juick/ws/XMPPConnection.java | 5 ++--- src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) (limited to 'src/main/java/com/juick/ws/XMPPConnection.java') diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java index 9ed3d0cd..5d4fbce9 100644 --- a/src/main/java/com/juick/ws/XMPPConnection.java +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -28,6 +28,7 @@ import java.util.stream.Collectors; public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener { private static final Logger logger = Logger.getLogger(XMPPConnection.class.getName()); + @Inject JdbcTemplate sql; Stream xmpp; String xmppPassword; @@ -35,9 +36,7 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. @Inject WebsocketComponent ws; - @Inject - public XMPPConnection(JdbcTemplate sql, String password) { - this.sql = sql; + public XMPPConnection(String password) { xmppPassword = password; ms = new MessageSerializer(); new Thread(this).start(); diff --git a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java index a298d0a0..9fd75bb0 100644 --- a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java +++ b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java @@ -43,7 +43,7 @@ public class WebsocketConfiguration extends WebMvcConfigurerAdapter implements W } @Bean XMPPConnection xmpp() { - return new XMPPConnection(jdbc(), env.getProperty("xmpp_password")); + return new XMPPConnection(env.getProperty("xmpp_password")); } @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { -- cgit v1.2.3 From a2faab09d3a8604ffe24dd337a3ab099f132e311 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 28 Jun 2016 11:34:25 +0300 Subject: fix xmpp component wiring, take 2 --- src/main/java/com/juick/ws/XMPPConnection.java | 7 +++++-- .../java/com/juick/ws/configuration/WebsocketConfiguration.java | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) (limited to 'src/main/java/com/juick/ws/XMPPConnection.java') diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java index 5d4fbce9..802271a0 100644 --- a/src/main/java/com/juick/ws/XMPPConnection.java +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -8,6 +8,7 @@ import com.juick.xmpp.Message; import com.juick.xmpp.Stream; import com.juick.xmpp.StreamComponent; import com.juick.xmpp.extensions.JuickMessage; +import org.springframework.core.env.Environment; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Component; import org.springframework.web.socket.TextMessage; @@ -30,14 +31,16 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. @Inject JdbcTemplate sql; + @Inject + Environment env; Stream xmpp; String xmppPassword; MessageSerializer ms; @Inject WebsocketComponent ws; - public XMPPConnection(String password) { - xmppPassword = password; + public XMPPConnection() { + xmppPassword = env.getProperty("xmpp_password"); ms = new MessageSerializer(); new Thread(this).start(); } diff --git a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java index 9fd75bb0..46ef8486 100644 --- a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java +++ b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java @@ -43,7 +43,7 @@ public class WebsocketConfiguration extends WebMvcConfigurerAdapter implements W } @Bean XMPPConnection xmpp() { - return new XMPPConnection(env.getProperty("xmpp_password")); + return new XMPPConnection(); } @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { -- cgit v1.2.3 From 9f2503cf924d79f00cb0956d37daf10e7a163163 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 28 Jun 2016 14:40:55 +0300 Subject: gretty --- build.gradle | 17 +++++++++++++++++ src/main/java/com/juick/ws/XMPPConnection.java | 5 ++--- .../juick/ws/configuration/WebsocketConfiguration.java | 4 ---- 3 files changed, 19 insertions(+), 7 deletions(-) (limited to 'src/main/java/com/juick/ws/XMPPConnection.java') diff --git a/build.gradle b/build.gradle index 7f0bbe52..bea96871 100644 --- a/build.gradle +++ b/build.gradle @@ -4,9 +4,19 @@ subprojects { mavenCentral() } } +buildscript { + repositories { + mavenCentral() + } + dependencies { + classpath 'org.akhikhl.gretty:gretty:+' + } +} + apply plugin: 'java' apply plugin: 'war' +apply plugin: 'org.akhikhl.gretty' repositories { mavenCentral() @@ -51,3 +61,10 @@ dependencies { } compileJava.options.encoding = 'UTF-8' + +gretty { + httpPort = 8080 + contextPath = '' + servletContainer = 'tomcat8' +} + diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java index 802271a0..8194862c 100644 --- a/src/main/java/com/juick/ws/XMPPConnection.java +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -31,15 +31,14 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. @Inject JdbcTemplate sql; - @Inject - Environment env; Stream xmpp; String xmppPassword; MessageSerializer ms; @Inject WebsocketComponent ws; - public XMPPConnection() { + @Inject + public XMPPConnection(Environment env) { xmppPassword = env.getProperty("xmpp_password"); ms = new MessageSerializer(); new Thread(this).start(); diff --git a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java index 46ef8486..b0393038 100644 --- a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java +++ b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java @@ -41,10 +41,6 @@ public class WebsocketConfiguration extends WebMvcConfigurerAdapter implements W dataSource.setUrl(env.getProperty("datasource_url")); return new JdbcTemplate(dataSource); } - @Bean - XMPPConnection xmpp() { - return new XMPPConnection(); - } @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(wsHandler(), "/"); -- cgit v1.2.3 From 0f907b3973e851970882a00da123f6b7a6a359fb Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 28 Jun 2016 16:08:16 +0300 Subject: send messages to legacy urls --- src/main/java/com/juick/ws/XMPPConnection.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'src/main/java/com/juick/ws/XMPPConnection.java') diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java index 8194862c..a6f4e2cb 100644 --- a/src/main/java/com/juick/ws/XMPPConnection.java +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -120,6 +120,14 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. logger.log(Level.WARNING, "ws error", e); } }); + ws.clients.stream().filter(c -> c.legacy && c.allMessages).forEach(c -> { + try { + logger.info("sending message to legacy client " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + }); } } @@ -138,6 +146,14 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. logger.log(Level.WARNING, "ws error", e); } }); + ws.clients.stream().filter(c -> (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())).forEach(c -> { + try { + logger.info("sending reply to legacy client " + c.visitor.getUID()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.log(Level.WARNING, "ws error", e); + } + }); } } } -- cgit v1.2.3 From e8621033c919f0920a52463c01293349752aedec Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Tue, 28 Jun 2016 16:47:52 +0300 Subject: destroy xmpp service --- src/main/java/com/juick/ws/XMPPConnection.java | 6 +++++- .../ws/configuration/WebsocketConfiguration.java | 20 ++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) (limited to 'src/main/java/com/juick/ws/XMPPConnection.java') diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java index a6f4e2cb..9ac22d45 100644 --- a/src/main/java/com/juick/ws/XMPPConnection.java +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -17,6 +17,8 @@ import javax.inject.Inject; import java.io.IOException; import java.net.Socket; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -36,12 +38,14 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. MessageSerializer ms; @Inject WebsocketComponent ws; + @Inject + ExecutorService xmppThreadService; @Inject public XMPPConnection(Environment env) { xmppPassword = env.getProperty("xmpp_password"); ms = new MessageSerializer(); - new Thread(this).start(); + xmppThreadService.submit(this); } @Override diff --git a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java index 63ff12cb..27bfcf36 100644 --- a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java +++ b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java @@ -2,6 +2,7 @@ package com.juick.ws.configuration; import com.juick.ws.WebsocketComponent; import com.juick.ws.XMPPConnection; +import org.springframework.beans.factory.DisposableBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @@ -17,6 +18,8 @@ import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import javax.inject.Inject; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Created by vitalyster on 28.06.2016. @@ -41,8 +44,25 @@ public class WebsocketConfiguration extends WebMvcConfigurerAdapter implements W dataSource.setUrl(env.getProperty("datasource_url")); return new JdbcTemplate(dataSource); } + @Bean + ExecutorService xmppThreadService() { + return Executors.newSingleThreadExecutor(); + } + @Bean + DestroyBean destroyBean() { + return new DestroyBean(); + } @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { registry.addHandler(wsHandler(), "/**"); } + class DestroyBean implements DisposableBean { + + @Inject + ExecutorService xmppThreadService; + @Override + public void destroy() throws Exception { + xmppThreadService.shutdownNow(); + } + } } -- cgit v1.2.3 From d131a99cb1641a4ea0c61b4caaee50a50c1ffd42 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Thu, 30 Jun 2016 09:25:16 +0300 Subject: xmpp fix, next --- src/main/java/com/juick/ws/XMPPConnection.java | 3 +-- src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java | 4 ---- 2 files changed, 1 insertion(+), 6 deletions(-) (limited to 'src/main/java/com/juick/ws/XMPPConnection.java') diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java index 9ac22d45..3122e715 100644 --- a/src/main/java/com/juick/ws/XMPPConnection.java +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -38,8 +38,7 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. MessageSerializer ms; @Inject WebsocketComponent ws; - @Inject - ExecutorService xmppThreadService; + ExecutorService xmppThreadService = Executors.newSingleThreadExecutor(); @Inject public XMPPConnection(Environment env) { diff --git a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java index 27bfcf36..0a1b83d2 100644 --- a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java +++ b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java @@ -45,10 +45,6 @@ public class WebsocketConfiguration extends WebMvcConfigurerAdapter implements W return new JdbcTemplate(dataSource); } @Bean - ExecutorService xmppThreadService() { - return Executors.newSingleThreadExecutor(); - } - @Bean DestroyBean destroyBean() { return new DestroyBean(); } -- cgit v1.2.3 From 614384564482c8a35a567dd092cf5674f853ebd3 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Thu, 30 Jun 2016 11:04:41 +0300 Subject: ws debug --- src/main/java/com/juick/ws/XMPPConnection.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'src/main/java/com/juick/ws/XMPPConnection.java') diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java index 3122e715..65008906 100644 --- a/src/main/java/com/juick/ws/XMPPConnection.java +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -113,8 +113,10 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. String json = messageSerializer.serialize(jmsg).toString(); List uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()) .stream().map(User::getUID).collect(Collectors.toList()); - + logger.info(String.format("%d users subscribed to %d", uids.size(), jmsg.getUser().getUID())); synchronized (ws.clients) { + Long legacycount = ws.clients.stream().filter(c -> c.legacy && c.allMessages).count(); + logger.info(String.format("%d legacy users watched %d", legacycount, jmsg.getMID())); ws.clients.stream().filter(c -> !c.legacy && uids.contains(c.visitor.getUID())).forEach(c -> { try { logger.info("sending message to " + c.visitor.getUID()); @@ -136,11 +138,14 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. private void onJuickMessageReply(com.juick.Message jmsg) { String json = messageSerializer.serialize(jmsg).toString(); - + logger.info("got reply: " + json); List threadUsers = SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID()) .stream().map(User::getUID).collect(Collectors.toList()); + logger.info(String.format("%d users subscribed to %d", threadUsers.size(), jmsg.getMID())); synchronized (ws.clients) { + Long legacycount = ws.clients.stream().filter(c -> c.legacy && c.allReplies).count(); + logger.info(String.format("%d legacy users watched %d", legacycount, jmsg.getMID())); ws.clients.stream().filter(c -> !c.legacy && threadUsers.contains(c.visitor.getUID())).forEach(c -> { try { logger.info("sending reply to " + c.visitor.getUID()); -- cgit v1.2.3 From 38040c8f5faaab4fe6d7055ed010533aa1860b07 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Thu, 30 Jun 2016 11:26:33 +0300 Subject: ws injection fix --- src/main/java/com/juick/ws/XMPPConnection.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/main/java/com/juick/ws/XMPPConnection.java') diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java index 65008906..ae70d956 100644 --- a/src/main/java/com/juick/ws/XMPPConnection.java +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -31,17 +31,17 @@ import java.util.stream.Collectors; public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener { private static final Logger logger = Logger.getLogger(XMPPConnection.class.getName()); - @Inject JdbcTemplate sql; Stream xmpp; String xmppPassword; MessageSerializer ms; - @Inject WebsocketComponent ws; ExecutorService xmppThreadService = Executors.newSingleThreadExecutor(); @Inject - public XMPPConnection(Environment env) { + public XMPPConnection(Environment env, WebsocketComponent ws, JdbcTemplate sql) { + this.sql = sql; + this.ws = ws; xmppPassword = env.getProperty("xmpp_password"); ms = new MessageSerializer(); xmppThreadService.submit(this); -- cgit v1.2.3 From dbceb8cb379c44366c1014d282a73457762aabe1 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Thu, 30 Jun 2016 11:46:44 +0300 Subject: ws injection fix, next --- src/main/java/com/juick/ws/XMPPConnection.java | 33 +++++++++------------- .../ws/configuration/WebsocketConfiguration.java | 14 +++++++-- 2 files changed, 25 insertions(+), 22 deletions(-) (limited to 'src/main/java/com/juick/ws/XMPPConnection.java') diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java index ae70d956..d5cb4eba 100644 --- a/src/main/java/com/juick/ws/XMPPConnection.java +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -17,8 +17,6 @@ import javax.inject.Inject; import java.io.IOException; import java.net.Socket; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -30,21 +28,18 @@ import java.util.stream.Collectors; @Component public class XMPPConnection implements Runnable, Stream.StreamListener, Message.MessageListener { private static final Logger logger = Logger.getLogger(XMPPConnection.class.getName()); - + @Inject JdbcTemplate sql; Stream xmpp; String xmppPassword; MessageSerializer ms; - WebsocketComponent ws; - ExecutorService xmppThreadService = Executors.newSingleThreadExecutor(); + WebsocketComponent wsHandler; @Inject - public XMPPConnection(Environment env, WebsocketComponent ws, JdbcTemplate sql) { - this.sql = sql; - this.ws = ws; + public XMPPConnection(Environment env, WebsocketComponent wsHandler) { + this.wsHandler = wsHandler; xmppPassword = env.getProperty("xmpp_password"); ms = new MessageSerializer(); - xmppThreadService.submit(this); } @Override @@ -97,8 +92,8 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. private void onJuickPM(int uid_to, com.juick.Message jmsg) { String json = messageSerializer.serialize(jmsg).toString(); - synchronized (ws.clients) { - ws.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> { + synchronized (wsHandler.clients) { + wsHandler.clients.stream().filter(c -> !c.legacy && c.visitor.getUID() == uid_to).forEach(c -> { try { logger.info("sending pm to " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); @@ -114,10 +109,10 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. List uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()) .stream().map(User::getUID).collect(Collectors.toList()); logger.info(String.format("%d users subscribed to %d", uids.size(), jmsg.getUser().getUID())); - synchronized (ws.clients) { - Long legacycount = ws.clients.stream().filter(c -> c.legacy && c.allMessages).count(); + synchronized (wsHandler.clients) { + Long legacycount = wsHandler.clients.stream().filter(c -> c.legacy && c.allMessages).count(); logger.info(String.format("%d legacy users watched %d", legacycount, jmsg.getMID())); - ws.clients.stream().filter(c -> !c.legacy && uids.contains(c.visitor.getUID())).forEach(c -> { + wsHandler.clients.stream().filter(c -> !c.legacy && uids.contains(c.visitor.getUID())).forEach(c -> { try { logger.info("sending message to " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); @@ -125,7 +120,7 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. logger.log(Level.WARNING, "ws error", e); } }); - ws.clients.stream().filter(c -> c.legacy && c.allMessages).forEach(c -> { + wsHandler.clients.stream().filter(c -> c.legacy && c.allMessages).forEach(c -> { try { logger.info("sending message to legacy client " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); @@ -143,10 +138,10 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID()) .stream().map(User::getUID).collect(Collectors.toList()); logger.info(String.format("%d users subscribed to %d", threadUsers.size(), jmsg.getMID())); - synchronized (ws.clients) { - Long legacycount = ws.clients.stream().filter(c -> c.legacy && c.allReplies).count(); + synchronized (wsHandler.clients) { + Long legacycount = wsHandler.clients.stream().filter(c -> c.legacy && c.allReplies).count(); logger.info(String.format("%d legacy users watched %d", legacycount, jmsg.getMID())); - ws.clients.stream().filter(c -> !c.legacy && threadUsers.contains(c.visitor.getUID())).forEach(c -> { + wsHandler.clients.stream().filter(c -> !c.legacy && threadUsers.contains(c.visitor.getUID())).forEach(c -> { try { logger.info("sending reply to " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); @@ -154,7 +149,7 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. logger.log(Level.WARNING, "ws error", e); } }); - ws.clients.stream().filter(c -> (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())).forEach(c -> { + wsHandler.clients.stream().filter(c -> (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())).forEach(c -> { try { logger.info("sending reply to legacy client " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); diff --git a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java index 0bb293f4..223f8d63 100644 --- a/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java +++ b/src/main/java/com/juick/ws/configuration/WebsocketConfiguration.java @@ -1,6 +1,7 @@ package com.juick.ws.configuration; import com.juick.ws.WebsocketComponent; +import com.juick.ws.XMPPConnection; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; @@ -10,12 +11,13 @@ import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.datasource.DriverManagerDataSource; import org.springframework.web.servlet.config.annotation.EnableWebMvc; import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter; -import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; import javax.inject.Inject; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Created by vitalyster on 28.06.2016. @@ -28,12 +30,18 @@ import javax.inject.Inject; public class WebsocketConfiguration extends WebMvcConfigurerAdapter implements WebSocketConfigurer { @Inject Environment env; - + ExecutorService xmppThread = Executors.newSingleThreadExecutor(); @Bean - WebSocketHandler wsHandler() { + WebsocketComponent wsHandler() { return new WebsocketComponent(); } @Bean + XMPPConnection xmpp() { + XMPPConnection xmpp = new XMPPConnection(env, wsHandler()); + xmppThread.submit(xmpp); + return xmpp; + } + @Bean JdbcTemplate jdbc() { DriverManagerDataSource dataSource = new DriverManagerDataSource(); dataSource.setDriverClassName(env.getProperty("datasource_driver", "com.mysql.jdbc.Driver")); -- cgit v1.2.3 From e3fdbbcd833e4be70b4e62e541cfd03b2f4d129a Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Thu, 30 Jun 2016 11:57:55 +0300 Subject: update deps --- .gitignore | 1 + deps/com.juick.xmpp | 2 +- src/main/java/com/juick/ws/XMPPConnection.java | 4 +- .../com/juick/xmpp/extensions/JuickMessage.java | 183 +++++++++++++++++++++ .../java/com/juick/xmpp/extensions/JuickUser.java | 75 +++++++++ 5 files changed, 262 insertions(+), 3 deletions(-) create mode 100644 src/main/java/com/juick/xmpp/extensions/JuickMessage.java create mode 100644 src/main/java/com/juick/xmpp/extensions/JuickUser.java (limited to 'src/main/java/com/juick/ws/XMPPConnection.java') diff --git a/.gitignore b/.gitignore index e3baf212..c26bb4f3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /dist/ /build/ .gradle/* +.idea/** \ No newline at end of file diff --git a/deps/com.juick.xmpp b/deps/com.juick.xmpp index a096ecc6..1d87d06b 160000 --- a/deps/com.juick.xmpp +++ b/deps/com.juick.xmpp @@ -1 +1 @@ -Subproject commit a096ecc6d011bf0f6c738b10d7d0a7319ffcf2dc +Subproject commit 1d87d06b72011cb26813cf084102a02e9dcd96e9 diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java index d5cb4eba..d0bdc42f 100644 --- a/src/main/java/com/juick/ws/XMPPConnection.java +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -62,8 +62,8 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. } @Override - public void onStreamFail(String msg) { - logger.severe("XMPP stream failed: " + msg); + public void onStreamFail(Exception ex) { + logger.log(Level.SEVERE, "XMPP stream failed", ex); } @Override diff --git a/src/main/java/com/juick/xmpp/extensions/JuickMessage.java b/src/main/java/com/juick/xmpp/extensions/JuickMessage.java new file mode 100644 index 00000000..a7fd1e35 --- /dev/null +++ b/src/main/java/com/juick/xmpp/extensions/JuickMessage.java @@ -0,0 +1,183 @@ +/* + * Juick + * Copyright (C) 2008-2011, Ugnich Anton + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package com.juick.xmpp.extensions; + +import com.juick.xmpp.utils.XmlUtils; +import com.juick.xmpp.*; +import java.io.IOException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.TimeZone; + +import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; + +/** + * + * @author Ugnich Anton + */ +public class JuickMessage extends com.juick.Message implements StanzaChild { + + public final static String XMLNS = "http://juick.com/message"; + public final static String TagName = "juick"; + + private SimpleDateFormat df; + + public JuickMessage() { + df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + df.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + public JuickMessage(com.juick.Message msg) { + super(msg); + df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + df.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + @Override + public String getXMLNS() { + return XMLNS; + } + + @Override + public JuickMessage parse(XmlPullParser parser) throws XmlPullParserException, IOException, ParseException { + JuickMessage jmsg = new JuickMessage(); + + final String sMID = parser.getAttributeValue(null, "mid"); + if (sMID != null) { + jmsg.setMID(Integer.parseInt(sMID)); + } + final String sRID = parser.getAttributeValue(null, "rid"); + if (sRID != null) { + jmsg.setRID(Integer.parseInt(sRID)); + } + final String sReplyTo = parser.getAttributeValue(null, "replyto"); + if (sReplyTo != null) { + jmsg.ReplyTo = Integer.parseInt(sReplyTo); + } + final String sPrivacy = parser.getAttributeValue(null, "privacy"); + if (sPrivacy != null) { + jmsg.Privacy = Integer.parseInt(sPrivacy); + } + final String sFriendsOnly = parser.getAttributeValue(null, "friendsonly"); + if (sFriendsOnly != null) { + jmsg.FriendsOnly = true; + } + final String sReadOnly = parser.getAttributeValue(null, "readonly"); + if (sReadOnly != null) { + jmsg.ReadOnly = true; + } + + jmsg.setDate(df.parse(parser.getAttributeValue(null, "ts"))); + jmsg.AttachmentType = parser.getAttributeValue(null, "attach"); + + while (parser.next() == XmlPullParser.START_TAG) { + final String tag = parser.getName(); + final String xmlns = parser.getNamespace(); + if (tag.equals("body")) { + jmsg.setText(XmlUtils.getTagText(parser)); + } else if (tag.equals(JuickUser.TagName) && xmlns != null && xmlns.equals(JuickUser.XMLNS)) { + jmsg.setUser(new JuickUser().parse(parser)); + } else if (tag.equals("tag")) { + jmsg.Tags.add(XmlUtils.getTagText(parser)); + } else { + XmlUtils.skip(parser); + } + } + return jmsg; + } + + @Override + public String toString() { + String ret = ""; + + ret = "<" + TagName + " xmlns=\"" + XMLNS + "\""; + if (getMID() > 0) { + ret += " mid=\"" + getMID() + "\""; + } + if (getRID() > 0) { + ret += " rid=\"" + getRID() + "\""; + } + if (ReplyTo > 0) { + ret += " replyto=\"" + ReplyTo + "\""; + } + ret += " privacy=\"" + Privacy + "\""; + if (FriendsOnly) { + ret += " friendsonly=\"1\""; + } + if (ReadOnly) { + ret += " readonly=\"1\""; + } + if (getDate() != null) { + ret += " ts=\"" + df.format(getDate()) + "\""; + } + if (AttachmentType != null) { + ret += " attach=\"" + AttachmentType + "\""; + } + ret += ">"; + if (getUser() != null) { + ret += JuickUser.toString(getUser()); + } + if (getText() != null) { + ret += "" + XmlUtils.escape(getText()) + ""; + } + if (!Tags.isEmpty()) { + for (String Tag : Tags) { + ret += "" + XmlUtils.escape(Tag) + ""; + } + } + ret += ""; + + return ret; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof JuickMessage)) { + return false; + } + JuickMessage jmsg = (JuickMessage) obj; + return (this.getMID() == jmsg.getMID() && this.getRID() == jmsg.getRID()); + } + + @Override + public int compareTo(Object obj) throws ClassCastException { + if (!(obj instanceof JuickMessage)) { + throw new ClassCastException(); + } + JuickMessage jmsg = (JuickMessage) obj; + + if (this.getMID() != jmsg.getMID()) { + if (this.getMID() > jmsg.getMID()) { + return -1; + } else { + return 1; + } + } + + if (this.getRID() != jmsg.getRID()) { + if (this.getRID() < jmsg.getRID()) { + return -1; + } else { + return 1; + } + } + + return 0; + } +} diff --git a/src/main/java/com/juick/xmpp/extensions/JuickUser.java b/src/main/java/com/juick/xmpp/extensions/JuickUser.java new file mode 100644 index 00000000..edc6749a --- /dev/null +++ b/src/main/java/com/juick/xmpp/extensions/JuickUser.java @@ -0,0 +1,75 @@ +/* + * Juick + * Copyright (C) 2008-2011, Ugnich Anton + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +package com.juick.xmpp.extensions; + +import com.juick.xmpp.utils.XmlUtils; +import com.juick.xmpp.*; +import java.io.IOException; +import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; + +/** + * + * @author Ugnich Anton + */ +public class JuickUser extends com.juick.User implements StanzaChild { + + public final static String XMLNS = "http://juick.com/user"; + public final static String TagName = "user"; + + public JuickUser() { + } + + public JuickUser(com.juick.User user) { + super(user); + } + + @Override + public String getXMLNS() { + return XMLNS; + } + + @Override + public JuickUser parse(final XmlPullParser parser) throws XmlPullParserException, IOException { + JuickUser juser = new JuickUser(); + String strUID = parser.getAttributeValue(null, "uid"); + if (strUID != null) { + juser.setUID(Integer.parseInt(strUID)); + } + juser.setUName(parser.getAttributeValue(null, "uname")); + XmlUtils.skip(parser); + return juser; + } + + public static String toString(com.juick.User user) { + String str = "<" + TagName + " xmlns='" + XMLNS + "'"; + if (user.getUID() > 0) { + str += " uid='" + user.getUID() + "'"; + } + if (user.getUName() != null && user.getUName().length() > 0) { + str += " uname='" + XmlUtils.escape(user.getUName()) + "'"; + } + str += "/>"; + return str; + } + + @Override + public String toString() { + return toString(this); + } +} -- cgit v1.2.3 From e0d4d97777962c6f1752d7772b57392ec53dfc21 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Thu, 30 Jun 2016 12:29:38 +0300 Subject: allow anonymous replies --- src/main/java/com/juick/ws/WebsocketComponent.java | 6 +++++- src/main/java/com/juick/ws/XMPPConnection.java | 25 ++++++++++++---------- 2 files changed, 19 insertions(+), 12 deletions(-) (limited to 'src/main/java/com/juick/ws/XMPPConnection.java') diff --git a/src/main/java/com/juick/ws/WebsocketComponent.java b/src/main/java/com/juick/ws/WebsocketComponent.java index d38f7b69..daf17753 100644 --- a/src/main/java/com/juick/ws/WebsocketComponent.java +++ b/src/main/java/com/juick/ws/WebsocketComponent.java @@ -97,7 +97,10 @@ public class WebsocketComponent extends TextWebSocketHandler { } } if (sockSubscr != null) { - clients.add(sockSubscr); + synchronized (clients) { + clients.add(sockSubscr); + logger.info(clients.size() + " clients connected"); + } } } @Override @@ -110,6 +113,7 @@ public class WebsocketComponent extends TextWebSocketHandler { } return false; }); + logger.info(clients.size() + " clients connected"); } } diff --git a/src/main/java/com/juick/ws/XMPPConnection.java b/src/main/java/com/juick/ws/XMPPConnection.java index d0bdc42f..4a80eec5 100644 --- a/src/main/java/com/juick/ws/XMPPConnection.java +++ b/src/main/java/com/juick/ws/XMPPConnection.java @@ -108,11 +108,11 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. String json = messageSerializer.serialize(jmsg).toString(); List uids = SubscriptionsQueries.getSubscribedUsers(sql, jmsg.getUser().getUID(), jmsg.getMID()) .stream().map(User::getUID).collect(Collectors.toList()); - logger.info(String.format("%d users subscribed to %d", uids.size(), jmsg.getUser().getUID())); synchronized (wsHandler.clients) { - Long legacycount = wsHandler.clients.stream().filter(c -> c.legacy && c.allMessages).count(); - logger.info(String.format("%d legacy users watched %d", legacycount, jmsg.getMID())); - wsHandler.clients.stream().filter(c -> !c.legacy && uids.contains(c.visitor.getUID())).forEach(c -> { + wsHandler.clients.stream().filter(c -> + (!c.legacy && c.visitor.getUID() == 0) // anonymous users + || (!c.legacy && uids.contains(c.visitor.getUID()))) // subscriptions + .forEach(c -> { try { logger.info("sending message to " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); @@ -120,7 +120,9 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. logger.log(Level.WARNING, "ws error", e); } }); - wsHandler.clients.stream().filter(c -> c.legacy && c.allMessages).forEach(c -> { + wsHandler.clients.stream().filter(c -> + c.legacy && c.allMessages) // legacy all posts + .forEach(c -> { try { logger.info("sending message to legacy client " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); @@ -133,15 +135,14 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. private void onJuickMessageReply(com.juick.Message jmsg) { String json = messageSerializer.serialize(jmsg).toString(); - logger.info("got reply: " + json); List threadUsers = SubscriptionsQueries.getUsersSubscribedToComments(sql, jmsg.getMID(), jmsg.getUser().getUID()) .stream().map(User::getUID).collect(Collectors.toList()); - logger.info(String.format("%d users subscribed to %d", threadUsers.size(), jmsg.getMID())); synchronized (wsHandler.clients) { - Long legacycount = wsHandler.clients.stream().filter(c -> c.legacy && c.allReplies).count(); - logger.info(String.format("%d legacy users watched %d", legacycount, jmsg.getMID())); - wsHandler.clients.stream().filter(c -> !c.legacy && threadUsers.contains(c.visitor.getUID())).forEach(c -> { + wsHandler.clients.stream().filter(c -> + (!c.legacy && c.visitor.getUID() == 0) // anonymous users + || (!c.legacy && threadUsers.contains(c.visitor.getUID()))) // subscriptions + .forEach(c -> { try { logger.info("sending reply to " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); @@ -149,7 +150,9 @@ public class XMPPConnection implements Runnable, Stream.StreamListener, Message. logger.log(Level.WARNING, "ws error", e); } }); - wsHandler.clients.stream().filter(c -> (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())).forEach(c -> { + wsHandler.clients.stream().filter(c -> + (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMID())) // legacy replies + .forEach(c -> { try { logger.info("sending reply to legacy client " + c.visitor.getUID()); c.session.sendMessage(new TextMessage(json)); -- cgit v1.2.3