diff options
Diffstat (limited to 'server/lib/activitypub')
-rw-r--r-- | server/lib/activitypub/inbox-manager.ts | 12 | ||||
-rw-r--r-- | server/lib/activitypub/process/process.ts | 5 |
2 files changed, 9 insertions, 8 deletions
diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts index 18ae49532..282e7ce66 100644 --- a/server/lib/activitypub/inbox-manager.ts +++ b/server/lib/activitypub/inbox-manager.ts | |||
@@ -1,10 +1,10 @@ | |||
1 | import { AsyncQueue, queue } from 'async' | 1 | import { queue, QueueObject } from 'async' |
2 | import { logger } from '@server/helpers/logger' | 2 | import { logger } from '@server/helpers/logger' |
3 | import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' | 3 | import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' |
4 | import { MActorDefault, MActorSignature } from '@server/types/models' | 4 | import { MActorDefault, MActorSignature } from '@server/types/models' |
5 | import { Activity } from '@shared/models' | 5 | import { Activity } from '@shared/models' |
6 | import { processActivities } from './process' | ||
7 | import { StatsManager } from '../stat-manager' | 6 | import { StatsManager } from '../stat-manager' |
7 | import { processActivities } from './process' | ||
8 | 8 | ||
9 | type QueueParam = { | 9 | type QueueParam = { |
10 | activities: Activity[] | 10 | activities: Activity[] |
@@ -16,16 +16,12 @@ class InboxManager { | |||
16 | 16 | ||
17 | private static instance: InboxManager | 17 | private static instance: InboxManager |
18 | 18 | ||
19 | private readonly inboxQueue: AsyncQueue<QueueParam> | 19 | private readonly inboxQueue: QueueObject<QueueParam> |
20 | |||
21 | private messagesProcessed = 0 | ||
22 | 20 | ||
23 | private constructor () { | 21 | private constructor () { |
24 | this.inboxQueue = queue<QueueParam, Error>((task, cb) => { | 22 | this.inboxQueue = queue<QueueParam, Error>((task, cb) => { |
25 | const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor } | 23 | const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor } |
26 | 24 | ||
27 | this.messagesProcessed++ | ||
28 | |||
29 | processActivities(task.activities, options) | 25 | processActivities(task.activities, options) |
30 | .then(() => cb()) | 26 | .then(() => cb()) |
31 | .catch(err => { | 27 | .catch(err => { |
@@ -35,7 +31,7 @@ class InboxManager { | |||
35 | }) | 31 | }) |
36 | 32 | ||
37 | setInterval(() => { | 33 | setInterval(() => { |
38 | StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.getActivityPubMessagesWaiting()) | 34 | StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting()) |
39 | }, SCHEDULER_INTERVALS_MS.updateInboxStats) | 35 | }, SCHEDULER_INTERVALS_MS.updateInboxStats) |
40 | } | 36 | } |
41 | 37 | ||
diff --git a/server/lib/activitypub/process/process.ts b/server/lib/activitypub/process/process.ts index e60dd2a5b..5cef75665 100644 --- a/server/lib/activitypub/process/process.ts +++ b/server/lib/activitypub/process/process.ts | |||
@@ -16,6 +16,7 @@ import { processFlagActivity } from './process-flag' | |||
16 | import { processViewActivity } from './process-view' | 16 | import { processViewActivity } from './process-view' |
17 | import { APProcessorOptions } from '../../../types/activitypub-processor.model' | 17 | import { APProcessorOptions } from '../../../types/activitypub-processor.model' |
18 | import { MActorDefault, MActorSignature } from '../../../types/models' | 18 | import { MActorDefault, MActorSignature } from '../../../types/models' |
19 | import { StatsManager } from '@server/lib/stat-manager' | ||
19 | 20 | ||
20 | const processActivity: { [ P in ActivityType ]: (options: APProcessorOptions<Activity>) => Promise<any> } = { | 21 | const processActivity: { [ P in ActivityType ]: (options: APProcessorOptions<Activity>) => Promise<any> } = { |
21 | Create: processCreateActivity, | 22 | Create: processCreateActivity, |
@@ -75,8 +76,12 @@ async function processActivities ( | |||
75 | 76 | ||
76 | try { | 77 | try { |
77 | await activityProcessor({ activity, byActor, inboxActor, fromFetch }) | 78 | await activityProcessor({ activity, byActor, inboxActor, fromFetch }) |
79 | |||
80 | StatsManager.Instance.addInboxProcessedSuccess(activity.type) | ||
78 | } catch (err) { | 81 | } catch (err) { |
79 | logger.warn('Cannot process activity %s.', activity.type, { err }) | 82 | logger.warn('Cannot process activity %s.', activity.type, { err }) |
83 | |||
84 | StatsManager.Instance.addInboxProcessedError(activity.type) | ||
80 | } | 85 | } |
81 | } | 86 | } |
82 | } | 87 | } |