From 630d0a1bf5897fff203cb07e426223f55dcc882d Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 5 Jul 2022 15:43:21 +0200 Subject: Introduce experimental telemetry --- server/lib/opentelemetry/metric-helpers/index.ts | 1 + .../metric-helpers/stats-observers-builder.ts | 186 +++++++++++++++++++++ server/lib/opentelemetry/metrics.ts | 111 ++++++++++++ server/lib/opentelemetry/tracing.ts | 81 +++++++++ 4 files changed, 379 insertions(+) create mode 100644 server/lib/opentelemetry/metric-helpers/index.ts create mode 100644 server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts create mode 100644 server/lib/opentelemetry/metrics.ts create mode 100644 server/lib/opentelemetry/tracing.ts (limited to 'server/lib/opentelemetry') diff --git a/server/lib/opentelemetry/metric-helpers/index.ts b/server/lib/opentelemetry/metric-helpers/index.ts new file mode 100644 index 000000000..cabb27326 --- /dev/null +++ b/server/lib/opentelemetry/metric-helpers/index.ts @@ -0,0 +1 @@ +export * from './stats-observers-builder' diff --git a/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts b/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts new file mode 100644 index 000000000..90b58f33d --- /dev/null +++ b/server/lib/opentelemetry/metric-helpers/stats-observers-builder.ts @@ -0,0 +1,186 @@ +import memoizee from 'memoizee' +import { Meter } from '@opentelemetry/api-metrics' +import { MEMOIZE_TTL } from '@server/initializers/constants' +import { buildAvailableActivities } from '@server/lib/activitypub/activity' +import { StatsManager } from '@server/lib/stat-manager' + +export class StatsObserverBuilder { + + private readonly getInstanceStats = memoizee(() => { + return StatsManager.Instance.getStats() + }, { maxAge: MEMOIZE_TTL.GET_STATS_FOR_OPEN_TELEMETRY_METRICS }) + + constructor (private readonly meter: Meter) { + + } + + buildObservers () { + this.buildUserStatsObserver() + this.buildVideoStatsObserver() + this.buildCommentStatsObserver() + this.buildPlaylistStatsObserver() + this.buildChannelStatsObserver() + this.buildInstanceFollowsStatsObserver() + this.buildRedundancyStatsObserver() + this.buildActivityPubStatsObserver() + } + + private buildUserStatsObserver () { + this.meter.createObservableGauge('peertube_users_total', { + description: 'Total users on the instance' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + observableResult.observe(stats.totalUsers) + }) + + this.meter.createObservableGauge('peertube_active_users_total', { + description: 'Total active users on the instance' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + observableResult.observe(stats.totalDailyActiveUsers, { activeInterval: 'daily' }) + observableResult.observe(stats.totalWeeklyActiveUsers, { activeInterval: 'weekly' }) + observableResult.observe(stats.totalMonthlyActiveUsers, { activeInterval: 'monthly' }) + }) + } + + private buildChannelStatsObserver () { + this.meter.createObservableGauge('peertube_channels_total', { + description: 'Total channels on the instance' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + observableResult.observe(stats.totalLocalVideoChannels, { channelOrigin: 'local' }) + }) + + this.meter.createObservableGauge('peertube_active_channels_total', { + description: 'Total active channels on the instance' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + observableResult.observe(stats.totalLocalDailyActiveVideoChannels, { channelOrigin: 'local', activeInterval: 'daily' }) + observableResult.observe(stats.totalLocalWeeklyActiveVideoChannels, { channelOrigin: 'local', activeInterval: 'weekly' }) + observableResult.observe(stats.totalLocalMonthlyActiveVideoChannels, { channelOrigin: 'local', activeInterval: 'monthly' }) + }) + } + + private buildVideoStatsObserver () { + this.meter.createObservableGauge('peertube_videos_total', { + description: 'Total videos on the instance' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + observableResult.observe(stats.totalLocalVideos, { videoOrigin: 'local' }) + observableResult.observe(stats.totalVideos - stats.totalLocalVideos, { videoOrigin: 'remote' }) + }) + + this.meter.createObservableGauge('peertube_video_views_total', { + description: 'Total video views made on the instance' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + observableResult.observe(stats.totalLocalVideoViews, { viewOrigin: 'local' }) + }) + + this.meter.createObservableGauge('peertube_video_bytes_total', { + description: 'Total bytes of videos' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + observableResult.observe(stats.totalLocalVideoFilesSize, { videoOrigin: 'local' }) + }) + } + + private buildCommentStatsObserver () { + this.meter.createObservableGauge('peertube_comments_total', { + description: 'Total comments on the instance' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + observableResult.observe(stats.totalLocalVideoComments, { accountOrigin: 'local' }) + }) + } + + private buildPlaylistStatsObserver () { + this.meter.createObservableGauge('peertube_playlists_total', { + description: 'Total playlists on the instance' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + observableResult.observe(stats.totalLocalPlaylists, { playlistOrigin: 'local' }) + }) + } + + private buildInstanceFollowsStatsObserver () { + this.meter.createObservableGauge('peertube_instance_followers_total', { + description: 'Total followers of the instance' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + observableResult.observe(stats.totalInstanceFollowers) + }) + + this.meter.createObservableGauge('peertube_instance_following_total', { + description: 'Total following of the instance' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + observableResult.observe(stats.totalInstanceFollowing) + }) + } + + private buildRedundancyStatsObserver () { + this.meter.createObservableGauge('peertube_redundancy_used_bytes_total', { + description: 'Total redundancy used of the instance' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + for (const r of stats.videosRedundancy) { + observableResult.observe(r.totalUsed, { strategy: r.strategy }) + } + }) + + this.meter.createObservableGauge('peertube_redundancy_available_bytes_total', { + description: 'Total redundancy available of the instance' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + for (const r of stats.videosRedundancy) { + observableResult.observe(r.totalSize, { strategy: r.strategy }) + } + }) + } + + private buildActivityPubStatsObserver () { + const availableActivities = buildAvailableActivities() + + this.meter.createObservableGauge('peertube_ap_inbox_success_total', { + description: 'Total inbox messages processed with success' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + for (const type of availableActivities) { + observableResult.observe(stats[`totalActivityPub${type}MessagesSuccesses`], { activityType: type }) + } + }) + + this.meter.createObservableGauge('peertube_ap_inbox_error_total', { + description: 'Total inbox messages processed with error' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + for (const type of availableActivities) { + observableResult.observe(stats[`totalActivityPub${type}MessagesErrors`], { activityType: type }) + } + }) + + this.meter.createObservableGauge('peertube_ap_inbox_waiting_total', { + description: 'Total inbox messages waiting for being processed' + }).addCallback(async observableResult => { + const stats = await this.getInstanceStats() + + observableResult.observe(stats.totalActivityPubMessagesWaiting) + }) + } +} diff --git a/server/lib/opentelemetry/metrics.ts b/server/lib/opentelemetry/metrics.ts new file mode 100644 index 000000000..ca0aae8e7 --- /dev/null +++ b/server/lib/opentelemetry/metrics.ts @@ -0,0 +1,111 @@ +import { Application, Request, Response } from 'express' +import { Meter, metrics } from '@opentelemetry/api-metrics' +import { PrometheusExporter } from '@opentelemetry/exporter-prometheus' +import { MeterProvider } from '@opentelemetry/sdk-metrics-base' +import { logger } from '@server/helpers/logger' +import { CONFIG } from '@server/initializers/config' +import { JobQueue } from '../job-queue' +import { StatsObserverBuilder } from './metric-helpers' + +class OpenTelemetryMetrics { + + private static instance: OpenTelemetryMetrics + + private meter: Meter + + private onRequestDuration: (req: Request, res: Response) => void + + private constructor () {} + + init (app: Application) { + if (CONFIG.OPEN_TELEMETRY.METRICS.ENABLED !== true) return + + app.use((req, res, next) => { + res.once('finish', () => { + if (!this.onRequestDuration) return + + this.onRequestDuration(req as Request, res as Response) + }) + + next() + }) + } + + registerMetrics () { + if (CONFIG.OPEN_TELEMETRY.METRICS.ENABLED !== true) return + + logger.info('Registering Open Telemetry metrics') + + const provider = new MeterProvider() + + provider.addMetricReader(new PrometheusExporter({ port: CONFIG.OPEN_TELEMETRY.METRICS.PROMETHEUS_EXPORTER.PORT })) + + metrics.setGlobalMeterProvider(provider) + + this.meter = metrics.getMeter('default') + + this.buildMemoryObserver() + this.buildRequestObserver() + this.buildJobQueueObserver() + + const statsObserverBuilder = new StatsObserverBuilder(this.meter) + statsObserverBuilder.buildObservers() + } + + private buildMemoryObserver () { + this.meter.createObservableGauge('nodejs_memory_usage_bytes', { + description: 'Memory' + }).addCallback(observableResult => { + const current = process.memoryUsage() + + observableResult.observe(current.heapTotal, { memoryType: 'heapTotal' }) + observableResult.observe(current.heapUsed, { memoryType: 'heapUsed' }) + observableResult.observe(current.arrayBuffers, { memoryType: 'arrayBuffers' }) + observableResult.observe(current.external, { memoryType: 'external' }) + observableResult.observe(current.rss, { memoryType: 'rss' }) + }) + } + + private buildJobQueueObserver () { + this.meter.createObservableGauge('peertube_job_queue_total', { + description: 'Total jobs in the PeerTube job queue' + }).addCallback(async observableResult => { + const stats = await JobQueue.Instance.getStats() + + for (const { jobType, counts } of stats) { + for (const state of Object.keys(counts)) { + observableResult.observe(counts[state], { jobType, state }) + } + } + }) + } + + private buildRequestObserver () { + const requestDuration = this.meter.createHistogram('http_request_duration_ms', { + unit: 'milliseconds', + description: 'Duration of HTTP requests in ms' + }) + + this.onRequestDuration = (req: Request, res: Response) => { + const duration = Date.now() - res.locals.requestStart + + requestDuration.record(duration, { + path: this.buildRequestPath(req.originalUrl), + method: req.method, + statusCode: res.statusCode + '' + }) + } + } + + private buildRequestPath (path: string) { + return path.split('?')[0] + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} + +export { + OpenTelemetryMetrics +} diff --git a/server/lib/opentelemetry/tracing.ts b/server/lib/opentelemetry/tracing.ts new file mode 100644 index 000000000..5358d04de --- /dev/null +++ b/server/lib/opentelemetry/tracing.ts @@ -0,0 +1,81 @@ +import { diag, DiagLogLevel, trace } from '@opentelemetry/api' +import { JaegerExporter } from '@opentelemetry/exporter-jaeger' +import { registerInstrumentations } from '@opentelemetry/instrumentation' +import { DnsInstrumentation } from '@opentelemetry/instrumentation-dns' +import { ExpressInstrumentation } from '@opentelemetry/instrumentation-express' +import FsInstrumentation from '@opentelemetry/instrumentation-fs' +import { HttpInstrumentation } from '@opentelemetry/instrumentation-http' +import { PgInstrumentation } from '@opentelemetry/instrumentation-pg' +import { RedisInstrumentation } from '@opentelemetry/instrumentation-redis-4' +import { Resource } from '@opentelemetry/resources' +import { BatchSpanProcessor } from '@opentelemetry/sdk-trace-base' +import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node' +import { SemanticResourceAttributes } from '@opentelemetry/semantic-conventions' +import { logger } from '@server/helpers/logger' +import { CONFIG } from '@server/initializers/config' + +function registerOpentelemetryTracing () { + if (CONFIG.OPEN_TELEMETRY.TRACING.ENABLED !== true) return + + logger.info('Registering Open Telemetry tracing') + + const customLogger = (level: string) => { + return (message: string, ...args: unknown[]) => { + let fullMessage = message + + for (const arg of args) { + if (typeof arg === 'string') fullMessage += arg + else break + } + + logger[level](fullMessage) + } + } + + diag.setLogger({ + error: customLogger('error'), + warn: customLogger('warn'), + info: customLogger('info'), + debug: customLogger('debug'), + verbose: customLogger('verbose') + }, DiagLogLevel.INFO) + + const tracerProvider = new NodeTracerProvider({ + resource: new Resource({ + [SemanticResourceAttributes.SERVICE_NAME]: 'peertube' + }) + }) + + registerInstrumentations({ + tracerProvider: tracerProvider, + instrumentations: [ + new PgInstrumentation({ + enhancedDatabaseReporting: true + }), + new DnsInstrumentation(), + new HttpInstrumentation(), + new ExpressInstrumentation(), + new RedisInstrumentation({ + dbStatementSerializer: function (cmdName, cmdArgs) { + return [ cmdName, ...cmdArgs ].join(' ') + } + }), + new FsInstrumentation() + ] + }) + + tracerProvider.addSpanProcessor( + new BatchSpanProcessor( + new JaegerExporter({ endpoint: CONFIG.OPEN_TELEMETRY.TRACING.JAEGER_EXPORTER.ENDPOINT }) + ) + ) + + tracerProvider.register() +} + +const tracer = trace.getTracer('peertube') + +export { + registerOpentelemetryTracing, + tracer +} -- cgit v1.2.3