aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/activitypub
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/activitypub')
-rw-r--r--server/lib/activitypub/inbox-manager.ts12
-rw-r--r--server/lib/activitypub/process/process.ts5
2 files changed, 9 insertions, 8 deletions
diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts
index 18ae49532..282e7ce66 100644
--- a/server/lib/activitypub/inbox-manager.ts
+++ b/server/lib/activitypub/inbox-manager.ts
@@ -1,10 +1,10 @@
1import { AsyncQueue, queue } from 'async' 1import { queue, QueueObject } from 'async'
2import { logger } from '@server/helpers/logger' 2import { logger } from '@server/helpers/logger'
3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants' 3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
4import { MActorDefault, MActorSignature } from '@server/types/models' 4import { MActorDefault, MActorSignature } from '@server/types/models'
5import { Activity } from '@shared/models' 5import { Activity } from '@shared/models'
6import { processActivities } from './process'
7import { StatsManager } from '../stat-manager' 6import { StatsManager } from '../stat-manager'
7import { processActivities } from './process'
8 8
9type QueueParam = { 9type QueueParam = {
10 activities: Activity[] 10 activities: Activity[]
@@ -16,16 +16,12 @@ class InboxManager {
16 16
17 private static instance: InboxManager 17 private static instance: InboxManager
18 18
19 private readonly inboxQueue: AsyncQueue<QueueParam> 19 private readonly inboxQueue: QueueObject<QueueParam>
20
21 private messagesProcessed = 0
22 20
23 private constructor () { 21 private constructor () {
24 this.inboxQueue = queue<QueueParam, Error>((task, cb) => { 22 this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
25 const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor } 23 const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
26 24
27 this.messagesProcessed++
28
29 processActivities(task.activities, options) 25 processActivities(task.activities, options)
30 .then(() => cb()) 26 .then(() => cb())
31 .catch(err => { 27 .catch(err => {
@@ -35,7 +31,7 @@ class InboxManager {
35 }) 31 })
36 32
37 setInterval(() => { 33 setInterval(() => {
38 StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.getActivityPubMessagesWaiting()) 34 StatsManager.Instance.updateInboxWaiting(this.getActivityPubMessagesWaiting())
39 }, SCHEDULER_INTERVALS_MS.updateInboxStats) 35 }, SCHEDULER_INTERVALS_MS.updateInboxStats)
40 } 36 }
41 37
diff --git a/server/lib/activitypub/process/process.ts b/server/lib/activitypub/process/process.ts
index e60dd2a5b..5cef75665 100644
--- a/server/lib/activitypub/process/process.ts
+++ b/server/lib/activitypub/process/process.ts
@@ -16,6 +16,7 @@ import { processFlagActivity } from './process-flag'
16import { processViewActivity } from './process-view' 16import { processViewActivity } from './process-view'
17import { APProcessorOptions } from '../../../types/activitypub-processor.model' 17import { APProcessorOptions } from '../../../types/activitypub-processor.model'
18import { MActorDefault, MActorSignature } from '../../../types/models' 18import { MActorDefault, MActorSignature } from '../../../types/models'
19import { StatsManager } from '@server/lib/stat-manager'
19 20
20const processActivity: { [ P in ActivityType ]: (options: APProcessorOptions<Activity>) => Promise<any> } = { 21const processActivity: { [ P in ActivityType ]: (options: APProcessorOptions<Activity>) => Promise<any> } = {
21 Create: processCreateActivity, 22 Create: processCreateActivity,
@@ -75,8 +76,12 @@ async function processActivities (
75 76
76 try { 77 try {
77 await activityProcessor({ activity, byActor, inboxActor, fromFetch }) 78 await activityProcessor({ activity, byActor, inboxActor, fromFetch })
79
80 StatsManager.Instance.addInboxProcessedSuccess(activity.type)
78 } catch (err) { 81 } catch (err) {
79 logger.warn('Cannot process activity %s.', activity.type, { err }) 82 logger.warn('Cannot process activity %s.', activity.type, { err })
83
84 StatsManager.Instance.addInboxProcessedError(activity.type)
80 } 85 }
81 } 86 }
82} 87}