1 import PQueue from 'p-queue'
2 import { logger } from '@server/helpers/logger'
3 import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
4 import { MActorDefault, MActorSignature } from '@server/types/models'
5 import { Activity } from '@shared/models'
6 import { StatsManager } from '../stat-manager'
7 import { processActivities } from './process'
11 private static instance: InboxManager
12 private readonly inboxQueue: PQueue
14 private constructor () {
15 this.inboxQueue = new PQueue({ concurrency: 1 })
18 StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
19 }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS)
22 addInboxMessage (param: {
23 activities: Activity[]
24 signatureActor?: MActorSignature
25 inboxActor?: MActorDefault
27 this.inboxQueue.add(() => {
28 const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor }
30 return processActivities(param.activities, options)
31 }).catch(err => logger.error('Error with inbox queue.', { err }))
34 getActivityPubMessagesWaiting () {
35 return this.inboxQueue.size + this.inboxQueue.pending
38 static get Instance () {
39 return this.instance || (this.instance = new this())
43 // ---------------------------------------------------------------------------