]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/activitypub/inbox-manager.ts
Don't inject untrusted input
[github/Chocobozzz/PeerTube.git] / server / lib / activitypub / inbox-manager.ts
index 18ae495325119034c34c62109f23453579a58f66..27778cc9dc85ecc2983657d9b807db51b0689285 100644 (file)
@@ -1,51 +1,38 @@
-import { AsyncQueue, queue } from 'async'
+import PQueue from 'p-queue'
 import { logger } from '@server/helpers/logger'
 import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
 import { MActorDefault, MActorSignature } from '@server/types/models'
 import { Activity } from '@shared/models'
-import { processActivities } from './process'
 import { StatsManager } from '../stat-manager'
-
-type QueueParam = {
-  activities: Activity[]
-  signatureActor?: MActorSignature
-  inboxActor?: MActorDefault
-}
+import { processActivities } from './process'
 
 class InboxManager {
 
   private static instance: InboxManager
-
-  private readonly inboxQueue: AsyncQueue<QueueParam>
-
-  private messagesProcessed = 0
+  private readonly inboxQueue: PQueue
 
   private constructor () {
-    this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
-      const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
-
-      this.messagesProcessed++
-
-      processActivities(task.activities, options)
-        .then(() => cb())
-        .catch(err => {
-          logger.error('Error in process activities.', { err })
-          cb()
-        })
-    })
+    this.inboxQueue = new PQueue({ concurrency: 1 })
 
     setInterval(() => {
-      StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.getActivityPubMessagesWaiting())
-    }, SCHEDULER_INTERVALS_MS.updateInboxStats)
+      StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
+    }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS)
   }
 
-  addInboxMessage (options: QueueParam) {
-    this.inboxQueue.push(options)
-      .catch(err => logger.error('Cannot add options in inbox queue.', { options, err }))
+  addInboxMessage (param: {
+    activities: Activity[]
+    signatureActor?: MActorSignature
+    inboxActor?: MActorDefault
+  }) {
+    this.inboxQueue.add(() => {
+      const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor }
+
+      return processActivities(param.activities, options)
+    }).catch(err => logger.error('Error with inbox queue.', { err }))
   }
 
   getActivityPubMessagesWaiting () {
-    return this.inboxQueue.length() + this.inboxQueue.running()
+    return this.inboxQueue.size + this.inboxQueue.pending
   }
 
   static get Instance () {