From 51bfc341be1975b7a11e0b3a59cfbb4710e78446 Mon Sep 17 00:00:00 2001 From: Vitaly Takmazov Date: Wed, 4 Oct 2017 15:31:44 +0300 Subject: juick-xmpp-wip: router component --- .../main/java/com/juick/components/XMPPRouter.java | 198 +++++++++++++++++++++ 1 file changed, 198 insertions(+) create mode 100644 juick-xmpp-wip/src/main/java/com/juick/components/XMPPRouter.java (limited to 'juick-xmpp-wip/src/main/java/com/juick/components/XMPPRouter.java') diff --git a/juick-xmpp-wip/src/main/java/com/juick/components/XMPPRouter.java b/juick-xmpp-wip/src/main/java/com/juick/components/XMPPRouter.java new file mode 100644 index 00000000..a39358c5 --- /dev/null +++ b/juick-xmpp-wip/src/main/java/com/juick/components/XMPPRouter.java @@ -0,0 +1,198 @@ +/* + * Copyright (C) 2008-2017, Juick + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.juick.components; + +import com.juick.components.s2s.BasicXmppSession; +import com.juick.xmpp.Message; +import com.juick.xmpp.StreamComponentServer; +import com.juick.xmpp.StreamListener; +import com.juick.xmpp.extensions.JuickMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.xmlpull.v1.XmlPullParserException; +import rocks.xmpp.addr.Jid; +import rocks.xmpp.core.stanza.model.Stanza; +import rocks.xmpp.util.XmppUtils; + +import javax.annotation.PostConstruct; +import javax.inject.Inject; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; +import java.io.IOException; +import java.io.StringReader; +import java.io.StringWriter; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; + +/** + * @author ugnich + */ +@Component +public class XMPPRouter implements Message.MessageListener, AutoCloseable, StreamListener { + private static final Logger logger = LoggerFactory.getLogger(XMPPRouter.class); + + @Inject + private ExecutorService service; + + private final List connections = Collections.synchronizedList(new ArrayList<>()); + + private ServerSocket listener; + + @Inject + private BasicXmppSession session; + + @Value("${router_port:5347}") + private int routerPort; + + @PostConstruct + public void init() { + + logger.info("component router initialized"); + service.submit(() -> { + try { + listener = new ServerSocket(routerPort); + logger.info("component router listening on {}", routerPort); + while (!listener.isClosed()) { + if (Thread.currentThread().isInterrupted()) break; + Socket socket = listener.accept(); + service.submit(() -> { + try { + StreamComponentServer client = new StreamComponentServer(socket.getInputStream(), socket.getOutputStream(), "secret"); + addConnectionIn(client); + client.addChildParser(new JuickMessage()); + client.addListener((Message.MessageListener) this); + client.addListener((StreamListener) this); + client.connect(); + } catch (IOException e) { + logger.error("component error", e); + } catch (XmlPullParserException e) { + e.printStackTrace(); + } + }); + } + } catch (SocketException e) { + // shutdown + } catch (IOException e) { + logger.warn("io exception", e); + } + }); + } + + @Override + public void close() throws Exception { + if (!listener.isClosed()) { + listener.close(); + } + synchronized (getConnections()) { + for (Iterator i = getConnections().iterator(); i.hasNext(); ) { + StreamComponentServer c = i.next(); + c.logoff(); + i.remove(); + } + } + service.shutdown(); + logger.info("XMPP router destroyed"); + } + + private void addConnectionIn(StreamComponentServer c) { + synchronized (getConnections()) { + getConnections().add(c); + } + } + + private void sendOut(Stanza s) { + try { + StringWriter stanzaWriter = new StringWriter(); + XMLStreamWriter xmppStreamWriter = XmppUtils.createXmppStreamWriter( + session.getConfiguration().getXmlOutputFactory().createXMLStreamWriter(stanzaWriter)); + session.createMarshaller().marshal(s, xmppStreamWriter); + xmppStreamWriter.flush(); + xmppStreamWriter.close(); + String xml = stanzaWriter.toString(); + logger.info("XMPPRouter (out): {}", xml); + sendOut(s.getTo().getDomain(), xml); + } catch (XMLStreamException | JAXBException e1) { + logger.info("jaxb exception", e1); + } + } + + private void sendOut(String hostname, String xml) { + boolean haveAnyConn = false; + + StreamComponentServer connOut = null; + synchronized (getConnections()) { + for (StreamComponentServer c : getConnections()) { + if (c.to != null && c.to.getDomain().equals(hostname)) { + if (c.isLoggedIn()) { + connOut = c; + break; + } + } + } + } + if (connOut != null) { + connOut.send(xml); + return; + } + logger.error("component unavailable: {}", hostname); + + } + + private List getConnections() { + return connections; + } + + private Stanza parse(String xml) { + try { + Unmarshaller unmarshaller = session.createUnmarshaller(); + return (Stanza)unmarshaller.unmarshal(new StringReader(xml)); + } catch (JAXBException e) { + logger.error("JAXB exception", e); + } + return null; + } + @Override + public void onMessage(Message message) { + sendOut(parse(message.toString())); + } + + @Override + public void ready() { + + } + + @Override + public void fail(Exception e) { + + } + + @Override + public boolean filter(Jid jid, Jid jid1) { + return false; + } +} -- cgit v1.2.3