--- /dev/null
+import { readdir } from 'fs-extra'
+import { constants, PerformanceObserver } from 'perf_hooks'
+import * as process from 'process'
+import { Meter, ObservableResult } from '@opentelemetry/api-metrics'
+import { ExplicitBucketHistogramAggregation, MeterProvider } from '@opentelemetry/sdk-metrics-base'
+import { View } from '@opentelemetry/sdk-metrics-base/build/src/view/View'
+import { logger } from '@server/helpers/logger'
+
+// Thanks to https://github.com/siimon/prom-client
+// We took their logic and adapter it for opentelemetry
+// Try to keep consistency with their metric name/description so it's easier to process (grafana dashboard template etc)
+
+export class NodeJSObserversBuilder {
+
+ constructor (private readonly meter: Meter, private readonly meterProvider: MeterProvider) {
+ }
+
+ buildObservers () {
+ this.buildCPUObserver()
+ this.buildMemoryObserver()
+
+ this.buildHandlesObserver()
+ this.buildFileDescriptorsObserver()
+
+ this.buildGCObserver()
+ this.buildEventLoopLagObserver()
+
+ this.buildLibUVActiveRequestsObserver()
+ this.buildActiveResourcesObserver()
+ }
+
+ private buildCPUObserver () {
+ const cpuTotal = this.meter.createObservableCounter('process_cpu_seconds_total', {
+ description: 'Total user and system CPU time spent in seconds.'
+ })
+ const cpuUser = this.meter.createObservableCounter('process_cpu_user_seconds_total', {
+ description: 'Total user CPU time spent in seconds.'
+ })
+ const cpuSystem = this.meter.createObservableCounter('process_cpu_system_seconds_total', {
+ description: 'Total system CPU time spent in seconds.'
+ })
+
+ let lastCpuUsage = process.cpuUsage()
+
+ this.meter.addBatchObservableCallback(observableResult => {
+ const cpuUsage = process.cpuUsage()
+
+ const userUsageMicros = cpuUsage.user - lastCpuUsage.user
+ const systemUsageMicros = cpuUsage.system - lastCpuUsage.system
+
+ lastCpuUsage = cpuUsage
+
+ observableResult.observe(cpuTotal, (userUsageMicros + systemUsageMicros) / 1e6)
+ observableResult.observe(cpuUser, userUsageMicros / 1e6)
+ observableResult.observe(cpuSystem, systemUsageMicros / 1e6)
+
+ }, [ cpuTotal, cpuUser, cpuSystem ])
+ }
+
+ private buildMemoryObserver () {
+ this.meter.createObservableGauge('nodejs_memory_usage_bytes', {
+ description: 'Memory'
+ }).addCallback(observableResult => {
+ const current = process.memoryUsage()
+
+ observableResult.observe(current.heapTotal, { memoryType: 'heapTotal' })
+ observableResult.observe(current.heapUsed, { memoryType: 'heapUsed' })
+ observableResult.observe(current.arrayBuffers, { memoryType: 'arrayBuffers' })
+ observableResult.observe(current.external, { memoryType: 'external' })
+ observableResult.observe(current.rss, { memoryType: 'rss' })
+ })
+ }
+
+ private buildHandlesObserver () {
+ if (typeof (process as any)._getActiveHandles !== 'function') return
+
+ this.meter.createObservableGauge('nodejs_active_handles_total', {
+ description: 'Total number of active handles.'
+ }).addCallback(observableResult => {
+ const handles = (process as any)._getActiveHandles()
+
+ observableResult.observe(handles.length)
+ })
+ }
+
+ private buildGCObserver () {
+ const kinds = {
+ [constants.NODE_PERFORMANCE_GC_MAJOR]: 'major',
+ [constants.NODE_PERFORMANCE_GC_MINOR]: 'minor',
+ [constants.NODE_PERFORMANCE_GC_INCREMENTAL]: 'incremental',
+ [constants.NODE_PERFORMANCE_GC_WEAKCB]: 'weakcb'
+ }
+
+ this.meterProvider.addView(
+ new View({ aggregation: new ExplicitBucketHistogramAggregation([ 0.001, 0.01, 0.1, 1, 2, 5 ]) }),
+ { instrument: { name: 'nodejs_gc_duration_seconds' } }
+ )
+
+ const histogram = this.meter.createHistogram('nodejs_gc_duration_seconds', {
+ description: 'Garbage collection duration by kind, one of major, minor, incremental or weakcb'
+ })
+
+ const obs = new PerformanceObserver(list => {
+ const entry = list.getEntries()[0]
+
+ // Node < 16 uses entry.kind
+ // Node >= 16 uses entry.detail.kind
+ // See: https://nodejs.org/docs/latest-v16.x/api/deprecations.html#deprecations_dep0152_extension_performanceentry_properties
+ const kind = (entry as any).detail
+ ? kinds[(entry as any).detail.kind]
+ : kinds[entry.kind]
+
+ // Convert duration from milliseconds to seconds
+ histogram.record(entry.duration / 1000, {
+ kind
+ })
+ })
+
+ obs.observe({ entryTypes: [ 'gc' ] })
+ }
+
+ private buildEventLoopLagObserver () {
+ const reportEventloopLag = (start: [ number, number ], observableResult: ObservableResult, res: () => void) => {
+ const delta = process.hrtime(start)
+ const nanosec = delta[0] * 1e9 + delta[1]
+ const seconds = nanosec / 1e9
+
+ observableResult.observe(seconds)
+
+ res()
+ }
+
+ this.meter.createObservableGauge('nodejs_eventloop_lag_seconds', {
+ description: 'Lag of event loop in seconds.'
+ }).addCallback(observableResult => {
+ return new Promise(res => {
+ const start = process.hrtime()
+
+ setImmediate(reportEventloopLag, start, observableResult, res)
+ })
+ })
+ }
+
+ private buildFileDescriptorsObserver () {
+ this.meter.createObservableGauge('process_open_fds', {
+ description: 'Number of open file descriptors.'
+ }).addCallback(async observableResult => {
+ try {
+ const fds = await readdir('/proc/self/fd')
+ observableResult.observe(fds.length - 1)
+ } catch (err) {
+ logger.debug('Cannot list file descriptors of current process for OpenTelemetry.', { err })
+ }
+ })
+ }
+
+ private buildLibUVActiveRequestsObserver () {
+ if (typeof (process as any)._getActiveRequests !== 'function') return
+
+ this.meter.createObservableGauge('nodejs_active_requests_total', {
+ description: 'Total number of active libuv requests.'
+ }).addCallback(async observableResult => {
+ const requests = (process as any)._getActiveRequests()
+
+ observableResult.observe(requests.length)
+ })
+ }
+
+ private buildActiveResourcesObserver () {
+ if (typeof (process as any).getActiveResourcesInfo !== 'function') return
+
+ const grouped = this.meter.createObservableCounter('nodejs_active_resources', {
+ description: 'Number of active resources that are currently keeping the event loop alive, grouped by async resource type.'
+ })
+ const total = this.meter.createObservableCounter('nodejs_active_resources_total', {
+ description: 'Total number of active resources.'
+ })
+
+ this.meter.addBatchObservableCallback(observableResult => {
+ const resources = (process as any).getActiveResourcesInfo()
+
+ const data = {}
+
+ for (let i = 0; i < resources.length; i++) {
+ const resource = resources[i]
+
+ if (data[resource] === undefined) data[resource] = 0
+ data[resource] += 1
+ }
+
+ for (const type of Object.keys(data)) {
+ observableResult.observe(grouped, data[type], { type })
+ }
+
+ observableResult.observe(total, resources.length)
+ }, [ grouped, total ])
+ }
+}
import { MeterProvider } from '@opentelemetry/sdk-metrics-base'
import { logger } from '@server/helpers/logger'
import { CONFIG } from '@server/initializers/config'
-import { JobQueue } from '../job-queue'
-import { StatsObserverBuilder } from './metric-helpers'
+import { JobQueueObserversBuilder, NodeJSObserversBuilder, StatsObserversBuilder } from './metric-helpers'
class OpenTelemetryMetrics {
this.meter = metrics.getMeter('default')
- this.buildMemoryObserver()
this.buildRequestObserver()
- this.buildJobQueueObserver()
- const statsObserverBuilder = new StatsObserverBuilder(this.meter)
- statsObserverBuilder.buildObservers()
- }
+ const nodeJSObserversBuilder = new NodeJSObserversBuilder(this.meter, provider)
+ nodeJSObserversBuilder.buildObservers()
- private buildMemoryObserver () {
- this.meter.createObservableGauge('nodejs_memory_usage_bytes', {
- description: 'Memory'
- }).addCallback(observableResult => {
- const current = process.memoryUsage()
-
- observableResult.observe(current.heapTotal, { memoryType: 'heapTotal' })
- observableResult.observe(current.heapUsed, { memoryType: 'heapUsed' })
- observableResult.observe(current.arrayBuffers, { memoryType: 'arrayBuffers' })
- observableResult.observe(current.external, { memoryType: 'external' })
- observableResult.observe(current.rss, { memoryType: 'rss' })
- })
- }
+ const jobQueueObserversBuilder = new JobQueueObserversBuilder(this.meter)
+ jobQueueObserversBuilder.buildObservers()
- private buildJobQueueObserver () {
- this.meter.createObservableGauge('peertube_job_queue_total', {
- description: 'Total jobs in the PeerTube job queue'
- }).addCallback(async observableResult => {
- const stats = await JobQueue.Instance.getStats()
-
- for (const { jobType, counts } of stats) {
- for (const state of Object.keys(counts)) {
- observableResult.observe(counts[state], { jobType, state })
- }
- }
- })
+ const statsObserversBuilder = new StatsObserversBuilder(this.meter)
+ statsObserversBuilder.buildObservers()
}
private buildRequestObserver () {