From 0c9668f77901e7540e2c7045eb0f2974a4842a69 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Apr 2023 14:55:10 +0200 Subject: Implement remote runner jobs in server Move ffmpeg functions to @shared --- server/lib/live/shared/muxing-session.ts | 191 +++++++++------------ .../abstract-transcoding-wrapper.ts | 101 +++++++++++ .../ffmpeg-transcoding-wrapper.ts | 95 ++++++++++ .../lib/live/shared/transcoding-wrapper/index.ts | 3 + .../remote-transcoding-wrapper.ts | 20 +++ 5 files changed, 303 insertions(+), 107 deletions(-) create mode 100644 server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts create mode 100644 server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts create mode 100644 server/lib/live/shared/transcoding-wrapper/index.ts create mode 100644 server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts (limited to 'server/lib/live/shared') diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 2727fc4a7..f3f8fc886 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -1,11 +1,10 @@ import { mapSeries } from 'bluebird' import { FSWatcher, watch } from 'chokidar' -import { FfmpegCommand } from 'fluent-ffmpeg' +import { EventEmitter } from 'events' import { appendFile, ensureDir, readFile, stat } from 'fs-extra' import PQueue from 'p-queue' import { basename, join } from 'path' -import { EventEmitter } from 'stream' -import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg' +import { computeOutputFPS } from '@server/helpers/ffmpeg' import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' import { CONFIG } from '@server/initializers/config' import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' @@ -20,24 +19,24 @@ import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' -import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' import { isAbleToUploadVideo } from '../../user' import { LiveQuotaStore } from '../live-quota-store' import { LiveSegmentShaStore } from '../live-segment-sha-store' -import { buildConcatenatedName } from '../live-utils' +import { buildConcatenatedName, getLiveSegmentTime } from '../live-utils' +import { AbstractTranscodingWrapper, FFmpegTranscodingWrapper, RemoteTranscodingWrapper } from './transcoding-wrapper' import memoizee = require('memoizee') interface MuxingSessionEvents { - 'live-ready': (options: { videoId: number }) => void + 'live-ready': (options: { videoUUID: string }) => void - 'bad-socket-health': (options: { videoId: number }) => void - 'duration-exceeded': (options: { videoId: number }) => void - 'quota-exceeded': (options: { videoId: number }) => void + 'bad-socket-health': (options: { videoUUID: string }) => void + 'duration-exceeded': (options: { videoUUID: string }) => void + 'quota-exceeded': (options: { videoUUID: string }) => void - 'ffmpeg-end': (options: { videoId: number }) => void - 'ffmpeg-error': (options: { videoId: number }) => void + 'transcoding-end': (options: { videoUUID: string }) => void + 'transcoding-error': (options: { videoUUID: string }) => void - 'after-cleanup': (options: { videoId: number }) => void + 'after-cleanup': (options: { videoUUID: string }) => void } declare interface MuxingSession { @@ -52,7 +51,7 @@ declare interface MuxingSession { class MuxingSession extends EventEmitter { - private ffmpegCommand: FfmpegCommand + private transcodingWrapper: AbstractTranscodingWrapper private readonly context: any private readonly user: MUserId @@ -67,7 +66,6 @@ class MuxingSession extends EventEmitter { private readonly hasAudio: boolean - private readonly videoId: number private readonly videoUUID: string private readonly saveReplay: boolean @@ -126,7 +124,6 @@ class MuxingSession extends EventEmitter { this.allResolutions = options.allResolutions - this.videoId = this.videoLive.Video.id this.videoUUID = this.videoLive.Video.uuid this.saveReplay = this.videoLive.saveReplay @@ -145,63 +142,23 @@ class MuxingSession extends EventEmitter { await this.prepareDirectories() - this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED - ? await getLiveTranscodingCommand({ - inputUrl: this.inputUrl, + this.transcodingWrapper = this.buildTranscodingWrapper() - outPath: this.outDirectory, - masterPlaylistName: this.streamingPlaylist.playlistFilename, + this.transcodingWrapper.on('end', () => this.onTranscodedEnded()) + this.transcodingWrapper.on('error', () => this.onTranscodingError()) - latencyMode: this.videoLive.latencyMode, - - resolutions: this.allResolutions, - fps: this.fps, - bitrate: this.bitrate, - ratio: this.ratio, - - hasAudio: this.hasAudio, - - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.LIVE.TRANSCODING.PROFILE - }) - : getLiveMuxingCommand({ - inputUrl: this.inputUrl, - outPath: this.outDirectory, - masterPlaylistName: this.streamingPlaylist.playlistFilename, - latencyMode: this.videoLive.latencyMode - }) - - logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) + await this.transcodingWrapper.run() this.watchMasterFile() this.watchTSFiles() this.watchM3U8File() - - let ffmpegShellCommand: string - this.ffmpegCommand.on('start', cmdline => { - ffmpegShellCommand = cmdline - - logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() }) - }) - - this.ffmpegCommand.on('error', (err, stdout, stderr) => { - this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand }) - }) - - this.ffmpegCommand.on('end', () => { - this.emit('ffmpeg-end', ({ videoId: this.videoId })) - - this.onFFmpegEnded() - }) - - this.ffmpegCommand.run() } abort () { - if (!this.ffmpegCommand) return + if (!this.transcodingWrapper) return this.aborted = true - this.ffmpegCommand.kill('SIGINT') + this.transcodingWrapper.abort() } destroy () { @@ -210,48 +167,6 @@ class MuxingSession extends EventEmitter { this.hasClientSocketInBadHealthWithCache.clear() } - private onFFmpegError (options: { - err: any - stdout: string - stderr: string - ffmpegShellCommand: string - }) { - const { err, stdout, stderr, ffmpegShellCommand } = options - - this.onFFmpegEnded() - - // Don't care that we killed the ffmpeg process - if (err?.message?.includes('Exiting normally')) return - - logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) - - this.emit('ffmpeg-error', ({ videoId: this.videoId })) - } - - private onFFmpegEnded () { - logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags()) - - setTimeout(() => { - // Wait latest segments generation, and close watchers - - Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) - .then(() => { - // Process remaining segments hash - for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { - this.processSegments(this.segmentsToProcessPerPlaylist[key]) - } - }) - .catch(err => { - logger.error( - 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory, - { err, ...this.lTags() } - ) - }) - - this.emit('after-cleanup', { videoId: this.videoId }) - }, 1000) - } - private watchMasterFile () { this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) @@ -272,6 +187,8 @@ class MuxingSession extends EventEmitter { this.masterPlaylistCreated = true + logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) + this.masterWatcher.close() .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) }) @@ -318,19 +235,19 @@ class MuxingSession extends EventEmitter { this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) { - this.emit('bad-socket-health', { videoId: this.videoId }) + this.emit('bad-socket-health', { videoUUID: this.videoUUID }) return } // Duration constraint check if (this.isDurationConstraintValid(startStreamDateTime) !== true) { - this.emit('duration-exceeded', { videoId: this.videoId }) + this.emit('duration-exceeded', { videoUUID: this.videoUUID }) return } // Check user quota if the user enabled replay saving if (await this.isQuotaExceeded(segmentPath) === true) { - this.emit('quota-exceeded', { videoId: this.videoId }) + this.emit('quota-exceeded', { videoUUID: this.videoUUID }) } } @@ -438,10 +355,40 @@ class MuxingSession extends EventEmitter { if (this.masterPlaylistCreated && !this.liveReady) { this.liveReady = true - this.emit('live-ready', { videoId: this.videoId }) + this.emit('live-ready', { videoUUID: this.videoUUID }) } } + private onTranscodingError () { + this.emit('transcoding-error', ({ videoUUID: this.videoUUID })) + } + + private onTranscodedEnded () { + this.emit('transcoding-end', ({ videoUUID: this.videoUUID })) + + logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags()) + + setTimeout(() => { + // Wait latest segments generation, and close watchers + + Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) + .then(() => { + // Process remaining segments hash + for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { + this.processSegments(this.segmentsToProcessPerPlaylist[key]) + } + }) + .catch(err => { + logger.error( + 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory, + { err, ...this.lTags() } + ) + }) + + this.emit('after-cleanup', { videoUUID: this.videoUUID }) + }, 1000) + } + private hasClientSocketInBadHealth (sessionId: string) { const rtmpSession = this.context.sessions.get(sessionId) @@ -503,6 +450,36 @@ class MuxingSession extends EventEmitter { sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED }) } + + private buildTranscodingWrapper () { + const options = { + streamingPlaylist: this.streamingPlaylist, + videoLive: this.videoLive, + + lTags: this.lTags, + + inputUrl: this.inputUrl, + + toTranscode: this.allResolutions.map(resolution => ({ + resolution, + fps: computeOutputFPS({ inputFPS: this.fps, resolution }) + })), + + fps: this.fps, + bitrate: this.bitrate, + ratio: this.ratio, + hasAudio: this.hasAudio, + + segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE, + segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode), + + outDirectory: this.outDirectory + } + + return CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED + ? new RemoteTranscodingWrapper(options) + : new FFmpegTranscodingWrapper(options) + } } // --------------------------------------------------------------------------- diff --git a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts new file mode 100644 index 000000000..226ba4573 --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts @@ -0,0 +1,101 @@ +import EventEmitter from 'events' +import { LoggerTagsFn } from '@server/helpers/logger' +import { MStreamingPlaylistVideo, MVideoLiveVideo } from '@server/types/models' +import { LiveVideoError } from '@shared/models' + +interface TranscodingWrapperEvents { + 'end': () => void + + 'error': (options: { err: Error }) => void +} + +declare interface AbstractTranscodingWrapper { + on( + event: U, listener: TranscodingWrapperEvents[U] + ): this + + emit( + event: U, ...args: Parameters + ): boolean +} + +interface AbstractTranscodingWrapperOptions { + streamingPlaylist: MStreamingPlaylistVideo + videoLive: MVideoLiveVideo + + lTags: LoggerTagsFn + + inputUrl: string + fps: number + toTranscode: { + resolution: number + fps: number + }[] + + bitrate: number + ratio: number + hasAudio: boolean + + segmentListSize: number + segmentDuration: number + + outDirectory: string +} + +abstract class AbstractTranscodingWrapper extends EventEmitter { + protected readonly videoLive: MVideoLiveVideo + + protected readonly toTranscode: { + resolution: number + fps: number + }[] + + protected readonly inputUrl: string + protected readonly fps: number + protected readonly bitrate: number + protected readonly ratio: number + protected readonly hasAudio: boolean + + protected readonly segmentListSize: number + protected readonly segmentDuration: number + + protected readonly videoUUID: string + + protected readonly outDirectory: string + + protected readonly lTags: LoggerTagsFn + + protected readonly streamingPlaylist: MStreamingPlaylistVideo + + constructor (options: AbstractTranscodingWrapperOptions) { + super() + + this.lTags = options.lTags + + this.videoLive = options.videoLive + this.videoUUID = options.videoLive.Video.uuid + this.streamingPlaylist = options.streamingPlaylist + + this.inputUrl = options.inputUrl + this.fps = options.fps + this.toTranscode = options.toTranscode + + this.bitrate = options.bitrate + this.ratio = options.ratio + this.hasAudio = options.hasAudio + + this.segmentListSize = options.segmentListSize + this.segmentDuration = options.segmentDuration + + this.outDirectory = options.outDirectory + } + + abstract run (): Promise + + abstract abort (error?: LiveVideoError): void +} + +export { + AbstractTranscodingWrapper, + AbstractTranscodingWrapperOptions +} diff --git a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts new file mode 100644 index 000000000..1f4c12bd4 --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts @@ -0,0 +1,95 @@ +import { FfmpegCommand } from 'fluent-ffmpeg' +import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg' +import { logger } from '@server/helpers/logger' +import { CONFIG } from '@server/initializers/config' +import { VIDEO_LIVE } from '@server/initializers/constants' +import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' +import { FFmpegLive } from '@shared/ffmpeg' +import { getLiveSegmentTime } from '../../live-utils' +import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper' + +export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper { + private ffmpegCommand: FfmpegCommand + private ended = false + + async run () { + this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED + ? await this.buildFFmpegLive().getLiveTranscodingCommand({ + inputUrl: this.inputUrl, + + outPath: this.outDirectory, + masterPlaylistName: this.streamingPlaylist.playlistFilename, + + segmentListSize: this.segmentListSize, + segmentDuration: this.segmentDuration, + + toTranscode: this.toTranscode, + + bitrate: this.bitrate, + ratio: this.ratio, + + hasAudio: this.hasAudio + }) + : this.buildFFmpegLive().getLiveMuxingCommand({ + inputUrl: this.inputUrl, + outPath: this.outDirectory, + + masterPlaylistName: this.streamingPlaylist.playlistFilename, + + segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE, + segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode) + }) + + logger.info('Running local live muxing/transcoding for %s.', this.videoUUID, this.lTags()) + + this.ffmpegCommand.run() + + let ffmpegShellCommand: string + this.ffmpegCommand.on('start', cmdline => { + ffmpegShellCommand = cmdline + + logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() }) + }) + + this.ffmpegCommand.on('error', (err, stdout, stderr) => { + this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand }) + }) + + this.ffmpegCommand.on('end', () => { + this.onFFmpegEnded() + }) + + this.ffmpegCommand.run() + } + + abort () { + // Nothing to do, ffmpeg will automatically exit + } + + private onFFmpegError (options: { + err: any + stdout: string + stderr: string + ffmpegShellCommand: string + }) { + const { err, stdout, stderr, ffmpegShellCommand } = options + + // Don't care that we killed the ffmpeg process + if (err?.message?.includes('Exiting normally')) return + + logger.error('FFmpeg transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) + + this.emit('error', { err }) + } + + private onFFmpegEnded () { + if (this.ended) return + + this.ended = true + this.emit('end') + } + + private buildFFmpegLive () { + return new FFmpegLive(getFFmpegCommandWrapperOptions('live', VideoTranscodingProfilesManager.Instance.getAvailableEncoders())) + } +} diff --git a/server/lib/live/shared/transcoding-wrapper/index.ts b/server/lib/live/shared/transcoding-wrapper/index.ts new file mode 100644 index 000000000..ae28fa1ca --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/index.ts @@ -0,0 +1,3 @@ +export * from './abstract-transcoding-wrapper' +export * from './ffmpeg-transcoding-wrapper' +export * from './remote-transcoding-wrapper' diff --git a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts new file mode 100644 index 000000000..345eaf442 --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts @@ -0,0 +1,20 @@ +import { LiveRTMPHLSTranscodingJobHandler } from '@server/lib/runners' +import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper' + +export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper { + async run () { + await new LiveRTMPHLSTranscodingJobHandler().create({ + rtmpUrl: this.inputUrl, + toTranscode: this.toTranscode, + video: this.videoLive.Video, + outputDirectory: this.outDirectory, + playlist: this.streamingPlaylist, + segmentListSize: this.segmentListSize, + segmentDuration: this.segmentDuration + }) + } + + abort () { + this.emit('end') + } +} -- cgit v1.2.3