From b2a65a6b63e8691b0101f4652705fab79dc26d99 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Wed, 7 Dec 2016 12:37:43 +0300 Subject: juick-xmpp: refactoring --- .../main/java/com/juick/components/XMPPServer.java | 12 ++++- .../java/com/juick/components/s2s/Connection.java | 9 ++-- .../com/juick/components/s2s/ConnectionIn.java | 51 +++++++++------------- .../com/juick/components/s2s/ConnectionOut.java | 26 +++++------ .../com/juick/components/s2s/ConnectionRouter.java | 24 +++++----- 5 files changed, 59 insertions(+), 63 deletions(-) (limited to 'juick-xmpp') diff --git a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java index c51810a5..caa4c200 100644 --- a/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java +++ b/juick-xmpp/src/main/java/com/juick/components/XMPPServer.java @@ -209,7 +209,7 @@ public class XMPPServer implements AutoCloseable { try { connOut.sendStanza(xml); } catch (IOException e) { - logger.warn("STREAM TO {} {} ERROR: {}", + logger.warn("stream to {} {} error: {}", connOut.to, connOut.streamID, e); } return; @@ -256,4 +256,14 @@ public class XMPPServer implements AutoCloseable { return outCache; } + public void startDialback(String from, String streamId, String dbKey) throws Exception { + ConnectionOut c = getConnectionOut(from, false); + if (c != null) { + c.sendDialbackVerify(streamId, dbKey); + } else { + c = new ConnectionOut(this, from, streamId, dbKey); + service.submit(c); + } + } + } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/Connection.java b/juick-xmpp/src/main/java/com/juick/components/s2s/Connection.java index 67953b76..b6052fc8 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/Connection.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/Connection.java @@ -1,9 +1,6 @@ package com.juick.components.s2s; import com.juick.components.XMPPServer; -import org.apache.commons.codec.digest.DigestUtils; -import org.apache.commons.codec.digest.HmacUtils; -import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xmlpull.mxp1.MXParser; @@ -16,7 +13,7 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import java.io.*; import java.net.Socket; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.security.*; import java.security.cert.CertificateException; import java.util.UUID; @@ -99,7 +96,7 @@ public class Connection { public void closeConnection() { if (streamID != null) { - logger.info("CLOSING STREAM {}", streamID); + logger.info("closing stream {}", streamID); } try { @@ -130,7 +127,7 @@ public class Connection { parser = new MXParser(); parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); parser.setInput(new InputStreamReader(socket.getInputStream())); - writer = new OutputStreamWriter(socket.getOutputStream(), Charset.forName("UTF-8")); + writer = new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8); streamID = UUID.randomUUID().toString(); } } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java index 344cc7c1..a6d0549c 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionIn.java @@ -6,8 +6,6 @@ import com.juick.xmpp.JID; import com.juick.xmpp.Message; import com.juick.xmpp.Presence; import com.juick.xmpp.utils.XmlUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.xmlpull.v1.XmlPullParser; import javax.net.ssl.SSLException; @@ -25,8 +23,6 @@ import java.util.UUID; */ public class ConnectionIn extends Connection implements Runnable { - private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionIn.class); - final public List from = new ArrayList<>(); public long tsRemoteData = 0; public long packetsRemote = 0; @@ -42,7 +38,7 @@ public class ConnectionIn extends Connection implements Runnable { @Override public void run() { - LOGGER.info("STREAM FROM ? {} START", streamID); + logger.info("stream from ? {} start", streamID); try { parser.next(); // stream:stream updateTsRemoteData(); @@ -50,7 +46,7 @@ public class ConnectionIn extends Connection implements Runnable { || !parser.getNamespace("stream").equals(NS_STREAM)) { // || !parser.getAttributeValue(null, "version").equals("1.0") // || !parser.getAttributeValue(null, "to").equals(Main.HOSTNAME)) { - throw new Exception("STREAM FROM ? " + streamID + " INVALID FIRST PACKET"); + throw new Exception("stream from ? " + streamID + " invalid first packet"); } boolean xmppversionnew = parser.getAttributeValue(null, "version") != null; String from = parser.getAttributeValue(null, "from"); @@ -74,23 +70,16 @@ public class ConnectionIn extends Connection implements Runnable { if (tag.equals("result") && parser.getNamespace().equals(NS_DB)) { String dfrom = parser.getAttributeValue(null, "from"); String to = parser.getAttributeValue(null, "to"); - LOGGER.info("STREAM FROM {} TO {} {} ASKING FOR DIALBACK", dfrom, to, streamID); + logger.info("stream from {} to {} {} asking for dialback", dfrom, to, streamID); if (dfrom.endsWith(xmpp.HOSTNAME) && (dfrom.equals(xmpp.HOSTNAME) || dfrom.endsWith("." + xmpp.HOSTNAME))) { break; } if (to != null && to.equals(xmpp.HOSTNAME)) { String dbKey = XmlUtils.getTagText(parser); updateTsRemoteData(); - - ConnectionOut c = xmpp.getConnectionOut(dfrom, false); - if (c != null) { - c.sendDialbackVerify(streamID, dbKey); - } else { - c = new ConnectionOut(xmpp, dfrom, streamID, dbKey); - xmpp.service.submit(c); - } + xmpp.startDialback(dfrom, streamID, dbKey); } else { - throw new HostUnknownException("STREAM FROM " + dfrom + " " + streamID + " INVALID TO " + to); + throw new HostUnknownException("stream from " + dfrom + " " + streamID + " invalid to " + to); } } else if (tag.equals("verify") && parser.getNamespace().equals(NS_DB)) { String vfrom = parser.getAttributeValue(null, "from"); @@ -105,10 +94,10 @@ public class ConnectionIn extends Connection implements Runnable { } if (valid) { sendStanza(""); - LOGGER.info("STREAM FROM {} {} DIALBACK VERIFY VALID", vfrom, streamID); + logger.info("stream from {} {} dialback verify valid", vfrom, streamID); } else { sendStanza(""); - LOGGER.warn("STREAM FROM {} {} DIALBACK VERIFY INVALID", vfrom, streamID); + logger.warn("stream from {} {} dialback verify invalid", vfrom, streamID); } } else if (tag.equals("presence") && checkFromTo(parser)) { Presence p = Presence.parse(parser, null); @@ -119,7 +108,7 @@ public class ConnectionIn extends Connection implements Runnable { updateTsRemoteData(); Message msg = Message.parse(parser, xmpp.childParsers); if (msg != null && (msg.type == null || !msg.type.equals(Message.Type.error))) { - LOGGER.info("STREAM {}: {}", streamID, msg); + logger.info("stream {}: {}", streamID, msg); if (!bot.incomingMessage(msg)) { xmpp.getRouter().sendStanza(msg.toString()); } @@ -129,11 +118,11 @@ public class ConnectionIn extends Connection implements Runnable { String type = parser.getAttributeValue(null, "type"); String xml = XmlUtils.parseToString(parser, true); if (type == null || !type.equals(Iq.Type.error)) { - LOGGER.info("STREAM {}: {}", streamID, xml); + logger.info("stream {}: {}", streamID, xml); xmpp.getRouter().sendStanza(xml); } } else if (sc != null && !isSecured() && tag.equals("starttls")) { - LOGGER.info("STREAM {} SECURING", streamID); + logger.info("stream {} securing", streamID); sendStanza(""); try { socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(), @@ -141,10 +130,10 @@ public class ConnectionIn extends Connection implements Runnable { ((SSLSocket) socket).setUseClientMode(false); ((SSLSocket) socket).startHandshake(); setSecured(true); - LOGGER.info("STREAM {} SECURED", streamID); + logger.info("stream {} secured", streamID); restartParser(); } catch (SSLException sex) { - LOGGER.warn("STREAM {} SSL ERROR {}", streamID, sex); + logger.warn("stream {} ssl error {}", streamID, sex); sendStanza(""); xmpp.removeConnectionIn(this); closeConnection(); @@ -152,21 +141,21 @@ public class ConnectionIn extends Connection implements Runnable { } else if (isSecured() && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) { sendOpenStream(null, true); } else { - if (LOGGER.isInfoEnabled()) // prevent call parseToString if logger disabled - LOGGER.info("STREAM {}: {}", streamID, XmlUtils.parseToString(parser, true)); + String unhandledStanza = XmlUtils.parseToString(parser, true); + logger.warn("Unhandled stanza from {}: {}", streamID, unhandledStanza); } } - LOGGER.warn("STREAM {} FINISHED", streamID); + logger.warn("stream {} finished", streamID); xmpp.removeConnectionIn(this); closeConnection(); } catch (EOFException | SocketException ex) { - LOGGER.info("STREAM {} CLOSED (dirty)", streamID); + logger.info("stream {} closed (dirty)", streamID); xmpp.removeConnectionIn(this); closeConnection(); } catch (HostUnknownException e) { - LOGGER.warn(e.getMessage()); + logger.warn(e.getMessage()); } catch (Exception e) { - LOGGER.warn("STREAM {} ERROR {}", streamID, e); + logger.warn("stream {} error {}", streamID, e); xmpp.removeConnectionIn(this); closeConnection(); } @@ -195,10 +184,10 @@ public class ConnectionIn extends Connection implements Runnable { sendStanza(""); if (type.equals("valid")) { from.add(sfrom); - LOGGER.info("STREAM FROM {} {} READY", sfrom, streamID); + logger.info("stream from {} {} ready", sfrom, streamID); } } catch (IOException e) { - LOGGER.warn("STREAM FROM {} {} ERROR: {}", sfrom, streamID, e); + logger.warn("stream from {} {} error: {}", sfrom, streamID, e); } } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java index 7a6ae122..0111601e 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionOut.java @@ -61,7 +61,7 @@ public class ConnectionOut extends Connection implements Runnable { @Override public void run() { - logger.info("STREAM TO {} START", to); + logger.info("stream to {} start", to); try { socket = new Socket(); socket.connect(DNSQueries.getServerAddress(to)); @@ -72,10 +72,10 @@ public class ConnectionOut extends Connection implements Runnable { parser.next(); // stream:stream streamID = parser.getAttributeValue(null, "id"); if (streamID == null || streamID.isEmpty()) { - throw new Exception("STREAM TO " + to + " INVALID FIRST PACKET"); + throw new Exception("stream to " + to + " invalid first packet"); } - logger.info("STREAM TO {} {} OPEN", to, streamID); + logger.info("stream to {} {} open", to, streamID); xmpp.addConnectionOut(ConnectionOut.this); boolean xmppversionnew = parser.getAttributeValue(null, "version") != null; if (!xmppversionnew) { @@ -93,16 +93,16 @@ public class ConnectionOut extends Connection implements Runnable { String type = parser.getAttributeValue(null, "type"); if (type != null && type.equals("valid")) { streamReady = true; - logger.info("STREAM TO {} {} READY", to, streamID); + logger.info("stream to {} {} ready", to, streamID); String cache = xmpp.getFromCache(to); if (cache != null) { - logger.info("STREAM TO {} {} SENDING CACHE", to, streamID); + logger.info("stream to {} {} sending cache", to, streamID); sendStanza(cache); } } else { - logger.info("STREAM TO {} {} DIALBACK FAIL", to, streamID); + logger.info("stream to {} {} dialback fail", to, streamID); } XmlUtils.skip(parser); } else if (tag.equals("verify") && parser.getNamespace().equals(NS_DB)) { @@ -119,7 +119,7 @@ public class ConnectionOut extends Connection implements Runnable { } else if (tag.equals("features") && parser.getNamespace().equals(NS_STREAM)) { StreamFeatures features = StreamFeatures.parse(parser); if (sc != null && !isSecured() && features.STARTTLS >= 0 && !xmpp.brokenSSLhosts.contains(to)) { - logger.info("STREAM TO {} {} SECURING", to, streamID); + logger.info("stream to {} {} securing", to, streamID); sendStanza(""); } else { processDialback(); @@ -130,7 +130,7 @@ public class ConnectionOut extends Connection implements Runnable { socket.getPort(), true); ((SSLSocket) socket).startHandshake(); setSecured(true); - logger.info("STREAM {} SECURED", streamID); + logger.info("stream {} secured", streamID); restartParser(); sendOpenStream(); } catch (SSLException sex) { @@ -142,16 +142,16 @@ public class ConnectionOut extends Connection implements Runnable { } else if (isSecured() && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) { streamID = parser.getAttributeValue(null, "id"); } else { - if (logger.isInfoEnabled()) // prevent parseToString call if logger disabled - logger.info("STREAM TO {} {} : {}", to, streamID, XmlUtils.parseToString(parser, true)); + String unhandledStanza = XmlUtils.parseToString(parser, true); + logger.warn("Unhandled stanza from {} {} : {}", to, streamID, unhandledStanza); } } - logger.warn("STREAM TO {} {} FINISHED", to, streamID); + logger.warn("stream to {} {} finished", to, streamID); xmpp.removeConnectionOut(ConnectionOut.this); closeConnection(); } catch (EOFException | SocketException eofex) { - logger.info("STREAM {} {} CLOSED (dirty)", to, streamID); + logger.info("stream {} {} closed (dirty)", to, streamID); xmpp.removeConnectionOut(ConnectionOut.this); closeConnection(); } catch (Exception e) { @@ -165,7 +165,7 @@ public class ConnectionOut extends Connection implements Runnable { try { sendStanza("" + key + ""); } catch (IOException e) { - logger.warn("STREAM TO {} {} ERROR {}", to, streamID, e); + logger.warn("stream to {} {} error {}", to, streamID, e); } } } diff --git a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java index 7bef76ab..3517462b 100644 --- a/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java +++ b/juick-xmpp/src/main/java/com/juick/components/s2s/ConnectionRouter.java @@ -46,7 +46,7 @@ public class ConnectionRouter extends Connection implements Runnable { @Override public void run() { - logger.info("STREAM ROUTER START"); + logger.info("stream router start"); @SuppressWarnings("unchecked") RetryPolicy retryPolicy = new RetryPolicy() .withBackoff(1, 30, TimeUnit.SECONDS) .withJitter(0.1) @@ -69,7 +69,7 @@ public class ConnectionRouter extends Connection implements Runnable { parser.next(); // stream:stream streamID = parser.getAttributeValue(null, "id"); if (streamID == null || streamID.isEmpty()) { - throw new Exception("FAIL ON FIRST PACKET"); + throw new Exception("fail on first packet"); } msg = "" + DigestUtils.sha1Hex(streamID + password) + ""; @@ -78,10 +78,10 @@ public class ConnectionRouter extends Connection implements Runnable { parser.next(); if (!parser.getName().equals("handshake")) { - throw new Exception("NO HANDSHAKE"); + throw new Exception("no handshake"); } XmlUtils.skip(parser); - logger.info("STREAM ROUTER OPEN"); + logger.info("stream router open"); while (parser.next() != XmlPullParser.END_DOCUMENT) { if (parser.getEventType() != XmlPullParser.START_TAG) { @@ -96,7 +96,7 @@ public class ConnectionRouter extends Connection implements Runnable { if (jid.Host.equals(componentName)) { if (tag.equals("message")) { Message xmsg = Message.parse(parser, xmpp.childParsers); - logger.info("STREAM ROUTER (PROCESS): {}", xmsg); + logger.info("stream router (process): {}", xmsg); JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); if (jmsg != null) { if (jid.Username != null && jid.Username.equals("recomm")) { @@ -112,23 +112,23 @@ public class ConnectionRouter extends Connection implements Runnable { } } else if (jid.Host.endsWith(xmpp.HOSTNAME) && (jid.Host.equals(xmpp.HOSTNAME) || jid.Host.endsWith("." + xmpp.HOSTNAME))) { String xml = XmlUtils.parseToString(parser, true); - logger.info("STREAM ROUTER: {}", xml); + logger.info("stream router: {}", xml); } else { String xml = XmlUtils.parseToString(parser, true); - logger.info("STREAM ROUTER (OUT): {}", xml); + logger.info("stream router (out): {}", xml); xmpp.sendOut(jid.Host, xml); } } else { - if (logger.isInfoEnabled()) // prevent parseToString for disabled logs - logger.info("STREAM ROUTER (NO TO): {}", XmlUtils.parseToString(parser, true)); + String invalidStanza = XmlUtils.parseToString(parser, true); + logger.info("stream router (no to): {}", invalidStanza); } } else { - if (logger.isInfoEnabled()) // prevent parseToString for disabled logs - logger.info("STREAM ROUTER: {}", XmlUtils.parseToString(parser, true)); + String unknownStanza = XmlUtils.parseToString(parser, true); + logger.info("stream router (unknown): {}", unknownStanza); } } - logger.warn("STREAM ROUTER FINISHED"); + logger.warn("stream router finished"); } catch (Exception e) { logger.warn("router error, reconnection ", e); execution.recordFailure(e); -- cgit v1.2.3