diff options
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/activitypub/activity.ts | 20 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 6 | ||||
-rw-r--r-- | server/lib/opentelemetry/metric-helpers/index.ts | 1 | ||||
-rw-r--r-- | server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts | 186 | ||||
-rw-r--r-- | server/lib/opentelemetry/metrics.ts | 111 | ||||
-rw-r--r-- | server/lib/opentelemetry/tracing.ts | 81 |
6 files changed, 405 insertions, 0 deletions
diff --git a/server/lib/activitypub/activity.ts b/server/lib/activitypub/activity.ts index e6cec1ba7..ba2967ce9 100644 --- a/server/lib/activitypub/activity.ts +++ b/server/lib/activitypub/activity.ts | |||
@@ -1,3 +1,5 @@ | |||
1 | import { ActivityType } from "@shared/models" | ||
2 | |||
1 | function getAPId (object: string | { id: string }) { | 3 | function getAPId (object: string | { id: string }) { |
2 | if (typeof object === 'string') return object | 4 | if (typeof object === 'string') return object |
3 | 5 | ||
@@ -13,8 +15,26 @@ function getDurationFromActivityStream (duration: string) { | |||
13 | return parseInt(duration.replace(/[^\d]+/, '')) | 15 | return parseInt(duration.replace(/[^\d]+/, '')) |
14 | } | 16 | } |
15 | 17 | ||
18 | function buildAvailableActivities (): ActivityType[] { | ||
19 | return [ | ||
20 | 'Create', | ||
21 | 'Update', | ||
22 | 'Delete', | ||
23 | 'Follow', | ||
24 | 'Accept', | ||
25 | 'Announce', | ||
26 | 'Undo', | ||
27 | 'Like', | ||
28 | 'Reject', | ||
29 | 'View', | ||
30 | 'Dislike', | ||
31 | 'Flag' | ||
32 | ] | ||
33 | } | ||
34 | |||
16 | export { | 35 | export { |
17 | getAPId, | 36 | getAPId, |
18 | getActivityStreamDuration, | 37 | getActivityStreamDuration, |
38 | buildAvailableActivities, | ||
19 | getDurationFromActivityStream | 39 | getDurationFromActivityStream |
20 | } | 40 | } |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index ce24763f1..e55d2e7c2 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -285,6 +285,12 @@ class JobQueue { | |||
285 | return total | 285 | return total |
286 | } | 286 | } |
287 | 287 | ||
288 | async getStats () { | ||
289 | const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() })) | ||
290 | |||
291 | return Promise.all(promises) | ||
292 | } | ||
293 | |||
288 | async removeOldJobs () { | 294 | async removeOldJobs () { |
289 | for (const key of Object.keys(this.queues)) { | 295 | for (const key of Object.keys(this.queues)) { |
290 | const queue = this.queues[key] | 296 | const queue = this.queues[key] |
diff --git a/server/lib/opentelemetry/metric-helpers/index.ts b/server/lib/opentelemetry/metric-helpers/index.ts new file mode 100644 index 000000000..cabb27326 --- /dev/null +++ b/server/lib/opentelemetry/metric-helpers/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './stats-observers-builder' | |||
diff --git a/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts b/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts new file mode 100644 index 000000000..90b58f33d --- /dev/null +++ b/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts | |||
@@ -0,0 +1,186 @@ | |||
1 | import memoizee from 'memoizee' | ||
2 | import { Meter } from '@opentelemetry/api-metrics' | ||
3 | import { MEMOIZE_TTL } from '@server/initializers/constants' | ||
4 | import { buildAvailableActivities } from '@server/lib/activitypub/activity' | ||
5 | import { StatsManager } from '@server/lib/stat-manager' | ||
6 | |||
7 | export class StatsObserverBuilder { | ||
8 | |||
9 | private readonly getInstanceStats = memoizee(() => { | ||
10 | return StatsManager.Instance.getStats() | ||
11 | }, { maxAge: MEMOIZE_TTL.GET_STATS_FOR_OPEN_TELEMETRY_METRICS }) | ||
12 | |||
13 | constructor (private readonly meter: Meter) { | ||
14 | |||
15 | } | ||
16 | |||
17 | buildObservers () { | ||
18 | this.buildUserStatsObserver() | ||
19 | this.buildVideoStatsObserver() | ||
20 | this.buildCommentStatsObserver() | ||
21 | this.buildPlaylistStatsObserver() | ||
22 | this.buildChannelStatsObserver() | ||
23 | this.buildInstanceFollowsStatsObserver() | ||
24 | this.buildRedundancyStatsObserver() | ||
25 | this.buildActivityPubStatsObserver() | ||
26 | } | ||
27 | |||
28 | private buildUserStatsObserver () { | ||
29 | this.meter.createObservableGauge('peertube_users_total', { | ||
30 | description: 'Total users on the instance' | ||
31 | }).addCallback(async observableResult => { | ||
32 | const stats = await this.getInstanceStats() | ||
33 | |||
34 | observableResult.observe(stats.totalUsers) | ||
35 | }) | ||
36 | |||
37 | this.meter.createObservableGauge('peertube_active_users_total', { | ||
38 | description: 'Total active users on the instance' | ||
39 | }).addCallback(async observableResult => { | ||
40 | const stats = await this.getInstanceStats() | ||
41 | |||
42 | observableResult.observe(stats.totalDailyActiveUsers, { activeInterval: 'daily' }) | ||
43 | observableResult.observe(stats.totalWeeklyActiveUsers, { activeInterval: 'weekly' }) | ||
44 | observableResult.observe(stats.totalMonthlyActiveUsers, { activeInterval: 'monthly' }) | ||
45 | }) | ||
46 | } | ||
47 | |||
48 | private buildChannelStatsObserver () { | ||
49 | this.meter.createObservableGauge('peertube_channels_total', { | ||
50 | description: 'Total channels on the instance' | ||
51 | }).addCallback(async observableResult => { | ||
52 | const stats = await this.getInstanceStats() | ||
53 | |||
54 | observableResult.observe(stats.totalLocalVideoChannels, { channelOrigin: 'local' }) | ||
55 | }) | ||
56 | |||
57 | this.meter.createObservableGauge('peertube_active_channels_total', { | ||
58 | description: 'Total active channels on the instance' | ||
59 | }).addCallback(async observableResult => { | ||
60 | const stats = await this.getInstanceStats() | ||
61 | |||
62 | observableResult.observe(stats.totalLocalDailyActiveVideoChannels, { channelOrigin: 'local', activeInterval: 'daily' }) | ||
63 | observableResult.observe(stats.totalLocalWeeklyActiveVideoChannels, { channelOrigin: 'local', activeInterval: 'weekly' }) | ||
64 | observableResult.observe(stats.totalLocalMonthlyActiveVideoChannels, { channelOrigin: 'local', activeInterval: 'monthly' }) | ||
65 | }) | ||
66 | } | ||
67 | |||
68 | private buildVideoStatsObserver () { | ||
69 | this.meter.createObservableGauge('peertube_videos_total', { | ||
70 | description: 'Total videos on the instance' | ||
71 | }).addCallback(async observableResult => { | ||
72 | const stats = await this.getInstanceStats() | ||
73 | |||
74 | observableResult.observe(stats.totalLocalVideos, { videoOrigin: 'local' }) | ||
75 | observableResult.observe(stats.totalVideos - stats.totalLocalVideos, { videoOrigin: 'remote' }) | ||
76 | }) | ||
77 | |||
78 | this.meter.createObservableGauge('peertube_video_views_total', { | ||
79 | description: 'Total video views made on the instance' | ||
80 | }).addCallback(async observableResult => { | ||
81 | const stats = await this.getInstanceStats() | ||
82 | |||
83 | observableResult.observe(stats.totalLocalVideoViews, { viewOrigin: 'local' }) | ||
84 | }) | ||
85 | |||
86 | this.meter.createObservableGauge('peertube_video_bytes_total', { | ||
87 | description: 'Total bytes of videos' | ||
88 | }).addCallback(async observableResult => { | ||
89 | const stats = await this.getInstanceStats() | ||
90 | |||
91 | observableResult.observe(stats.totalLocalVideoFilesSize, { videoOrigin: 'local' }) | ||
92 | }) | ||
93 | } | ||
94 | |||
95 | private buildCommentStatsObserver () { | ||
96 | this.meter.createObservableGauge('peertube_comments_total', { | ||
97 | description: 'Total comments on the instance' | ||
98 | }).addCallback(async observableResult => { | ||
99 | const stats = await this.getInstanceStats() | ||
100 | |||
101 | observableResult.observe(stats.totalLocalVideoComments, { accountOrigin: 'local' }) | ||
102 | }) | ||
103 | } | ||
104 | |||
105 | private buildPlaylistStatsObserver () { | ||
106 | this.meter.createObservableGauge('peertube_playlists_total', { | ||
107 | description: 'Total playlists on the instance' | ||
108 | }).addCallback(async observableResult => { | ||
109 | const stats = await this.getInstanceStats() | ||
110 | |||
111 | observableResult.observe(stats.totalLocalPlaylists, { playlistOrigin: 'local' }) | ||
112 | }) | ||
113 | } | ||
114 | |||
115 | private buildInstanceFollowsStatsObserver () { | ||
116 | this.meter.createObservableGauge('peertube_instance_followers_total', { | ||
117 | description: 'Total followers of the instance' | ||
118 | }).addCallback(async observableResult => { | ||
119 | const stats = await this.getInstanceStats() | ||
120 | |||
121 | observableResult.observe(stats.totalInstanceFollowers) | ||
122 | }) | ||
123 | |||
124 | this.meter.createObservableGauge('peertube_instance_following_total', { | ||
125 | description: 'Total following of the instance' | ||
126 | }).addCallback(async observableResult => { | ||
127 | const stats = await this.getInstanceStats() | ||
128 | |||
129 | observableResult.observe(stats.totalInstanceFollowing) | ||
130 | }) | ||
131 | } | ||
132 | |||
133 | private buildRedundancyStatsObserver () { | ||
134 | this.meter.createObservableGauge('peertube_redundancy_used_bytes_total', { | ||
135 | description: 'Total redundancy used of the instance' | ||
136 | }).addCallback(async observableResult => { | ||
137 | const stats = await this.getInstanceStats() | ||
138 | |||
139 | for (const r of stats.videosRedundancy) { | ||
140 | observableResult.observe(r.totalUsed, { strategy: r.strategy }) | ||
141 | } | ||
142 | }) | ||
143 | |||
144 | this.meter.createObservableGauge('peertube_redundancy_available_bytes_total', { | ||
145 | description: 'Total redundancy available of the instance' | ||
146 | }).addCallback(async observableResult => { | ||
147 | const stats = await this.getInstanceStats() | ||
148 | |||
149 | for (const r of stats.videosRedundancy) { | ||
150 | observableResult.observe(r.totalSize, { strategy: r.strategy }) | ||
151 | } | ||
152 | }) | ||
153 | } | ||
154 | |||
155 | private buildActivityPubStatsObserver () { | ||
156 | const availableActivities = buildAvailableActivities() | ||
157 | |||
158 | this.meter.createObservableGauge('peertube_ap_inbox_success_total', { | ||
159 | description: 'Total inbox messages processed with success' | ||
160 | }).addCallback(async observableResult => { | ||
161 | const stats = await this.getInstanceStats() | ||
162 | |||
163 | for (const type of availableActivities) { | ||
164 | observableResult.observe(stats[`totalActivityPub${type}MessagesSuccesses`], { activityType: type }) | ||
165 | } | ||
166 | }) | ||
167 | |||
168 | this.meter.createObservableGauge('peertube_ap_inbox_error_total', { | ||
169 | description: 'Total inbox messages processed with error' | ||
170 | }).addCallback(async observableResult => { | ||
171 | const stats = await this.getInstanceStats() | ||
172 | |||
173 | for (const type of availableActivities) { | ||
174 | observableResult.observe(stats[`totalActivityPub${type}MessagesErrors`], { activityType: type }) | ||
175 | } | ||
176 | }) | ||
177 | |||
178 | this.meter.createObservableGauge('peertube_ap_inbox_waiting_total', { | ||
179 | description: 'Total inbox messages waiting for being processed' | ||
180 | }).addCallback(async observableResult => { | ||
181 | const stats = await this.getInstanceStats() | ||
182 | |||
183 | observableResult.observe(stats.totalActivityPubMessagesWaiting) | ||
184 | }) | ||
185 | } | ||
186 | } | ||
diff --git a/server/lib/opentelemetry/metrics.ts b/server/lib/opentelemetry/metrics.ts new file mode 100644 index 000000000..ca0aae8e7 --- /dev/null +++ b/server/lib/opentelemetry/metrics.ts | |||
@@ -0,0 +1,111 @@ | |||
1 | import { Application, Request, Response } from 'express' | ||
2 | import { Meter, metrics } from '@opentelemetry/api-metrics' | ||
3 | import { PrometheusExporter } from '@opentelemetry/exporter-prometheus' | ||
4 | import { MeterProvider } from '@opentelemetry/sdk-metrics-base' | ||
5 | import { logger } from '@server/helpers/logger' | ||
6 | import { CONFIG } from '@server/initializers/config' | ||
7 | import { JobQueue } from '../job-queue' | ||
8 | import { StatsObserverBuilder } from './metric-helpers' | ||
9 | |||
10 | class OpenTelemetryMetrics { | ||
11 | |||
12 | private static instance: OpenTelemetryMetrics | ||
13 | |||
14 | private meter: Meter | ||
15 | |||
16 | private onRequestDuration: (req: Request, res: Response) => void | ||
17 | |||
18 | private constructor () {} | ||
19 | |||
20 | init (app: Application) { | ||
21 | if (CONFIG.OPEN_TELEMETRY.METRICS.ENABLED !== true) return | ||
22 | |||
23 | app.use((req, res, next) => { | ||
24 | res.once('finish', () => { | ||
25 | if (!this.onRequestDuration) return | ||
26 | |||
27 | this.onRequestDuration(req as Request, res as Response) | ||
28 | }) | ||
29 | |||
30 | next() | ||
31 | }) | ||
32 | } | ||
33 | |||
34 | registerMetrics () { | ||
35 | if (CONFIG.OPEN_TELEMETRY.METRICS.ENABLED !== true) return | ||
36 | |||
37 | logger.info('Registering Open Telemetry metrics') | ||
38 | |||
39 | const provider = new MeterProvider() | ||
40 | |||
41 | provider.addMetricReader(new PrometheusExporter({ port: CONFIG.OPEN_TELEMETRY.METRICS.PROMETHEUS_EXPORTER.PORT })) | ||
42 | |||
43 | metrics.setGlobalMeterProvider(provider) | ||
44 | |||
45 | this.meter = metrics.getMeter('default') | ||
46 | |||
47 | this.buildMemoryObserver() | ||
48 | this.buildRequestObserver() | ||
49 | this.buildJobQueueObserver() | ||
50 | |||
51 | const statsObserverBuilder = new StatsObserverBuilder(this.meter) | ||
52 | statsObserverBuilder.buildObservers() | ||
53 | } | ||
54 | |||
55 | private buildMemoryObserver () { | ||
56 | this.meter.createObservableGauge('nodejs_memory_usage_bytes', { | ||
57 | description: 'Memory' | ||
58 | }).addCallback(observableResult => { | ||
59 | const current = process.memoryUsage() | ||
60 | |||
61 | observableResult.observe(current.heapTotal, { memoryType: 'heapTotal' }) | ||
62 | observableResult.observe(current.heapUsed, { memoryType: 'heapUsed' }) | ||
63 | observableResult.observe(current.arrayBuffers, { memoryType: 'arrayBuffers' }) | ||
64 | observableResult.observe(current.external, { memoryType: 'external' }) | ||
65 | observableResult.observe(current.rss, { memoryType: 'rss' }) | ||
66 | }) | ||
67 | } | ||
68 | |||
69 | private buildJobQueueObserver () { | ||
70 | this.meter.createObservableGauge('peertube_job_queue_total', { | ||
71 | description: 'Total jobs in the PeerTube job queue' | ||
72 | }).addCallback(async observableResult => { | ||
73 | const stats = await JobQueue.Instance.getStats() | ||
74 | |||
75 | for (const { jobType, counts } of stats) { | ||
76 | for (const state of Object.keys(counts)) { | ||
77 | observableResult.observe(counts[state], { jobType, state }) | ||
78 | } | ||
79 | } | ||
80 | }) | ||
81 | } | ||
82 | |||
83 | private buildRequestObserver () { | ||
84 | const requestDuration = this.meter.createHistogram('http_request_duration_ms', { | ||
85 | unit: 'milliseconds', | ||
86 | description: 'Duration of HTTP requests in ms' | ||
87 | }) | ||
88 | |||
89 | this.onRequestDuration = (req: Request, res: Response) => { | ||
90 | const duration = Date.now() - res.locals.requestStart | ||
91 | |||
92 | requestDuration.record(duration, { | ||
93 | path: this.buildRequestPath(req.originalUrl), | ||
94 | method: req.method, | ||
95 | statusCode: res.statusCode + '' | ||
96 | }) | ||
97 | } | ||
98 | } | ||
99 | |||
100 | private buildRequestPath (path: string) { | ||
101 | return path.split('?')[0] | ||
102 | } | ||
103 | |||
104 | static get Instance () { | ||
105 | return this.instance || (this.instance = new this()) | ||
106 | } | ||
107 | } | ||
108 | |||
109 | export { | ||
110 | OpenTelemetryMetrics | ||
111 | } | ||
diff --git a/server/lib/opentelemetry/tracing.ts b/server/lib/opentelemetry/tracing.ts new file mode 100644 index 000000000..5358d04de --- /dev/null +++ b/server/lib/opentelemetry/tracing.ts | |||
@@ -0,0 +1,81 @@ | |||
1 | import { diag, DiagLogLevel, trace } from '@opentelemetry/api' | ||
2 | import { JaegerExporter } from '@opentelemetry/exporter-jaeger' | ||
3 | import { registerInstrumentations } from '@opentelemetry/instrumentation' | ||
4 | import { DnsInstrumentation } from '@opentelemetry/instrumentation-dns' | ||
5 | import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express' | ||
6 | import FsInstrumentation from '@opentelemetry/instrumentation-fs' | ||
7 | import { HttpInstrumentation } from '@opentelemetry/instrumentation-http' | ||
8 | import { PgInstrumentation } from '@opentelemetry/instrumentation-pg' | ||
9 | import { RedisInstrumentation } from '@opentelemetry/instrumentation-redis-4' | ||
10 | import { Resource } from '@opentelemetry/resources' | ||
11 | import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base' | ||
12 | import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node' | ||
13 | import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions' | ||
14 | import { logger } from '@server/helpers/logger' | ||
15 | import { CONFIG } from '@server/initializers/config' | ||
16 | |||
17 | function registerOpentelemetryTracing () { | ||
18 | if (CONFIG.OPEN_TELEMETRY.TRACING.ENABLED !== true) return | ||
19 | |||
20 | logger.info('Registering Open Telemetry tracing') | ||
21 | |||
22 | const customLogger = (level: string) => { | ||
23 | return (message: string, ...args: unknown[]) => { | ||
24 | let fullMessage = message | ||
25 | |||
26 | for (const arg of args) { | ||
27 | if (typeof arg === 'string') fullMessage += arg | ||
28 | else break | ||
29 | } | ||
30 | |||
31 | logger[level](fullMessage) | ||
32 | } | ||
33 | } | ||
34 | |||
35 | diag.setLogger({ | ||
36 | error: customLogger('error'), | ||
37 | warn: customLogger('warn'), | ||
38 | info: customLogger('info'), | ||
39 | debug: customLogger('debug'), | ||
40 | verbose: customLogger('verbose') | ||
41 | }, DiagLogLevel.INFO) | ||
42 | |||
43 | const tracerProvider = new NodeTracerProvider({ | ||
44 | resource: new Resource({ | ||
45 | [SemanticResourceAttributes.SERVICE_NAME]: 'peertube' | ||
46 | }) | ||
47 | }) | ||
48 | |||
49 | registerInstrumentations({ | ||
50 | tracerProvider: tracerProvider, | ||
51 | instrumentations: [ | ||
52 | new PgInstrumentation({ | ||
53 | enhancedDatabaseReporting: true | ||
54 | }), | ||
55 | new DnsInstrumentation(), | ||
56 | new HttpInstrumentation(), | ||
57 | new ExpressInstrumentation(), | ||
58 | new RedisInstrumentation({ | ||
59 | dbStatementSerializer: function (cmdName, cmdArgs) { | ||
60 | return [ cmdName, ...cmdArgs ].join(' ') | ||
61 | } | ||
62 | }), | ||
63 | new FsInstrumentation() | ||
64 | ] | ||
65 | }) | ||
66 | |||
67 | tracerProvider.addSpanProcessor( | ||
68 | new BatchSpanProcessor( | ||
69 | new JaegerExporter({ endpoint: CONFIG.OPEN_TELEMETRY.TRACING.JAEGER_EXPORTER.ENDPOINT }) | ||
70 | ) | ||
71 | ) | ||
72 | |||
73 | tracerProvider.register() | ||
74 | } | ||
75 | |||
76 | const tracer = trace.getTracer('peertube') | ||
77 | |||
78 | export { | ||
79 | registerOpentelemetryTracing, | ||
80 | tracer | ||
81 | } | ||