package com.juick.server; import com.juick.server.xmpp.s2s.BasicXmppSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.xmlpull.v1.XmlPullParserException; import rocks.xmpp.addr.Jid; import rocks.xmpp.core.stanza.model.Stanza; import rocks.xmpp.util.XmppUtils; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamWriter; import java.io.IOException; import java.io.StringReader; import java.io.StringWriter; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; @Component public class XMPPRouter implements StreamHandler { private static final Logger logger = LoggerFactory.getLogger(XMPPRouter.class); @Inject private ExecutorService service; private final List connections = Collections.synchronizedList(new ArrayList<>()); private ServerSocket listener; @Inject private BasicXmppSession session; @Value("${router_port:5347}") private int routerPort; @PostConstruct public void init() { logger.info("component router initialized"); service.submit(() -> { try { listener = new ServerSocket(routerPort); logger.info("component router listening on {}", routerPort); while (!listener.isClosed()) { if (Thread.currentThread().isInterrupted()) break; Socket socket = listener.accept(); service.submit(() -> { try { StreamComponentServer client = new StreamComponentServer(socket.getInputStream(), socket.getOutputStream(), "secret"); addConnectionIn(client); client.setHandler(this); client.connect(); } catch (IOException e) { logger.error("component error", e); } catch (XmlPullParserException e) { e.printStackTrace(); } }); } } catch (SocketException e) { // shutdown } catch (IOException e) { logger.warn("io exception", e); } }); } @PreDestroy public void close() throws Exception { if (!listener.isClosed()) { listener.close(); } synchronized (getConnections()) { for (Iterator i = getConnections().iterator(); i.hasNext(); ) { StreamComponentServer c = i.next(); c.logoff(); i.remove(); } } service.shutdown(); logger.info("XMPP router destroyed"); } private void addConnectionIn(StreamComponentServer c) { synchronized (getConnections()) { getConnections().add(c); } } private void sendOut(Stanza s) { try { StringWriter stanzaWriter = new StringWriter(); XMLStreamWriter xmppStreamWriter = XmppUtils.createXmppStreamWriter( session.getConfiguration().getXmlOutputFactory().createXMLStreamWriter(stanzaWriter)); session.createMarshaller().marshal(s, xmppStreamWriter); xmppStreamWriter.flush(); xmppStreamWriter.close(); String xml = stanzaWriter.toString(); logger.info("XMPPRouter (out): {}", xml); sendOut(s.getTo().getDomain(), xml); } catch (XMLStreamException | JAXBException e1) { logger.info("jaxb exception", e1); } } private void sendOut(String hostname, String xml) { boolean haveAnyConn = false; StreamComponentServer connOut = null; synchronized (getConnections()) { for (StreamComponentServer c : getConnections()) { if (c.to != null && c.to.getDomain().equals(hostname)) { if (c.isLoggedIn()) { connOut = c; break; } } } } if (connOut != null) { connOut.send(xml); return; } logger.error("component unavailable: {}", hostname); } public List getConnections() { return connections; } private Stanza parse(String xml) { try { Unmarshaller unmarshaller = session.createUnmarshaller(); return (Stanza)unmarshaller.unmarshal(new StringReader(xml)); } catch (JAXBException e) { logger.error("JAXB exception", e); } return null; } @Override public void stanzaReceived(String stanza) { sendOut(parse(stanza)); } @Override public void ready() { } @Override public void fail(Exception e) { } @Override public boolean filter(Jid jid, Jid jid1) { return false; } }