aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/activitypub
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2020-12-15 13:34:58 +0100
committerChocobozzz <me@florianbigard.com>2020-12-15 13:34:58 +0100
commit99afa081bc6ae7f34b2105075bd43e3625434fa8 (patch)
tree0caea8591b3d3688a133cf33edcad616bf276375 /server/lib/activitypub
parent48586fe070c2a59e9febb62a7f41ebb384e1d20e (diff)
downloadPeerTube-99afa081bc6ae7f34b2105075bd43e3625434fa8.tar.gz
PeerTube-99afa081bc6ae7f34b2105075bd43e3625434fa8.tar.zst
PeerTube-99afa081bc6ae7f34b2105075bd43e3625434fa8.zip
Add AP stats
Diffstat (limited to 'server/lib/activitypub')
-rw-r--r--server/lib/activitypub/inbox-manager.ts55
1 files changed, 55 insertions, 0 deletions
diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts
new file mode 100644
index 000000000..19e112f91
--- /dev/null
+++ b/server/lib/activitypub/inbox-manager.ts
@@ -0,0 +1,55 @@
1import { AsyncQueue, queue } from 'async'
2import { logger } from '@server/helpers/logger'
3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
4import { MActorDefault, MActorSignature } from '@server/types/models'
5import { Activity } from '@shared/models'
6import { processActivities } from './process'
7import { StatsManager } from '../stat-manager'
8
9type QueueParam = {
10 activities: Activity[]
11 signatureActor?: MActorSignature
12 inboxActor?: MActorDefault
13}
14
15class InboxManager {
16
17 private static instance: InboxManager
18
19 private readonly inboxQueue: AsyncQueue<QueueParam>
20
21 private messagesProcessed = 0
22
23 private constructor () {
24 this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
25 const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
26
27 this.messagesProcessed++
28
29 processActivities(task.activities, options)
30 .then(() => cb())
31 .catch(err => {
32 logger.error('Error in process activities.', { err })
33 cb()
34 })
35 })
36
37 setInterval(() => {
38 StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.inboxQueue.length())
39 }, SCHEDULER_INTERVALS_MS.updateInboxStats)
40 }
41
42 addInboxMessage (options: QueueParam) {
43 this.inboxQueue.push(options)
44 }
45
46 static get Instance () {
47 return this.instance || (this.instance = new this())
48 }
49}
50
51// ---------------------------------------------------------------------------
52
53export {
54 InboxManager
55}