aboutsummaryrefslogtreecommitdiff
path: root/vnext/server/middleware/event.js
blob: 1267d1c42cddec280ff72b5357608478ef373445 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import { simpleParser } from 'mailparser';
import { isPM, isReply, isService } from '../common/MessageUtils';
import { sendTelegramNotification } from '../durov';
import { subscribers } from '../http';
import { sendNotification, buildNotification } from '../sender';
import debug from 'debug';
var log = debug('event');
import config from 'config';
import EventSource from 'eventsource';

const es = new EventSource(config.get('service.baseURL')+ '/api/events');
es.addEventListener('msg', (msg) => {
    log(msg.data);
    processMessageEvent(JSON.parse(msg.data));
});
es.addEventListener('read', (msg) => {
    log(msg);
});
es.addEventListener('open', () => {
    log('online');
});
es.onerror = (evt) => {
    log(`err: ${JSON.stringify(evt)}`);
};

/** @type {number[]} */
const allSandboxIds = [];

/**
 * handle message event
 *
 * @param {import('../../client').Message} msg message
 */
function processMessageEvent(msg) {
    let params = {};
    params.uid = isPM(msg) ? msg.to.uid : msg.user.uid;
    if (isReply(msg)) {
        params.mid = msg.mid;
        params.rid = msg.rid;
    } else if (!isPM(msg) && !isService(msg)) {
        params.mid = msg.mid;
    }
    subscribers(new URLSearchParams(JSON.parse(JSON.stringify(params)))).then(users => {
        users.forEach(user => {
            log(`${user.uname}: ${user.unreadCount}`);
            let [sandboxTokens, productionTokens] = (user.tokens || [])
                .filter(t => ['mpns', 'apns', 'gcm'].includes(t.type))
                .map(t => t.token)
                .reduce((result, element, i) => {
                    allSandboxIds.includes(user.uid)
                        ? result[0].push(element)
                        : result[1].push(element);
                    return result;
                }, [[], []]);
            sendNotification(productionTokens, sandboxTokens, buildNotification(user, msg));
            let durovIds = (user.tokens || [])
                .filter(t => ['durov'].includes(t.type))
                .map(t => t.token);
            sendTelegramNotification(msg, durovIds);
        });
    }).catch(log);
}

/**
 * Handle new events
 *
 * @type {import('express').RequestParamHandler}
 */
export default function handleMessage(req, res) {
    return simpleParser(req.body, {})
        .then(parsed => {
            const new_version = parsed.headers.get('x-event-version') == '1.0';
            log(`New event: ${parsed.text}, new version: ${new_version}`);
            if (new_version) {
                /** @type {import('../../client').SystemEvent} */
                const event = JSON.parse(parsed.text || '');
                if (event.type === 'message' && event.message) {
                    if (event.message.service) {
                        // TODO: remove
                        let msg = { ...event.message };
                        if (event.from) {
                            msg.user = event.from;
                        }
                        processMessageEvent(msg);
                    } else {
                        processMessageEvent(event.message);
                    }

                }
            } else {
                /** @type {import('../../client').Message} */
                const msg = JSON.parse(parsed.text || '');
                processMessageEvent(msg);
            }
            res.end();
        })
        .catch(err => { log(err); res.status(400).send('Invalid request'); });
}