]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/activitypub/inbox-manager.ts
Prevent invalid watch sections
[github/Chocobozzz/PeerTube.git] / server / lib / activitypub / inbox-manager.ts
CommitLineData
543442a3 1import { queue, QueueObject } from 'async'
99afa081
C
2import { logger } from '@server/helpers/logger'
3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
4import { MActorDefault, MActorSignature } from '@server/types/models'
5import { Activity } from '@shared/models'
99afa081 6import { StatsManager } from '../stat-manager'
543442a3 7import { processActivities } from './process'
99afa081
C
8
9type QueueParam = {
10 activities: Activity[]
11 signatureActor?: MActorSignature
12 inboxActor?: MActorDefault
13}
14
15class InboxManager {
16
17 private static instance: InboxManager
18
543442a3 19 private readonly inboxQueue: QueueObject<QueueParam>
99afa081
C
20
21 private constructor () {
22 this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
23 const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
24
99afa081
C
25 processActivities(task.activities, options)
26 .then(() => cb())
27 .catch(err => {
28 logger.error('Error in process activities.', { err })
29 cb()
30 })
31 })
32
33 setInterval(() => {
543442a3 34 StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
61953742 35 }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS)
99afa081
C
36 }
37
38 addInboxMessage (options: QueueParam) {
39 this.inboxQueue.push(options)
ba5a8d89 40 .catch(err => logger.error('Cannot add options in inbox queue.', { options, err }))
99afa081
C
41 }
42
fae6e4da
C
43 getActivityPubMessagesWaiting () {
44 return this.inboxQueue.length() + this.inboxQueue.running()
45 }
46
99afa081
C
47 static get Instance () {
48 return this.instance || (this.instance = new this())
49 }
50}
51
52// ---------------------------------------------------------------------------
53
54export {
55 InboxManager
56}