aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-07-05 15:43:21 +0200
committerChocobozzz <me@florianbigard.com>2022-07-06 15:13:55 +0200
commit630d0a1bf5897fff203cb07e426223f55dcc882d (patch)
tree5e6fa9d26f3f21178a538bd1ac38fa0a3f4f228c /server/lib
parent15b43b214eb37b05aa65aa8ef61fd0e6aa0b62d2 (diff)
downloadPeerTube-630d0a1bf5897fff203cb07e426223f55dcc882d.tar.gz
PeerTube-630d0a1bf5897fff203cb07e426223f55dcc882d.tar.zst
PeerTube-630d0a1bf5897fff203cb07e426223f55dcc882d.zip
Introduce experimental telemetry
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/activitypub/activity.ts20
-rw-r--r--server/lib/job-queue/job-queue.ts6
-rw-r--r--server/lib/opentelemetry/metric-helpers/index.ts1
-rw-r--r--server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts186
-rw-r--r--server/lib/opentelemetry/metrics.ts111
-rw-r--r--server/lib/opentelemetry/tracing.ts81
6 files changed, 405 insertions, 0 deletions
diff --git a/server/lib/activitypub/activity.ts b/server/lib/activitypub/activity.ts
index e6cec1ba7..ba2967ce9 100644
--- a/server/lib/activitypub/activity.ts
+++ b/server/lib/activitypub/activity.ts
@@ -1,3 +1,5 @@
1import { ActivityType } from "@shared/models"
2
1function getAPId (object: string | { id: string }) { 3function getAPId (object: string | { id: string }) {
2 if (typeof object === 'string') return object 4 if (typeof object === 'string') return object
3 5
@@ -13,8 +15,26 @@ function getDurationFromActivityStream (duration: string) {
13 return parseInt(duration.replace(/[^\d]+/, '')) 15 return parseInt(duration.replace(/[^\d]+/, ''))
14} 16}
15 17
18function buildAvailableActivities (): ActivityType[] {
19 return [
20 'Create',
21 'Update',
22 'Delete',
23 'Follow',
24 'Accept',
25 'Announce',
26 'Undo',
27 'Like',
28 'Reject',
29 'View',
30 'Dislike',
31 'Flag'
32 ]
33}
34
16export { 35export {
17 getAPId, 36 getAPId,
18 getActivityStreamDuration, 37 getActivityStreamDuration,
38 buildAvailableActivities,
19 getDurationFromActivityStream 39 getDurationFromActivityStream
20} 40}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index ce24763f1..e55d2e7c2 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -285,6 +285,12 @@ class JobQueue {
285 return total 285 return total
286 } 286 }
287 287
288 async getStats () {
289 const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
290
291 return Promise.all(promises)
292 }
293
288 async removeOldJobs () { 294 async removeOldJobs () {
289 for (const key of Object.keys(this.queues)) { 295 for (const key of Object.keys(this.queues)) {
290 const queue = this.queues[key] 296 const queue = this.queues[key]
diff --git a/server/lib/opentelemetry/metric-helpers/index.ts b/server/lib/opentelemetry/metric-helpers/index.ts
new file mode 100644
index 000000000..cabb27326
--- /dev/null
+++ b/server/lib/opentelemetry/metric-helpers/index.ts
@@ -0,0 +1 @@
export * from './stats-observers-builder'
diff --git a/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts b/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts
new file mode 100644
index 000000000..90b58f33d
--- /dev/null
+++ b/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts
@@ -0,0 +1,186 @@
1import memoizee from 'memoizee'
2import { Meter } from '@opentelemetry/api-metrics'
3import { MEMOIZE_TTL } from '@server/initializers/constants'
4import { buildAvailableActivities } from '@server/lib/activitypub/activity'
5import { StatsManager } from '@server/lib/stat-manager'
6
7export class StatsObserverBuilder {
8
9 private readonly getInstanceStats = memoizee(() => {
10 return StatsManager.Instance.getStats()
11 }, { maxAge: MEMOIZE_TTL.GET_STATS_FOR_OPEN_TELEMETRY_METRICS })
12
13 constructor (private readonly meter: Meter) {
14
15 }
16
17 buildObservers () {
18 this.buildUserStatsObserver()
19 this.buildVideoStatsObserver()
20 this.buildCommentStatsObserver()
21 this.buildPlaylistStatsObserver()
22 this.buildChannelStatsObserver()
23 this.buildInstanceFollowsStatsObserver()
24 this.buildRedundancyStatsObserver()
25 this.buildActivityPubStatsObserver()
26 }
27
28 private buildUserStatsObserver () {
29 this.meter.createObservableGauge('peertube_users_total', {
30 description: 'Total users on the instance'
31 }).addCallback(async observableResult => {
32 const stats = await this.getInstanceStats()
33
34 observableResult.observe(stats.totalUsers)
35 })
36
37 this.meter.createObservableGauge('peertube_active_users_total', {
38 description: 'Total active users on the instance'
39 }).addCallback(async observableResult => {
40 const stats = await this.getInstanceStats()
41
42 observableResult.observe(stats.totalDailyActiveUsers, { activeInterval: 'daily' })
43 observableResult.observe(stats.totalWeeklyActiveUsers, { activeInterval: 'weekly' })
44 observableResult.observe(stats.totalMonthlyActiveUsers, { activeInterval: 'monthly' })
45 })
46 }
47
48 private buildChannelStatsObserver () {
49 this.meter.createObservableGauge('peertube_channels_total', {
50 description: 'Total channels on the instance'
51 }).addCallback(async observableResult => {
52 const stats = await this.getInstanceStats()
53
54 observableResult.observe(stats.totalLocalVideoChannels, { channelOrigin: 'local' })
55 })
56
57 this.meter.createObservableGauge('peertube_active_channels_total', {
58 description: 'Total active channels on the instance'
59 }).addCallback(async observableResult => {
60 const stats = await this.getInstanceStats()
61
62 observableResult.observe(stats.totalLocalDailyActiveVideoChannels, { channelOrigin: 'local', activeInterval: 'daily' })
63 observableResult.observe(stats.totalLocalWeeklyActiveVideoChannels, { channelOrigin: 'local', activeInterval: 'weekly' })
64 observableResult.observe(stats.totalLocalMonthlyActiveVideoChannels, { channelOrigin: 'local', activeInterval: 'monthly' })
65 })
66 }
67
68 private buildVideoStatsObserver () {
69 this.meter.createObservableGauge('peertube_videos_total', {
70 description: 'Total videos on the instance'
71 }).addCallback(async observableResult => {
72 const stats = await this.getInstanceStats()
73
74 observableResult.observe(stats.totalLocalVideos, { videoOrigin: 'local' })
75 observableResult.observe(stats.totalVideos - stats.totalLocalVideos, { videoOrigin: 'remote' })
76 })
77
78 this.meter.createObservableGauge('peertube_video_views_total', {
79 description: 'Total video views made on the instance'
80 }).addCallback(async observableResult => {
81 const stats = await this.getInstanceStats()
82
83 observableResult.observe(stats.totalLocalVideoViews, { viewOrigin: 'local' })
84 })
85
86 this.meter.createObservableGauge('peertube_video_bytes_total', {
87 description: 'Total bytes of videos'
88 }).addCallback(async observableResult => {
89 const stats = await this.getInstanceStats()
90
91 observableResult.observe(stats.totalLocalVideoFilesSize, { videoOrigin: 'local' })
92 })
93 }
94
95 private buildCommentStatsObserver () {
96 this.meter.createObservableGauge('peertube_comments_total', {
97 description: 'Total comments on the instance'
98 }).addCallback(async observableResult => {
99 const stats = await this.getInstanceStats()
100
101 observableResult.observe(stats.totalLocalVideoComments, { accountOrigin: 'local' })
102 })
103 }
104
105 private buildPlaylistStatsObserver () {
106 this.meter.createObservableGauge('peertube_playlists_total', {
107 description: 'Total playlists on the instance'
108 }).addCallback(async observableResult => {
109 const stats = await this.getInstanceStats()
110
111 observableResult.observe(stats.totalLocalPlaylists, { playlistOrigin: 'local' })
112 })
113 }
114
115 private buildInstanceFollowsStatsObserver () {
116 this.meter.createObservableGauge('peertube_instance_followers_total', {
117 description: 'Total followers of the instance'
118 }).addCallback(async observableResult => {
119 const stats = await this.getInstanceStats()
120
121 observableResult.observe(stats.totalInstanceFollowers)
122 })
123
124 this.meter.createObservableGauge('peertube_instance_following_total', {
125 description: 'Total following of the instance'
126 }).addCallback(async observableResult => {
127 const stats = await this.getInstanceStats()
128
129 observableResult.observe(stats.totalInstanceFollowing)
130 })
131 }
132
133 private buildRedundancyStatsObserver () {
134 this.meter.createObservableGauge('peertube_redundancy_used_bytes_total', {
135 description: 'Total redundancy used of the instance'
136 }).addCallback(async observableResult => {
137 const stats = await this.getInstanceStats()
138
139 for (const r of stats.videosRedundancy) {
140 observableResult.observe(r.totalUsed, { strategy: r.strategy })
141 }
142 })
143
144 this.meter.createObservableGauge('peertube_redundancy_available_bytes_total', {
145 description: 'Total redundancy available of the instance'
146 }).addCallback(async observableResult => {
147 const stats = await this.getInstanceStats()
148
149 for (const r of stats.videosRedundancy) {
150 observableResult.observe(r.totalSize, { strategy: r.strategy })
151 }
152 })
153 }
154
155 private buildActivityPubStatsObserver () {
156 const availableActivities = buildAvailableActivities()
157
158 this.meter.createObservableGauge('peertube_ap_inbox_success_total', {
159 description: 'Total inbox messages processed with success'
160 }).addCallback(async observableResult => {
161 const stats = await this.getInstanceStats()
162
163 for (const type of availableActivities) {
164 observableResult.observe(stats[`totalActivityPub${type}MessagesSuccesses`], { activityType: type })
165 }
166 })
167
168 this.meter.createObservableGauge('peertube_ap_inbox_error_total', {
169 description: 'Total inbox messages processed with error'
170 }).addCallback(async observableResult => {
171 const stats = await this.getInstanceStats()
172
173 for (const type of availableActivities) {
174 observableResult.observe(stats[`totalActivityPub${type}MessagesErrors`], { activityType: type })
175 }
176 })
177
178 this.meter.createObservableGauge('peertube_ap_inbox_waiting_total', {
179 description: 'Total inbox messages waiting for being processed'
180 }).addCallback(async observableResult => {
181 const stats = await this.getInstanceStats()
182
183 observableResult.observe(stats.totalActivityPubMessagesWaiting)
184 })
185 }
186}
diff --git a/server/lib/opentelemetry/metrics.ts b/server/lib/opentelemetry/metrics.ts
new file mode 100644
index 000000000..ca0aae8e7
--- /dev/null
+++ b/server/lib/opentelemetry/metrics.ts
@@ -0,0 +1,111 @@
1import { Application, Request, Response } from 'express'
2import { Meter, metrics } from '@opentelemetry/api-metrics'
3import { PrometheusExporter } from '@opentelemetry/exporter-prometheus'
4import { MeterProvider } from '@opentelemetry/sdk-metrics-base'
5import { logger } from '@server/helpers/logger'
6import { CONFIG } from '@server/initializers/config'
7import { JobQueue } from '../job-queue'
8import { StatsObserverBuilder } from './metric-helpers'
9
10class OpenTelemetryMetrics {
11
12 private static instance: OpenTelemetryMetrics
13
14 private meter: Meter
15
16 private onRequestDuration: (req: Request, res: Response) => void
17
18 private constructor () {}
19
20 init (app: Application) {
21 if (CONFIG.OPEN_TELEMETRY.METRICS.ENABLED !== true) return
22
23 app.use((req, res, next) => {
24 res.once('finish', () => {
25 if (!this.onRequestDuration) return
26
27 this.onRequestDuration(req as Request, res as Response)
28 })
29
30 next()
31 })
32 }
33
34 registerMetrics () {
35 if (CONFIG.OPEN_TELEMETRY.METRICS.ENABLED !== true) return
36
37 logger.info('Registering Open Telemetry metrics')
38
39 const provider = new MeterProvider()
40
41 provider.addMetricReader(new PrometheusExporter({ port: CONFIG.OPEN_TELEMETRY.METRICS.PROMETHEUS_EXPORTER.PORT }))
42
43 metrics.setGlobalMeterProvider(provider)
44
45 this.meter = metrics.getMeter('default')
46
47 this.buildMemoryObserver()
48 this.buildRequestObserver()
49 this.buildJobQueueObserver()
50
51 const statsObserverBuilder = new StatsObserverBuilder(this.meter)
52 statsObserverBuilder.buildObservers()
53 }
54
55 private buildMemoryObserver () {
56 this.meter.createObservableGauge('nodejs_memory_usage_bytes', {
57 description: 'Memory'
58 }).addCallback(observableResult => {
59 const current = process.memoryUsage()
60
61 observableResult.observe(current.heapTotal, { memoryType: 'heapTotal' })
62 observableResult.observe(current.heapUsed, { memoryType: 'heapUsed' })
63 observableResult.observe(current.arrayBuffers, { memoryType: 'arrayBuffers' })
64 observableResult.observe(current.external, { memoryType: 'external' })
65 observableResult.observe(current.rss, { memoryType: 'rss' })
66 })
67 }
68
69 private buildJobQueueObserver () {
70 this.meter.createObservableGauge('peertube_job_queue_total', {
71 description: 'Total jobs in the PeerTube job queue'
72 }).addCallback(async observableResult => {
73 const stats = await JobQueue.Instance.getStats()
74
75 for (const { jobType, counts } of stats) {
76 for (const state of Object.keys(counts)) {
77 observableResult.observe(counts[state], { jobType, state })
78 }
79 }
80 })
81 }
82
83 private buildRequestObserver () {
84 const requestDuration = this.meter.createHistogram('http_request_duration_ms', {
85 unit: 'milliseconds',
86 description: 'Duration of HTTP requests in ms'
87 })
88
89 this.onRequestDuration = (req: Request, res: Response) => {
90 const duration = Date.now() - res.locals.requestStart
91
92 requestDuration.record(duration, {
93 path: this.buildRequestPath(req.originalUrl),
94 method: req.method,
95 statusCode: res.statusCode + ''
96 })
97 }
98 }
99
100 private buildRequestPath (path: string) {
101 return path.split('?')[0]
102 }
103
104 static get Instance () {
105 return this.instance || (this.instance = new this())
106 }
107}
108
109export {
110 OpenTelemetryMetrics
111}
diff --git a/server/lib/opentelemetry/tracing.ts b/server/lib/opentelemetry/tracing.ts
new file mode 100644
index 000000000..5358d04de
--- /dev/null
+++ b/server/lib/opentelemetry/tracing.ts
@@ -0,0 +1,81 @@
1import { diag, DiagLogLevel, trace } from '@opentelemetry/api'
2import { JaegerExporter } from '@opentelemetry/exporter-jaeger'
3import { registerInstrumentations } from '@opentelemetry/instrumentation'
4import { DnsInstrumentation } from '@opentelemetry/instrumentation-dns'
5import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express'
6import FsInstrumentation from '@opentelemetry/instrumentation-fs'
7import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'
8import { PgInstrumentation } from '@opentelemetry/instrumentation-pg'
9import { RedisInstrumentation } from '@opentelemetry/instrumentation-redis-4'
10import { Resource } from '@opentelemetry/resources'
11import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base'
12import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'
13import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions'
14import { logger } from '@server/helpers/logger'
15import { CONFIG } from '@server/initializers/config'
16
17function registerOpentelemetryTracing () {
18 if (CONFIG.OPEN_TELEMETRY.TRACING.ENABLED !== true) return
19
20 logger.info('Registering Open Telemetry tracing')
21
22 const customLogger = (level: string) => {
23 return (message: string, ...args: unknown[]) => {
24 let fullMessage = message
25
26 for (const arg of args) {
27 if (typeof arg === 'string') fullMessage += arg
28 else break
29 }
30
31 logger[level](fullMessage)
32 }
33 }
34
35 diag.setLogger({
36 error: customLogger('error'),
37 warn: customLogger('warn'),
38 info: customLogger('info'),
39 debug: customLogger('debug'),
40 verbose: customLogger('verbose')
41 }, DiagLogLevel.INFO)
42
43 const tracerProvider = new NodeTracerProvider({
44 resource: new Resource({
45 [SemanticResourceAttributes.SERVICE_NAME]: 'peertube'
46 })
47 })
48
49 registerInstrumentations({
50 tracerProvider: tracerProvider,
51 instrumentations: [
52 new PgInstrumentation({
53 enhancedDatabaseReporting: true
54 }),
55 new DnsInstrumentation(),
56 new HttpInstrumentation(),
57 new ExpressInstrumentation(),
58 new RedisInstrumentation({
59 dbStatementSerializer: function (cmdName, cmdArgs) {
60 return [ cmdName, ...cmdArgs ].join(' ')
61 }
62 }),
63 new FsInstrumentation()
64 ]
65 })
66
67 tracerProvider.addSpanProcessor(
68 new BatchSpanProcessor(
69 new JaegerExporter({ endpoint: CONFIG.OPEN_TELEMETRY.TRACING.JAEGER_EXPORTER.ENDPOINT })
70 )
71 )
72
73 tracerProvider.register()
74}
75
76const tracer = trace.getTracer('peertube')
77
78export {
79 registerOpentelemetryTracing,
80 tracer
81}