/*
* Copyright (C) 2008-2017, Juick
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
package com.juick.components.s2s;
import com.juick.components.XMPPServer;
import com.juick.xmpp.extensions.StreamError;
import com.juick.xmpp.utils.XmlUtils;
import org.apache.commons.lang3.StringUtils;
import org.xmlpull.v1.XmlPullParser;
import rocks.xmpp.addr.Jid;
import rocks.xmpp.core.session.XmppSessionConfiguration;
import rocks.xmpp.core.stanza.model.Stanza;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSocket;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import java.io.EOFException;
import java.io.IOException;
import java.io.StringReader;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* @author ugnich
*/
public class ConnectionIn extends Connection implements Runnable {
final public List from = new ArrayList<>();
public long tsRemoteData = 0;
public long packetsRemote = 0;
XmppSessionConfiguration configuration;
public ConnectionIn(XMPPServer xmpp, Socket socket) throws Exception {
super(xmpp);
this.socket = socket;
restartParser();
}
protected Stanza parse(String xml) {
try {
Unmarshaller unmarshaller = xmpp.getSession().createUnmarshaller();
return (Stanza)unmarshaller.unmarshal(new StringReader(xml));
} catch (JAXBException e) {
logger.error("JAXB exception", e);
}
return null;
}
@Override
public void run() {
try {
parser.next(); // stream:stream
updateTsRemoteData();
if (!parser.getName().equals("stream")
|| !parser.getNamespace("stream").equals(NS_STREAM)) {
// || !parser.getAttributeValue(null, "version").equals("1.0")
// || !parser.getAttributeValue(null, "to").equals(Main.HOSTNAME)) {
throw new Exception(String.format("stream from %s invalid", socket.getRemoteSocketAddress()));
}
streamID = parser.getAttributeValue(null, "id");
if (streamID == null) {
streamID = UUID.randomUUID().toString();
}
boolean xmppversionnew = parser.getAttributeValue(null, "version") != null;
String from = parser.getAttributeValue(null, "from");
if (xmpp.bannedHosts.contains(from)) {
closeConnection();
return;
}
sendOpenStream(from, xmppversionnew);
while (parser.next() != XmlPullParser.END_DOCUMENT) {
updateTsRemoteData();
if (parser.getEventType() != XmlPullParser.START_TAG) {
continue;
}
logParser();
packetsRemote++;
String tag = parser.getName();
if (tag.equals("result") && parser.getNamespace().equals(NS_DB)) {
String dfrom = parser.getAttributeValue(null, "from");
String to = parser.getAttributeValue(null, "to");
logger.info("stream from {} to {} {} asking for dialback", dfrom, to, streamID);
if (dfrom.endsWith(xmpp.HOSTNAME) && (dfrom.equals(xmpp.HOSTNAME) || dfrom.endsWith("." + xmpp.HOSTNAME))) {
logger.warn("stream from {} is invalid", dfrom);
break;
}
if (to != null && to.equals(xmpp.HOSTNAME)) {
String dbKey = XmlUtils.getTagText(parser);
updateTsRemoteData();
xmpp.startDialback(dfrom, streamID, dbKey);
} else {
logger.warn("stream from " + dfrom + " " + streamID + " invalid to " + to);
break;
}
} else if (tag.equals("verify") && parser.getNamespace().equals(NS_DB)) {
String vfrom = parser.getAttributeValue(null, "from");
String vto = parser.getAttributeValue(null, "to");
String vid = parser.getAttributeValue(null, "id");
String vkey = XmlUtils.getTagText(parser);
updateTsRemoteData();
boolean valid = false;
if (vfrom != null && vto != null && vid != null && vkey != null) {
ConnectionOut c = xmpp.getConnectionOut(vfrom, false);
if (c == null) {
logger.warn("outgoing connection to {} not found", vfrom);
} else {
String dialbackKey = c.dbKey;
valid = vkey.equals(dialbackKey);
}
}
if (valid) {
sendStanza("");
logger.info("stream from {} {} dialback verify valid", vfrom, streamID);
} else {
sendStanza("");
logger.warn("stream from {} {} dialback verify invalid", vfrom, streamID);
}
} else if (tag.equals("presence") && checkFromTo(parser)) {
String xml = XmlUtils.parseToString(parser, false);
logger.info("stream {} presence: {}", streamID, xml);
xmpp.onStanzaReceived(parse(xml));
} else if (tag.equals("message") && checkFromTo(parser)) {
updateTsRemoteData();
String xml = XmlUtils.parseToString(parser, false);
logger.info("stream {} message: {}", streamID, xml);
xmpp.onStanzaReceived(parse(xml));
} else if (tag.equals("iq") && checkFromTo(parser)) {
updateTsRemoteData();
String type = parser.getAttributeValue(null, "type");
String xml = XmlUtils.parseToString(parser, false);
if (type == null || !type.equals("error")) {
logger.info("stream {} iq: {}", streamID, xml);
xmpp.getRouter().sendStanza(parse(xml));
}
} else if (sc != null && !isSecured() && tag.equals("starttls")) {
logger.info("stream {} securing", streamID);
sendStanza("");
try {
socket = sc.getSocketFactory().createSocket(socket, socket.getInetAddress().getHostAddress(),
socket.getPort(), true);
((SSLSocket) socket).setUseClientMode(false);
((SSLSocket) socket).startHandshake();
setSecured(true);
logger.info("stream {} secured", streamID);
restartParser();
} catch (SSLException sex) {
logger.warn("stream {} ssl error {}", streamID, sex);
sendStanza("");
xmpp.removeConnectionIn(this);
closeConnection();
}
} else if (isSecured() && tag.equals("stream") && parser.getNamespace().equals(NS_STREAM)) {
sendOpenStream(null, true);
} else if (tag.equals("error")) {
StreamError streamError = StreamError.parse(parser);
logger.warn("Stream error from {}: {}", streamID, streamError.getCondition());
xmpp.removeConnectionIn(this);
closeConnection();
} else {
String unhandledStanza = XmlUtils.parseToString(parser, true);
logger.warn("Unhandled stanza from {}: {}", streamID, unhandledStanza);
}
}
logger.warn("stream {} finished", streamID);
xmpp.removeConnectionIn(this);
closeConnection();
} catch (EOFException | SocketException ex) {
logger.info("stream {} closed (dirty)", streamID);
xmpp.removeConnectionIn(this);
closeConnection();
} catch (Exception e) {
logger.warn("stream {} error {}", streamID, e);
xmpp.removeConnectionIn(this);
closeConnection();
}
}
void updateTsRemoteData() {
tsRemoteData = System.currentTimeMillis();
}
void sendOpenStream(String from, boolean xmppversionnew) throws IOException {
String openStream = "";
if (xmppversionnew) {
openStream += "";
if (sc != null && !isSecured() && !xmpp.brokenSSLhosts.contains(from)) {
openStream += "";
}
openStream += "";
}
sendStanza(openStream);
}
public void sendDialbackResult(String sfrom, String type) {
sendStanza("");
if (type.equals("valid")) {
from.add(sfrom);
logger.info("stream from {} {} ready", sfrom, streamID);
}
}
boolean checkFromTo(XmlPullParser parser) throws Exception {
String cfrom = parser.getAttributeValue(null, "from");
String cto = parser.getAttributeValue(null, "to");
if (StringUtils.isNotEmpty(cfrom) && StringUtils.isNotEmpty(cto)) {
Jid jidto = Jid.of(cto);
if (jidto.getDomain().equals(xmpp.HOSTNAME)) {
Jid jidfrom = Jid.of(cfrom);
int size = from.size();
for (int i = 0; i < size; i++) {
if (from.get(i).equals(jidfrom.getDomain())) {
return true;
}
}
}
}
return false;
}
}