X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Factivitypub%2Finbox-manager.ts;h=f2785d6ce4df27742fdfeb26bcd9368b13e0c092;hb=082d32eb8873190e48329b61b91f87d71f3cf812;hp=19e112f919743b9d9892f052e847bd953138f7f6;hpb=99afa081bc6ae7f34b2105075bd43e3625434fa8;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts index 19e112f91..f2785d6ce 100644 --- a/server/lib/activitypub/inbox-manager.ts +++ b/server/lib/activitypub/inbox-manager.ts @@ -1,10 +1,10 @@ -import { AsyncQueue, queue } from 'async' +import { queue, QueueObject } from 'async' import { logger } from '@server/helpers/logger' import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' import { MActorDefault, MActorSignature } from '@server/types/models' import { Activity } from '@shared/models' -import { processActivities } from './process' import { StatsManager } from '../stat-manager' +import { processActivities } from './process' type QueueParam = { activities: Activity[] @@ -16,16 +16,12 @@ class InboxManager { private static instance: InboxManager - private readonly inboxQueue: AsyncQueue - - private messagesProcessed = 0 + private readonly inboxQueue: QueueObject private constructor () { this.inboxQueue = queue((task, cb) => { const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor } - this.messagesProcessed++ - processActivities(task.activities, options) .then(() => cb()) .catch(err => { @@ -35,12 +31,17 @@ class InboxManager { }) setInterval(() => { - StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.inboxQueue.length()) - }, SCHEDULER_INTERVALS_MS.updateInboxStats) + StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting()) + }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS) } addInboxMessage (options: QueueParam) { this.inboxQueue.push(options) + .catch(err => logger.error('Cannot add options in inbox queue.', { options, err })) + } + + getActivityPubMessagesWaiting () { + return this.inboxQueue.length() + this.inboxQueue.running() } static get Instance () {