aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/activitypub/inbox-manager.ts37
1 files changed, 14 insertions, 23 deletions
diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts
index f2785d6ce..27778cc9d 100644
--- a/server/lib/activitypub/inbox-manager.ts
+++ b/server/lib/activitypub/inbox-manager.ts
@@ -1,4 +1,4 @@
1import { queue, QueueObject } from 'async' 1import PQueue from 'p-queue'
2import { logger } from '@server/helpers/logger' 2import { logger } from '@server/helpers/logger'
3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' 3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
4import { MActorDefault, MActorSignature } from '@server/types/models' 4import { MActorDefault, MActorSignature } from '@server/types/models'
@@ -6,42 +6,33 @@ import { Activity } from '@shared/models'
6import { StatsManager } from '../stat-manager' 6import { StatsManager } from '../stat-manager'
7import { processActivities } from './process' 7import { processActivities } from './process'
8 8
9type QueueParam = {
10 activities: Activity[]
11 signatureActor?: MActorSignature
12 inboxActor?: MActorDefault
13}
14
15class InboxManager { 9class InboxManager {
16 10
17 private static instance: InboxManager 11 private static instance: InboxManager
18 12 private readonly inboxQueue: PQueue
19 private readonly inboxQueue: QueueObject<QueueParam>
20 13
21 private constructor () { 14 private constructor () {
22 this.inboxQueue = queue<QueueParam, Error>((task, cb) => { 15 this.inboxQueue = new PQueue({ concurrency: 1 })
23 const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
24
25 processActivities(task.activities, options)
26 .then(() => cb())
27 .catch(err => {
28 logger.error('Error in process activities.', { err })
29 cb()
30 })
31 })
32 16
33 setInterval(() => { 17 setInterval(() => {
34 StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting()) 18 StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
35 }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS) 19 }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS)
36 } 20 }
37 21
38 addInboxMessage (options: QueueParam) { 22 addInboxMessage (param: {
39 this.inboxQueue.push(options) 23 activities: Activity[]
40 .catch(err => logger.error('Cannot add options in inbox queue.', { options, err })) 24 signatureActor?: MActorSignature
25 inboxActor?: MActorDefault
26 }) {
27 this.inboxQueue.add(() => {
28 const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor }
29
30 return processActivities(param.activities, options)
31 }).catch(err => logger.error('Error with inbox queue.', { err }))
41 } 32 }
42 33
43 getActivityPubMessagesWaiting () { 34 getActivityPubMessagesWaiting () {
44 return this.inboxQueue.length() + this.inboxQueue.running() 35 return this.inboxQueue.size + this.inboxQueue.pending
45 } 36 }
46 37
47 static get Instance () { 38 static get Instance () {