1 import { queue, QueueObject } from 'async'
2 import { logger } from '@server/helpers/logger'
3 import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
4 import { MActorDefault, MActorSignature } from '@server/types/models'
5 import { Activity } from '@shared/models'
6 import { StatsManager } from '../stat-manager'
7 import { processActivities } from './process'
10 activities: Activity[]
11 signatureActor?: MActorSignature
12 inboxActor?: MActorDefault
17 private static instance: InboxManager
19 private readonly inboxQueue: QueueObject<QueueParam>
21 private constructor () {
22 this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
23 const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
25 processActivities(task.activities, options)
28 logger.error('Error in process activities.', { err })
34 StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
35 }, SCHEDULER_INTERVALS_MS.updateInboxStats)
38 addInboxMessage (options: QueueParam) {
39 this.inboxQueue.push(options)
40 .catch(err => logger.error('Cannot add options in inbox queue.', { options, err }))
43 getActivityPubMessagesWaiting () {
44 return this.inboxQueue.length() + this.inboxQueue.running()
47 static get Instance () {
48 return this.instance || (this.instance = new this())
52 // ---------------------------------------------------------------------------