From 0c9668f77901e7540e2c7045eb0f2974a4842a69 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Apr 2023 14:55:10 +0200 Subject: Implement remote runner jobs in server Move ffmpeg functions to @shared --- .../live-rtmp-hls-transcoding-job-handler.ts | 170 +++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts (limited to 'server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts') diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts new file mode 100644 index 000000000..c3d0e427d --- /dev/null +++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts @@ -0,0 +1,170 @@ +import { move, remove } from 'fs-extra' +import { join } from 'path' +import { logger } from '@server/helpers/logger' +import { JOB_PRIORITY } from '@server/initializers/constants' +import { LiveManager } from '@server/lib/live' +import { MStreamingPlaylist, MVideo } from '@server/types/models' +import { MRunnerJob } from '@server/types/models/runners' +import { buildUUID } from '@shared/extra-utils' +import { + LiveRTMPHLSTranscodingSuccess, + LiveRTMPHLSTranscodingUpdatePayload, + LiveVideoError, + RunnerJobLiveRTMPHLSTranscodingPayload, + RunnerJobLiveRTMPHLSTranscodingPrivatePayload, + RunnerJobState +} from '@shared/models' +import { AbstractJobHandler } from './abstract-job-handler' + +type CreateOptions = { + video: MVideo + playlist: MStreamingPlaylist + + rtmpUrl: string + + toTranscode: { + resolution: number + fps: number + }[] + + segmentListSize: number + segmentDuration: number + + outputDirectory: string +} + +// eslint-disable-next-line max-len +export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler { + + async create (options: CreateOptions) { + const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory } = options + + const jobUUID = buildUUID() + const payload: RunnerJobLiveRTMPHLSTranscodingPayload = { + input: { + rtmpUrl + }, + output: { + toTranscode, + segmentListSize, + segmentDuration + } + } + + const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = { + videoUUID: video.uuid, + masterPlaylistName: playlist.playlistFilename, + outputDirectory + } + + const job = await this.createRunnerJob({ + type: 'live-rtmp-hls-transcoding', + jobUUID, + payload, + privatePayload, + priority: JOB_PRIORITY.TRANSCODING + }) + + return job + } + + // --------------------------------------------------------------------------- + + async specificUpdate (options: { + runnerJob: MRunnerJob + updatePayload: LiveRTMPHLSTranscodingUpdatePayload + }) { + const { runnerJob, updatePayload } = options + + const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload + const outputDirectory = privatePayload.outputDirectory + const videoUUID = privatePayload.videoUUID + + if (updatePayload.type === 'add-chunk') { + await move( + updatePayload.videoChunkFile as string, + join(outputDirectory, updatePayload.videoChunkFilename), + { overwrite: true } + ) + } else if (updatePayload.type === 'remove-chunk') { + await remove(join(outputDirectory, updatePayload.videoChunkFilename)) + } + + if (updatePayload.resolutionPlaylistFile && updatePayload.resolutionPlaylistFilename) { + await move( + updatePayload.resolutionPlaylistFile as string, + join(outputDirectory, updatePayload.resolutionPlaylistFilename), + { overwrite: true } + ) + } + + if (updatePayload.masterPlaylistFile) { + await move(updatePayload.masterPlaylistFile as string, join(outputDirectory, privatePayload.masterPlaylistName), { overwrite: true }) + } + + logger.info( + 'Runner live RTMP to HLS job %s for %s updated.', + runnerJob.uuid, videoUUID, { updatePayload, ...this.lTags(videoUUID, runnerJob.uuid) } + ) + } + + // --------------------------------------------------------------------------- + + protected specificComplete (options: { + runnerJob: MRunnerJob + }) { + return this.stopLive({ + runnerJob: options.runnerJob, + type: 'ended' + }) + } + + // --------------------------------------------------------------------------- + + protected isAbortSupported () { + return false + } + + protected specificAbort () { + throw new Error('Not implemented') + } + + protected specificError (options: { + runnerJob: MRunnerJob + nextState: RunnerJobState + }) { + return this.stopLive({ + runnerJob: options.runnerJob, + type: 'errored' + }) + } + + protected specificCancel (options: { + runnerJob: MRunnerJob + }) { + return this.stopLive({ + runnerJob: options.runnerJob, + type: 'cancelled' + }) + } + + private stopLive (options: { + runnerJob: MRunnerJob + type: 'ended' | 'errored' | 'cancelled' + }) { + const { runnerJob, type } = options + + const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload + const videoUUID = privatePayload.videoUUID + + const errorType = { + ended: null, + errored: LiveVideoError.RUNNER_JOB_ERROR, + cancelled: LiveVideoError.RUNNER_JOB_CANCEL + } + + LiveManager.Instance.stopSessionOf(privatePayload.videoUUID, errorType[type]) + + logger.info('Runner live RTMP to HLS job %s for video %s %s.', runnerJob.uuid, videoUUID, type, this.lTags(runnerJob.uuid, videoUUID)) + } +} -- cgit v1.2.3