]>
Commit | Line | Data |
---|---|---|
99afa081 C |
1 | import { AsyncQueue, queue } from 'async' |
2 | import { logger } from '@server/helpers/logger' | |
3 | import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' | |
4 | import { MActorDefault, MActorSignature } from '@server/types/models' | |
5 | import { Activity } from '@shared/models' | |
6 | import { processActivities } from './process' | |
7 | import { StatsManager } from '../stat-manager' | |
8 | ||
9 | type QueueParam = { | |
10 | activities: Activity[] | |
11 | signatureActor?: MActorSignature | |
12 | inboxActor?: MActorDefault | |
13 | } | |
14 | ||
15 | class InboxManager { | |
16 | ||
17 | private static instance: InboxManager | |
18 | ||
19 | private readonly inboxQueue: AsyncQueue<QueueParam> | |
20 | ||
21 | private messagesProcessed = 0 | |
22 | ||
23 | private constructor () { | |
24 | this.inboxQueue = queue<QueueParam, Error>((task, cb) => { | |
25 | const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor } | |
26 | ||
27 | this.messagesProcessed++ | |
28 | ||
29 | processActivities(task.activities, options) | |
30 | .then(() => cb()) | |
31 | .catch(err => { | |
32 | logger.error('Error in process activities.', { err }) | |
33 | cb() | |
34 | }) | |
35 | }) | |
36 | ||
37 | setInterval(() => { | |
38 | StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.inboxQueue.length()) | |
39 | }, SCHEDULER_INTERVALS_MS.updateInboxStats) | |
40 | } | |
41 | ||
42 | addInboxMessage (options: QueueParam) { | |
43 | this.inboxQueue.push(options) | |
44 | } | |
45 | ||
46 | static get Instance () { | |
47 | return this.instance || (this.instance = new this()) | |
48 | } | |
49 | } | |
50 | ||
51 | // --------------------------------------------------------------------------- | |
52 | ||
53 | export { | |
54 | InboxManager | |
55 | } |