aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/activitypub/inbox-manager.ts
blob: 27778cc9dc85ecc2983657d9b807db51b0689285 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import PQueue from 'p-queue'
import { logger } from '@server/helpers/logger'
import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
import { MActorDefault, MActorSignature } from '@server/types/models'
import { Activity } from '@shared/models'
import { StatsManager } from '../stat-manager'
import { processActivities } from './process'

class InboxManager {

  private static instance: InboxManager
  private readonly inboxQueue: PQueue

  private constructor () {
    this.inboxQueue = new PQueue({ concurrency: 1 })

    setInterval(() => {
      StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
    }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS)
  }

  addInboxMessage (param: {
    activities: Activity[]
    signatureActor?: MActorSignature
    inboxActor?: MActorDefault
  }) {
    this.inboxQueue.add(() => {
      const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor }

      return processActivities(param.activities, options)
    }).catch(err => logger.error('Error with inbox queue.', { err }))
  }

  getActivityPubMessagesWaiting () {
    return this.inboxQueue.size + this.inboxQueue.pending
  }

  static get Instance () {
    return this.instance || (this.instance = new this())
  }
}

// ---------------------------------------------------------------------------

export {
  InboxManager
}