1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
import { simpleParser } from 'mailparser';
import { isPM, isReply, isService } from '../common/MessageUtils';
import { sendTelegramNotification } from '../durov';
import { subscribers } from '../http';
import { sendNotification, buildNotification } from '../sender';
import debug from 'debug';
var log = debug('event');
import config from 'config';
import EventSource from 'eventsource';
const es = new EventSource(config.get('service.baseURL')+ '/api/events');
es.addEventListener('msg', (msg) => {
log(msg.data);
processMessageEvent(JSON.parse(msg.data));
});
es.addEventListener('read', (msg) => {
log(msg);
});
es.addEventListener('open', () => {
log('online');
});
es.onerror = (evt) => {
log(`err: ${JSON.stringify(evt)}`);
};
/** @type {number[]} */
const allSandboxIds = [];
/**
* handle message event
*
* @param {import('../../client').Message} msg message
*/
function processMessageEvent(msg) {
let params = {};
params.uid = isPM(msg) ? msg.to.uid : msg.user.uid;
if (isReply(msg)) {
params.mid = msg.mid;
params.rid = msg.rid;
} else if (!isPM(msg) && !isService(msg)) {
params.mid = msg.mid;
}
subscribers(new URLSearchParams(JSON.parse(JSON.stringify(params)))).then(users => {
users.forEach(user => {
log(`${user.uname}: ${user.unreadCount}`);
let [sandboxTokens, productionTokens] = (user.tokens || [])
.filter(t => ['mpns', 'apns', 'fcm'].includes(t.type))
.map(t => t.token)
.reduce((result, element, i) => {
allSandboxIds.includes(user.uid)
? result[0].push(element)
: result[1].push(element);
return result;
}, [[], []]);
sendNotification(productionTokens, sandboxTokens, buildNotification(user, msg));
let durovIds = (user.tokens || [])
.filter(t => ['durov'].includes(t.type))
.map(t => t.token);
sendTelegramNotification(msg, durovIds);
});
}).catch(log);
}
/**
* Handle new events
*
* @type {import('express').RequestParamHandler}
*/
export default function handleMessage(req, res) {
return simpleParser(req.body, {})
.then(parsed => {
const new_version = parsed.headers.get('x-event-version') == '1.0';
log(`New event: ${parsed.text}, new version: ${new_version}`);
if (new_version) {
/** @type {import('../../client').SystemEvent} */
const event = JSON.parse(parsed.text || '');
if (event.type === 'message' && event.message) {
if (event.message.service) {
// TODO: remove
let msg = { ...event.message };
if (event.from) {
msg.user = event.from;
}
processMessageEvent(msg);
} else {
processMessageEvent(event.message);
}
}
} else {
/** @type {import('../../client').Message} */
const msg = JSON.parse(parsed.text || '');
processMessageEvent(msg);
}
res.end();
})
.catch(err => { log(err); res.status(400).send('Invalid request'); });
}
|