aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/activitypub/inbox-manager.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/activitypub/inbox-manager.ts')
-rw-r--r--server/lib/activitypub/inbox-manager.ts12
1 files changed, 4 insertions, 8 deletions
diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts
index 18ae49532..282e7ce66 100644
--- a/server/lib/activitypub/inbox-manager.ts
+++ b/server/lib/activitypub/inbox-manager.ts
@@ -1,10 +1,10 @@
1import { AsyncQueue, queue } from 'async' 1import { queue, QueueObject } from 'async'
2import { logger } from '@server/helpers/logger' 2import { logger } from '@server/helpers/logger'
3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' 3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
4import { MActorDefault, MActorSignature } from '@server/types/models' 4import { MActorDefault, MActorSignature } from '@server/types/models'
5import { Activity } from '@shared/models' 5import { Activity } from '@shared/models'
6import { processActivities } from './process'
7import { StatsManager } from '../stat-manager' 6import { StatsManager } from '../stat-manager'
7import { processActivities } from './process'
8 8
9type QueueParam = { 9type QueueParam = {
10 activities: Activity[] 10 activities: Activity[]
@@ -16,16 +16,12 @@ class InboxManager {
16 16
17 private static instance: InboxManager 17 private static instance: InboxManager
18 18
19 private readonly inboxQueue: AsyncQueue<QueueParam> 19 private readonly inboxQueue: QueueObject<QueueParam>
20
21 private messagesProcessed = 0
22 20
23 private constructor () { 21 private constructor () {
24 this.inboxQueue = queue<QueueParam, Error>((task, cb) => { 22 this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
25 const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor } 23 const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
26 24
27 this.messagesProcessed++
28
29 processActivities(task.activities, options) 25 processActivities(task.activities, options)
30 .then(() => cb()) 26 .then(() => cb())
31 .catch(err => { 27 .catch(err => {
@@ -35,7 +31,7 @@ class InboxManager {
35 }) 31 })
36 32
37 setInterval(() => { 33 setInterval(() => {
38 StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.getActivityPubMessagesWaiting()) 34 StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
39 }, SCHEDULER_INTERVALS_MS.updateInboxStats) 35 }, SCHEDULER_INTERVALS_MS.updateInboxStats)
40 } 36 }
41 37