package com.juick.server.xmpp.router; import com.juick.server.XMPPServer; import com.juick.server.xmpp.s2s.BasicXmppSession; import com.juick.server.xmpp.s2s.CacheEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.xmlpull.v1.XmlPullParserException; import rocks.xmpp.addr.Jid; import rocks.xmpp.core.stanza.model.IQ; import rocks.xmpp.core.stanza.model.Message; import rocks.xmpp.core.stanza.model.Presence; import rocks.xmpp.core.stanza.model.Stanza; import rocks.xmpp.core.stanza.model.server.ServerIQ; import rocks.xmpp.core.stanza.model.server.ServerMessage; import rocks.xmpp.core.stanza.model.server.ServerPresence; import rocks.xmpp.util.XmppUtils; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import javax.inject.Inject; import javax.xml.bind.JAXBException; import javax.xml.bind.Unmarshaller; import javax.xml.stream.XMLStreamException; import javax.xml.stream.XMLStreamWriter; import java.io.IOException; import java.io.StringReader; import java.io.StringWriter; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; @Component public class XMPPRouter implements StreamHandler { private static final Logger logger = LoggerFactory.getLogger("com.juick.server.xmpp"); @Inject private ExecutorService service; private final List connections = Collections.synchronizedList(new ArrayList<>()); private final List outCache = new CopyOnWriteArrayList<>(); private ServerSocket listener; @Inject private BasicXmppSession session; @Value("${router_port:5347}") private int routerPort; @Inject private XMPPServer xmppServer; @PostConstruct public void init() { logger.info("component router initialized"); service.submit(() -> { try { listener = new ServerSocket(routerPort); logger.info("component router listening on {}", routerPort); while (!listener.isClosed()) { if (Thread.currentThread().isInterrupted()) break; Socket socket = listener.accept(); service.submit(() -> { try { StreamComponentServer client = new StreamComponentServer(socket.getInputStream(), socket.getOutputStream(), "secret"); addConnectionIn(client); client.setHandler(this); client.connect(); } catch (IOException e) { logger.error("component error", e); } catch (XmlPullParserException e) { e.printStackTrace(); } }); } } catch (SocketException e) { // shutdown } catch (IOException e) { logger.warn("io exception", e); } }); } @PreDestroy public void close() throws Exception { if (!listener.isClosed()) { listener.close(); } synchronized (getConnections()) { for (Iterator i = getConnections().iterator(); i.hasNext(); ) { StreamComponentServer c = i.next(); c.logoff(); i.remove(); } } service.shutdown(); logger.info("XMPP router destroyed"); } private void addConnectionIn(StreamComponentServer c) { synchronized (getConnections()) { getConnections().add(c); } } private void sendOut(Stanza s) { try { StringWriter stanzaWriter = new StringWriter(); XMLStreamWriter xmppStreamWriter = XmppUtils.createXmppStreamWriter( session.getConfiguration().getXmlOutputFactory().createXMLStreamWriter(stanzaWriter)); session.createMarshaller().marshal(s, xmppStreamWriter); xmppStreamWriter.flush(); xmppStreamWriter.close(); String xml = stanzaWriter.toString(); logger.info("XMPPRouter (out): {}", xml); sendOut(s.getTo().getDomain(), xml); } catch (XMLStreamException | JAXBException e1) { logger.info("jaxb exception", e1); } } private void sendOut(String hostname, String xml) { boolean haveAnyConn = false; StreamComponentServer connOut = null; synchronized (getConnections()) { for (StreamComponentServer c : getConnections()) { if (c.to != null && c.to.getDomain().equals(hostname)) { if (c.isLoggedIn()) { connOut = c; break; } else { logger.info("bouncing stanza to {} component until it will be ready", hostname); boolean haveCache = false; for (CacheEntry entry : outCache) { if (entry.hostname != null && entry.hostname.equals(hostname)) { entry.xml += xml; entry.updated = Instant.now(); haveCache = true; break; } } if (!haveCache) { outCache.add(new CacheEntry(Jid.of(hostname), xml)); } } } } } if (connOut != null) { connOut.send(xml); return; } xmppServer.sendOut(Jid.of(hostname), xml); } public List getConnections() { return connections; } private Stanza parse(String xml) { try { Unmarshaller unmarshaller = session.createUnmarshaller(); return (Stanza)unmarshaller.unmarshal(new StringReader(xml)); } catch (JAXBException e) { logger.error("JAXB exception", e); } return null; } @Override public void stanzaReceived(String stanza) { Stanza input = parse(stanza); if (input instanceof Message) { sendOut(ServerMessage.from((Message)input)); } else if (input instanceof IQ) { sendOut(ServerIQ.from((IQ)input)); } else { sendOut(ServerPresence.from((Presence) input)); } } public String getFromCache(Jid to) { final String[] cache = new String[1]; outCache.stream().filter(c -> c.hostname != null && c.hostname.equals(to)).findFirst().ifPresent(c -> { cache[0] = c.xml; outCache.remove(c); }); return cache[0]; } @Override public void ready(StreamComponentServer componentServer) { logger.info("component {} ready", componentServer.to); String cache = getFromCache(componentServer.to); if (cache != null) { logger.debug("sending cache to {}", componentServer.to); componentServer.send(cache); } } @Override public void fail(Exception e) { } @Override public boolean filter(Jid jid, Jid jid1) { return false; } }