diff options
Diffstat (limited to 'src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java')
-rw-r--r-- | src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java | 189 |
1 files changed, 81 insertions, 108 deletions
diff --git a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java index 61ae89e0..443bfa82 100644 --- a/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java +++ b/src/main/java/com/juick/xmpp/s2s/ConnectionRouter.java @@ -2,127 +2,49 @@ package com.juick.xmpp.s2s; import com.juick.server.MessagesQueries; import com.juick.server.SubscriptionsQueries; -import com.juick.xmpp.JID; -import com.juick.xmpp.Message; +import com.juick.xmpp.*; import com.juick.xmpp.extensions.JuickMessage; import com.juick.xmpp.extensions.Nickname; import com.juick.xmpp.extensions.XOOB; -import com.juick.xmpp.utils.XmlUtils; -import org.apache.commons.codec.digest.DigestUtils; -import org.xmlpull.v1.XmlPullParser; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.net.Socket; import java.util.List; import java.util.logging.Level; +import java.util.logging.Logger; /** * @author ugnich */ -public class ConnectionRouter extends Connection implements Runnable { +public class ConnectionRouter implements Stream.StreamListener, + Message.MessageListener, Iq.IqListener, Presence.PresenceListener { + private static final Logger logger = Logger.getLogger(ConnectionRouter.class.getName()); private String componentName; + Stream router; + Socket socket; - ConnectionRouter(String componentName) throws Exception { + ConnectionRouter(String componentName, String password) { this.componentName = componentName; - } - - @Override - public void run() { - LOGGER.info("STREAM ROUTER START"); - + logger.info("STREAM ROUTER START"); try { socket = new Socket("localhost", 5347); - parser.setInput(new InputStreamReader(socket.getInputStream())); - - parser.setFeature(XmlPullParser.FEATURE_PROCESS_NAMESPACES, true); - writer = new OutputStreamWriter(socket.getOutputStream()); - - String msg = "<stream:stream xmlns='jabber:component:accept' xmlns:stream='http://etherx.jabber.org/streams' to='s2s'>"; - writer.write(msg); - writer.flush(); - - parser.next(); // stream:stream - streamID = parser.getAttributeValue(null, "id"); - if (streamID == null || streamID.isEmpty()) { - throw new Exception("FAIL ON FIRST PACKET"); - } - - msg = "<handshake>" + DigestUtils.sha1Hex(streamID + "secret") + "</handshake>"; - writer.write(msg); - writer.flush(); - - parser.next(); - if (!parser.getName().equals("handshake")) { - throw new Exception("NO HANDSHAKE"); - } - XmlUtils.skip(parser); - LOGGER.info("STREAM ROUTER OPEN"); - - while (parser.next() != XmlPullParser.END_DOCUMENT) { - if (parser.getEventType() != XmlPullParser.START_TAG) { - continue; - } - - String tag = parser.getName(); - String to = parser.getAttributeValue(null, "to"); - if (to != null && (tag.equals("message") || tag.equals("presence") || tag.equals("iq"))) { - JID jid = new JID(to); - if (jid.Host != null) { - if (jid.Host.equals(componentName)) { - if (tag.equals("message")) { - Message xmsg = Message.parse(parser, XMPPComponent.childParsers); - LOGGER.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); - JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); - if (jmsg != null) { - if (jid.Username != null && jid.Username.equals("recomm")) { - sendJuickRecommendation(jmsg); - } else { - if (jmsg.getRID() > 0) { - sendJuickComment(jmsg); - } else if (jmsg.getMID() > 0) { - sendJuickMessage(jmsg); - } - } - } - } - } else if (jid.Host.endsWith(XMPPComponent.HOSTNAME) && (jid.Host.equals(XMPPComponent.HOSTNAME) || jid.Host.endsWith("." + XMPPComponent.HOSTNAME))) { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER: " + xml); - } else { - String xml = XmlUtils.parseToString(parser, true); - LOGGER.info("STREAM ROUTER (OUT): " + xml); - XMPPComponent.sendOut(jid.Host, xml); - } - } else { - LOGGER.info("STREAM ROUTER (NO TO): " + XmlUtils.parseToString(parser, true)); - } - } else { - LOGGER.info("STREAM ROUTER: " + XmlUtils.parseToString(parser, true)); - } - } - - LOGGER.warning("STREAM ROUTER FINISHED"); - } catch (Exception e) { - LOGGER.log(Level.SEVERE, "xmpp router exception", e); - } - } - - @Override - synchronized public void sendStanza(String xml) { - try { - writer.write(xml); - writer.flush(); + router = new StreamComponent(new JID(componentName), socket.getInputStream(), socket.getOutputStream(), password); + router.addChildParser(new JuickMessage()); + router.addListener((Stream.StreamListener) this); + router.addListener((Message.MessageListener) this); + router.addListener((Iq.IqListener) this); + router.startParsing(); } catch (IOException e) { - LOGGER.warning("STREAM ROUTER ERROR: " + xml); - LOGGER.warning("STREAM ROUTER ERROR: " + e.toString()); - System.exit(0); + logger.log(Level.SEVERE, "router failed", e); } } + public void closeConnection() throws IOException { + router.logoff(); + socket.close(); + } - public void sendJuickMessage(JuickMessage jmsg) throws Exception { + public void sendJuickMessage(JuickMessage jmsg) { List<String> jids; synchronized (XMPPComponent.sqlSync) { @@ -157,16 +79,16 @@ public class ConnectionRouter extends Connection implements Runnable { msg.addChild(oob); } - for (int i = 0; i < jids.size(); i++) { - msg.to = new JID(jids.get(i)); + for (String jid : jids) { + msg.to = new JID(jid); XMPPComponent.sendOut(msg); } } - public void sendJuickComment(JuickMessage jmsg) throws Exception { + public void sendJuickComment(JuickMessage jmsg) { List<String> jids; String replyQuote; - String replyTo; + String replyTo; synchronized (XMPPComponent.sqlSync) { jids = SubscriptionsQueries.getJIDSubscribedToComments(XMPPComponent.sql, jmsg.getMID(), jmsg.getUser().getUID()); @@ -188,8 +110,8 @@ public class ConnectionRouter extends Connection implements Runnable { msg.body = txt; msg.type = Message.Type.chat; msg.addChild(jmsg); - for (int i = 0; i < jids.size(); i++) { - msg.to = new JID(jids.get(i)); + for (String jid : jids) { + msg.to = new JID(jid); XMPPComponent.sendOut(msg); } } @@ -204,7 +126,7 @@ public class ConnectionRouter extends Connection implements Runnable { return quote; } - public void sendJuickRecommendation(JuickMessage recomm) throws Exception { + public void sendJuickRecommendation(JuickMessage recomm) { List<String> jids; JuickMessage jmsg; synchronized (XMPPComponent.sqlSync) { @@ -246,9 +168,60 @@ public class ConnectionRouter extends Connection implements Runnable { msg.addChild(oob); } - for (int i = 0; i < jids.size(); i++) { - msg.to = new JID(jids.get(i)); + for (String jid : jids) { + msg.to = new JID(jid); XMPPComponent.sendOut(msg); } } + + @Override + public boolean onIq(Iq iq) { + JID jid = iq.to; + if (!jid.Host.equals(componentName)) { + logger.info("STREAM ROUTER (IQ): " + iq.toString()); + XMPPComponent.sendOut(iq); + } + return false; + } + + @Override + public void onMessage(Message xmsg) { + logger.info("STREAM ROUTER (PROCESS): " + xmsg.toString()); + JuickMessage jmsg = (JuickMessage) xmsg.getChild(JuickMessage.XMLNS); + JID jid = xmsg.to; + if (jid.Host.equals(componentName)) { + if (jmsg != null) { + if (jid.Username != null && jid.Username.equals("recomm")) { + sendJuickRecommendation(jmsg); + } else { + if (jmsg.getRID() > 0) { + sendJuickComment(jmsg); + } else if (jmsg.getMID() > 0) { + sendJuickMessage(jmsg); + } + } + } + } else { + XMPPComponent.sendOut(xmsg); + } + } + + @Override + public void onPresence(Presence presence) { + JID jid = presence.to; + if (!jid.Host.equals(componentName)) { + logger.info("STREAM ROUTER (PRESENCE): " + presence.toString()); + XMPPComponent.sendOut(presence); + } + } + + @Override + public void onStreamReady() { + logger.info("STREAM ROUTER (READY)"); + } + + @Override + public void onStreamFail(Exception ex) { + logger.log(Level.SEVERE, "STREAM ROUTER (FAIL)", ex); + } } |