diff options
author | Vitaly Takmazov | 2018-04-02 13:39:13 +0300 |
---|---|---|
committer | Vitaly Takmazov | 2018-04-02 13:39:13 +0300 |
commit | cb2d7e63fc9af26b06c2f02c98940aafecd87292 (patch) | |
tree | 98ef7c6ab6861032ecf22b010b3b5df9ef03b6a7 /juick-server/src/main | |
parent | 84d960a1d82a5619c65ab59a00e86644f53200a3 (diff) |
xmpp: test router
Diffstat (limited to 'juick-server/src/main')
9 files changed, 645 insertions, 0 deletions
diff --git a/juick-server/src/main/java/com/juick/server/XMPPConnection.java b/juick-server/src/main/java/com/juick/server/XMPPConnection.java index 40cf347a..4be4a5ba 100644 --- a/juick-server/src/main/java/com/juick/server/XMPPConnection.java +++ b/juick-server/src/main/java/com/juick/server/XMPPConnection.java @@ -41,6 +41,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import rocks.xmpp.addr.Jid; import rocks.xmpp.core.XmppException; @@ -85,6 +86,7 @@ import java.util.concurrent.ExecutorService; * @author ugnich */ @Component +@DependsOn("XMPPRouter") public class XMPPConnection implements StanzaListener, NotificationListener { private static final Logger logger = LoggerFactory.getLogger(XMPPConnection.class); diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/Stream.java b/juick-server/src/main/java/com/juick/server/xmpp/router/Stream.java new file mode 100644 index 00000000..7532443c --- /dev/null +++ b/juick-server/src/main/java/com/juick/server/xmpp/router/Stream.java @@ -0,0 +1,184 @@ +/* + * Juick + * Copyright (C) 2008-2011, Ugnich Anton + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package com.juick.server.xmpp.router; + +import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; +import org.xmlpull.v1.XmlPullParserFactory; +import rocks.xmpp.addr.Jid; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.UUID; + +/** + * + * @author Ugnich Anton + */ +public abstract class Stream { + + public boolean isLoggedIn() { + return loggedIn; + } + + public void setLoggedIn(boolean loggedIn) { + this.loggedIn = loggedIn; + } + + Jid from; + public Jid to; + private InputStream is; + private OutputStream os; + private XmlPullParserFactory factory; + protected XmlPullParser parser; + private OutputStreamWriter writer; + StreamHandler streamHandler; + private boolean loggedIn; + private Instant created; + private Instant updated; + String streamId; + private boolean secured; + + public Stream(final Jid from, final Jid to, final InputStream is, final OutputStream os) throws XmlPullParserException { + this.from = from; + this.to = to; + this.is = is; + this.os = os; + factory = XmlPullParserFactory.newInstance(); + created = updated = Instant.now(); + streamId = UUID.randomUUID().toString(); + } + + void restartStream() throws XmlPullParserException { + parser = factory.newPullParser(); + parser.setInput(new InputStreamReader(is, StandardCharsets.UTF_8)); + parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); + writer = new OutputStreamWriter(os, StandardCharsets.UTF_8); + } + + public void connect() { + try { + restartStream(); + handshake(); + parse(); + } catch (XmlPullParserException e) { + StreamError invalidXmlError = new StreamError("invalid-xml"); + send(invalidXmlError.toString()); + connectionFailed(new Exception(invalidXmlError.getCondition())); + } catch (IOException e) { + connectionFailed(e); + } + } + + public void setHandler(final StreamHandler streamHandler) { + this.streamHandler = streamHandler; + } + + public abstract void handshake() throws XmlPullParserException, IOException; + + public void logoff() { + setLoggedIn(false); + try { + writer.flush(); + writer.close(); + //TODO close parser + } catch (final Exception e) { + connectionFailed(e); + } + } + + public void send(final String str) { + try { + updated = Instant.now(); + writer.write(str); + writer.flush(); + } catch (final Exception e) { + connectionFailed(e); + } + } + + private void parse() throws IOException, XmlPullParserException { + while (parser.next() != XmlPullParser.END_DOCUMENT) { + if (parser.getEventType() == XmlPullParser.IGNORABLE_WHITESPACE) { + setUpdated(); + } + if (parser.getEventType() != XmlPullParser.START_TAG) { + continue; + } + setUpdated(); + final String tag = parser.getName(); + switch (tag) { + case "message": + case "presence": + case "iq": + streamHandler.stanzaReceived(XmlUtils.parseToString(parser, false)); + break; + case "error": + StreamError error = StreamError.parse(parser); + connectionFailed(new Exception(error.getCondition())); + return; + default: + XmlUtils.skip(parser); + break; + } + } + } + + /** + * This method is used to be called on a parser or a connection error. + * It tries to close the XML-Reader and XML-Writer one last time. + */ + private void connectionFailed(final Exception ex) { + if (isLoggedIn()) { + try { + writer.close(); + //TODO close parser + } catch (Exception e) { + } + } + streamHandler.fail(ex); + } + + public Instant getCreated() { + return created; + } + + public Instant getUpdated() { + return updated; + } + public String getStreamId() { + return streamId; + } + + public boolean isSecured() { + return secured; + } + + public void setSecured(boolean secured) { + this.secured = secured; + } + + public void setUpdated() { + this.updated = Instant.now(); + } +} diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/StreamComponentServer.java b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamComponentServer.java new file mode 100644 index 00000000..5e2f6f82 --- /dev/null +++ b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamComponentServer.java @@ -0,0 +1,58 @@ +package com.juick.server.xmpp.router; + +import com.juick.xmpp.extensions.Handshake; +import org.apache.commons.codec.digest.DigestUtils; +import org.xmlpull.v1.XmlPullParserException; +import rocks.xmpp.addr.Jid; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.UUID; + +/** + * Created by vitalyster on 30.01.2017. + */ +public class StreamComponentServer extends Stream { + + private String streamId, secret; + + public String getStreamId() { + return streamId; + } + + + public StreamComponentServer(InputStream is, OutputStream os, String password) throws XmlPullParserException { + super(null, null, is, os); + secret = password; + streamId = UUID.randomUUID().toString(); + } + @Override + public void handshake() throws XmlPullParserException, IOException { + parser.next(); + if (!parser.getName().equals("stream") + || !parser.getNamespace(null).equals(StreamNamespaces.NS_COMPONENT_ACCEPT) + || !parser.getNamespace("stream").equals(StreamNamespaces.NS_STREAM)) { + throw new IOException("invalid stream"); + } + Jid domain = Jid.of(parser.getAttributeValue(null, "to")); + if (streamHandler.filter(null, domain)) { + send(new XMPPError(XMPPError.Type.cancel, "forbidden").toString()); + throw new IOException("invalid domain"); + } + from = domain; + to = domain; + send(String.format("<stream:stream xmlns:stream='%s' " + + "xmlns='%s' from='%s' id='%s'>", StreamNamespaces.NS_STREAM, StreamNamespaces.NS_COMPONENT_ACCEPT, from.asBareJid().toEscapedString(), streamId)); + Handshake handshake = Handshake.parse(parser); + boolean authenticated = handshake.getValue().equals(DigestUtils.sha1Hex(streamId + secret)); + setLoggedIn(authenticated); + if (!authenticated) { + send(new XMPPError(XMPPError.Type.cancel, "not-authorized").toString()); + streamHandler.fail(new IOException("stream:stream, failed authentication")); + return; + } + send(new Handshake().toString()); + streamHandler.ready(); + } +} diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/StreamError.java b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamError.java new file mode 100644 index 00000000..7eacfc94 --- /dev/null +++ b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamError.java @@ -0,0 +1,44 @@ +package com.juick.server.xmpp.router; + +import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; + +import java.io.IOException; + + +/** + * Created by vitalyster on 03.02.2017. + */ +public class StreamError { + + private String condition; + + public StreamError() {} + + public StreamError(String condition) { + this.condition = condition; + } + + public static StreamError parse(XmlPullParser parser) throws IOException, XmlPullParserException { + StreamError streamError = new StreamError(); + while (parser.next() == XmlPullParser.START_TAG) { + final String tag = parser.getName(); + final String xmlns = parser.getNamespace(); + if (xmlns.equals(StreamNamespaces.NS_XMPP_STREAMS)) { + streamError.condition = tag; + } else { + XmlUtils.skip(parser); + } + } + return streamError; + } + + public String getCondition() { + return condition; + } + + @Override + public String toString() { + return String.format("<stream:error><%s xmlns='%s'/></stream:error>", condition, StreamNamespaces.NS_XMPP_STREAMS); + } +} diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/StreamHandler.java b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamHandler.java new file mode 100644 index 00000000..43836c2d --- /dev/null +++ b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamHandler.java @@ -0,0 +1,13 @@ +package com.juick.server.xmpp.router; + +import rocks.xmpp.addr.Jid; + +/** + * Created by vitalyster on 01.02.2017. + */ +public interface StreamHandler { + void ready(); + void fail(final Exception ex); + boolean filter(Jid from, Jid to); + void stanzaReceived(String stanza); +} diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/StreamNamespaces.java b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamNamespaces.java new file mode 100644 index 00000000..1b9b1965 --- /dev/null +++ b/juick-server/src/main/java/com/juick/server/xmpp/router/StreamNamespaces.java @@ -0,0 +1,10 @@ +package com.juick.server.xmpp.router; + +public class StreamNamespaces { + public static final String NS_STREAM = "http://etherx.jabber.org/streams"; + public static final String NS_TLS = "urn:ietf:params:xml:ns:xmpp-tls"; + public static final String NS_DB = "jabber:server:dialback"; + public static final String NS_SERVER = "jabber:server"; + public static final String NS_COMPONENT_ACCEPT = "jabber:component:accept"; + public static final String NS_XMPP_STREAMS = "urn:ietf:params:xml:ns:xmpp-streams"; +} diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPError.java b/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPError.java new file mode 100644 index 00000000..0cf9a3bc --- /dev/null +++ b/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPError.java @@ -0,0 +1,73 @@ +/* + * Juick + * Copyright (C) 2008-2013, ugnich + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package com.juick.server.xmpp.router; + +import org.apache.commons.text.StringEscapeUtils; + +/** + * + * @author ugnich + */ +public class XMPPError { + + public static final class Type { + + public static final String auth = "auth"; + public static final String cancel = "cancel"; + public static final String continue_ = "continue"; + public static final String modify = "modify"; + public static final String wait = "wait"; + } + private final static String TagName = "error"; + public String by = null; + private String type; + private String condition; + private String text = null; + + public XMPPError(String type, String condition) { + this.type = type; + this.condition = condition; + } + + @Override + public String toString() { + StringBuilder str = new StringBuilder("<").append(TagName).append(""); + if (by != null) { + str.append(" by=\"").append(StringEscapeUtils.escapeXml10(by)).append("\""); + } + if (type != null) { + str.append(" type=\"").append(StringEscapeUtils.escapeXml10(type)).append("\""); + } + + if (condition != null) { + str.append(">"); + str.append("<").append(StringEscapeUtils.escapeXml10(condition)).append(" xmlns=\"urn:ietf:params:xml:ns:xmpp-stanzas\""); + if (text != null) { + str.append(">").append(StringEscapeUtils.escapeXml10(text)).append("</").append(StringEscapeUtils.escapeXml10(condition)) + .append(">"); + } else { + str.append("/>"); + } + str.append("</").append(TagName).append(">"); + } else { + str.append("/>"); + } + + return str.toString(); + } +} diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java b/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java new file mode 100644 index 00000000..670962de --- /dev/null +++ b/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java @@ -0,0 +1,173 @@ +package com.juick.server.xmpp.router; + +import com.juick.server.xmpp.s2s.BasicXmppSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.xmlpull.v1.XmlPullParserException; +import rocks.xmpp.addr.Jid; +import rocks.xmpp.core.stanza.model.Stanza; +import rocks.xmpp.util.XmppUtils; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; + +@Component +public class XMPPRouter implements StreamHandler { + private static final Logger logger = LoggerFactory.getLogger(XMPPRouter.class); + + @Inject + private ExecutorService service; + + private final List<StreamComponentServer> connections = Collections.synchronizedList(new ArrayList<>()); + + private ServerSocket listener; + + @Inject + private BasicXmppSession session; + + @Value("${router_port:5347}") + private int routerPort; + + @PostConstruct + public void init() { + + logger.info("component router initialized"); + service.submit(() -> { + try { + listener = new ServerSocket(routerPort); + logger.info("component router listening on {}", routerPort); + while (!listener.isClosed()) { + if (Thread.currentThread().isInterrupted()) break; + Socket socket = listener.accept(); + service.submit(() -> { + try { + StreamComponentServer client = new StreamComponentServer(socket.getInputStream(), socket.getOutputStream(), "secret"); + addConnectionIn(client); + client.setHandler(this); + client.connect(); + } catch (IOException e) { + logger.error("component error", e); + } catch (XmlPullParserException e) { + e.printStackTrace(); + } + }); + } + } catch (SocketException e) { + // shutdown + } catch (IOException e) { + logger.warn("io exception", e); + } + }); + } + + @PreDestroy + public void close() throws Exception { + if (!listener.isClosed()) { + listener.close(); + } + synchronized (getConnections()) { + for (Iterator<StreamComponentServer> i = getConnections().iterator(); i.hasNext(); ) { + StreamComponentServer c = i.next(); + c.logoff(); + i.remove(); + } + } + service.shutdown(); + logger.info("XMPP router destroyed"); + } + + private void addConnectionIn(StreamComponentServer c) { + synchronized (getConnections()) { + getConnections().add(c); + } + } + + private void sendOut(Stanza s) { + try { + StringWriter stanzaWriter = new StringWriter(); + XMLStreamWriter xmppStreamWriter = XmppUtils.createXmppStreamWriter( + session.getConfiguration().getXmlOutputFactory().createXMLStreamWriter(stanzaWriter)); + session.createMarshaller().marshal(s, xmppStreamWriter); + xmppStreamWriter.flush(); + xmppStreamWriter.close(); + String xml = stanzaWriter.toString(); + logger.info("XMPPRouter (out): {}", xml); + sendOut(s.getTo().getDomain(), xml); + } catch (XMLStreamException | JAXBException e1) { + logger.info("jaxb exception", e1); + } + } + + private void sendOut(String hostname, String xml) { + boolean haveAnyConn = false; + + StreamComponentServer connOut = null; + synchronized (getConnections()) { + for (StreamComponentServer c : getConnections()) { + if (c.to != null && c.to.getDomain().equals(hostname)) { + if (c.isLoggedIn()) { + connOut = c; + break; + } + } + } + } + if (connOut != null) { + connOut.send(xml); + return; + } + logger.error("component unavailable: {}", hostname); + + } + + public List<StreamComponentServer> getConnections() { + return connections; + } + + private Stanza parse(String xml) { + try { + Unmarshaller unmarshaller = session.createUnmarshaller(); + return (Stanza)unmarshaller.unmarshal(new StringReader(xml)); + } catch (JAXBException e) { + logger.error("JAXB exception", e); + } + return null; + } + @Override + public void stanzaReceived(String stanza) { + sendOut(parse(stanza)); + } + + @Override + public void ready() { + + } + + @Override + public void fail(Exception e) { + + } + + @Override + public boolean filter(Jid jid, Jid jid1) { + return false; + } +}
\ No newline at end of file diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/XmlUtils.java b/juick-server/src/main/java/com/juick/server/xmpp/router/XmlUtils.java new file mode 100644 index 00000000..7579489f --- /dev/null +++ b/juick-server/src/main/java/com/juick/server/xmpp/router/XmlUtils.java @@ -0,0 +1,88 @@ +/* + * Juick + * Copyright (C) 2008-2011, Ugnich Anton + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +package com.juick.server.xmpp.router; + +import java.io.IOException; + +import org.apache.commons.text.StringEscapeUtils; +import org.xmlpull.v1.XmlPullParser; +import org.xmlpull.v1.XmlPullParserException; + +/** + * + * @author Ugnich Anton + */ +public class XmlUtils { + + public static void skip(XmlPullParser parser) throws XmlPullParserException, IOException { + String tag = parser.getName(); + while (parser.getName() != null && !(parser.next() == XmlPullParser.END_TAG && parser.getName().equals(tag))) { + } + } + + public static String getTagText(XmlPullParser parser) throws XmlPullParserException, IOException { + String ret = ""; + String tag = parser.getName(); + + if (parser.next() == XmlPullParser.TEXT) { + ret = parser.getText(); + } + + while (!(parser.getEventType() == XmlPullParser.END_TAG && parser.getName().equals(tag))) { + parser.next(); + } + + return ret; + } + + public static String parseToString(XmlPullParser parser, boolean skipXMLNS) throws XmlPullParserException, IOException { + String tag = parser.getName(); + StringBuilder ret = new StringBuilder("<").append(tag); + + // skipXMLNS for xmlns="jabber:client" + + String ns = parser.getNamespace(); + if (!skipXMLNS && ns != null && !ns.isEmpty()) { + ret.append(" xmlns=\"").append(ns).append("\""); + } + + for (int i = 0; i < parser.getAttributeCount(); i++) { + String attr = parser.getAttributeName(i); + if ((!skipXMLNS || !attr.equals("xmlns")) && !attr.contains(":")) { + ret.append(" ").append(attr).append("=\"").append(StringEscapeUtils.escapeXml10(parser.getAttributeValue(i))).append("\""); + } + } + ret.append(">"); + + while (!(parser.next() == XmlPullParser.END_TAG && parser.getName().equals(tag))) { + int event = parser.getEventType(); + if (event == XmlPullParser.START_TAG) { + if (!parser.getName().contains(":")) { + ret.append(parseToString(parser, false)); + } else { + skip(parser); + } + } else if (event == XmlPullParser.TEXT) { + ret.append(StringEscapeUtils.escapeXml10(parser.getText())); + } + } + + ret.append("</").append(tag).append(">"); + return ret.toString(); + } +} |