aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2020-12-15 13:34:58 +0100
committerChocobozzz <me@florianbigard.com>2020-12-15 13:34:58 +0100
commit99afa081bc6ae7f34b2105075bd43e3625434fa8 (patch)
tree0caea8591b3d3688a133cf33edcad616bf276375 /server
parent48586fe070c2a59e9febb62a7f41ebb384e1d20e (diff)
downloadPeerTube-99afa081bc6ae7f34b2105075bd43e3625434fa8.tar.gz
PeerTube-99afa081bc6ae7f34b2105075bd43e3625434fa8.tar.zst
PeerTube-99afa081bc6ae7f34b2105075bd43e3625434fa8.zip
Add AP stats
Diffstat (limited to 'server')
-rw-r--r--server/controllers/activitypub/inbox.ts24
-rw-r--r--server/controllers/api/server/stats.ts56
-rw-r--r--server/initializers/constants.ts4
-rw-r--r--server/lib/activitypub/inbox-manager.ts55
-rw-r--r--server/lib/stat-manager.ts94
-rw-r--r--server/tests/api/server/stats.ts26
6 files changed, 189 insertions, 70 deletions
diff --git a/server/controllers/activitypub/inbox.ts b/server/controllers/activitypub/inbox.ts
index 67b2c0d66..14f301ab7 100644
--- a/server/controllers/activitypub/inbox.ts
+++ b/server/controllers/activitypub/inbox.ts
@@ -1,13 +1,11 @@
1import * as express from 'express' 1import * as express from 'express'
2import { InboxManager } from '@server/lib/activitypub/inbox-manager'
2import { Activity, ActivityPubCollection, ActivityPubOrderedCollection, RootActivity } from '../../../shared' 3import { Activity, ActivityPubCollection, ActivityPubOrderedCollection, RootActivity } from '../../../shared'
4import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes'
3import { isActivityValid } from '../../helpers/custom-validators/activitypub/activity' 5import { isActivityValid } from '../../helpers/custom-validators/activitypub/activity'
4import { logger } from '../../helpers/logger' 6import { logger } from '../../helpers/logger'
5import { processActivities } from '../../lib/activitypub/process/process'
6import { asyncMiddleware, checkSignature, localAccountValidator, localVideoChannelValidator, signatureValidator } from '../../middlewares' 7import { asyncMiddleware, checkSignature, localAccountValidator, localVideoChannelValidator, signatureValidator } from '../../middlewares'
7import { activityPubValidator } from '../../middlewares/validators/activitypub/activity' 8import { activityPubValidator } from '../../middlewares/validators/activitypub/activity'
8import { queue } from 'async'
9import { MActorDefault, MActorSignature } from '../../types/models'
10import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes'
11 9
12const inboxRouter = express.Router() 10const inboxRouter = express.Router()
13 11
@@ -41,18 +39,6 @@ export {
41 39
42// --------------------------------------------------------------------------- 40// ---------------------------------------------------------------------------
43 41
44type QueueParam = { activities: Activity[], signatureActor?: MActorSignature, inboxActor?: MActorDefault }
45const inboxQueue = queue<QueueParam, Error>((task, cb) => {
46 const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
47
48 processActivities(task.activities, options)
49 .then(() => cb())
50 .catch(err => {
51 logger.error('Error in process activities.', { err })
52 cb()
53 })
54})
55
56function inboxController (req: express.Request, res: express.Response) { 42function inboxController (req: express.Request, res: express.Response) {
57 const rootActivity: RootActivity = req.body 43 const rootActivity: RootActivity = req.body
58 let activities: Activity[] 44 let activities: Activity[]
@@ -74,10 +60,12 @@ function inboxController (req: express.Request, res: express.Response) {
74 60
75 logger.info('Receiving inbox requests for %d activities by %s.', activities.length, res.locals.signature.actor.url) 61 logger.info('Receiving inbox requests for %d activities by %s.', activities.length, res.locals.signature.actor.url)
76 62
77 inboxQueue.push({ 63 InboxManager.Instance.addInboxMessage({
78 activities, 64 activities,
79 signatureActor: res.locals.signature.actor, 65 signatureActor: res.locals.signature.actor,
80 inboxActor: accountOrChannel ? accountOrChannel.Actor : undefined 66 inboxActor: accountOrChannel
67 ? accountOrChannel.Actor
68 : undefined
81 }) 69 })
82 70
83 return res.status(HttpStatusCode.NO_CONTENT_204).end() 71 return res.status(HttpStatusCode.NO_CONTENT_204).end()
diff --git a/server/controllers/api/server/stats.ts b/server/controllers/api/server/stats.ts
index f07301a04..3aea12450 100644
--- a/server/controllers/api/server/stats.ts
+++ b/server/controllers/api/server/stats.ts
@@ -1,16 +1,8 @@
1import * as express from 'express' 1import * as express from 'express'
2import { ServerStats } from '../../../../shared/models/server/server-stats.model' 2import { StatsManager } from '@server/lib/stat-manager'
3import { asyncMiddleware } from '../../../middlewares'
4import { UserModel } from '../../../models/account/user'
5import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
6import { VideoModel } from '../../../models/video/video'
7import { VideoCommentModel } from '../../../models/video/video-comment'
8import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy'
9import { ROUTE_CACHE_LIFETIME } from '../../../initializers/constants' 3import { ROUTE_CACHE_LIFETIME } from '../../../initializers/constants'
4import { asyncMiddleware } from '../../../middlewares'
10import { cacheRoute } from '../../../middlewares/cache' 5import { cacheRoute } from '../../../middlewares/cache'
11import { VideoFileModel } from '../../../models/video/video-file'
12import { CONFIG } from '../../../initializers/config'
13import { VideoRedundancyStrategyWithManual } from '@shared/models'
14 6
15const statsRouter = express.Router() 7const statsRouter = express.Router()
16 8
@@ -19,48 +11,10 @@ statsRouter.get('/stats',
19 asyncMiddleware(getStats) 11 asyncMiddleware(getStats)
20) 12)
21 13
22async function getStats (req: express.Request, res: express.Response) { 14async function getStats (_req: express.Request, res: express.Response) {
23 const { totalLocalVideos, totalLocalVideoViews, totalVideos } = await VideoModel.getStats() 15 const data = await StatsManager.Instance.getStats()
24 const { totalLocalVideoComments, totalVideoComments } = await VideoCommentModel.getStats()
25 const { totalUsers, totalDailyActiveUsers, totalWeeklyActiveUsers, totalMonthlyActiveUsers } = await UserModel.getStats()
26 const { totalInstanceFollowers, totalInstanceFollowing } = await ActorFollowModel.getStats()
27 const { totalLocalVideoFilesSize } = await VideoFileModel.getStats()
28
29 const strategies = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES
30 .map(r => ({
31 strategy: r.strategy as VideoRedundancyStrategyWithManual,
32 size: r.size
33 }))
34
35 strategies.push({ strategy: 'manual', size: null })
36
37 const videosRedundancyStats = await Promise.all(
38 strategies.map(r => {
39 return VideoRedundancyModel.getStats(r.strategy)
40 .then(stats => Object.assign(stats, { strategy: r.strategy, totalSize: r.size }))
41 })
42 )
43
44 const data: ServerStats = {
45 totalLocalVideos,
46 totalLocalVideoViews,
47 totalLocalVideoFilesSize,
48 totalLocalVideoComments,
49 totalVideos,
50 totalVideoComments,
51
52 totalUsers,
53 totalDailyActiveUsers,
54 totalWeeklyActiveUsers,
55 totalMonthlyActiveUsers,
56
57 totalInstanceFollowers,
58 totalInstanceFollowing,
59
60 videosRedundancy: videosRedundancyStats
61 }
62 16
63 return res.json(data).end() 17 return res.json(data)
64} 18}
65 19
66// --------------------------------------------------------------------------- 20// ---------------------------------------------------------------------------
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 9e642af95..01860b575 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -194,7 +194,8 @@ const SCHEDULER_INTERVALS_MS = {
194 checkPlugins: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL, 194 checkPlugins: CONFIG.PLUGINS.INDEX.CHECK_LATEST_VERSIONS_INTERVAL,
195 autoFollowIndexInstances: 60000 * 60 * 24, // 1 day 195 autoFollowIndexInstances: 60000 * 60 * 24, // 1 day
196 removeOldViews: 60000 * 60 * 24, // 1 day 196 removeOldViews: 60000 * 60 * 24, // 1 day
197 removeOldHistory: 60000 * 60 * 24 // 1 day 197 removeOldHistory: 60000 * 60 * 24, // 1 day
198 updateInboxStats: 1000 * 60 * 5 // 5 minutes
198} 199}
199 200
200// --------------------------------------------------------------------------- 201// ---------------------------------------------------------------------------
@@ -747,6 +748,7 @@ if (isTestInstance() === true) {
747 SCHEDULER_INTERVALS_MS.removeOldViews = 5000 748 SCHEDULER_INTERVALS_MS.removeOldViews = 5000
748 SCHEDULER_INTERVALS_MS.updateVideos = 5000 749 SCHEDULER_INTERVALS_MS.updateVideos = 5000
749 SCHEDULER_INTERVALS_MS.autoFollowIndexInstances = 5000 750 SCHEDULER_INTERVALS_MS.autoFollowIndexInstances = 5000
751 SCHEDULER_INTERVALS_MS.updateInboxStats = 5000
750 REPEAT_JOBS['videos-views'] = { every: 5000 } 752 REPEAT_JOBS['videos-views'] = { every: 5000 }
751 753
752 REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1 754 REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
diff --git a/server/lib/activitypub/inbox-manager.ts b/server/lib/activitypub/inbox-manager.ts
new file mode 100644
index 000000000..19e112f91
--- /dev/null
+++ b/server/lib/activitypub/inbox-manager.ts
@@ -0,0 +1,55 @@
1import { AsyncQueue, queue } from 'async'
2import { logger } from '@server/helpers/logger'
3import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
4import { MActorDefault, MActorSignature } from '@server/types/models'
5import { Activity } from '@shared/models'
6import { processActivities } from './process'
7import { StatsManager } from '../stat-manager'
8
9type QueueParam = {
10 activities: Activity[]
11 signatureActor?: MActorSignature
12 inboxActor?: MActorDefault
13}
14
15class InboxManager {
16
17 private static instance: InboxManager
18
19 private readonly inboxQueue: AsyncQueue<QueueParam>
20
21 private messagesProcessed = 0
22
23 private constructor () {
24 this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
25 const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
26
27 this.messagesProcessed++
28
29 processActivities(task.activities, options)
30 .then(() => cb())
31 .catch(err => {
32 logger.error('Error in process activities.', { err })
33 cb()
34 })
35 })
36
37 setInterval(() => {
38 StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.inboxQueue.length())
39 }, SCHEDULER_INTERVALS_MS.updateInboxStats)
40 }
41
42 addInboxMessage (options: QueueParam) {
43 this.inboxQueue.push(options)
44 }
45
46 static get Instance () {
47 return this.instance || (this.instance = new this())
48 }
49}
50
51// ---------------------------------------------------------------------------
52
53export {
54 InboxManager
55}
diff --git a/server/lib/stat-manager.ts b/server/lib/stat-manager.ts
new file mode 100644
index 000000000..f9d69b0dc
--- /dev/null
+++ b/server/lib/stat-manager.ts
@@ -0,0 +1,94 @@
1import { CONFIG } from '@server/initializers/config'
2import { UserModel } from '@server/models/account/user'
3import { ActorFollowModel } from '@server/models/activitypub/actor-follow'
4import { VideoRedundancyModel } from '@server/models/redundancy/video-redundancy'
5import { VideoModel } from '@server/models/video/video'
6import { VideoCommentModel } from '@server/models/video/video-comment'
7import { VideoFileModel } from '@server/models/video/video-file'
8import { ServerStats, VideoRedundancyStrategyWithManual } from '@shared/models'
9
10class StatsManager {
11
12 private static instance: StatsManager
13
14 private readonly instanceStartDate = new Date()
15
16 private inboxMessagesProcessed = 0
17 private inboxMessagesWaiting = 0
18
19 private constructor () {}
20
21 updateInboxStats (inboxMessagesProcessed: number, inboxMessagesWaiting: number) {
22 this.inboxMessagesProcessed = inboxMessagesProcessed
23 this.inboxMessagesWaiting = inboxMessagesWaiting
24 }
25
26 async getStats () {
27 const { totalLocalVideos, totalLocalVideoViews, totalVideos } = await VideoModel.getStats()
28 const { totalLocalVideoComments, totalVideoComments } = await VideoCommentModel.getStats()
29 const { totalUsers, totalDailyActiveUsers, totalWeeklyActiveUsers, totalMonthlyActiveUsers } = await UserModel.getStats()
30 const { totalInstanceFollowers, totalInstanceFollowing } = await ActorFollowModel.getStats()
31 const { totalLocalVideoFilesSize } = await VideoFileModel.getStats()
32
33 const videosRedundancyStats = await this.buildRedundancyStats()
34
35 const data: ServerStats = {
36 totalLocalVideos,
37 totalLocalVideoViews,
38 totalLocalVideoFilesSize,
39 totalLocalVideoComments,
40 totalVideos,
41 totalVideoComments,
42
43 totalUsers,
44 totalDailyActiveUsers,
45 totalWeeklyActiveUsers,
46 totalMonthlyActiveUsers,
47
48 totalInstanceFollowers,
49 totalInstanceFollowing,
50
51 videosRedundancy: videosRedundancyStats,
52
53 totalActivityPubMessagesProcessed: this.inboxMessagesProcessed,
54 activityPubMessagesProcessedPerSecond: this.buildActivityPubMessagesProcessedPerSecond(),
55 totalActivityPubMessagesWaiting: this.inboxMessagesWaiting
56 }
57
58 return data
59 }
60
61 private buildActivityPubMessagesProcessedPerSecond () {
62 const now = new Date()
63 const startedSeconds = (now.getTime() - this.instanceStartDate.getTime()) / 1000
64
65 return this.inboxMessagesProcessed / startedSeconds
66 }
67
68 private buildRedundancyStats () {
69 const strategies = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES
70 .map(r => ({
71 strategy: r.strategy as VideoRedundancyStrategyWithManual,
72 size: r.size
73 }))
74
75 strategies.push({ strategy: 'manual', size: null })
76
77 return Promise.all(
78 strategies.map(r => {
79 return VideoRedundancyModel.getStats(r.strategy)
80 .then(stats => Object.assign(stats, { strategy: r.strategy, totalSize: r.size }))
81 })
82 )
83 }
84
85 static get Instance () {
86 return this.instance || (this.instance = new this())
87 }
88}
89
90// ---------------------------------------------------------------------------
91
92export {
93 StatsManager
94}
diff --git a/server/tests/api/server/stats.ts b/server/tests/api/server/stats.ts
index 0e77712cf..9f785a80e 100644
--- a/server/tests/api/server/stats.ts
+++ b/server/tests/api/server/stats.ts
@@ -176,6 +176,32 @@ describe('Test stats (excluding redundancy)', function () {
176 } 176 }
177 }) 177 })
178 178
179 it('Should have the correct AP stats', async function () {
180 this.timeout(60000)
181
182 for (let i = 0; i < 10; i++) {
183 await uploadVideo(servers[0].url, servers[0].accessToken, { name: 'video' })
184 }
185
186 const res1 = await getStats(servers[1].url)
187 const first = res1.body as ServerStats
188
189 await waitJobs(servers)
190
191 const res2 = await getStats(servers[1].url)
192 const second: ServerStats = res2.body
193
194 expect(second.totalActivityPubMessagesWaiting).to.equal(0)
195 expect(second.totalActivityPubMessagesProcessed).to.be.greaterThan(first.totalActivityPubMessagesProcessed)
196
197 await wait(5000)
198
199 const res3 = await getStats(servers[1].url)
200 const third: ServerStats = res3.body
201
202 expect(third.activityPubMessagesProcessedPerSecond).to.be.lessThan(second.activityPubMessagesProcessedPerSecond)
203 })
204
179 after(async function () { 205 after(async function () {
180 await cleanupTests(servers) 206 await cleanupTests(servers)
181 }) 207 })