import { simpleParser } from 'mailparser'; import { isPM, isReply, isService } from '../common/MessageUtils'; import { sendTelegramNotification } from '../durov'; import { subscribers } from '../http'; import { sendNotification, buildNotification } from '../sender'; import debug from 'debug'; var log = debug('event'); import config from 'config'; import EventSource from 'eventsource'; const es = new EventSource(config.get('service.baseURL')+ '/api/events'); es.addEventListener('msg', (msg) => { log(msg.data); processMessageEvent(JSON.parse(msg.data)); }); es.addEventListener('read', (msg) => { log(msg); }); es.addEventListener('open', () => { log('online'); }); es.onerror = (evt) => { log(`err: ${JSON.stringify(evt)}`); }; /** @type {number[]} */ const allSandboxIds = []; /** * handle message event * * @param {import('../../client').Message} msg message */ function processMessageEvent(msg) { let params = {}; params.uid = isPM(msg) ? msg.to.uid : msg.user.uid; if (isReply(msg)) { params.mid = msg.mid; params.rid = msg.rid; } else if (!isPM(msg) && !isService(msg)) { params.mid = msg.mid; } subscribers(new URLSearchParams(JSON.parse(JSON.stringify(params)))).then(users => { users.forEach(user => { log(`${user.uname}: ${user.unreadCount}`); let [sandboxTokens, productionTokens] = (user.tokens || []) .filter(t => ['mpns', 'apns', 'gcm'].includes(t.type)) .map(t => t.token) .reduce((result, element, i) => { allSandboxIds.includes(user.uid) ? result[0].push(element) : result[1].push(element); return result; }, [[], []]); sendNotification(productionTokens, sandboxTokens, buildNotification(user, msg)); let durovIds = (user.tokens || []) .filter(t => ['durov'].includes(t.type)) .map(t => t.token); sendTelegramNotification(msg, durovIds); }); }).catch(log); } /** * Handle new events * * @type {import('express').RequestParamHandler} */ export default function handleMessage(req, res) { return simpleParser(req.body, {}) .then(parsed => { const new_version = parsed.headers.get('x-event-version') == '1.0'; log(`New event: ${parsed.text}, new version: ${new_version}`); if (new_version) { /** @type {import('../../client').SystemEvent} */ const event = JSON.parse(parsed.text || ''); if (event.type === 'message' && event.message) { if (event.message.service) { // TODO: remove let msg = { ...event.message }; if (event.from) { msg.user = event.from; } processMessageEvent(msg); } else { processMessageEvent(event.message); } } } else { /** @type {import('../../client').Message} */ const msg = JSON.parse(parsed.text || ''); processMessageEvent(msg); } res.end(); }) .catch(err => { log(err); res.status(400).send('Invalid request'); }); }