X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Factivitypub%2Finbox-manager.ts;h=f2785d6ce4df27742fdfeb26bcd9368b13e0c092;hb=c5cadb2859050449596199090231d6e38bc4a571;hp=6d9bf7cf0c99c79bbf8b61b3a2a7bcdbdf518743;hpb=ba5a8d89bbf049e4afc41543bcc072cccdb02669;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts index 6d9bf7cf0..f2785d6ce 100644 --- a/server/lib/activitypub/inbox-manager.ts +++ b/server/lib/activitypub/inbox-manager.ts @@ -1,10 +1,10 @@ -import { AsyncQueue, queue } from 'async' +import { queue, QueueObject } from 'async' import { logger } from '@server/helpers/logger' import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' import { MActorDefault, MActorSignature } from '@server/types/models' import { Activity } from '@shared/models' -import { processActivities } from './process' import { StatsManager } from '../stat-manager' +import { processActivities } from './process' type QueueParam = { activities: Activity[] @@ -16,16 +16,12 @@ class InboxManager { private static instance: InboxManager - private readonly inboxQueue: AsyncQueue - - private messagesProcessed = 0 + private readonly inboxQueue: QueueObject private constructor () { this.inboxQueue = queue((task, cb) => { const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor } - this.messagesProcessed++ - processActivities(task.activities, options) .then(() => cb()) .catch(err => { @@ -35,8 +31,8 @@ class InboxManager { }) setInterval(() => { - StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.inboxQueue.length()) - }, SCHEDULER_INTERVALS_MS.updateInboxStats) + StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting()) + }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS) } addInboxMessage (options: QueueParam) { @@ -44,6 +40,10 @@ class InboxManager { .catch(err => logger.error('Cannot add options in inbox queue.', { options, err })) } + getActivityPubMessagesWaiting () { + return this.inboxQueue.length() + this.inboxQueue.running() + } + static get Instance () { return this.instance || (this.instance = new this()) }