]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blob - server/lib/activitypub/inbox-manager.ts
Try to speed up server tests
[github/Chocobozzz/PeerTube.git] / server / lib / activitypub / inbox-manager.ts
1 import { AsyncQueue, queue } from 'async'
2 import { logger } from '@server/helpers/logger'
3 import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
4 import { MActorDefault, MActorSignature } from '@server/types/models'
5 import { Activity } from '@shared/models'
6 import { processActivities } from './process'
7 import { StatsManager } from '../stat-manager'
8
9 type QueueParam = {
10 activities: Activity[]
11 signatureActor?: MActorSignature
12 inboxActor?: MActorDefault
13 }
14
15 class InboxManager {
16
17 private static instance: InboxManager
18
19 private readonly inboxQueue: AsyncQueue<QueueParam>
20
21 private messagesProcessed = 0
22
23 private constructor () {
24 this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
25 const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
26
27 this.messagesProcessed++
28
29 processActivities(task.activities, options)
30 .then(() => cb())
31 .catch(err => {
32 logger.error('Error in process activities.', { err })
33 cb()
34 })
35 })
36
37 setInterval(() => {
38 StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.getActivityPubMessagesWaiting())
39 }, SCHEDULER_INTERVALS_MS.updateInboxStats)
40 }
41
42 addInboxMessage (options: QueueParam) {
43 this.inboxQueue.push(options)
44 .catch(err => logger.error('Cannot add options in inbox queue.', { options, err }))
45 }
46
47 getActivityPubMessagesWaiting () {
48 return this.inboxQueue.length() + this.inboxQueue.running()
49 }
50
51 static get Instance () {
52 return this.instance || (this.instance = new this())
53 }
54 }
55
56 // ---------------------------------------------------------------------------
57
58 export {
59 InboxManager
60 }