From 448fc7e84732422011186a9a4633c345e9c6208e Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Mon, 9 Oct 2017 15:27:48 +0300 Subject: xmpp:ConnectionOut -> StreamServerDialback --- .../com/juick/components/s2s/ConnectionIn.java | 89 +++++++--------------- 1 file changed, 29 insertions(+), 60 deletions(-) (limited to 'juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java') diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java index e6f404ef..16f207a7 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java @@ -24,52 +24,33 @@ import org.apache.commons.lang3.StringUtils; import org.xmlpull.v1.XmlPullParser; import org.xmlpull.v1.XmlPullParserException; import rocks.xmpp.addr.Jid; -import rocks.xmpp.core.stanza.model.Stanza; -import javax.net.ssl.SSLException; -import javax.net.ssl.SSLSocket; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Unmarshaller; import java.io.EOFException; import java.io.IOException; -import java.io.StringReader; import java.net.Socket; import java.net.SocketException; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; -import java.util.ArrayList; +import java.time.Instant; import java.util.Arrays; import java.util.List; import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; /** * @author ugnich */ public class ConnectionIn extends Connection implements Runnable { - final public List from = new ArrayList<>(); - public long tsRemoteData = 0; + final public List from = new CopyOnWriteArrayList<>(); + public Instant received; public long packetsRemote = 0; + ConnectionListener listener; - public ConnectionIn(XMPPServer xmpp, Socket socket) throws CertificateException, UnrecoverableKeyException, NoSuchAlgorithmException, XmlPullParserException, KeyManagementException, KeyStoreException, IOException { + public ConnectionIn(XMPPServer xmpp, Socket socket) throws XmlPullParserException, IOException { super(xmpp); - this.socket = socket; + this.setSocket(socket); restartParser(); } - protected Stanza parse(String xml) { - try { - Unmarshaller unmarshaller = xmpp.getSession().createUnmarshaller(); - return (Stanza)unmarshaller.unmarshal(new StringReader(xml)); - } catch (JAXBException e) { - logger.error("JAXB exception", e); - } - return null; - } - @Override public void run() { try { @@ -79,7 +60,7 @@ public class ConnectionIn extends Connection implements Runnable { || !parser.getNamespace("stream").equals(NS_STREAM)) { // || !parser.getAttributeValue(null, "version").equals("1.0") // || !parser.getAttributeValue(null, "to").equals(Main.HOSTNAME)) { - throw new Exception(String.format("stream from %s invalid", socket.getRemoteSocketAddress())); + throw new Exception(String.format("stream from %s invalid", getSocket().getRemoteSocketAddress())); } streamID = parser.getAttributeValue(null, "id"); if (streamID == null) { @@ -108,14 +89,15 @@ public class ConnectionIn extends Connection implements Runnable { String dfrom = parser.getAttributeValue(null, "from"); String to = parser.getAttributeValue(null, "to"); logger.info("stream from {} to {} {} asking for dialback", dfrom, to, streamID); - if (dfrom.endsWith(xmpp.HOSTNAME) && (dfrom.equals(xmpp.HOSTNAME) || dfrom.endsWith("." + xmpp.HOSTNAME))) { + if (dfrom.endsWith(xmpp.getJid().toEscapedString()) && (dfrom.equals(xmpp.getJid().toEscapedString()) + || dfrom.endsWith("." + xmpp.getJid()))) { logger.warn("stream from {} is invalid", dfrom); break; } - if (to != null && to.equals(xmpp.HOSTNAME)) { + if (to != null && to.equals(xmpp.getJid().toEscapedString())) { String dbKey = XmlUtils.getTagText(parser); updateTsRemoteData(); - xmpp.startDialback(dfrom, streamID, dbKey); + xmpp.startDialback(Jid.of(dfrom), streamID, dbKey); } else { logger.warn("stream from " + dfrom + " " + streamID + " invalid to " + to); break; @@ -128,7 +110,7 @@ public class ConnectionIn extends Connection implements Runnable { updateTsRemoteData(); final boolean[] valid = {false}; if (vfrom != null && vto != null && vid != null && vkey != null) { - xmpp.getConnectionOut(vfrom, false).ifPresent(c -> { + xmpp.getConnectionOut(Jid.of(vfrom), false).ifPresent(c -> { String dialbackKey = c.dbKey; valid[0] = vkey.equals(dialbackKey); }); @@ -143,12 +125,12 @@ public class ConnectionIn extends Connection implements Runnable { } else if (tag.equals("presence") && checkFromTo(parser)) { String xml = XmlUtils.parseToString(parser, false); logger.info("stream {} presence: {}", streamID, xml); - xmpp.onStanzaReceived(parse(xml)); + xmpp.onStanzaReceived(xml); } else if (tag.equals("message") && checkFromTo(parser)) { updateTsRemoteData(); String xml = XmlUtils.parseToString(parser, false); logger.info("stream {} message: {}", streamID, xml); - xmpp.onStanzaReceived(parse(xml)); + xmpp.onStanzaReceived(xml); } else if (tag.equals("iq") && checkFromTo(parser)) { updateTsRemoteData(); @@ -156,25 +138,10 @@ public class ConnectionIn extends Connection implements Runnable { String xml = XmlUtils.parseToString(parser, false); if (type == null || !type.equals("error")) { logger.info("stream {} iq: {}", streamID, xml); - xmpp.onStanzaReceived(parse(xml)); - } - } else if (sc != null && !isSecured() && tag.equals("starttls")) { - logger.info("stream {} securing", streamID); - sendStanza(""); - try { - socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(), - socket.getPort(), true); - ((SSLSocket) socket).setUseClientMode(false); - ((SSLSocket) socket).startHandshake(); - setSecured(true); - logger.info("stream {} secured", streamID); - restartParser(); - } catch (SSLException sex) { - logger.warn("stream {} ssl error {}", streamID, sex); - sendStanza(""); - xmpp.removeConnectionIn(this); - closeConnection(); + xmpp.onStanzaReceived(xml); } + } else if (!isSecured() && tag.equals("starttls")) { + listener.starttls(this); } else if (isSecured() && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) { sendOpenStream(null, true); } else if (tag.equals("error")) { @@ -202,16 +169,16 @@ public class ConnectionIn extends Connection implements Runnable { } void updateTsRemoteData() { - tsRemoteData = System.currentTimeMillis(); + received = Instant.now(); } void sendOpenStream(String from, boolean xmppversionnew) throws IOException { String openStream = ""; + xmpp.getJid().toEscapedString() + "' id='" + streamID + "' version='1.0'>"; if (xmppversionnew) { openStream += ""; - if (sc != null && !isSecured() && !Arrays.asList(xmpp.brokenSSLhosts).contains(from)) { + if (listener != null && !isSecured() && !Arrays.asList(xmpp.brokenSSLhosts).contains(from)) { openStream += ""; } openStream += ""; @@ -219,8 +186,8 @@ public class ConnectionIn extends Connection implements Runnable { sendStanza(openStream); } - public void sendDialbackResult(String sfrom, String type) { - sendStanza(""); + public void sendDialbackResult(Jid sfrom, String type) { + sendStanza(""); if (type.equals("valid")) { from.add(sfrom); logger.info("stream from {} {} ready", sfrom, streamID); @@ -232,11 +199,10 @@ public class ConnectionIn extends Connection implements Runnable { String cto = parser.getAttributeValue(null, "to"); if (StringUtils.isNotEmpty(cfrom) && StringUtils.isNotEmpty(cto)) { Jid jidto = Jid.of(cto); - if (jidto.getDomain().equals(xmpp.HOSTNAME)) { + if (jidto.getDomain().equals(xmpp.getJid().toEscapedString())) { Jid jidfrom = Jid.of(cfrom); - int size = from.size(); - for (int i = 0; i < size; i++) { - if (from.get(i).equals(jidfrom.getDomain())) { + for (Jid aFrom : from) { + if (aFrom.equals(Jid.of(jidfrom.getDomain()))) { return true; } } @@ -244,4 +210,7 @@ public class ConnectionIn extends Connection implements Runnable { } return false; } + public void setListener(ConnectionListener listener) { + this.listener = listener; + } } -- cgit v1.2.3