]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/activitypub/inbox-manager.ts
Refactor playlist creation for lives
[github/Chocobozzz/PeerTube.git] / server / lib / activitypub / inbox-manager.ts
CommitLineData
7a9e420a 1import PQueue from 'p-queue'
99afa081
C
2import { logger } from '@server/helpers/logger'
3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
4import { MActorDefault, MActorSignature } from '@server/types/models'
5import { Activity } from '@shared/models'
99afa081 6import { StatsManager } from '../stat-manager'
543442a3 7import { processActivities } from './process'
99afa081 8
99afa081
C
9class InboxManager {
10
11 private static instance: InboxManager
7a9e420a 12 private readonly inboxQueue: PQueue
99afa081
C
13
14 private constructor () {
7a9e420a 15 this.inboxQueue = new PQueue({ concurrency: 1 })
99afa081
C
16
17 setInterval(() => {
543442a3 18 StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
61953742 19 }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS)
99afa081
C
20 }
21
7a9e420a
C
22 addInboxMessage (param: {
23 activities: Activity[]
24 signatureActor?: MActorSignature
25 inboxActor?: MActorDefault
26 }) {
27 this.inboxQueue.add(() => {
28 const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor }
29
30 return processActivities(param.activities, options)
31 }).catch(err => logger.error('Error with inbox queue.', { err }))
99afa081
C
32 }
33
fae6e4da 34 getActivityPubMessagesWaiting () {
7a9e420a 35 return this.inboxQueue.size + this.inboxQueue.pending
fae6e4da
C
36 }
37
99afa081
C
38 static get Instance () {
39 return this.instance || (this.instance = new this())
40 }
41}
42
43// ---------------------------------------------------------------------------
44
45export {
46 InboxManager
47}