/*
* Copyright (C) 2008-2017, Juick
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package com.juick.components;
import com.juick.components.s2s.BasicXmppSession;
import com.juick.xmpp.Message;
import com.juick.xmpp.StreamComponentServer;
import com.juick.xmpp.StreamListener;
import com.juick.xmpp.extensions.JuickMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.xmlpull.v1.XmlPullParserException;
import rocks.xmpp.addr.Jid;
import rocks.xmpp.core.stanza.model.Stanza;
import rocks.xmpp.util.XmppUtils;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* @author ugnich
*/
@Component
public class XMPPRouter implements Message.MessageListener, AutoCloseable, StreamListener {
private static final Logger logger = LoggerFactory.getLogger(XMPPRouter.class);
@Inject
private ExecutorService service;
private final List connections = Collections.synchronizedList(new ArrayList<>());
private ServerSocket listener;
@Inject
private BasicXmppSession session;
@Value("${router_port:5347}")
private int routerPort;
@PostConstruct
public void init() {
logger.info("component router initialized");
service.submit(() -> {
try {
listener = new ServerSocket(routerPort);
logger.info("component router listening on {}", routerPort);
while (!listener.isClosed()) {
if (Thread.currentThread().isInterrupted()) break;
Socket socket = listener.accept();
service.submit(() -> {
try {
StreamComponentServer client = new StreamComponentServer(socket.getInputStream(), socket.getOutputStream(), "secret");
addConnectionIn(client);
client.addChildParser(new JuickMessage());
client.addListener((Message.MessageListener) this);
client.addListener((StreamListener) this);
client.connect();
} catch (IOException e) {
logger.error("component error", e);
} catch (XmlPullParserException e) {
e.printStackTrace();
}
});
}
} catch (SocketException e) {
// shutdown
} catch (IOException e) {
logger.warn("io exception", e);
}
});
}
@Override
public void close() throws Exception {
if (!listener.isClosed()) {
listener.close();
}
synchronized (getConnections()) {
for (Iterator i = getConnections().iterator(); i.hasNext(); ) {
StreamComponentServer c = i.next();
c.logoff();
i.remove();
}
}
service.shutdown();
logger.info("XMPP router destroyed");
}
private void addConnectionIn(StreamComponentServer c) {
synchronized (getConnections()) {
getConnections().add(c);
}
}
private void sendOut(Stanza s) {
try {
StringWriter stanzaWriter = new StringWriter();
XMLStreamWriter xmppStreamWriter = XmppUtils.createXmppStreamWriter(
session.getConfiguration().getXmlOutputFactory().createXMLStreamWriter(stanzaWriter));
session.createMarshaller().marshal(s, xmppStreamWriter);
xmppStreamWriter.flush();
xmppStreamWriter.close();
String xml = stanzaWriter.toString();
logger.info("XMPPRouter (out): {}", xml);
sendOut(s.getTo().getDomain(), xml);
} catch (XMLStreamException | JAXBException e1) {
logger.info("jaxb exception", e1);
}
}
private void sendOut(String hostname, String xml) {
boolean haveAnyConn = false;
StreamComponentServer connOut = null;
synchronized (getConnections()) {
for (StreamComponentServer c : getConnections()) {
if (c.to != null && c.to.getDomain().equals(hostname)) {
if (c.isLoggedIn()) {
connOut = c;
break;
}
}
}
}
if (connOut != null) {
connOut.send(xml);
return;
}
logger.error("component unavailable: {}", hostname);
}
private List getConnections() {
return connections;
}
private Stanza parse(String xml) {
try {
Unmarshaller unmarshaller = session.createUnmarshaller();
return (Stanza)unmarshaller.unmarshal(new StringReader(xml));
} catch (JAXBException e) {
logger.error("JAXB exception", e);
}
return null;
}
@Override
public void onMessage(Message message) {
sendOut(parse(message.toString()));
}
@Override
public void ready() {
}
@Override
public void fail(Exception e) {
}
@Override
public boolean filter(Jid jid, Jid jid1) {
return false;
}
}