From 2be5fc6e0397b53173aa21298142d5fa66877fa5 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Mon, 6 Feb 2017 12:14:29 +0300 Subject: juick-xmpp: ConnectionRouter uses babbler now --- .../com/juick/components/s2s/ConnectionRouter.java | 269 +++++++++------------ 1 file changed, 117 insertions(+), 152 deletions(-) (limited to 'juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java') diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java index f0fc3c60..16c2019a 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java @@ -2,44 +2,43 @@ package com.juick.components.s2s; import com.juick.User; import com.juick.components.XMPPServer; -import com.juick.xmpp.Message; -import com.juick.xmpp.extensions.JuickMessage; -import com.juick.xmpp.extensions.Nickname; -import com.juick.xmpp.extensions.XOOB; -import com.juick.xmpp.utils.XmlUtils; -import net.jodah.failsafe.Execution; -import net.jodah.failsafe.RetryPolicy; -import org.apache.commons.codec.digest.DigestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.xmlpull.v1.XmlPullParser; -import org.xmlpull.v1.XmlPullParserException; import rocks.xmpp.addr.Jid; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.net.InetSocketAddress; -import java.nio.channels.AsynchronousSocketChannel; -import java.nio.channels.Channels; +import rocks.xmpp.core.XmppException; +import rocks.xmpp.core.session.Extension; +import rocks.xmpp.core.session.XmppSessionConfiguration; +import rocks.xmpp.core.stanza.model.Message; +import rocks.xmpp.core.stanza.model.Stanza; +import rocks.xmpp.extensions.component.accept.ExternalComponent; +import rocks.xmpp.extensions.nick.model.Nickname; +import rocks.xmpp.extensions.oob.model.x.OobX; +import rocks.xmpp.util.XmppUtils; + +import javax.xml.bind.JAXBException; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; +import java.io.StringWriter; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.TimeUnit; /** * @author ugnich */ -public class ConnectionRouter extends Connection implements Runnable, AutoCloseable { +public class ConnectionRouter implements Runnable, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(ConnectionRouter.class); private String componentName; private int componentPort; private String password; - private Execution execution; + private ExternalComponent router; + private XMPPServer xmpp; public ConnectionRouter(XMPPServer s2s, String componentName, int componentPort, String password) throws Exception { - super(s2s); + this.xmpp = s2s; this.componentName = componentName; this.componentPort = componentPort; this.password = password; @@ -48,104 +47,65 @@ public class ConnectionRouter extends Connection implements Runnable, AutoClosea @Override public void run() { logger.info("stream router start"); - @SuppressWarnings("unchecked") RetryPolicy retryPolicy = new RetryPolicy() - .withBackoff(1, 30, TimeUnit.SECONDS) - .withJitter(0.1) - .retryOn(IOException.class, XmlPullParserException.class); - execution = new Execution(retryPolicy); - xmpp.service.submit(() -> { - while (!execution.isComplete()) { - try { - AsynchronousSocketChannel socket = AsynchronousSocketChannel.open(); - socket.connect(new InetSocketAddress("localhost", componentPort)).get(); - parser.setInput(new InputStreamReader(Channels.newInputStream(socket))); - parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); - writer = new OutputStreamWriter(Channels.newOutputStream(socket)); - - String msg = String.format("", componentName); - writer.write(msg); - writer.flush(); - - parser.next(); // stream:stream - streamID = parser.getAttributeValue(null, "id"); - if (streamID == null || streamID.isEmpty()) { - throw new Exception("fail on first packet"); - } - - msg = "" + DigestUtils.sha1Hex(streamID + password) + ""; - writer.write(msg); - writer.flush(); - - parser.next(); - if (!parser.getName().equals("handshake")) { - throw new Exception("no handshake"); - } - XmlUtils.skip(parser); - logger.info("stream router open"); - - while (parser.next() != XmlPullParser.END_DOCUMENT) { - if (parser.getEventType() != XmlPullParser.START_TAG) { - continue; - } - - String tag = parser.getName(); - String to = parser.getAttributeValue(null, "to"); - if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { - Jid jid = Jid.of(to); - if (jid.getDomain() != null) { - if (jid.getDomain().equals(componentName)) { - if (tag.equals("message")) { - Message xmsg = Message.parse(parser, xmpp.childParsers); - logger.info("stream router (process): {}", xmsg); - JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); - if (jmsg != null) { - if (jid.getLocal() != null && jid.getLocal().equals("recomm")) { - sendJuickRecommendation(jmsg); - } else { - if (jmsg.getRid() > 0) { - sendJuickComment(jmsg); - } else if (jmsg.getMid() > 0) { - sendJuickMessage(jmsg); - } - } - } - } else if (tag.equals("iq") || tag.equals("presence")) { - String xml = XmlUtils.parseToString(parser, true); - logger.info("stream router (stanza): {}", xml); - } - } else if (jid.getDomain().endsWith(xmpp.HOSTNAME) && (jid.getDomain().equals(xmpp.HOSTNAME) - || jid.getDomain().endsWith("." + xmpp.HOSTNAME))) { - String xml = XmlUtils.parseToString(parser, true); - logger.info("stream router: {}", xml); - } else { - String xml = XmlUtils.parseToString(parser, true); - logger.info("stream router (out): {}", xml); - xmpp.sendOut(jid.getDomain(), xml); - } - } else { - String invalidStanza = XmlUtils.parseToString(parser, true); - logger.info("stream router (no to): {}", invalidStanza); - } - } else { - String unknownStanza = XmlUtils.parseToString(parser, true); - logger.info("stream router (unknown): {}", unknownStanza); + XmppSessionConfiguration configuration = XmppSessionConfiguration.builder() + .extensions(Extension.of(com.juick.Message.class)) + .build(); + router = ExternalComponent.create(componentName, password, configuration, "localhost", componentPort); + router.addInboundMessageListener(e -> { + Message message = e.getMessage(); + Jid jid = message.getTo(); + if (jid.getDomain().equals(Jid.of(componentName).getDomain())) { + com.juick.Message jmsg = message.getExtension(com.juick.Message.class); + if (jmsg != null) { + if (jid.getLocal().equals("recomm")) { + sendJuickRecommendation(jmsg); + } else { + if (jmsg.getRid() > 0) { + sendJuickComment(jmsg); + } else if (jmsg.getMid() > 0) { + sendJuickMessage(jmsg); } } - - logger.warn("stream router finished"); - } catch (InterruptedException ex) { - logger.info("shutting down"); - execution.complete(); - } catch (Exception e) { - logger.warn("router error, reconnection ", e); - execution.recordFailure(e); } + } else if (jid.getDomain().endsWith(xmpp.HOSTNAME) && (jid.getDomain().equals(xmpp.HOSTNAME) + || jid.getDomain().endsWith("." + xmpp.HOSTNAME))) { + logger.info("skip"); + } else { + route(jid.getDomain(), message); + } + }); + xmpp.service.submit(() -> { + try { + router.connect(); + } catch (XmppException e) { + logger.warn("xmpp exception", e); } }); } - public void sendJuickMessage(JuickMessage jmsg) { + void route(String domain, Stanza stanza) { + try { + StringWriter stanzaWriter = new StringWriter(); + XMLStreamWriter xmppStreamWriter = XmppUtils.createXmppStreamWriter( + router.getConfiguration().getXmlOutputFactory().createXMLStreamWriter(stanzaWriter)); + router.createMarshaller().marshal(stanza, xmppStreamWriter); + xmppStreamWriter.flush(); + xmppStreamWriter.close(); + String xml = stanzaWriter.toString(); + logger.info("stream router (out): {}", xml); + xmpp.sendOut(domain, xml); + } catch (XMLStreamException | JAXBException e1) { + logger.info("jaxb exception", e1); + } + } + + void sendStanza(Stanza xml) { + router.send(xml); + } + + + + public void sendJuickMessage(com.juick.Message jmsg) { List jids = new ArrayList<>(); if (jmsg.FriendsOnly) { @@ -167,29 +127,31 @@ public class ConnectionRouter extends Connection implements Runnable, AutoClosea txt += jmsg.getText() + "\n\n"; txt += "#" + jmsg.getMid() + " http://juick.com/" + jmsg.getMid(); - Nickname nick = new Nickname(); - nick.Nickname = "@" + jmsg.getUser().getName(); + Nickname nick = new Nickname("@" + jmsg.getUser().getName()); - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = xmpp.getJid(); - msg.body = txt; - msg.type = Message.Type.chat; - msg.thread = "juick-" + jmsg.getMid(); - msg.addChild(jmsg); - msg.addChild(nick); + Message msg = new Message(); + msg.setFrom(xmpp.getJid()); + msg.setBody(txt); + msg.setType(Message.Type.CHAT); + msg.setThread("juick-" + jmsg.getMid()); + msg.addExtension(jmsg); + msg.addExtension(nick); if (attachment != null) { - XOOB oob = new XOOB(); - oob.URL = attachment; - msg.addChild(oob); + try { + OobX oob = new OobX(new URI(attachment)); + msg.addExtension(oob); + } catch (URISyntaxException e) { + logger.warn("uri exception", e); + } } for (String jid : jids) { - msg.to = Jid.of(jid); - xmpp.sendOut(msg); + msg.setTo(Jid.of(jid)); + route(msg.getTo().getDomain(), msg); } } - public void sendJuickComment(JuickMessage jmsg) { + public void sendJuickComment(com.juick.Message jmsg) { List users; String replyQuote; String replyTo; @@ -208,23 +170,22 @@ public class ConnectionRouter extends Connection implements Runnable, AutoClosea } txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMid() + "/" + jmsg.getRid() + " http://juick.com/" + jmsg.getMid() + "#" + jmsg.getRid(); - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = xmpp.getJid(); - msg.body = txt; - msg.type = Message.Type.chat; - msg.addChild(jmsg); + Message msg = new Message(); + msg.setFrom(xmpp.getJid()); + msg.setBody(txt); + msg.setType(Message.Type.CHAT); + msg.addExtension(jmsg); for (User user : users) { for (String jid : xmpp.userService.getJIDsbyUID(user.getUid())) { - msg.to = Jid.of(jid); - xmpp.sendOut(msg); + msg.setTo(Jid.of(jid)); + route(msg.getTo().getDomain(), msg); } } } - public void sendJuickRecommendation(JuickMessage recomm) { + public void sendJuickRecommendation(com.juick.Message recomm) { List users; - JuickMessage jmsg; - jmsg = new JuickMessage(xmpp.messagesService.getMessage(recomm.getMid())); + com.juick.Message jmsg = xmpp.messagesService.getMessage(recomm.getMid()); users = xmpp.subscriptionService.getUsersSubscribedToUserRecommendations(recomm.getUser().getUid(), recomm.getMid(), jmsg.getUser().getUid()); @@ -245,32 +206,36 @@ public class ConnectionRouter extends Connection implements Runnable, AutoClosea } txt += " http://juick.com/" + jmsg.getMid(); - Nickname nick = new Nickname(); - nick.Nickname = "@" + jmsg.getUser().getName(); + Nickname nick = new Nickname("@" + jmsg.getUser().getName()); - com.juick.xmpp.Message msg = new com.juick.xmpp.Message(); - msg.from = xmpp.getJid(); - msg.body = txt; - msg.type = Message.Type.chat; - msg.thread = "juick-" + jmsg.getMid(); - msg.addChild(jmsg); - msg.addChild(nick); + Message msg = new Message(); + msg.setFrom(xmpp.getJid()); + msg.setBody(txt); + msg.setType(Message.Type.CHAT); + msg.setThread("juick-" + jmsg.getMid()); + msg.addExtension(jmsg); + msg.addExtension(nick); if (attachment != null) { - XOOB oob = new XOOB(); - oob.URL = attachment; - msg.addChild(oob); + try { + OobX oob = new OobX(new URI(attachment)); + msg.addExtension(oob); + } catch (URISyntaxException e) { + logger.warn("uri exception", e); + } } for (User user : users) { for (String jid : xmpp.userService.getJIDsbyUID(user.getUid())) { - msg.to = Jid.of(jid); - xmpp.sendOut(msg); + msg.setTo(Jid.of(jid)); + route(msg.getTo().getDomain(), msg); } } } @Override public void close() throws Exception { - execution.complete(); + if (router != null) { + router.close(); + } } } -- cgit v1.2.3