X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Factivitypub%2Finbox-manager.ts;h=27778cc9dc85ecc2983657d9b807db51b0689285;hb=4638cd713dcdd007cd7f49b9a95fa62ac7823e7c;hp=18ae495325119034c34c62109f23453579a58f66;hpb=fae6e4da8f516a9d6c3bad9bf6f35811ccacbad8;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts index 18ae49532..27778cc9d 100644 --- a/server/lib/activitypub/inbox-manager.ts +++ b/server/lib/activitypub/inbox-manager.ts @@ -1,51 +1,38 @@ -import { AsyncQueue, queue } from 'async' +import PQueue from 'p-queue' import { logger } from '@server/helpers/logger' import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' import { MActorDefault, MActorSignature } from '@server/types/models' import { Activity } from '@shared/models' -import { processActivities } from './process' import { StatsManager } from '../stat-manager' - -type QueueParam = { - activities: Activity[] - signatureActor?: MActorSignature - inboxActor?: MActorDefault -} +import { processActivities } from './process' class InboxManager { private static instance: InboxManager - - private readonly inboxQueue: AsyncQueue - - private messagesProcessed = 0 + private readonly inboxQueue: PQueue private constructor () { - this.inboxQueue = queue((task, cb) => { - const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor } - - this.messagesProcessed++ - - processActivities(task.activities, options) - .then(() => cb()) - .catch(err => { - logger.error('Error in process activities.', { err }) - cb() - }) - }) + this.inboxQueue = new PQueue({ concurrency: 1 }) setInterval(() => { - StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.getActivityPubMessagesWaiting()) - }, SCHEDULER_INTERVALS_MS.updateInboxStats) + StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting()) + }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS) } - addInboxMessage (options: QueueParam) { - this.inboxQueue.push(options) - .catch(err => logger.error('Cannot add options in inbox queue.', { options, err })) + addInboxMessage (param: { + activities: Activity[] + signatureActor?: MActorSignature + inboxActor?: MActorDefault + }) { + this.inboxQueue.add(() => { + const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor } + + return processActivities(param.activities, options) + }).catch(err => logger.error('Error with inbox queue.', { err })) } getActivityPubMessagesWaiting () { - return this.inboxQueue.length() + this.inboxQueue.running() + return this.inboxQueue.size + this.inboxQueue.pending } static get Instance () {