diff options
Diffstat (limited to 'server/server/lib/opentelemetry')
10 files changed, 863 insertions, 0 deletions
diff --git a/server/server/lib/opentelemetry/metric-helpers/bittorrent-tracker-observers-builder.ts b/server/server/lib/opentelemetry/metric-helpers/bittorrent-tracker-observers-builder.ts new file mode 100644 index 000000000..ef40c0fa9 --- /dev/null +++ b/server/server/lib/opentelemetry/metric-helpers/bittorrent-tracker-observers-builder.ts | |||
@@ -0,0 +1,51 @@ | |||
1 | import { Meter } from '@opentelemetry/api' | ||
2 | |||
3 | export class BittorrentTrackerObserversBuilder { | ||
4 | |||
5 | constructor (private readonly meter: Meter, private readonly trackerServer: any) { | ||
6 | |||
7 | } | ||
8 | |||
9 | buildObservers () { | ||
10 | const activeInfohashes = this.meter.createObservableGauge('peertube_bittorrent_tracker_active_infohashes_total', { | ||
11 | description: 'Total active infohashes in the PeerTube BitTorrent Tracker' | ||
12 | }) | ||
13 | const inactiveInfohashes = this.meter.createObservableGauge('peertube_bittorrent_tracker_inactive_infohashes_total', { | ||
14 | description: 'Total inactive infohashes in the PeerTube BitTorrent Tracker' | ||
15 | }) | ||
16 | const peers = this.meter.createObservableGauge('peertube_bittorrent_tracker_peers_total', { | ||
17 | description: 'Total peers in the PeerTube BitTorrent Tracker' | ||
18 | }) | ||
19 | |||
20 | this.meter.addBatchObservableCallback(observableResult => { | ||
21 | const infohashes = Object.keys(this.trackerServer.torrents) | ||
22 | |||
23 | const counters = { | ||
24 | activeInfohashes: 0, | ||
25 | inactiveInfohashes: 0, | ||
26 | peers: 0, | ||
27 | uncompletedPeers: 0 | ||
28 | } | ||
29 | |||
30 | for (const infohash of infohashes) { | ||
31 | const content = this.trackerServer.torrents[infohash] | ||
32 | |||
33 | const peers = content.peers | ||
34 | if (peers.keys.length !== 0) counters.activeInfohashes++ | ||
35 | else counters.inactiveInfohashes++ | ||
36 | |||
37 | for (const peerId of peers.keys) { | ||
38 | const peer = peers.peek(peerId) | ||
39 | if (peer == null) return | ||
40 | |||
41 | counters.peers++ | ||
42 | } | ||
43 | } | ||
44 | |||
45 | observableResult.observe(activeInfohashes, counters.activeInfohashes) | ||
46 | observableResult.observe(inactiveInfohashes, counters.inactiveInfohashes) | ||
47 | observableResult.observe(peers, counters.peers) | ||
48 | }, [ activeInfohashes, inactiveInfohashes, peers ]) | ||
49 | } | ||
50 | |||
51 | } | ||
diff --git a/server/server/lib/opentelemetry/metric-helpers/index.ts b/server/server/lib/opentelemetry/metric-helpers/index.ts new file mode 100644 index 000000000..ec9c35408 --- /dev/null +++ b/server/server/lib/opentelemetry/metric-helpers/index.ts | |||
@@ -0,0 +1,7 @@ | |||
1 | export * from './bittorrent-tracker-observers-builder.js' | ||
2 | export * from './lives-observers-builder.js' | ||
3 | export * from './job-queue-observers-builder.js' | ||
4 | export * from './nodejs-observers-builder.js' | ||
5 | export * from './playback-metrics.js' | ||
6 | export * from './stats-observers-builder.js' | ||
7 | export * from './viewers-observers-builder.js' | ||
diff --git a/server/server/lib/opentelemetry/metric-helpers/job-queue-observers-builder.ts b/server/server/lib/opentelemetry/metric-helpers/job-queue-observers-builder.ts new file mode 100644 index 000000000..c0bcb517e --- /dev/null +++ b/server/server/lib/opentelemetry/metric-helpers/job-queue-observers-builder.ts | |||
@@ -0,0 +1,24 @@ | |||
1 | import { Meter } from '@opentelemetry/api' | ||
2 | import { JobQueue } from '@server/lib/job-queue/index.js' | ||
3 | |||
4 | export class JobQueueObserversBuilder { | ||
5 | |||
6 | constructor (private readonly meter: Meter) { | ||
7 | |||
8 | } | ||
9 | |||
10 | buildObservers () { | ||
11 | this.meter.createObservableGauge('peertube_job_queue_total', { | ||
12 | description: 'Total jobs in the PeerTube job queue' | ||
13 | }).addCallback(async observableResult => { | ||
14 | const stats = await JobQueue.Instance.getStats() | ||
15 | |||
16 | for (const { jobType, counts } of stats) { | ||
17 | for (const state of Object.keys(counts)) { | ||
18 | observableResult.observe(counts[state], { jobType, state }) | ||
19 | } | ||
20 | } | ||
21 | }) | ||
22 | } | ||
23 | |||
24 | } | ||
diff --git a/server/server/lib/opentelemetry/metric-helpers/lives-observers-builder.ts b/server/server/lib/opentelemetry/metric-helpers/lives-observers-builder.ts new file mode 100644 index 000000000..45676ed2b --- /dev/null +++ b/server/server/lib/opentelemetry/metric-helpers/lives-observers-builder.ts | |||
@@ -0,0 +1,21 @@ | |||
1 | import { Meter } from '@opentelemetry/api' | ||
2 | import { VideoModel } from '@server/models/video/video.js' | ||
3 | |||
4 | export class LivesObserversBuilder { | ||
5 | |||
6 | constructor (private readonly meter: Meter) { | ||
7 | |||
8 | } | ||
9 | |||
10 | buildObservers () { | ||
11 | this.meter.createObservableGauge('peertube_running_lives_total', { | ||
12 | description: 'Total running lives on the instance' | ||
13 | }).addCallback(async observableResult => { | ||
14 | const local = await VideoModel.countLives({ remote: false, mode: 'published' }) | ||
15 | const remote = await VideoModel.countLives({ remote: true, mode: 'published' }) | ||
16 | |||
17 | observableResult.observe(local, { liveOrigin: 'local' }) | ||
18 | observableResult.observe(remote, { liveOrigin: 'remote' }) | ||
19 | }) | ||
20 | } | ||
21 | } | ||
diff --git a/server/server/lib/opentelemetry/metric-helpers/nodejs-observers-builder.ts b/server/server/lib/opentelemetry/metric-helpers/nodejs-observers-builder.ts new file mode 100644 index 000000000..40df12b8d --- /dev/null +++ b/server/server/lib/opentelemetry/metric-helpers/nodejs-observers-builder.ts | |||
@@ -0,0 +1,202 @@ | |||
1 | import { readdir } from 'fs/promises' | ||
2 | import { constants, NodeGCPerformanceDetail, PerformanceObserver } from 'perf_hooks' | ||
3 | import * as process from 'process' | ||
4 | import { Meter, ObservableResult } from '@opentelemetry/api' | ||
5 | import { ExplicitBucketHistogramAggregation } from '@opentelemetry/sdk-metrics' | ||
6 | import { View } from '@opentelemetry/sdk-metrics/build/src/view/View.js' | ||
7 | import { logger } from '@server/helpers/logger.js' | ||
8 | |||
9 | // Thanks to https://github.com/siimon/prom-client | ||
10 | // We took their logic and adapter it for opentelemetry | ||
11 | // Try to keep consistency with their metric name/description so it's easier to process (grafana dashboard template etc) | ||
12 | |||
13 | export class NodeJSObserversBuilder { | ||
14 | |||
15 | constructor (private readonly meter: Meter) { | ||
16 | } | ||
17 | |||
18 | static getViews () { | ||
19 | return [ | ||
20 | new View({ | ||
21 | aggregation: new ExplicitBucketHistogramAggregation([ 0.001, 0.01, 0.1, 1, 2, 5 ]), | ||
22 | instrumentName: 'nodejs_gc_duration_seconds' | ||
23 | }) | ||
24 | ] | ||
25 | } | ||
26 | |||
27 | buildObservers () { | ||
28 | this.buildCPUObserver() | ||
29 | this.buildMemoryObserver() | ||
30 | |||
31 | this.buildHandlesObserver() | ||
32 | this.buildFileDescriptorsObserver() | ||
33 | |||
34 | this.buildGCObserver() | ||
35 | this.buildEventLoopLagObserver() | ||
36 | |||
37 | this.buildLibUVActiveRequestsObserver() | ||
38 | this.buildActiveResourcesObserver() | ||
39 | } | ||
40 | |||
41 | private buildCPUObserver () { | ||
42 | const cpuTotal = this.meter.createObservableCounter('process_cpu_seconds_total', { | ||
43 | description: 'Total user and system CPU time spent in seconds.' | ||
44 | }) | ||
45 | const cpuUser = this.meter.createObservableCounter('process_cpu_user_seconds_total', { | ||
46 | description: 'Total user CPU time spent in seconds.' | ||
47 | }) | ||
48 | const cpuSystem = this.meter.createObservableCounter('process_cpu_system_seconds_total', { | ||
49 | description: 'Total system CPU time spent in seconds.' | ||
50 | }) | ||
51 | |||
52 | let lastCpuUsage = process.cpuUsage() | ||
53 | |||
54 | this.meter.addBatchObservableCallback(observableResult => { | ||
55 | const cpuUsage = process.cpuUsage() | ||
56 | |||
57 | const userUsageMicros = cpuUsage.user - lastCpuUsage.user | ||
58 | const systemUsageMicros = cpuUsage.system - lastCpuUsage.system | ||
59 | |||
60 | lastCpuUsage = cpuUsage | ||
61 | |||
62 | observableResult.observe(cpuTotal, (userUsageMicros + systemUsageMicros) / 1e6) | ||
63 | observableResult.observe(cpuUser, userUsageMicros / 1e6) | ||
64 | observableResult.observe(cpuSystem, systemUsageMicros / 1e6) | ||
65 | |||
66 | }, [ cpuTotal, cpuUser, cpuSystem ]) | ||
67 | } | ||
68 | |||
69 | private buildMemoryObserver () { | ||
70 | this.meter.createObservableGauge('nodejs_memory_usage_bytes', { | ||
71 | description: 'Memory' | ||
72 | }).addCallback(observableResult => { | ||
73 | const current = process.memoryUsage() | ||
74 | |||
75 | observableResult.observe(current.heapTotal, { memoryType: 'heapTotal' }) | ||
76 | observableResult.observe(current.heapUsed, { memoryType: 'heapUsed' }) | ||
77 | observableResult.observe(current.arrayBuffers, { memoryType: 'arrayBuffers' }) | ||
78 | observableResult.observe(current.external, { memoryType: 'external' }) | ||
79 | observableResult.observe(current.rss, { memoryType: 'rss' }) | ||
80 | }) | ||
81 | } | ||
82 | |||
83 | private buildHandlesObserver () { | ||
84 | if (typeof (process as any)._getActiveHandles !== 'function') return | ||
85 | |||
86 | this.meter.createObservableGauge('nodejs_active_handles_total', { | ||
87 | description: 'Total number of active handles.' | ||
88 | }).addCallback(observableResult => { | ||
89 | const handles = (process as any)._getActiveHandles() | ||
90 | |||
91 | observableResult.observe(handles.length) | ||
92 | }) | ||
93 | } | ||
94 | |||
95 | private buildGCObserver () { | ||
96 | const kinds = { | ||
97 | [constants.NODE_PERFORMANCE_GC_MAJOR]: 'major', | ||
98 | [constants.NODE_PERFORMANCE_GC_MINOR]: 'minor', | ||
99 | [constants.NODE_PERFORMANCE_GC_INCREMENTAL]: 'incremental', | ||
100 | [constants.NODE_PERFORMANCE_GC_WEAKCB]: 'weakcb' | ||
101 | } | ||
102 | |||
103 | const histogram = this.meter.createHistogram('nodejs_gc_duration_seconds', { | ||
104 | description: 'Garbage collection duration by kind, one of major, minor, incremental or weakcb' | ||
105 | }) | ||
106 | |||
107 | const obs = new PerformanceObserver(list => { | ||
108 | const entry = list.getEntries()[0] | ||
109 | |||
110 | // Node < 16 uses entry.kind | ||
111 | // Node >= 16 uses entry.detail.kind | ||
112 | // See: https://nodejs.org/docs/latest-v16.x/api/deprecations.html#deprecations_dep0152_extension_performanceentry_properties | ||
113 | const kind = entry.detail | ||
114 | ? kinds[(entry.detail as NodeGCPerformanceDetail).kind] | ||
115 | : kinds[(entry as any).kind] | ||
116 | |||
117 | // Convert duration from milliseconds to seconds | ||
118 | histogram.record(entry.duration / 1000, { | ||
119 | kind | ||
120 | }) | ||
121 | }) | ||
122 | |||
123 | obs.observe({ entryTypes: [ 'gc' ] }) | ||
124 | } | ||
125 | |||
126 | private buildEventLoopLagObserver () { | ||
127 | const reportEventloopLag = (start: [ number, number ], observableResult: ObservableResult, res: () => void) => { | ||
128 | const delta = process.hrtime(start) | ||
129 | const nanosec = delta[0] * 1e9 + delta[1] | ||
130 | const seconds = nanosec / 1e9 | ||
131 | |||
132 | observableResult.observe(seconds) | ||
133 | |||
134 | res() | ||
135 | } | ||
136 | |||
137 | this.meter.createObservableGauge('nodejs_eventloop_lag_seconds', { | ||
138 | description: 'Lag of event loop in seconds.' | ||
139 | }).addCallback(observableResult => { | ||
140 | return new Promise(res => { | ||
141 | const start = process.hrtime() | ||
142 | |||
143 | setImmediate(reportEventloopLag, start, observableResult, res) | ||
144 | }) | ||
145 | }) | ||
146 | } | ||
147 | |||
148 | private buildFileDescriptorsObserver () { | ||
149 | this.meter.createObservableGauge('process_open_fds', { | ||
150 | description: 'Number of open file descriptors.' | ||
151 | }).addCallback(async observableResult => { | ||
152 | try { | ||
153 | const fds = await readdir('/proc/self/fd') | ||
154 | observableResult.observe(fds.length - 1) | ||
155 | } catch (err) { | ||
156 | logger.debug('Cannot list file descriptors of current process for OpenTelemetry.', { err }) | ||
157 | } | ||
158 | }) | ||
159 | } | ||
160 | |||
161 | private buildLibUVActiveRequestsObserver () { | ||
162 | if (typeof (process as any)._getActiveRequests !== 'function') return | ||
163 | |||
164 | this.meter.createObservableGauge('nodejs_active_requests_total', { | ||
165 | description: 'Total number of active libuv requests.' | ||
166 | }).addCallback(observableResult => { | ||
167 | const requests = (process as any)._getActiveRequests() | ||
168 | |||
169 | observableResult.observe(requests.length) | ||
170 | }) | ||
171 | } | ||
172 | |||
173 | private buildActiveResourcesObserver () { | ||
174 | if (typeof (process as any).getActiveResourcesInfo !== 'function') return | ||
175 | |||
176 | const grouped = this.meter.createObservableCounter('nodejs_active_resources', { | ||
177 | description: 'Number of active resources that are currently keeping the event loop alive, grouped by async resource type.' | ||
178 | }) | ||
179 | const total = this.meter.createObservableCounter('nodejs_active_resources_total', { | ||
180 | description: 'Total number of active resources.' | ||
181 | }) | ||
182 | |||
183 | this.meter.addBatchObservableCallback(observableResult => { | ||
184 | const resources = (process as any).getActiveResourcesInfo() | ||
185 | |||
186 | const data = {} | ||
187 | |||
188 | for (let i = 0; i < resources.length; i++) { | ||
189 | const resource = resources[i] | ||
190 | |||
191 | if (data[resource] === undefined) data[resource] = 0 | ||
192 | data[resource] += 1 | ||
193 | } | ||
194 | |||
195 | for (const type of Object.keys(data)) { | ||
196 | observableResult.observe(grouped, data[type], { type }) | ||
197 | } | ||
198 | |||
199 | observableResult.observe(total, resources.length) | ||
200 | }, [ grouped, total ]) | ||
201 | } | ||
202 | } | ||
diff --git a/server/server/lib/opentelemetry/metric-helpers/playback-metrics.ts b/server/server/lib/opentelemetry/metric-helpers/playback-metrics.ts new file mode 100644 index 000000000..ec139f331 --- /dev/null +++ b/server/server/lib/opentelemetry/metric-helpers/playback-metrics.ts | |||
@@ -0,0 +1,85 @@ | |||
1 | import { Counter, Meter } from '@opentelemetry/api' | ||
2 | import { MVideoImmutable } from '@server/types/models/index.js' | ||
3 | import { PlaybackMetricCreate } from '@peertube/peertube-models' | ||
4 | |||
5 | export class PlaybackMetrics { | ||
6 | private errorsCounter: Counter | ||
7 | private resolutionChangesCounter: Counter | ||
8 | |||
9 | private downloadedBytesP2PCounter: Counter | ||
10 | private uploadedBytesP2PCounter: Counter | ||
11 | |||
12 | private downloadedBytesHTTPCounter: Counter | ||
13 | |||
14 | private peersP2PPeersGaugeBuffer: { | ||
15 | value: number | ||
16 | attributes: any | ||
17 | }[] = [] | ||
18 | |||
19 | constructor (private readonly meter: Meter) { | ||
20 | |||
21 | } | ||
22 | |||
23 | buildCounters () { | ||
24 | this.errorsCounter = this.meter.createCounter('peertube_playback_errors_count', { | ||
25 | description: 'Errors collected from PeerTube player.' | ||
26 | }) | ||
27 | |||
28 | this.resolutionChangesCounter = this.meter.createCounter('peertube_playback_resolution_changes_count', { | ||
29 | description: 'Resolution changes collected from PeerTube player.' | ||
30 | }) | ||
31 | |||
32 | this.downloadedBytesHTTPCounter = this.meter.createCounter('peertube_playback_http_downloaded_bytes', { | ||
33 | description: 'Downloaded bytes with HTTP by PeerTube player.' | ||
34 | }) | ||
35 | this.downloadedBytesP2PCounter = this.meter.createCounter('peertube_playback_p2p_downloaded_bytes', { | ||
36 | description: 'Downloaded bytes with P2P by PeerTube player.' | ||
37 | }) | ||
38 | |||
39 | this.uploadedBytesP2PCounter = this.meter.createCounter('peertube_playback_p2p_uploaded_bytes', { | ||
40 | description: 'Uploaded bytes with P2P by PeerTube player.' | ||
41 | }) | ||
42 | |||
43 | this.meter.createObservableGauge('peertube_playback_p2p_peers', { | ||
44 | description: 'Total P2P peers connected to the PeerTube player.' | ||
45 | }).addCallback(observableResult => { | ||
46 | for (const gauge of this.peersP2PPeersGaugeBuffer) { | ||
47 | observableResult.observe(gauge.value, gauge.attributes) | ||
48 | } | ||
49 | |||
50 | this.peersP2PPeersGaugeBuffer = [] | ||
51 | }) | ||
52 | } | ||
53 | |||
54 | observe (video: MVideoImmutable, metrics: PlaybackMetricCreate) { | ||
55 | const attributes = { | ||
56 | videoOrigin: video.remote | ||
57 | ? 'remote' | ||
58 | : 'local', | ||
59 | |||
60 | playerMode: metrics.playerMode, | ||
61 | |||
62 | resolution: metrics.resolution + '', | ||
63 | fps: metrics.fps + '', | ||
64 | |||
65 | p2pEnabled: metrics.p2pEnabled, | ||
66 | |||
67 | videoUUID: video.uuid | ||
68 | } | ||
69 | |||
70 | this.errorsCounter.add(metrics.errors, attributes) | ||
71 | this.resolutionChangesCounter.add(metrics.resolutionChanges, attributes) | ||
72 | |||
73 | this.downloadedBytesHTTPCounter.add(metrics.downloadedBytesHTTP, attributes) | ||
74 | this.downloadedBytesP2PCounter.add(metrics.downloadedBytesP2P, attributes) | ||
75 | |||
76 | this.uploadedBytesP2PCounter.add(metrics.uploadedBytesP2P, attributes) | ||
77 | |||
78 | if (metrics.p2pPeers) { | ||
79 | this.peersP2PPeersGaugeBuffer.push({ | ||
80 | value: metrics.p2pPeers, | ||
81 | attributes | ||
82 | }) | ||
83 | } | ||
84 | } | ||
85 | } | ||
diff --git a/server/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts b/server/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts new file mode 100644 index 000000000..05e6431a4 --- /dev/null +++ b/server/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts | |||
@@ -0,0 +1,186 @@ | |||
1 | import memoizee from 'memoizee' | ||
2 | import { Meter } from '@opentelemetry/api' | ||
3 | import { MEMOIZE_TTL } from '@server/initializers/constants.js' | ||
4 | import { buildAvailableActivities } from '@server/lib/activitypub/activity.js' | ||
5 | import { StatsManager } from '@server/lib/stat-manager.js' | ||
6 | |||
7 | export class StatsObserversBuilder { | ||
8 | |||
9 | private readonly getInstanceStats = memoizee(() => { | ||
10 | return StatsManager.Instance.getStats() | ||
11 | }, { maxAge: MEMOIZE_TTL.GET_STATS_FOR_OPEN_TELEMETRY_METRICS }) | ||
12 | |||
13 | constructor (private readonly meter: Meter) { | ||
14 | |||
15 | } | ||
16 | |||
17 | buildObservers () { | ||
18 | this.buildUserStatsObserver() | ||
19 | this.buildVideoStatsObserver() | ||
20 | this.buildCommentStatsObserver() | ||
21 | this.buildPlaylistStatsObserver() | ||
22 | this.buildChannelStatsObserver() | ||
23 | this.buildInstanceFollowsStatsObserver() | ||
24 | this.buildRedundancyStatsObserver() | ||
25 | this.buildActivityPubStatsObserver() | ||
26 | } | ||
27 | |||
28 | private buildUserStatsObserver () { | ||
29 | this.meter.createObservableGauge('peertube_users_total', { | ||
30 | description: 'Total users on the instance' | ||
31 | }).addCallback(async observableResult => { | ||
32 | const stats = await this.getInstanceStats() | ||
33 | |||
34 | observableResult.observe(stats.totalUsers) | ||
35 | }) | ||
36 | |||
37 | this.meter.createObservableGauge('peertube_active_users_total', { | ||
38 | description: 'Total active users on the instance' | ||
39 | }).addCallback(async observableResult => { | ||
40 | const stats = await this.getInstanceStats() | ||
41 | |||
42 | observableResult.observe(stats.totalDailyActiveUsers, { activeInterval: 'daily' }) | ||
43 | observableResult.observe(stats.totalWeeklyActiveUsers, { activeInterval: 'weekly' }) | ||
44 | observableResult.observe(stats.totalMonthlyActiveUsers, { activeInterval: 'monthly' }) | ||
45 | }) | ||
46 | } | ||
47 | |||
48 | private buildChannelStatsObserver () { | ||
49 | this.meter.createObservableGauge('peertube_channels_total', { | ||
50 | description: 'Total channels on the instance' | ||
51 | }).addCallback(async observableResult => { | ||
52 | const stats = await this.getInstanceStats() | ||
53 | |||
54 | observableResult.observe(stats.totalLocalVideoChannels, { channelOrigin: 'local' }) | ||
55 | }) | ||
56 | |||
57 | this.meter.createObservableGauge('peertube_active_channels_total', { | ||
58 | description: 'Total active channels on the instance' | ||
59 | }).addCallback(async observableResult => { | ||
60 | const stats = await this.getInstanceStats() | ||
61 | |||
62 | observableResult.observe(stats.totalLocalDailyActiveVideoChannels, { channelOrigin: 'local', activeInterval: 'daily' }) | ||
63 | observableResult.observe(stats.totalLocalWeeklyActiveVideoChannels, { channelOrigin: 'local', activeInterval: 'weekly' }) | ||
64 | observableResult.observe(stats.totalLocalMonthlyActiveVideoChannels, { channelOrigin: 'local', activeInterval: 'monthly' }) | ||
65 | }) | ||
66 | } | ||
67 | |||
68 | private buildVideoStatsObserver () { | ||
69 | this.meter.createObservableGauge('peertube_videos_total', { | ||
70 | description: 'Total videos on the instance' | ||
71 | }).addCallback(async observableResult => { | ||
72 | const stats = await this.getInstanceStats() | ||
73 | |||
74 | observableResult.observe(stats.totalLocalVideos, { videoOrigin: 'local' }) | ||
75 | observableResult.observe(stats.totalVideos - stats.totalLocalVideos, { videoOrigin: 'remote' }) | ||
76 | }) | ||
77 | |||
78 | this.meter.createObservableGauge('peertube_video_views_total', { | ||
79 | description: 'Total video views made on the instance' | ||
80 | }).addCallback(async observableResult => { | ||
81 | const stats = await this.getInstanceStats() | ||
82 | |||
83 | observableResult.observe(stats.totalLocalVideoViews, { viewOrigin: 'local' }) | ||
84 | }) | ||
85 | |||
86 | this.meter.createObservableGauge('peertube_video_bytes_total', { | ||
87 | description: 'Total bytes of videos' | ||
88 | }).addCallback(async observableResult => { | ||
89 | const stats = await this.getInstanceStats() | ||
90 | |||
91 | observableResult.observe(stats.totalLocalVideoFilesSize, { videoOrigin: 'local' }) | ||
92 | }) | ||
93 | } | ||
94 | |||
95 | private buildCommentStatsObserver () { | ||
96 | this.meter.createObservableGauge('peertube_comments_total', { | ||
97 | description: 'Total comments on the instance' | ||
98 | }).addCallback(async observableResult => { | ||
99 | const stats = await this.getInstanceStats() | ||
100 | |||
101 | observableResult.observe(stats.totalLocalVideoComments, { accountOrigin: 'local' }) | ||
102 | }) | ||
103 | } | ||
104 | |||
105 | private buildPlaylistStatsObserver () { | ||
106 | this.meter.createObservableGauge('peertube_playlists_total', { | ||
107 | description: 'Total playlists on the instance' | ||
108 | }).addCallback(async observableResult => { | ||
109 | const stats = await this.getInstanceStats() | ||
110 | |||
111 | observableResult.observe(stats.totalLocalPlaylists, { playlistOrigin: 'local' }) | ||
112 | }) | ||
113 | } | ||
114 | |||
115 | private buildInstanceFollowsStatsObserver () { | ||
116 | this.meter.createObservableGauge('peertube_instance_followers_total', { | ||
117 | description: 'Total followers of the instance' | ||
118 | }).addCallback(async observableResult => { | ||
119 | const stats = await this.getInstanceStats() | ||
120 | |||
121 | observableResult.observe(stats.totalInstanceFollowers) | ||
122 | }) | ||
123 | |||
124 | this.meter.createObservableGauge('peertube_instance_following_total', { | ||
125 | description: 'Total following of the instance' | ||
126 | }).addCallback(async observableResult => { | ||
127 | const stats = await this.getInstanceStats() | ||
128 | |||
129 | observableResult.observe(stats.totalInstanceFollowing) | ||
130 | }) | ||
131 | } | ||
132 | |||
133 | private buildRedundancyStatsObserver () { | ||
134 | this.meter.createObservableGauge('peertube_redundancy_used_bytes_total', { | ||
135 | description: 'Total redundancy used of the instance' | ||
136 | }).addCallback(async observableResult => { | ||
137 | const stats = await this.getInstanceStats() | ||
138 | |||
139 | for (const r of stats.videosRedundancy) { | ||
140 | observableResult.observe(r.totalUsed, { strategy: r.strategy }) | ||
141 | } | ||
142 | }) | ||
143 | |||
144 | this.meter.createObservableGauge('peertube_redundancy_available_bytes_total', { | ||
145 | description: 'Total redundancy available of the instance' | ||
146 | }).addCallback(async observableResult => { | ||
147 | const stats = await this.getInstanceStats() | ||
148 | |||
149 | for (const r of stats.videosRedundancy) { | ||
150 | observableResult.observe(r.totalSize, { strategy: r.strategy }) | ||
151 | } | ||
152 | }) | ||
153 | } | ||
154 | |||
155 | private buildActivityPubStatsObserver () { | ||
156 | const availableActivities = buildAvailableActivities() | ||
157 | |||
158 | this.meter.createObservableGauge('peertube_ap_inbox_success_total', { | ||
159 | description: 'Total inbox messages processed with success' | ||
160 | }).addCallback(async observableResult => { | ||
161 | const stats = await this.getInstanceStats() | ||
162 | |||
163 | for (const type of availableActivities) { | ||
164 | observableResult.observe(stats[`totalActivityPub${type}MessagesSuccesses`], { activityType: type }) | ||
165 | } | ||
166 | }) | ||
167 | |||
168 | this.meter.createObservableGauge('peertube_ap_inbox_error_total', { | ||
169 | description: 'Total inbox messages processed with error' | ||
170 | }).addCallback(async observableResult => { | ||
171 | const stats = await this.getInstanceStats() | ||
172 | |||
173 | for (const type of availableActivities) { | ||
174 | observableResult.observe(stats[`totalActivityPub${type}MessagesErrors`], { activityType: type }) | ||
175 | } | ||
176 | }) | ||
177 | |||
178 | this.meter.createObservableGauge('peertube_ap_inbox_waiting_total', { | ||
179 | description: 'Total inbox messages waiting for being processed' | ||
180 | }).addCallback(async observableResult => { | ||
181 | const stats = await this.getInstanceStats() | ||
182 | |||
183 | observableResult.observe(stats.totalActivityPubMessagesWaiting) | ||
184 | }) | ||
185 | } | ||
186 | } | ||
diff --git a/server/server/lib/opentelemetry/metric-helpers/viewers-observers-builder.ts b/server/server/lib/opentelemetry/metric-helpers/viewers-observers-builder.ts new file mode 100644 index 000000000..a1e1c7496 --- /dev/null +++ b/server/server/lib/opentelemetry/metric-helpers/viewers-observers-builder.ts | |||
@@ -0,0 +1,24 @@ | |||
1 | import { Meter } from '@opentelemetry/api' | ||
2 | import { VideoScope, ViewerScope } from '@server/lib/views/shared/index.js' | ||
3 | import { VideoViewsManager } from '@server/lib/views/video-views-manager.js' | ||
4 | |||
5 | export class ViewersObserversBuilder { | ||
6 | |||
7 | constructor (private readonly meter: Meter) { | ||
8 | |||
9 | } | ||
10 | |||
11 | buildObservers () { | ||
12 | this.meter.createObservableGauge('peertube_viewers_total', { | ||
13 | description: 'Total viewers on the instance' | ||
14 | }).addCallback(observableResult => { | ||
15 | for (const viewerScope of [ 'local', 'remote' ] as ViewerScope[]) { | ||
16 | for (const videoScope of [ 'local', 'remote' ] as VideoScope[]) { | ||
17 | const result = VideoViewsManager.Instance.getTotalViewers({ viewerScope, videoScope }) | ||
18 | |||
19 | observableResult.observe(result, { viewerOrigin: viewerScope, videoOrigin: videoScope }) | ||
20 | } | ||
21 | } | ||
22 | }) | ||
23 | } | ||
24 | } | ||
diff --git a/server/server/lib/opentelemetry/metrics.ts b/server/server/lib/opentelemetry/metrics.ts new file mode 100644 index 000000000..7276182ef --- /dev/null +++ b/server/server/lib/opentelemetry/metrics.ts | |||
@@ -0,0 +1,123 @@ | |||
1 | import { Application, Request, Response } from 'express' | ||
2 | import { Meter, metrics } from '@opentelemetry/api' | ||
3 | import { PrometheusExporter } from '@opentelemetry/exporter-prometheus' | ||
4 | import { MeterProvider } from '@opentelemetry/sdk-metrics' | ||
5 | import { logger } from '@server/helpers/logger.js' | ||
6 | import { CONFIG } from '@server/initializers/config.js' | ||
7 | import { MVideoImmutable } from '@server/types/models/index.js' | ||
8 | import { PlaybackMetricCreate } from '@peertube/peertube-models' | ||
9 | import { | ||
10 | BittorrentTrackerObserversBuilder, | ||
11 | JobQueueObserversBuilder, | ||
12 | LivesObserversBuilder, | ||
13 | NodeJSObserversBuilder, | ||
14 | PlaybackMetrics, | ||
15 | StatsObserversBuilder, | ||
16 | ViewersObserversBuilder | ||
17 | } from './metric-helpers/index.js' | ||
18 | |||
19 | class OpenTelemetryMetrics { | ||
20 | |||
21 | private static instance: OpenTelemetryMetrics | ||
22 | |||
23 | private meter: Meter | ||
24 | |||
25 | private onRequestDuration: (req: Request, res: Response) => void | ||
26 | |||
27 | private playbackMetrics: PlaybackMetrics | ||
28 | |||
29 | private constructor () {} | ||
30 | |||
31 | init (app: Application) { | ||
32 | if (CONFIG.OPEN_TELEMETRY.METRICS.ENABLED !== true) return | ||
33 | |||
34 | app.use((req, res, next) => { | ||
35 | res.once('finish', () => { | ||
36 | if (!this.onRequestDuration) return | ||
37 | |||
38 | this.onRequestDuration(req as Request, res as Response) | ||
39 | }) | ||
40 | |||
41 | next() | ||
42 | }) | ||
43 | } | ||
44 | |||
45 | registerMetrics (options: { trackerServer: any }) { | ||
46 | if (CONFIG.OPEN_TELEMETRY.METRICS.ENABLED !== true) return | ||
47 | |||
48 | logger.info('Registering Open Telemetry metrics') | ||
49 | |||
50 | const provider = new MeterProvider({ | ||
51 | views: [ | ||
52 | ...NodeJSObserversBuilder.getViews() | ||
53 | ] | ||
54 | }) | ||
55 | |||
56 | provider.addMetricReader(new PrometheusExporter({ | ||
57 | host: CONFIG.OPEN_TELEMETRY.METRICS.PROMETHEUS_EXPORTER.HOSTNAME, | ||
58 | port: CONFIG.OPEN_TELEMETRY.METRICS.PROMETHEUS_EXPORTER.PORT | ||
59 | })) | ||
60 | |||
61 | metrics.setGlobalMeterProvider(provider) | ||
62 | |||
63 | this.meter = metrics.getMeter('default') | ||
64 | |||
65 | if (CONFIG.OPEN_TELEMETRY.METRICS.HTTP_REQUEST_DURATION.ENABLED === true) { | ||
66 | this.buildRequestObserver() | ||
67 | } | ||
68 | |||
69 | this.playbackMetrics = new PlaybackMetrics(this.meter) | ||
70 | this.playbackMetrics.buildCounters() | ||
71 | |||
72 | const nodeJSObserversBuilder = new NodeJSObserversBuilder(this.meter) | ||
73 | nodeJSObserversBuilder.buildObservers() | ||
74 | |||
75 | const jobQueueObserversBuilder = new JobQueueObserversBuilder(this.meter) | ||
76 | jobQueueObserversBuilder.buildObservers() | ||
77 | |||
78 | const statsObserversBuilder = new StatsObserversBuilder(this.meter) | ||
79 | statsObserversBuilder.buildObservers() | ||
80 | |||
81 | const livesObserversBuilder = new LivesObserversBuilder(this.meter) | ||
82 | livesObserversBuilder.buildObservers() | ||
83 | |||
84 | const viewersObserversBuilder = new ViewersObserversBuilder(this.meter) | ||
85 | viewersObserversBuilder.buildObservers() | ||
86 | |||
87 | const bittorrentTrackerObserversBuilder = new BittorrentTrackerObserversBuilder(this.meter, options.trackerServer) | ||
88 | bittorrentTrackerObserversBuilder.buildObservers() | ||
89 | } | ||
90 | |||
91 | observePlaybackMetric (video: MVideoImmutable, metrics: PlaybackMetricCreate) { | ||
92 | this.playbackMetrics.observe(video, metrics) | ||
93 | } | ||
94 | |||
95 | private buildRequestObserver () { | ||
96 | const requestDuration = this.meter.createHistogram('http_request_duration_ms', { | ||
97 | unit: 'milliseconds', | ||
98 | description: 'Duration of HTTP requests in ms' | ||
99 | }) | ||
100 | |||
101 | this.onRequestDuration = (req: Request, res: Response) => { | ||
102 | const duration = Date.now() - res.locals.requestStart | ||
103 | |||
104 | requestDuration.record(duration, { | ||
105 | path: this.buildRequestPath(req.originalUrl), | ||
106 | method: req.method, | ||
107 | statusCode: res.statusCode + '' | ||
108 | }) | ||
109 | } | ||
110 | } | ||
111 | |||
112 | private buildRequestPath (path: string) { | ||
113 | return path.split('?')[0] | ||
114 | } | ||
115 | |||
116 | static get Instance () { | ||
117 | return this.instance || (this.instance = new this()) | ||
118 | } | ||
119 | } | ||
120 | |||
121 | export { | ||
122 | OpenTelemetryMetrics | ||
123 | } | ||
diff --git a/server/server/lib/opentelemetry/tracing.ts b/server/server/lib/opentelemetry/tracing.ts new file mode 100644 index 000000000..b306fa40c --- /dev/null +++ b/server/server/lib/opentelemetry/tracing.ts | |||
@@ -0,0 +1,140 @@ | |||
1 | import type { Span, Tracer } from '@opentelemetry/api' | ||
2 | import { logger } from '@server/helpers/logger.js' | ||
3 | import { CONFIG } from '@server/initializers/config.js' | ||
4 | |||
5 | let tracer: Tracer | TrackerMock | ||
6 | |||
7 | async function registerOpentelemetryTracing () { | ||
8 | if (CONFIG.OPEN_TELEMETRY.TRACING.ENABLED !== true) { | ||
9 | tracer = new TrackerMock() | ||
10 | |||
11 | return | ||
12 | } | ||
13 | |||
14 | const { diag, DiagLogLevel, trace } = await import('@opentelemetry/api') | ||
15 | tracer = trace.getTracer('peertube') | ||
16 | |||
17 | const [ | ||
18 | { JaegerExporter }, | ||
19 | { registerInstrumentations }, | ||
20 | DnsInstrumentation, | ||
21 | ExpressInstrumentation, | ||
22 | { FsInstrumentation }, | ||
23 | { HttpInstrumentation }, | ||
24 | IORedisInstrumentation, | ||
25 | PgInstrumentation, | ||
26 | { SequelizeInstrumentation }, | ||
27 | Resource, | ||
28 | BatchSpanProcessor, | ||
29 | NodeTracerProvider, | ||
30 | SemanticResourceAttributes | ||
31 | ] = await Promise.all([ | ||
32 | import('@opentelemetry/exporter-jaeger'), | ||
33 | import('@opentelemetry/instrumentation'), | ||
34 | import('@opentelemetry/instrumentation-dns'), | ||
35 | import('@opentelemetry/instrumentation-express'), | ||
36 | import('@opentelemetry/instrumentation-fs'), | ||
37 | import('@opentelemetry/instrumentation-http'), | ||
38 | import('@opentelemetry/instrumentation-ioredis'), | ||
39 | import('@opentelemetry/instrumentation-pg'), | ||
40 | import('opentelemetry-instrumentation-sequelize'), | ||
41 | import('@opentelemetry/resources'), | ||
42 | import('@opentelemetry/sdk-trace-base'), | ||
43 | import('@opentelemetry/sdk-trace-node'), | ||
44 | import('@opentelemetry/semantic-conventions') | ||
45 | ]) | ||
46 | |||
47 | logger.info('Registering Open Telemetry tracing') | ||
48 | |||
49 | const customLogger = (level: string) => { | ||
50 | return (message: string, ...args: unknown[]) => { | ||
51 | let fullMessage = message | ||
52 | |||
53 | for (const arg of args) { | ||
54 | if (typeof arg === 'string') fullMessage += arg | ||
55 | else break | ||
56 | } | ||
57 | |||
58 | logger[level](fullMessage) | ||
59 | } | ||
60 | } | ||
61 | |||
62 | diag.setLogger({ | ||
63 | error: customLogger('error'), | ||
64 | warn: customLogger('warn'), | ||
65 | info: customLogger('info'), | ||
66 | debug: customLogger('debug'), | ||
67 | verbose: customLogger('verbose') | ||
68 | }, DiagLogLevel.INFO) | ||
69 | |||
70 | const tracerProvider = new NodeTracerProvider.default.NodeTracerProvider({ | ||
71 | resource: new Resource.default.Resource({ | ||
72 | [SemanticResourceAttributes.default.SemanticResourceAttributes.SERVICE_NAME]: 'peertube' | ||
73 | }) | ||
74 | }) | ||
75 | |||
76 | registerInstrumentations({ | ||
77 | tracerProvider, | ||
78 | instrumentations: [ | ||
79 | new PgInstrumentation.default.PgInstrumentation({ | ||
80 | enhancedDatabaseReporting: true | ||
81 | }), | ||
82 | new DnsInstrumentation.default.DnsInstrumentation(), | ||
83 | new HttpInstrumentation(), | ||
84 | new ExpressInstrumentation.default.ExpressInstrumentation(), | ||
85 | new IORedisInstrumentation.default.IORedisInstrumentation({ | ||
86 | dbStatementSerializer: function (cmdName, cmdArgs) { | ||
87 | return [ cmdName, ...cmdArgs ].join(' ') | ||
88 | } | ||
89 | }), | ||
90 | new FsInstrumentation(), | ||
91 | new SequelizeInstrumentation() | ||
92 | ] | ||
93 | }) | ||
94 | |||
95 | tracerProvider.addSpanProcessor( | ||
96 | new BatchSpanProcessor.default.BatchSpanProcessor( | ||
97 | new JaegerExporter({ endpoint: CONFIG.OPEN_TELEMETRY.TRACING.JAEGER_EXPORTER.ENDPOINT }) | ||
98 | ) | ||
99 | ) | ||
100 | |||
101 | tracerProvider.register() | ||
102 | } | ||
103 | |||
104 | async function wrapWithSpanAndContext <T> (spanName: string, cb: () => Promise<T>) { | ||
105 | const { context, trace } = await import('@opentelemetry/api') | ||
106 | |||
107 | if (CONFIG.OPEN_TELEMETRY.TRACING.ENABLED !== true) { | ||
108 | return cb() | ||
109 | } | ||
110 | |||
111 | const span = tracer.startSpan(spanName) | ||
112 | const activeContext = trace.setSpan(context.active(), span as Span) | ||
113 | |||
114 | const result = await context.with(activeContext, () => cb()) | ||
115 | span.end() | ||
116 | |||
117 | return result | ||
118 | } | ||
119 | |||
120 | export { | ||
121 | registerOpentelemetryTracing, | ||
122 | tracer, | ||
123 | wrapWithSpanAndContext | ||
124 | } | ||
125 | |||
126 | // --------------------------------------------------------------------------- | ||
127 | // Private | ||
128 | // --------------------------------------------------------------------------- | ||
129 | |||
130 | class TrackerMock { | ||
131 | startSpan () { | ||
132 | return new SpanMock() | ||
133 | } | ||
134 | } | ||
135 | |||
136 | class SpanMock { | ||
137 | end () { | ||
138 | |||
139 | } | ||
140 | } | ||