diff options
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/activitypub/inbox-manager.ts | 37 |
1 files changed, 14 insertions, 23 deletions
diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts index f2785d6ce..27778cc9d 100644 --- a/server/lib/activitypub/inbox-manager.ts +++ b/server/lib/activitypub/inbox-manager.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { queue, QueueObject } from 'async' | 1 | import PQueue from 'p-queue' |
2 | import { logger } from '@server/helpers/logger' | 2 | import { logger } from '@server/helpers/logger' |
3 | import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' | 3 | import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' |
4 | import { MActorDefault, MActorSignature } from '@server/types/models' | 4 | import { MActorDefault, MActorSignature } from '@server/types/models' |
@@ -6,42 +6,33 @@ import { Activity } from '@shared/models' | |||
6 | import { StatsManager } from '../stat-manager' | 6 | import { StatsManager } from '../stat-manager' |
7 | import { processActivities } from './process' | 7 | import { processActivities } from './process' |
8 | 8 | ||
9 | type QueueParam = { | ||
10 | activities: Activity[] | ||
11 | signatureActor?: MActorSignature | ||
12 | inboxActor?: MActorDefault | ||
13 | } | ||
14 | |||
15 | class InboxManager { | 9 | class InboxManager { |
16 | 10 | ||
17 | private static instance: InboxManager | 11 | private static instance: InboxManager |
18 | 12 | private readonly inboxQueue: PQueue | |
19 | private readonly inboxQueue: QueueObject<QueueParam> | ||
20 | 13 | ||
21 | private constructor () { | 14 | private constructor () { |
22 | this.inboxQueue = queue<QueueParam, Error>((task, cb) => { | 15 | this.inboxQueue = new PQueue({ concurrency: 1 }) |
23 | const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor } | ||
24 | |||
25 | processActivities(task.activities, options) | ||
26 | .then(() => cb()) | ||
27 | .catch(err => { | ||
28 | logger.error('Error in process activities.', { err }) | ||
29 | cb() | ||
30 | }) | ||
31 | }) | ||
32 | 16 | ||
33 | setInterval(() => { | 17 | setInterval(() => { |
34 | StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting()) | 18 | StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting()) |
35 | }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS) | 19 | }, SCHEDULER_INTERVALS_MS.UPDATE_INBOX_STATS) |
36 | } | 20 | } |
37 | 21 | ||
38 | addInboxMessage (options: QueueParam) { | 22 | addInboxMessage (param: { |
39 | this.inboxQueue.push(options) | 23 | activities: Activity[] |
40 | .catch(err => logger.error('Cannot add options in inbox queue.', { options, err })) | 24 | signatureActor?: MActorSignature |
25 | inboxActor?: MActorDefault | ||
26 | }) { | ||
27 | this.inboxQueue.add(() => { | ||
28 | const options = { signatureActor: param.signatureActor, inboxActor: param.inboxActor } | ||
29 | |||
30 | return processActivities(param.activities, options) | ||
31 | }).catch(err => logger.error('Error with inbox queue.', { err })) | ||
41 | } | 32 | } |
42 | 33 | ||
43 | getActivityPubMessagesWaiting () { | 34 | getActivityPubMessagesWaiting () { |
44 | return this.inboxQueue.length() + this.inboxQueue.running() | 35 | return this.inboxQueue.size + this.inboxQueue.pending |
45 | } | 36 | } |
46 | 37 | ||
47 | static get Instance () { | 38 | static get Instance () { |