diff options
5 files changed, 232 insertions, 33 deletions
diff --git a/server/lib/opentelemetry/metric-helpers/index.ts b/server/lib/opentelemetry/metric-helpers/index.ts index cabb27326..ff0aff9fd 100644 --- a/server/lib/opentelemetry/metric-helpers/index.ts +++ b/server/lib/opentelemetry/metric-helpers/index.ts | |||
@@ -1 +1,3 @@ | |||
1 | export * from './job-queue-observers-builder' | ||
2 | export * from './nodejs-observers-builder' | ||
1 | export * from './stats-observers-builder' | 3 | export * from './stats-observers-builder' |
diff --git a/server/lib/opentelemetry/metric-helpers/job-queue-observers-builder.ts b/server/lib/opentelemetry/metric-helpers/job-queue-observers-builder.ts new file mode 100644 index 000000000..c81959616 --- /dev/null +++ b/server/lib/opentelemetry/metric-helpers/job-queue-observers-builder.ts | |||
@@ -0,0 +1,24 @@ | |||
1 | import { Meter } from '@opentelemetry/api-metrics' | ||
2 | import { JobQueue } from '@server/lib/job-queue' | ||
3 | |||
4 | export class JobQueueObserversBuilder { | ||
5 | |||
6 | constructor (private readonly meter: Meter) { | ||
7 | |||
8 | } | ||
9 | |||
10 | buildObservers () { | ||
11 | this.meter.createObservableGauge('peertube_job_queue_total', { | ||
12 | description: 'Total jobs in the PeerTube job queue' | ||
13 | }).addCallback(async observableResult => { | ||
14 | const stats = await JobQueue.Instance.getStats() | ||
15 | |||
16 | for (const { jobType, counts } of stats) { | ||
17 | for (const state of Object.keys(counts)) { | ||
18 | observableResult.observe(counts[state], { jobType, state }) | ||
19 | } | ||
20 | } | ||
21 | }) | ||
22 | } | ||
23 | |||
24 | } | ||
diff --git a/server/lib/opentelemetry/metric-helpers/nodejs-observers-builder.ts b/server/lib/opentelemetry/metric-helpers/nodejs-observers-builder.ts new file mode 100644 index 000000000..c51a50ebb --- /dev/null +++ b/server/lib/opentelemetry/metric-helpers/nodejs-observers-builder.ts | |||
@@ -0,0 +1,198 @@ | |||
1 | import { readdir } from 'fs-extra' | ||
2 | import { constants, PerformanceObserver } from 'perf_hooks' | ||
3 | import * as process from 'process' | ||
4 | import { Meter, ObservableResult } from '@opentelemetry/api-metrics' | ||
5 | import { ExplicitBucketHistogramAggregation, MeterProvider } from '@opentelemetry/sdk-metrics-base' | ||
6 | import { View } from '@opentelemetry/sdk-metrics-base/build/src/view/View' | ||
7 | import { logger } from '@server/helpers/logger' | ||
8 | |||
9 | // Thanks to https://github.com/siimon/prom-client | ||
10 | // We took their logic and adapter it for opentelemetry | ||
11 | // Try to keep consistency with their metric name/description so it's easier to process (grafana dashboard template etc) | ||
12 | |||
13 | export class NodeJSObserversBuilder { | ||
14 | |||
15 | constructor (private readonly meter: Meter, private readonly meterProvider: MeterProvider) { | ||
16 | } | ||
17 | |||
18 | buildObservers () { | ||
19 | this.buildCPUObserver() | ||
20 | this.buildMemoryObserver() | ||
21 | |||
22 | this.buildHandlesObserver() | ||
23 | this.buildFileDescriptorsObserver() | ||
24 | |||
25 | this.buildGCObserver() | ||
26 | this.buildEventLoopLagObserver() | ||
27 | |||
28 | this.buildLibUVActiveRequestsObserver() | ||
29 | this.buildActiveResourcesObserver() | ||
30 | } | ||
31 | |||
32 | private buildCPUObserver () { | ||
33 | const cpuTotal = this.meter.createObservableCounter('process_cpu_seconds_total', { | ||
34 | description: 'Total user and system CPU time spent in seconds.' | ||
35 | }) | ||
36 | const cpuUser = this.meter.createObservableCounter('process_cpu_user_seconds_total', { | ||
37 | description: 'Total user CPU time spent in seconds.' | ||
38 | }) | ||
39 | const cpuSystem = this.meter.createObservableCounter('process_cpu_system_seconds_total', { | ||
40 | description: 'Total system CPU time spent in seconds.' | ||
41 | }) | ||
42 | |||
43 | let lastCpuUsage = process.cpuUsage() | ||
44 | |||
45 | this.meter.addBatchObservableCallback(observableResult => { | ||
46 | const cpuUsage = process.cpuUsage() | ||
47 | |||
48 | const userUsageMicros = cpuUsage.user - lastCpuUsage.user | ||
49 | const systemUsageMicros = cpuUsage.system - lastCpuUsage.system | ||
50 | |||
51 | lastCpuUsage = cpuUsage | ||
52 | |||
53 | observableResult.observe(cpuTotal, (userUsageMicros + systemUsageMicros) / 1e6) | ||
54 | observableResult.observe(cpuUser, userUsageMicros / 1e6) | ||
55 | observableResult.observe(cpuSystem, systemUsageMicros / 1e6) | ||
56 | |||
57 | }, [ cpuTotal, cpuUser, cpuSystem ]) | ||
58 | } | ||
59 | |||
60 | private buildMemoryObserver () { | ||
61 | this.meter.createObservableGauge('nodejs_memory_usage_bytes', { | ||
62 | description: 'Memory' | ||
63 | }).addCallback(observableResult => { | ||
64 | const current = process.memoryUsage() | ||
65 | |||
66 | observableResult.observe(current.heapTotal, { memoryType: 'heapTotal' }) | ||
67 | observableResult.observe(current.heapUsed, { memoryType: 'heapUsed' }) | ||
68 | observableResult.observe(current.arrayBuffers, { memoryType: 'arrayBuffers' }) | ||
69 | observableResult.observe(current.external, { memoryType: 'external' }) | ||
70 | observableResult.observe(current.rss, { memoryType: 'rss' }) | ||
71 | }) | ||
72 | } | ||
73 | |||
74 | private buildHandlesObserver () { | ||
75 | if (typeof (process as any)._getActiveHandles !== 'function') return | ||
76 | |||
77 | this.meter.createObservableGauge('nodejs_active_handles_total', { | ||
78 | description: 'Total number of active handles.' | ||
79 | }).addCallback(observableResult => { | ||
80 | const handles = (process as any)._getActiveHandles() | ||
81 | |||
82 | observableResult.observe(handles.length) | ||
83 | }) | ||
84 | } | ||
85 | |||
86 | private buildGCObserver () { | ||
87 | const kinds = { | ||
88 | [constants.NODE_PERFORMANCE_GC_MAJOR]: 'major', | ||
89 | [constants.NODE_PERFORMANCE_GC_MINOR]: 'minor', | ||
90 | [constants.NODE_PERFORMANCE_GC_INCREMENTAL]: 'incremental', | ||
91 | [constants.NODE_PERFORMANCE_GC_WEAKCB]: 'weakcb' | ||
92 | } | ||
93 | |||
94 | this.meterProvider.addView( | ||
95 | new View({ aggregation: new ExplicitBucketHistogramAggregation([ 0.001, 0.01, 0.1, 1, 2, 5 ]) }), | ||
96 | { instrument: { name: 'nodejs_gc_duration_seconds' } } | ||
97 | ) | ||
98 | |||
99 | const histogram = this.meter.createHistogram('nodejs_gc_duration_seconds', { | ||
100 | description: 'Garbage collection duration by kind, one of major, minor, incremental or weakcb' | ||
101 | }) | ||
102 | |||
103 | const obs = new PerformanceObserver(list => { | ||
104 | const entry = list.getEntries()[0] | ||
105 | |||
106 | // Node < 16 uses entry.kind | ||
107 | // Node >= 16 uses entry.detail.kind | ||
108 | // See: https://nodejs.org/docs/latest-v16.x/api/deprecations.html#deprecations_dep0152_extension_performanceentry_properties | ||
109 | const kind = (entry as any).detail | ||
110 | ? kinds[(entry as any).detail.kind] | ||
111 | : kinds[entry.kind] | ||
112 | |||
113 | // Convert duration from milliseconds to seconds | ||
114 | histogram.record(entry.duration / 1000, { | ||
115 | kind | ||
116 | }) | ||
117 | }) | ||
118 | |||
119 | obs.observe({ entryTypes: [ 'gc' ] }) | ||
120 | } | ||
121 | |||
122 | private buildEventLoopLagObserver () { | ||
123 | const reportEventloopLag = (start: [ number, number ], observableResult: ObservableResult, res: () => void) => { | ||
124 | const delta = process.hrtime(start) | ||
125 | const nanosec = delta[0] * 1e9 + delta[1] | ||
126 | const seconds = nanosec / 1e9 | ||
127 | |||
128 | observableResult.observe(seconds) | ||
129 | |||
130 | res() | ||
131 | } | ||
132 | |||
133 | this.meter.createObservableGauge('nodejs_eventloop_lag_seconds', { | ||
134 | description: 'Lag of event loop in seconds.' | ||
135 | }).addCallback(observableResult => { | ||
136 | return new Promise(res => { | ||
137 | const start = process.hrtime() | ||
138 | |||
139 | setImmediate(reportEventloopLag, start, observableResult, res) | ||
140 | }) | ||
141 | }) | ||
142 | } | ||
143 | |||
144 | private buildFileDescriptorsObserver () { | ||
145 | this.meter.createObservableGauge('process_open_fds', { | ||
146 | description: 'Number of open file descriptors.' | ||
147 | }).addCallback(async observableResult => { | ||
148 | try { | ||
149 | const fds = await readdir('/proc/self/fd') | ||
150 | observableResult.observe(fds.length - 1) | ||
151 | } catch (err) { | ||
152 | logger.debug('Cannot list file descriptors of current process for OpenTelemetry.', { err }) | ||
153 | } | ||
154 | }) | ||
155 | } | ||
156 | |||
157 | private buildLibUVActiveRequestsObserver () { | ||
158 | if (typeof (process as any)._getActiveRequests !== 'function') return | ||
159 | |||
160 | this.meter.createObservableGauge('nodejs_active_requests_total', { | ||
161 | description: 'Total number of active libuv requests.' | ||
162 | }).addCallback(async observableResult => { | ||
163 | const requests = (process as any)._getActiveRequests() | ||
164 | |||
165 | observableResult.observe(requests.length) | ||
166 | }) | ||
167 | } | ||
168 | |||
169 | private buildActiveResourcesObserver () { | ||
170 | if (typeof (process as any).getActiveResourcesInfo !== 'function') return | ||
171 | |||
172 | const grouped = this.meter.createObservableCounter('nodejs_active_resources', { | ||
173 | description: 'Number of active resources that are currently keeping the event loop alive, grouped by async resource type.' | ||
174 | }) | ||
175 | const total = this.meter.createObservableCounter('nodejs_active_resources_total', { | ||
176 | description: 'Total number of active resources.' | ||
177 | }) | ||
178 | |||
179 | this.meter.addBatchObservableCallback(observableResult => { | ||
180 | const resources = (process as any).getActiveResourcesInfo() | ||
181 | |||
182 | const data = {} | ||
183 | |||
184 | for (let i = 0; i < resources.length; i++) { | ||
185 | const resource = resources[i] | ||
186 | |||
187 | if (data[resource] === undefined) data[resource] = 0 | ||
188 | data[resource] += 1 | ||
189 | } | ||
190 | |||
191 | for (const type of Object.keys(data)) { | ||
192 | observableResult.observe(grouped, data[type], { type }) | ||
193 | } | ||
194 | |||
195 | observableResult.observe(total, resources.length) | ||
196 | }, [ grouped, total ]) | ||
197 | } | ||
198 | } | ||
diff --git a/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts b/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts index 90b58f33d..3d28ffdd8 100644 --- a/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts +++ b/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts | |||
@@ -4,7 +4,7 @@ import { MEMOIZE_TTL } from '@server/initializers/constants' | |||
4 | import { buildAvailableActivities } from '@server/lib/activitypub/activity' | 4 | import { buildAvailableActivities } from '@server/lib/activitypub/activity' |
5 | import { StatsManager } from '@server/lib/stat-manager' | 5 | import { StatsManager } from '@server/lib/stat-manager' |
6 | 6 | ||
7 | export class StatsObserverBuilder { | 7 | export class StatsObserversBuilder { |
8 | 8 | ||
9 | private readonly getInstanceStats = memoizee(() => { | 9 | private readonly getInstanceStats = memoizee(() => { |
10 | return StatsManager.Instance.getStats() | 10 | return StatsManager.Instance.getStats() |
diff --git a/server/lib/opentelemetry/metrics.ts b/server/lib/opentelemetry/metrics.ts index ca0aae8e7..149f421be 100644 --- a/server/lib/opentelemetry/metrics.ts +++ b/server/lib/opentelemetry/metrics.ts | |||
@@ -4,8 +4,7 @@ import { PrometheusExporter } from '@opentelemetry/exporter-prometheus' | |||
4 | import { MeterProvider } from '@opentelemetry/sdk-metrics-base' | 4 | import { MeterProvider } from '@opentelemetry/sdk-metrics-base' |
5 | import { logger } from '@server/helpers/logger' | 5 | import { logger } from '@server/helpers/logger' |
6 | import { CONFIG } from '@server/initializers/config' | 6 | import { CONFIG } from '@server/initializers/config' |
7 | import { JobQueue } from '../job-queue' | 7 | import { JobQueueObserversBuilder, NodeJSObserversBuilder, StatsObserversBuilder } from './metric-helpers' |
8 | import { StatsObserverBuilder } from './metric-helpers' | ||
9 | 8 | ||
10 | class OpenTelemetryMetrics { | 9 | class OpenTelemetryMetrics { |
11 | 10 | ||
@@ -44,40 +43,16 @@ class OpenTelemetryMetrics { | |||
44 | 43 | ||
45 | this.meter = metrics.getMeter('default') | 44 | this.meter = metrics.getMeter('default') |
46 | 45 | ||
47 | this.buildMemoryObserver() | ||
48 | this.buildRequestObserver() | 46 | this.buildRequestObserver() |
49 | this.buildJobQueueObserver() | ||
50 | 47 | ||
51 | const statsObserverBuilder = new StatsObserverBuilder(this.meter) | 48 | const nodeJSObserversBuilder = new NodeJSObserversBuilder(this.meter, provider) |
52 | statsObserverBuilder.buildObservers() | 49 | nodeJSObserversBuilder.buildObservers() |
53 | } | ||
54 | 50 | ||
55 | private buildMemoryObserver () { | 51 | const jobQueueObserversBuilder = new JobQueueObserversBuilder(this.meter) |
56 | this.meter.createObservableGauge('nodejs_memory_usage_bytes', { | 52 | jobQueueObserversBuilder.buildObservers() |
57 | description: 'Memory' | ||
58 | }).addCallback(observableResult => { | ||
59 | const current = process.memoryUsage() | ||
60 | |||
61 | observableResult.observe(current.heapTotal, { memoryType: 'heapTotal' }) | ||
62 | observableResult.observe(current.heapUsed, { memoryType: 'heapUsed' }) | ||
63 | observableResult.observe(current.arrayBuffers, { memoryType: 'arrayBuffers' }) | ||
64 | observableResult.observe(current.external, { memoryType: 'external' }) | ||
65 | observableResult.observe(current.rss, { memoryType: 'rss' }) | ||
66 | }) | ||
67 | } | ||
68 | 53 | ||
69 | private buildJobQueueObserver () { | 54 | const statsObserversBuilder = new StatsObserversBuilder(this.meter) |
70 | this.meter.createObservableGauge('peertube_job_queue_total', { | 55 | statsObserversBuilder.buildObservers() |
71 | description: 'Total jobs in the PeerTube job queue' | ||
72 | }).addCallback(async observableResult => { | ||
73 | const stats = await JobQueue.Instance.getStats() | ||
74 | |||
75 | for (const { jobType, counts } of stats) { | ||
76 | for (const state of Object.keys(counts)) { | ||
77 | observableResult.observe(counts[state], { jobType, state }) | ||
78 | } | ||
79 | } | ||
80 | }) | ||
81 | } | 56 | } |
82 | 57 | ||
83 | private buildRequestObserver () { | 58 | private buildRequestObserver () { |