diff options
Diffstat (limited to 'juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java')
-rw-r--r-- | juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java | 189 |
1 files changed, 189 insertions, 0 deletions
diff --git a/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java b/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java new file mode 100644 index 00000000..6edecf05 --- /dev/null +++ b/juick-server/src/main/java/com/juick/server/xmpp/router/XMPPRouter.java @@ -0,0 +1,189 @@ +package com.juick.server.xmpp.router; + +import com.juick.server.XMPPServer; +import com.juick.server.xmpp.s2s.BasicXmppSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.xmlpull.v1.XmlPullParserException; +import rocks.xmpp.addr.Jid; +import rocks.xmpp.core.stanza.model.IQ; +import rocks.xmpp.core.stanza.model.Message; +import rocks.xmpp.core.stanza.model.Presence; +import rocks.xmpp.core.stanza.model.Stanza; +import rocks.xmpp.core.stanza.model.server.ServerIQ; +import rocks.xmpp.core.stanza.model.server.ServerMessage; +import rocks.xmpp.core.stanza.model.server.ServerPresence; +import rocks.xmpp.util.XmppUtils; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; + +@Component +public class XMPPRouter implements StreamHandler { + private static final Logger logger = LoggerFactory.getLogger(XMPPRouter.class); + + @Inject + private ExecutorService service; + + private final List<StreamComponentServer> connections = Collections.synchronizedList(new ArrayList<>()); + + private ServerSocket listener; + + @Inject + private BasicXmppSession session; + + @Value("${router_port:5347}") + private int routerPort; + + @Inject + private XMPPServer xmppServer; + + @PostConstruct + public void init() { + logger.info("component router initialized"); + service.submit(() -> { + try { + listener = new ServerSocket(routerPort); + logger.info("component router listening on {}", routerPort); + while (!listener.isClosed()) { + if (Thread.currentThread().isInterrupted()) break; + Socket socket = listener.accept(); + service.submit(() -> { + try { + StreamComponentServer client = new StreamComponentServer(socket.getInputStream(), socket.getOutputStream(), "secret"); + addConnectionIn(client); + client.setHandler(this); + client.connect(); + } catch (IOException e) { + logger.error("component error", e); + } catch (XmlPullParserException e) { + e.printStackTrace(); + } + }); + } + } catch (SocketException e) { + // shutdown + } catch (IOException e) { + logger.warn("io exception", e); + } + }); + } + + @PreDestroy + public void close() throws Exception { + if (!listener.isClosed()) { + listener.close(); + } + synchronized (getConnections()) { + for (Iterator<StreamComponentServer> i = getConnections().iterator(); i.hasNext(); ) { + StreamComponentServer c = i.next(); + c.logoff(); + i.remove(); + } + } + service.shutdown(); + logger.info("XMPP router destroyed"); + } + + private void addConnectionIn(StreamComponentServer c) { + synchronized (getConnections()) { + getConnections().add(c); + } + } + + private void sendOut(Stanza s) { + try { + StringWriter stanzaWriter = new StringWriter(); + XMLStreamWriter xmppStreamWriter = XmppUtils.createXmppStreamWriter( + session.getConfiguration().getXmlOutputFactory().createXMLStreamWriter(stanzaWriter)); + session.createMarshaller().marshal(s, xmppStreamWriter); + xmppStreamWriter.flush(); + xmppStreamWriter.close(); + String xml = stanzaWriter.toString(); + logger.info("XMPPRouter (out): {}", xml); + sendOut(s.getTo().getDomain(), xml); + } catch (XMLStreamException | JAXBException e1) { + logger.info("jaxb exception", e1); + } + } + + private void sendOut(String hostname, String xml) { + boolean haveAnyConn = false; + + StreamComponentServer connOut = null; + synchronized (getConnections()) { + for (StreamComponentServer c : getConnections()) { + if (c.to != null && c.to.getDomain().equals(hostname)) { + if (c.isLoggedIn()) { + connOut = c; + break; + } + } + } + } + if (connOut != null) { + connOut.send(xml); + return; + } + xmppServer.sendOut(Jid.of(hostname), xml); + + } + + public List<StreamComponentServer> getConnections() { + return connections; + } + + private Stanza parse(String xml) { + try { + Unmarshaller unmarshaller = session.createUnmarshaller(); + return (Stanza)unmarshaller.unmarshal(new StringReader(xml)); + } catch (JAXBException e) { + logger.error("JAXB exception", e); + } + return null; + } + @Override + public void stanzaReceived(String stanza) { + Stanza input = parse(stanza); + if (input instanceof Message) { + sendOut(ServerMessage.from((Message)input)); + } else if (input instanceof IQ) { + sendOut(ServerIQ.from((IQ)input)); + } else { + sendOut(ServerPresence.from((Presence) input)); + } + } + + @Override + public void ready() { + + } + + @Override + public void fail(Exception e) { + + } + + @Override + public boolean filter(Jid jid, Jid jid1) { + return false; + } +}
\ No newline at end of file |