aboutsummaryrefslogtreecommitdiff
path: root/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java
diff options
context:
space:
mode:
authorGravatar Vitaly Takmazov2017-02-06 12:14:29 +0300
committerGravatar Vitaly Takmazov2017-02-06 15:19:23 +0300
commit2be5fc6e0397b53173aa21298142d5fa66877fa5 (patch)
treede44753a0eb0ea26ef8ef5af04c4a5b2a7af50e7 /juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java
parentecd0131695cf8c7360c4d5675e3fde22325fc4df (diff)
juick-xmpp: ConnectionRouter uses babbler now
Diffstat (limited to 'juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java')
-rw-r--r--juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java269
1 files changed, 117 insertions, 152 deletions
diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java
index f0fc3c60..16c2019a 100644
--- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java
+++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java
@@ -2,44 +2,43 @@ package com.juick.components.s2s;
import com.juick.User;
import com.juick.components.XMPPServer;
-import com.juick.xmpp.Message;
-import com.juick.xmpp.extensions.JuickMessage;
-import com.juick.xmpp.extensions.Nickname;
-import com.juick.xmpp.extensions.XOOB;
-import com.juick.xmpp.utils.XmlUtils;
-import net.jodah.failsafe.Execution;
-import net.jodah.failsafe.RetryPolicy;
-import org.apache.commons.codec.digest.DigestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.xmlpull.v1.XmlPullParser;
-import org.xmlpull.v1.XmlPullParserException;
import rocks.xmpp.addr.Jid;
-
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.net.InetSocketAddress;
-import java.nio.channels.AsynchronousSocketChannel;
-import java.nio.channels.Channels;
+import rocks.xmpp.core.XmppException;
+import rocks.xmpp.core.session.Extension;
+import rocks.xmpp.core.session.XmppSessionConfiguration;
+import rocks.xmpp.core.stanza.model.Message;
+import rocks.xmpp.core.stanza.model.Stanza;
+import rocks.xmpp.extensions.component.accept.ExternalComponent;
+import rocks.xmpp.extensions.nick.model.Nickname;
+import rocks.xmpp.extensions.oob.model.x.OobX;
+import rocks.xmpp.util.XmppUtils;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import java.io.StringWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
/**
* @author ugnich
*/
-public class ConnectionRouter extends Connection implements Runnable, AutoCloseable {
+public class ConnectionRouter implements Runnable, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ConnectionRouter.class);
private String componentName;
private int componentPort;
private String password;
- private Execution execution;
+ private ExternalComponent router;
+ private XMPPServer xmpp;
public ConnectionRouter(XMPPServer s2s, String componentName, int componentPort, String password) throws Exception {
- super(s2s);
+ this.xmpp = s2s;
this.componentName = componentName;
this.componentPort = componentPort;
this.password = password;
@@ -48,104 +47,65 @@ public class ConnectionRouter extends Connection implements Runnable, AutoClosea
@Override
public void run() {
logger.info("stream router start");
- @SuppressWarnings("unchecked") RetryPolicy retryPolicy = new RetryPolicy()
- .withBackoff(1, 30, TimeUnit.SECONDS)
- .withJitter(0.1)
- .retryOn(IOException.class, XmlPullParserException.class);
- execution = new Execution(retryPolicy);
- xmpp.service.submit(() -> {
- while (!execution.isComplete()) {
- try {
- AsynchronousSocketChannel socket = AsynchronousSocketChannel.open();
- socket.connect(new InetSocketAddress("localhost", componentPort)).get();
- parser.setInput(new InputStreamReader(Channels.newInputStream(socket)));
- parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true);
- writer = new OutputStreamWriter(Channels.newOutputStream(socket));
-
- String msg = String.format("<stream:stream xmlns='jabber:component:accept' " +
- "xmlns:stream='http://etherx.jabber.org/streams' to='%s'>", componentName);
- writer.write(msg);
- writer.flush();
-
- parser.next(); // stream:stream
- streamID = parser.getAttributeValue(null, "id");
- if (streamID == null || streamID.isEmpty()) {
- throw new Exception("fail on first packet");
- }
-
- msg = "<handshake>" + DigestUtils.sha1Hex(streamID + password) + "</handshake>";
- writer.write(msg);
- writer.flush();
-
- parser.next();
- if (!parser.getName().equals("handshake")) {
- throw new Exception("no handshake");
- }
- XmlUtils.skip(parser);
- logger.info("stream router open");
-
- while (parser.next() != XmlPullParser.END_DOCUMENT) {
- if (parser.getEventType() != XmlPullParser.START_TAG) {
- continue;
- }
-
- String tag = parser.getName();
- String to = parser.getAttributeValue(null, "to");
- if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) {
- Jid jid = Jid.of(to);
- if (jid.getDomain() != null) {
- if (jid.getDomain().equals(componentName)) {
- if (tag.equals("message")) {
- Message xmsg = Message.parse(parser, xmpp.childParsers);
- logger.info("stream router (process): {}", xmsg);
- JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS);
- if (jmsg != null) {
- if (jid.getLocal() != null && jid.getLocal().equals("recomm")) {
- sendJuickRecommendation(jmsg);
- } else {
- if (jmsg.getRid() > 0) {
- sendJuickComment(jmsg);
- } else if (jmsg.getMid() > 0) {
- sendJuickMessage(jmsg);
- }
- }
- }
- } else if (tag.equals("iq") || tag.equals("presence")) {
- String xml = XmlUtils.parseToString(parser, true);
- logger.info("stream router (stanza): {}", xml);
- }
- } else if (jid.getDomain().endsWith(xmpp.HOSTNAME) && (jid.getDomain().equals(xmpp.HOSTNAME)
- || jid.getDomain().endsWith("." + xmpp.HOSTNAME))) {
- String xml = XmlUtils.parseToString(parser, true);
- logger.info("stream router: {}", xml);
- } else {
- String xml = XmlUtils.parseToString(parser, true);
- logger.info("stream router (out): {}", xml);
- xmpp.sendOut(jid.getDomain(), xml);
- }
- } else {
- String invalidStanza = XmlUtils.parseToString(parser, true);
- logger.info("stream router (no to): {}", invalidStanza);
- }
- } else {
- String unknownStanza = XmlUtils.parseToString(parser, true);
- logger.info("stream router (unknown): {}", unknownStanza);
+ XmppSessionConfiguration configuration = XmppSessionConfiguration.builder()
+ .extensions(Extension.of(com.juick.Message.class))
+ .build();
+ router = ExternalComponent.create(componentName, password, configuration, "localhost", componentPort);
+ router.addInboundMessageListener(e -> {
+ Message message = e.getMessage();
+ Jid jid = message.getTo();
+ if (jid.getDomain().equals(Jid.of(componentName).getDomain())) {
+ com.juick.Message jmsg = message.getExtension(com.juick.Message.class);
+ if (jmsg != null) {
+ if (jid.getLocal().equals("recomm")) {
+ sendJuickRecommendation(jmsg);
+ } else {
+ if (jmsg.getRid() > 0) {
+ sendJuickComment(jmsg);
+ } else if (jmsg.getMid() > 0) {
+ sendJuickMessage(jmsg);
}
}
-
- logger.warn("stream router finished");
- } catch (InterruptedException ex) {
- logger.info("shutting down");
- execution.complete();
- } catch (Exception e) {
- logger.warn("router error, reconnection ", e);
- execution.recordFailure(e);
}
+ } else if (jid.getDomain().endsWith(xmpp.HOSTNAME) && (jid.getDomain().equals(xmpp.HOSTNAME)
+ || jid.getDomain().endsWith("." + xmpp.HOSTNAME))) {
+ logger.info("skip");
+ } else {
+ route(jid.getDomain(), message);
+ }
+ });
+ xmpp.service.submit(() -> {
+ try {
+ router.connect();
+ } catch (XmppException e) {
+ logger.warn("xmpp exception", e);
}
});
}
- public void sendJuickMessage(JuickMessage jmsg) {
+ void route(String domain, Stanza stanza) {
+ try {
+ StringWriter stanzaWriter = new StringWriter();
+ XMLStreamWriter xmppStreamWriter = XmppUtils.createXmppStreamWriter(
+ router.getConfiguration().getXmlOutputFactory().createXMLStreamWriter(stanzaWriter));
+ router.createMarshaller().marshal(stanza, xmppStreamWriter);
+ xmppStreamWriter.flush();
+ xmppStreamWriter.close();
+ String xml = stanzaWriter.toString();
+ logger.info("stream router (out): {}", xml);
+ xmpp.sendOut(domain, xml);
+ } catch (XMLStreamException | JAXBException e1) {
+ logger.info("jaxb exception", e1);
+ }
+ }
+
+ void sendStanza(Stanza xml) {
+ router.send(xml);
+ }
+
+
+
+ public void sendJuickMessage(com.juick.Message jmsg) {
List<String> jids = new ArrayList<>();
if (jmsg.FriendsOnly) {
@@ -167,29 +127,31 @@ public class ConnectionRouter extends Connection implements Runnable, AutoClosea
txt += jmsg.getText() + "\n\n";
txt += "#" + jmsg.getMid() + " http://juick.com/" + jmsg.getMid();
- Nickname nick = new Nickname();
- nick.Nickname = "@" + jmsg.getUser().getName();
+ Nickname nick = new Nickname("@" + jmsg.getUser().getName());
- com.juick.xmpp.Message msg = new com.juick.xmpp.Message();
- msg.from = xmpp.getJid();
- msg.body = txt;
- msg.type = Message.Type.chat;
- msg.thread = "juick-" + jmsg.getMid();
- msg.addChild(jmsg);
- msg.addChild(nick);
+ Message msg = new Message();
+ msg.setFrom(xmpp.getJid());
+ msg.setBody(txt);
+ msg.setType(Message.Type.CHAT);
+ msg.setThread("juick-" + jmsg.getMid());
+ msg.addExtension(jmsg);
+ msg.addExtension(nick);
if (attachment != null) {
- XOOB oob = new XOOB();
- oob.URL = attachment;
- msg.addChild(oob);
+ try {
+ OobX oob = new OobX(new URI(attachment));
+ msg.addExtension(oob);
+ } catch (URISyntaxException e) {
+ logger.warn("uri exception", e);
+ }
}
for (String jid : jids) {
- msg.to = Jid.of(jid);
- xmpp.sendOut(msg);
+ msg.setTo(Jid.of(jid));
+ route(msg.getTo().getDomain(), msg);
}
}
- public void sendJuickComment(JuickMessage jmsg) {
+ public void sendJuickComment(com.juick.Message jmsg) {
List<User> users;
String replyQuote;
String replyTo;
@@ -208,23 +170,22 @@ public class ConnectionRouter extends Connection implements Runnable, AutoClosea
}
txt += jmsg.getText() + "\n\n" + "#" + jmsg.getMid() + "/" + jmsg.getRid() + " http://juick.com/" + jmsg.getMid() + "#" + jmsg.getRid();
- com.juick.xmpp.Message msg = new com.juick.xmpp.Message();
- msg.from = xmpp.getJid();
- msg.body = txt;
- msg.type = Message.Type.chat;
- msg.addChild(jmsg);
+ Message msg = new Message();
+ msg.setFrom(xmpp.getJid());
+ msg.setBody(txt);
+ msg.setType(Message.Type.CHAT);
+ msg.addExtension(jmsg);
for (User user : users) {
for (String jid : xmpp.userService.getJIDsbyUID(user.getUid())) {
- msg.to = Jid.of(jid);
- xmpp.sendOut(msg);
+ msg.setTo(Jid.of(jid));
+ route(msg.getTo().getDomain(), msg);
}
}
}
- public void sendJuickRecommendation(JuickMessage recomm) {
+ public void sendJuickRecommendation(com.juick.Message recomm) {
List<User> users;
- JuickMessage jmsg;
- jmsg = new JuickMessage(xmpp.messagesService.getMessage(recomm.getMid()));
+ com.juick.Message jmsg = xmpp.messagesService.getMessage(recomm.getMid());
users = xmpp.subscriptionService.getUsersSubscribedToUserRecommendations(recomm.getUser().getUid(),
recomm.getMid(), jmsg.getUser().getUid());
@@ -245,32 +206,36 @@ public class ConnectionRouter extends Connection implements Runnable, AutoClosea
}
txt += " http://juick.com/" + jmsg.getMid();
- Nickname nick = new Nickname();
- nick.Nickname = "@" + jmsg.getUser().getName();
+ Nickname nick = new Nickname("@" + jmsg.getUser().getName());
- com.juick.xmpp.Message msg = new com.juick.xmpp.Message();
- msg.from = xmpp.getJid();
- msg.body = txt;
- msg.type = Message.Type.chat;
- msg.thread = "juick-" + jmsg.getMid();
- msg.addChild(jmsg);
- msg.addChild(nick);
+ Message msg = new Message();
+ msg.setFrom(xmpp.getJid());
+ msg.setBody(txt);
+ msg.setType(Message.Type.CHAT);
+ msg.setThread("juick-" + jmsg.getMid());
+ msg.addExtension(jmsg);
+ msg.addExtension(nick);
if (attachment != null) {
- XOOB oob = new XOOB();
- oob.URL = attachment;
- msg.addChild(oob);
+ try {
+ OobX oob = new OobX(new URI(attachment));
+ msg.addExtension(oob);
+ } catch (URISyntaxException e) {
+ logger.warn("uri exception", e);
+ }
}
for (User user : users) {
for (String jid : xmpp.userService.getJIDsbyUID(user.getUid())) {
- msg.to = Jid.of(jid);
- xmpp.sendOut(msg);
+ msg.setTo(Jid.of(jid));
+ route(msg.getTo().getDomain(), msg);
}
}
}
@Override
public void close() throws Exception {
- execution.complete();
+ if (router != null) {
+ router.close();
+ }
}
}