diff options
-rw-r--r-- | juick-server/src/main/java/com/juick/server/ServerManager.java | 211 |
1 files changed, 102 insertions, 109 deletions
diff --git a/juick-server/src/main/java/com/juick/server/ServerManager.java b/juick-server/src/main/java/com/juick/server/ServerManager.java index ca22aa35..efa4b3f0 100644 --- a/juick-server/src/main/java/com/juick/server/ServerManager.java +++ b/juick-server/src/main/java/com/juick/server/ServerManager.java @@ -19,6 +19,7 @@ package com.juick.server; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.juick.User; +import com.juick.server.component.MessageEvent; import com.juick.service.MessagesService; import com.juick.service.SubscriptionService; import org.apache.commons.lang3.StringUtils; @@ -26,6 +27,7 @@ import org.apache.commons.lang3.math.NumberUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.web.socket.TextMessage; import rocks.xmpp.addr.Jid; @@ -53,7 +55,7 @@ import java.util.stream.Collectors; * @author Ugnich Anton */ @Component -public class ServerManager implements AutoCloseable { +public class ServerManager implements AutoCloseable, ApplicationListener<MessageEvent> { private static Logger logger = LoggerFactory.getLogger(ServerManager.class); private ExternalComponent xmpp; @@ -105,45 +107,6 @@ public class ServerManager implements AutoCloseable { .extensions(Extension.of(com.juick.Message.class)) .build(); xmpp = ExternalComponent.create(jid, password, configuration, host, port); - xmpp.addInboundMessageListener(e -> { - try { - Message msg = e.getMessage(); - com.juick.Message jmsg = msg.getExtension(com.juick.Message.class); - if (jmsg != null) { - if (logger.isInfoEnabled()) { // prevent writeValueAsString execution if log is disabled - try { - StringWriter stanzaWriter = new StringWriter(); - XMLStreamWriter xmppStreamWriter = XmppUtils.createXmppStreamWriter( - xmpp.getConfiguration().getXmlOutputFactory().createXMLStreamWriter(stanzaWriter)); - xmpp.createMarshaller().marshal(msg, xmppStreamWriter); - xmppStreamWriter.flush(); - xmppStreamWriter.close(); - logger.info("got msg: {}", stanzaWriter.toString()); - } catch (XMLStreamException e1) { - logger.info("jaxb exception", e1); - } - - } - if (jmsg.getMid() == 0) { - int uid_to = NumberUtils.toInt(msg.getTo().getLocal(), 0); - if (uid_to > 0) { - onJuickPM(uid_to, jmsg); - } - } else if (jmsg.getRid() == 0) { - // to get full message with attachment, etc. - onJuickMessagePost(messagesService.getMessage(jmsg.getMid())); - } else { - // to get quote and attachment - com.juick.Message reply = messagesService.getReply(jmsg.getMid(), jmsg.getRid()); - onJuickMessageReply(reply); - } - } - } catch (JsonProcessingException ex) { - logger.error("mapper exception", ex); - } catch (JAXBException exc) { - logger.error("jaxb exception", exc); - } - }); try { xmpp.connect(); } catch (XmppException e) { @@ -178,81 +141,111 @@ public class ServerManager implements AutoCloseable { sendMessage(xmsg); } - private void onJuickPM(final int uid_to, final com.juick.Message jmsg) throws JsonProcessingException { - String json = jsonMapper.writeValueAsString(jmsg); - synchronized (wsHandler.getClients()) { - wsHandler.getClients().stream().filter(c -> - (!c.legacy && c.visitor.getUid() == uid_to) || c.visitor.getName().equals(serviceUser)) - .forEach(c -> { - try { - logger.debug("sending pm to {}", c.visitor.getUid()); - c.session.sendMessage(new TextMessage(json)); - } catch (IOException e) { - logger.warn("ws error", e); - } - }); + private void onJuickPM(final int uid_to, final com.juick.Message jmsg) { + try { + String json = jsonMapper.writeValueAsString(jmsg); + synchronized (wsHandler.getClients()) { + wsHandler.getClients().stream().filter(c -> + (!c.legacy && c.visitor.getUid() == uid_to) || c.visitor.getName().equals(serviceUser)) + .forEach(c -> { + try { + logger.debug("sending pm to {}", c.visitor.getUid()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); + } + } catch (JsonProcessingException e) { + logger.warn("Invalid JSON", e); + } + } + + private void onJuickMessagePost(final com.juick.Message jmsg) { + try { + String json = jsonMapper.writeValueAsString(jmsg); + List<Integer> uids = subscriptionService.getSubscribedUsers(jmsg.getUser().getUid(), jmsg.getMid()) + .stream().map(User::getUid).collect(Collectors.toList()); + synchronized (wsHandler.getClients()) { + wsHandler.getClients().stream().filter(c -> + (!c.legacy && c.visitor.getUid() == 0) // anonymous users + || c.visitor.getName().equals(serviceUser) // services + || (!c.legacy && uids.contains(c.visitor.getUid()))) // subscriptions + .forEach(c -> { + try { + logger.debug("sending message to {}", c.visitor.getUid()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); + wsHandler.getClients().stream().filter(c -> + c.legacy && c.allMessages) // legacy all posts + .forEach(c -> { + try { + logger.debug("sending message to legacy client {}", c.visitor.getUid()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); + } + } catch (JsonProcessingException e) { + logger.warn("Invalid JSON", e); } } - private void onJuickMessagePost(final com.juick.Message jmsg) throws JsonProcessingException { - String json = jsonMapper.writeValueAsString(jmsg); - List<Integer> uids = subscriptionService.getSubscribedUsers(jmsg.getUser().getUid(), jmsg.getMid()) - .stream().map(User::getUid).collect(Collectors.toList()); - synchronized (wsHandler.getClients()) { - wsHandler.getClients().stream().filter(c -> - (!c.legacy && c.visitor.getUid() == 0) // anonymous users - || c.visitor.getName().equals(serviceUser) // services - || (!c.legacy && uids.contains(c.visitor.getUid()))) // subscriptions - .forEach(c -> { - try { - logger.debug("sending message to {}", c.visitor.getUid()); - c.session.sendMessage(new TextMessage(json)); - } catch (IOException e) { - logger.warn("ws error", e); - } - }); - wsHandler.getClients().stream().filter(c -> - c.legacy && c.allMessages) // legacy all posts - .forEach(c -> { - try { - logger.debug("sending message to legacy client {}", c.visitor.getUid()); - c.session.sendMessage(new TextMessage(json)); - } catch (IOException e) { - logger.warn("ws error", e); - } - }); + private void onJuickMessageReply(final com.juick.Message jmsg) { + try { + + String json = jsonMapper.writeValueAsString(jmsg); + com.juick.Message op = messagesService.getMessage(jmsg.getMid()); + List<Integer> threadUsers = + subscriptionService.getUsersSubscribedToComments(op, jmsg) + .stream().map(User::getUid).collect(Collectors.toList()); + synchronized (wsHandler.getClients()) { + wsHandler.getClients().stream().filter(c -> + (!c.legacy && c.visitor.getUid() == 0) // anonymous users + || c.visitor.getName().equals(serviceUser) // services + || (!c.legacy && threadUsers.contains(c.visitor.getUid()))) // subscriptions + .forEach(c -> { + try { + logger.debug("sending reply to {}", c.visitor.getUid()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); + wsHandler.getClients().stream().filter(c -> + (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMid())) // legacy replies + .forEach(c -> { + try { + logger.debug("sending reply to legacy client {}", c.visitor.getUid()); + c.session.sendMessage(new TextMessage(json)); + } catch (IOException e) { + logger.warn("ws error", e); + } + }); + } + } catch (JsonProcessingException e) { + logger.warn("Invalid JSON", e); } } - private void onJuickMessageReply(final com.juick.Message jmsg) throws JsonProcessingException { - String json = jsonMapper.writeValueAsString(jmsg); - com.juick.Message op = messagesService.getMessage(jmsg.getMid()); - List<Integer> threadUsers = - subscriptionService.getUsersSubscribedToComments(op, jmsg) - .stream().map(User::getUid).collect(Collectors.toList()); - synchronized (wsHandler.getClients()) { - wsHandler.getClients().stream().filter(c -> - (!c.legacy && c.visitor.getUid() == 0) // anonymous users - || c.visitor.getName().equals(serviceUser) // services - || (!c.legacy && threadUsers.contains(c.visitor.getUid()))) // subscriptions - .forEach(c -> { - try { - logger.debug("sending reply to {}", c.visitor.getUid()); - c.session.sendMessage(new TextMessage(json)); - } catch (IOException e) { - logger.warn("ws error", e); - } - }); - wsHandler.getClients().stream().filter(c -> - (c.legacy && c.allReplies) || (c.legacy && c.MID == jmsg.getMid())) // legacy replies - .forEach(c -> { - try { - logger.debug("sending reply to legacy client {}", c.visitor.getUid()); - c.session.sendMessage(new TextMessage(json)); - } catch (IOException e) { - logger.warn("ws error", e); - } - }); + @Override + public void onApplicationEvent(MessageEvent event) { + com.juick.Message jmsg = event.getMessage(); + if (jmsg.getMid() == 0) { + if (jmsg.getTo().getUid() > 0) { + onJuickPM(jmsg.getTo().getUid(), jmsg); + } + } else if (jmsg.getRid() == 0) { + // to get full message with attachment, etc. + onJuickMessagePost(messagesService.getMessage(jmsg.getMid())); + } else { + // to get quote and attachment + com.juick.Message reply = messagesService.getReply(jmsg.getMid(), jmsg.getRid()); + onJuickMessageReply(reply); } } } |