From 0c9668f77901e7540e2c7045eb0f2974a4842a69 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Apr 2023 14:55:10 +0200 Subject: Implement remote runner jobs in server Move ffmpeg functions to @shared --- server/lib/runners/index.ts | 3 + .../runners/job-handlers/abstract-job-handler.ts | 271 +++++++++++++++++++++ .../abstract-vod-transcoding-job-handler.ts | 71 ++++++ server/lib/runners/job-handlers/index.ts | 6 + .../live-rtmp-hls-transcoding-job-handler.ts | 170 +++++++++++++ .../runners/job-handlers/runner-job-handlers.ts | 18 ++ server/lib/runners/job-handlers/shared/index.ts | 1 + .../lib/runners/job-handlers/shared/vod-helpers.ts | 44 ++++ .../vod-audio-merge-transcoding-job-handler.ts | 97 ++++++++ .../vod-hls-transcoding-job-handler.ts | 114 +++++++++ .../vod-web-video-transcoding-job-handler.ts | 84 +++++++ server/lib/runners/runner-urls.ts | 9 + server/lib/runners/runner.ts | 36 +++ 13 files changed, 924 insertions(+) create mode 100644 server/lib/runners/index.ts create mode 100644 server/lib/runners/job-handlers/abstract-job-handler.ts create mode 100644 server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts create mode 100644 server/lib/runners/job-handlers/index.ts create mode 100644 server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts create mode 100644 server/lib/runners/job-handlers/runner-job-handlers.ts create mode 100644 server/lib/runners/job-handlers/shared/index.ts create mode 100644 server/lib/runners/job-handlers/shared/vod-helpers.ts create mode 100644 server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts create mode 100644 server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts create mode 100644 server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts create mode 100644 server/lib/runners/runner-urls.ts create mode 100644 server/lib/runners/runner.ts (limited to 'server/lib/runners') diff --git a/server/lib/runners/index.ts b/server/lib/runners/index.ts new file mode 100644 index 000000000..a737c7b59 --- /dev/null +++ b/server/lib/runners/index.ts @@ -0,0 +1,3 @@ +export * from './job-handlers' +export * from './runner' +export * from './runner-urls' diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts new file mode 100644 index 000000000..73fc14574 --- /dev/null +++ b/server/lib/runners/job-handlers/abstract-job-handler.ts @@ -0,0 +1,271 @@ +import { retryTransactionWrapper } from '@server/helpers/database-utils' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { RUNNER_JOBS } from '@server/initializers/constants' +import { sequelizeTypescript } from '@server/initializers/database' +import { PeerTubeSocket } from '@server/lib/peertube-socket' +import { RunnerJobModel } from '@server/models/runner/runner-job' +import { setAsUpdated } from '@server/models/shared' +import { MRunnerJob } from '@server/types/models/runners' +import { pick } from '@shared/core-utils' +import { + RunnerJobLiveRTMPHLSTranscodingPayload, + RunnerJobLiveRTMPHLSTranscodingPrivatePayload, + RunnerJobState, + RunnerJobSuccessPayload, + RunnerJobType, + RunnerJobUpdatePayload, + RunnerJobVODAudioMergeTranscodingPayload, + RunnerJobVODAudioMergeTranscodingPrivatePayload, + RunnerJobVODHLSTranscodingPayload, + RunnerJobVODHLSTranscodingPrivatePayload, + RunnerJobVODWebVideoTranscodingPayload, + RunnerJobVODWebVideoTranscodingPrivatePayload +} from '@shared/models' + +type CreateRunnerJobArg = + { + type: Extract + payload: RunnerJobVODWebVideoTranscodingPayload + privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload + } | + { + type: Extract + payload: RunnerJobVODHLSTranscodingPayload + privatePayload: RunnerJobVODHLSTranscodingPrivatePayload + } | + { + type: Extract + payload: RunnerJobVODAudioMergeTranscodingPayload + privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload + } | + { + type: Extract + payload: RunnerJobLiveRTMPHLSTranscodingPayload + privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload + } + +export abstract class AbstractJobHandler { + + protected readonly lTags = loggerTagsFactory('runner') + + // --------------------------------------------------------------------------- + + abstract create (options: C): Promise + + protected async createRunnerJob (options: CreateRunnerJobArg & { + jobUUID: string + priority: number + dependsOnRunnerJob?: MRunnerJob + }): Promise { + const { priority, dependsOnRunnerJob } = options + + const runnerJob = new RunnerJobModel({ + ...pick(options, [ 'type', 'payload', 'privatePayload' ]), + + uuid: options.jobUUID, + + state: dependsOnRunnerJob + ? RunnerJobState.WAITING_FOR_PARENT_JOB + : RunnerJobState.PENDING, + + dependsOnRunnerJobId: dependsOnRunnerJob?.id, + + priority + }) + + const job = await sequelizeTypescript.transaction(async transaction => { + return runnerJob.save({ transaction }) + }) + + if (runnerJob.state === RunnerJobState.PENDING) { + PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() + } + + return job + } + + // --------------------------------------------------------------------------- + + protected abstract specificUpdate (options: { + runnerJob: MRunnerJob + updatePayload?: U + }): Promise | void + + async update (options: { + runnerJob: MRunnerJob + progress?: number + updatePayload?: U + }) { + const { runnerJob, progress } = options + + await this.specificUpdate(options) + + if (progress) runnerJob.progress = progress + + await retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async transaction => { + if (runnerJob.changed()) { + return runnerJob.save({ transaction }) + } + + // Don't update the job too often + if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) { + await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction }) + } + }) + }) + } + + // --------------------------------------------------------------------------- + + async complete (options: { + runnerJob: MRunnerJob + resultPayload: S + }) { + const { runnerJob } = options + + try { + await this.specificComplete(options) + + runnerJob.state = RunnerJobState.COMPLETED + } catch (err) { + logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) + + runnerJob.state = RunnerJobState.ERRORED + runnerJob.error = err.message + } + + runnerJob.progress = null + runnerJob.finishedAt = new Date() + + await retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async transaction => { + await runnerJob.save({ transaction }) + }) + }) + + const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob) + + if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() + } + + protected abstract specificComplete (options: { + runnerJob: MRunnerJob + resultPayload: S + }): Promise | void + + // --------------------------------------------------------------------------- + + async cancel (options: { + runnerJob: MRunnerJob + fromParent?: boolean + }) { + const { runnerJob, fromParent } = options + + await this.specificCancel(options) + + const cancelState = fromParent + ? RunnerJobState.PARENT_CANCELLED + : RunnerJobState.CANCELLED + + runnerJob.setToErrorOrCancel(cancelState) + + await retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async transaction => { + await runnerJob.save({ transaction }) + }) + }) + + const children = await RunnerJobModel.listChildrenOf(runnerJob) + for (const child of children) { + logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid)) + + await this.cancel({ runnerJob: child, fromParent: true }) + } + } + + protected abstract specificCancel (options: { + runnerJob: MRunnerJob + }): Promise | void + + // --------------------------------------------------------------------------- + + protected abstract isAbortSupported (): boolean + + async abort (options: { + runnerJob: MRunnerJob + }) { + const { runnerJob } = options + + if (this.isAbortSupported() !== true) { + return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' }) + } + + await this.specificAbort(options) + + runnerJob.resetToPending() + + await retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async transaction => { + await runnerJob.save({ transaction }) + }) + }) + } + + protected setAbortState (runnerJob: MRunnerJob) { + runnerJob.resetToPending() + } + + protected abstract specificAbort (options: { + runnerJob: MRunnerJob + }): Promise | void + + // --------------------------------------------------------------------------- + + async error (options: { + runnerJob: MRunnerJob + message: string + fromParent?: boolean + }) { + const { runnerJob, message, fromParent } = options + + const errorState = fromParent + ? RunnerJobState.PARENT_ERRORED + : RunnerJobState.ERRORED + + const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES + ? RunnerJobState.PENDING + : errorState + + await this.specificError({ ...options, nextState }) + + if (nextState === errorState) { + runnerJob.setToErrorOrCancel(nextState) + runnerJob.error = message + } else { + runnerJob.resetToPending() + } + + await retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async transaction => { + await runnerJob.save({ transaction }) + }) + }) + + if (runnerJob.state === errorState) { + const children = await RunnerJobModel.listChildrenOf(runnerJob) + + for (const child of children) { + logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid)) + + await this.error({ runnerJob: child, message: 'Parent error', fromParent: true }) + } + } + } + + protected abstract specificError (options: { + runnerJob: MRunnerJob + message: string + nextState: RunnerJobState + }): Promise | void +} diff --git a/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts new file mode 100644 index 000000000..517645848 --- /dev/null +++ b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts @@ -0,0 +1,71 @@ + +import { retryTransactionWrapper } from '@server/helpers/database-utils' +import { logger } from '@server/helpers/logger' +import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MRunnerJob } from '@server/types/models/runners' +import { + LiveRTMPHLSTranscodingUpdatePayload, + RunnerJobSuccessPayload, + RunnerJobUpdatePayload, + RunnerJobVODPrivatePayload +} from '@shared/models' +import { AbstractJobHandler } from './abstract-job-handler' +import { loadTranscodingRunnerVideo } from './shared' + +// eslint-disable-next-line max-len +export abstract class AbstractVODTranscodingJobHandler extends AbstractJobHandler { + + // --------------------------------------------------------------------------- + + protected isAbortSupported () { + return true + } + + protected specificUpdate (_options: { + runnerJob: MRunnerJob + updatePayload?: LiveRTMPHLSTranscodingUpdatePayload + }) { + // empty + } + + protected specificAbort (_options: { + runnerJob: MRunnerJob + }) { + // empty + } + + protected async specificError (options: { + runnerJob: MRunnerJob + }) { + const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags) + if (!video) return + + await moveToFailedTranscodingState(video) + + await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') + } + + protected async specificCancel (options: { + runnerJob: MRunnerJob + }) { + const { runnerJob } = options + + const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags) + if (!video) return + + const pending = await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') + + logger.debug(`Pending transcode decreased to ${pending} after cancel`, this.lTags(video.uuid)) + + if (pending === 0) { + logger.info( + `All transcoding jobs of ${video.uuid} have been processed or canceled, moving it to its next state`, + this.lTags(video.uuid) + ) + + const privatePayload = runnerJob.privatePayload as RunnerJobVODPrivatePayload + await retryTransactionWrapper(moveToNextState, { video, isNewVideo: privatePayload.isNewVideo }) + } + } +} diff --git a/server/lib/runners/job-handlers/index.ts b/server/lib/runners/job-handlers/index.ts new file mode 100644 index 000000000..0fca72b9a --- /dev/null +++ b/server/lib/runners/job-handlers/index.ts @@ -0,0 +1,6 @@ +export * from './abstract-job-handler' +export * from './live-rtmp-hls-transcoding-job-handler' +export * from './vod-audio-merge-transcoding-job-handler' +export * from './vod-hls-transcoding-job-handler' +export * from './vod-web-video-transcoding-job-handler' +export * from './runner-job-handlers' diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts new file mode 100644 index 000000000..c3d0e427d --- /dev/null +++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts @@ -0,0 +1,170 @@ +import { move, remove } from 'fs-extra' +import { join } from 'path' +import { logger } from '@server/helpers/logger' +import { JOB_PRIORITY } from '@server/initializers/constants' +import { LiveManager } from '@server/lib/live' +import { MStreamingPlaylist, MVideo } from '@server/types/models' +import { MRunnerJob } from '@server/types/models/runners' +import { buildUUID } from '@shared/extra-utils' +import { + LiveRTMPHLSTranscodingSuccess, + LiveRTMPHLSTranscodingUpdatePayload, + LiveVideoError, + RunnerJobLiveRTMPHLSTranscodingPayload, + RunnerJobLiveRTMPHLSTranscodingPrivatePayload, + RunnerJobState +} from '@shared/models' +import { AbstractJobHandler } from './abstract-job-handler' + +type CreateOptions = { + video: MVideo + playlist: MStreamingPlaylist + + rtmpUrl: string + + toTranscode: { + resolution: number + fps: number + }[] + + segmentListSize: number + segmentDuration: number + + outputDirectory: string +} + +// eslint-disable-next-line max-len +export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler { + + async create (options: CreateOptions) { + const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory } = options + + const jobUUID = buildUUID() + const payload: RunnerJobLiveRTMPHLSTranscodingPayload = { + input: { + rtmpUrl + }, + output: { + toTranscode, + segmentListSize, + segmentDuration + } + } + + const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = { + videoUUID: video.uuid, + masterPlaylistName: playlist.playlistFilename, + outputDirectory + } + + const job = await this.createRunnerJob({ + type: 'live-rtmp-hls-transcoding', + jobUUID, + payload, + privatePayload, + priority: JOB_PRIORITY.TRANSCODING + }) + + return job + } + + // --------------------------------------------------------------------------- + + async specificUpdate (options: { + runnerJob: MRunnerJob + updatePayload: LiveRTMPHLSTranscodingUpdatePayload + }) { + const { runnerJob, updatePayload } = options + + const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload + const outputDirectory = privatePayload.outputDirectory + const videoUUID = privatePayload.videoUUID + + if (updatePayload.type === 'add-chunk') { + await move( + updatePayload.videoChunkFile as string, + join(outputDirectory, updatePayload.videoChunkFilename), + { overwrite: true } + ) + } else if (updatePayload.type === 'remove-chunk') { + await remove(join(outputDirectory, updatePayload.videoChunkFilename)) + } + + if (updatePayload.resolutionPlaylistFile && updatePayload.resolutionPlaylistFilename) { + await move( + updatePayload.resolutionPlaylistFile as string, + join(outputDirectory, updatePayload.resolutionPlaylistFilename), + { overwrite: true } + ) + } + + if (updatePayload.masterPlaylistFile) { + await move(updatePayload.masterPlaylistFile as string, join(outputDirectory, privatePayload.masterPlaylistName), { overwrite: true }) + } + + logger.info( + 'Runner live RTMP to HLS job %s for %s updated.', + runnerJob.uuid, videoUUID, { updatePayload, ...this.lTags(videoUUID, runnerJob.uuid) } + ) + } + + // --------------------------------------------------------------------------- + + protected specificComplete (options: { + runnerJob: MRunnerJob + }) { + return this.stopLive({ + runnerJob: options.runnerJob, + type: 'ended' + }) + } + + // --------------------------------------------------------------------------- + + protected isAbortSupported () { + return false + } + + protected specificAbort () { + throw new Error('Not implemented') + } + + protected specificError (options: { + runnerJob: MRunnerJob + nextState: RunnerJobState + }) { + return this.stopLive({ + runnerJob: options.runnerJob, + type: 'errored' + }) + } + + protected specificCancel (options: { + runnerJob: MRunnerJob + }) { + return this.stopLive({ + runnerJob: options.runnerJob, + type: 'cancelled' + }) + } + + private stopLive (options: { + runnerJob: MRunnerJob + type: 'ended' | 'errored' | 'cancelled' + }) { + const { runnerJob, type } = options + + const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload + const videoUUID = privatePayload.videoUUID + + const errorType = { + ended: null, + errored: LiveVideoError.RUNNER_JOB_ERROR, + cancelled: LiveVideoError.RUNNER_JOB_CANCEL + } + + LiveManager.Instance.stopSessionOf(privatePayload.videoUUID, errorType[type]) + + logger.info('Runner live RTMP to HLS job %s for video %s %s.', runnerJob.uuid, videoUUID, type, this.lTags(runnerJob.uuid, videoUUID)) + } +} diff --git a/server/lib/runners/job-handlers/runner-job-handlers.ts b/server/lib/runners/job-handlers/runner-job-handlers.ts new file mode 100644 index 000000000..7bad1bc77 --- /dev/null +++ b/server/lib/runners/job-handlers/runner-job-handlers.ts @@ -0,0 +1,18 @@ +import { MRunnerJob } from '@server/types/models/runners' +import { RunnerJobSuccessPayload, RunnerJobType, RunnerJobUpdatePayload } from '@shared/models' +import { AbstractJobHandler } from './abstract-job-handler' +import { LiveRTMPHLSTranscodingJobHandler } from './live-rtmp-hls-transcoding-job-handler' +import { VODAudioMergeTranscodingJobHandler } from './vod-audio-merge-transcoding-job-handler' +import { VODHLSTranscodingJobHandler } from './vod-hls-transcoding-job-handler' +import { VODWebVideoTranscodingJobHandler } from './vod-web-video-transcoding-job-handler' + +const processors: Record AbstractJobHandler> = { + 'vod-web-video-transcoding': VODWebVideoTranscodingJobHandler, + 'vod-hls-transcoding': VODHLSTranscodingJobHandler, + 'vod-audio-merge-transcoding': VODAudioMergeTranscodingJobHandler, + 'live-rtmp-hls-transcoding': LiveRTMPHLSTranscodingJobHandler +} + +export function getRunnerJobHandlerClass (job: MRunnerJob) { + return processors[job.type] +} diff --git a/server/lib/runners/job-handlers/shared/index.ts b/server/lib/runners/job-handlers/shared/index.ts new file mode 100644 index 000000000..348273ae2 --- /dev/null +++ b/server/lib/runners/job-handlers/shared/index.ts @@ -0,0 +1 @@ +export * from './vod-helpers' diff --git a/server/lib/runners/job-handlers/shared/vod-helpers.ts b/server/lib/runners/job-handlers/shared/vod-helpers.ts new file mode 100644 index 000000000..93ae89ff8 --- /dev/null +++ b/server/lib/runners/job-handlers/shared/vod-helpers.ts @@ -0,0 +1,44 @@ +import { move } from 'fs-extra' +import { dirname, join } from 'path' +import { logger, LoggerTagsFn } from '@server/helpers/logger' +import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' +import { onWebTorrentVideoFileTranscoding } from '@server/lib/transcoding/web-transcoding' +import { buildNewFile } from '@server/lib/video-file' +import { VideoModel } from '@server/models/video/video' +import { MVideoFullLight } from '@server/types/models' +import { MRunnerJob } from '@server/types/models/runners' +import { RunnerJobVODAudioMergeTranscodingPrivatePayload, RunnerJobVODWebVideoTranscodingPrivatePayload } from '@shared/models' + +export async function onVODWebVideoOrAudioMergeTranscodingJob (options: { + video: MVideoFullLight + videoFilePath: string + privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | RunnerJobVODAudioMergeTranscodingPrivatePayload +}) { + const { video, videoFilePath, privatePayload } = options + + const videoFile = await buildNewFile({ path: videoFilePath, mode: 'web-video' }) + videoFile.videoId = video.id + + const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename) + await move(videoFilePath, newVideoFilePath) + + await onWebTorrentVideoFileTranscoding({ + video, + videoFile, + videoOutputPath: newVideoFilePath + }) + + await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video }) +} + +export async function loadTranscodingRunnerVideo (runnerJob: MRunnerJob, lTags: LoggerTagsFn) { + const videoUUID = runnerJob.privatePayload.videoUUID + + const video = await VideoModel.loadFull(videoUUID) + if (!video) { + logger.info('Video %s does not exist anymore after transcoding runner job.', videoUUID, lTags(videoUUID)) + return undefined + } + + return video +} diff --git a/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts new file mode 100644 index 000000000..a7b33f87e --- /dev/null +++ b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts @@ -0,0 +1,97 @@ +import { pick } from 'lodash' +import { logger } from '@server/helpers/logger' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MVideo } from '@server/types/models' +import { MRunnerJob } from '@server/types/models/runners' +import { buildUUID } from '@shared/extra-utils' +import { getVideoStreamDuration } from '@shared/ffmpeg' +import { + RunnerJobUpdatePayload, + RunnerJobVODAudioMergeTranscodingPayload, + RunnerJobVODWebVideoTranscodingPrivatePayload, + VODAudioMergeTranscodingSuccess +} from '@shared/models' +import { generateRunnerTranscodingVideoInputFileUrl, generateRunnerTranscodingVideoPreviewFileUrl } from '../runner-urls' +import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler' +import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared' + +type CreateOptions = { + video: MVideo + isNewVideo: boolean + resolution: number + fps: number + priority: number + dependsOnRunnerJob?: MRunnerJob +} + +// eslint-disable-next-line max-len +export class VODAudioMergeTranscodingJobHandler extends AbstractVODTranscodingJobHandler { + + async create (options: CreateOptions) { + const { video, resolution, fps, priority, dependsOnRunnerJob } = options + + const jobUUID = buildUUID() + const payload: RunnerJobVODAudioMergeTranscodingPayload = { + input: { + audioFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid), + previewFileUrl: generateRunnerTranscodingVideoPreviewFileUrl(jobUUID, video.uuid) + }, + output: { + resolution, + fps + } + } + + const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = { + ...pick(options, [ 'isNewVideo' ]), + + videoUUID: video.uuid + } + + const job = await this.createRunnerJob({ + type: 'vod-audio-merge-transcoding', + jobUUID, + payload, + privatePayload, + priority, + dependsOnRunnerJob + }) + + await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode') + + return job + } + + // --------------------------------------------------------------------------- + + async specificComplete (options: { + runnerJob: MRunnerJob + resultPayload: VODAudioMergeTranscodingSuccess + }) { + const { runnerJob, resultPayload } = options + const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload + + const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags) + if (!video) return + + const videoFilePath = resultPayload.videoFile as string + + // ffmpeg generated a new video file, so update the video duration + // See https://trac.ffmpeg.org/ticket/5456 + video.duration = await getVideoStreamDuration(videoFilePath) + await video.save() + + // We can remove the old audio file + const oldAudioFile = video.VideoFiles[0] + await video.removeWebTorrentFile(oldAudioFile) + await oldAudioFile.destroy() + video.VideoFiles = [] + + await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload }) + + logger.info( + 'Runner VOD audio merge transcoding job %s for %s ended.', + runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid) + ) + } +} diff --git a/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts new file mode 100644 index 000000000..02566b9d5 --- /dev/null +++ b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts @@ -0,0 +1,114 @@ +import { move } from 'fs-extra' +import { dirname, join } from 'path' +import { logger } from '@server/helpers/logger' +import { renameVideoFileInPlaylist } from '@server/lib/hls' +import { getHlsResolutionPlaylistFilename } from '@server/lib/paths' +import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' +import { onHLSVideoFileTranscoding } from '@server/lib/transcoding/hls-transcoding' +import { buildNewFile, removeAllWebTorrentFiles } from '@server/lib/video-file' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MVideo } from '@server/types/models' +import { MRunnerJob } from '@server/types/models/runners' +import { pick } from '@shared/core-utils' +import { buildUUID } from '@shared/extra-utils' +import { + RunnerJobUpdatePayload, + RunnerJobVODHLSTranscodingPayload, + RunnerJobVODHLSTranscodingPrivatePayload, + VODHLSTranscodingSuccess +} from '@shared/models' +import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls' +import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler' +import { loadTranscodingRunnerVideo } from './shared' + +type CreateOptions = { + video: MVideo + isNewVideo: boolean + deleteWebVideoFiles: boolean + resolution: number + fps: number + priority: number + dependsOnRunnerJob?: MRunnerJob +} + +// eslint-disable-next-line max-len +export class VODHLSTranscodingJobHandler extends AbstractVODTranscodingJobHandler { + + async create (options: CreateOptions) { + const { video, resolution, fps, dependsOnRunnerJob, priority } = options + + const jobUUID = buildUUID() + + const payload: RunnerJobVODHLSTranscodingPayload = { + input: { + videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid) + }, + output: { + resolution, + fps + } + } + + const privatePayload: RunnerJobVODHLSTranscodingPrivatePayload = { + ...pick(options, [ 'isNewVideo', 'deleteWebVideoFiles' ]), + + videoUUID: video.uuid + } + + const job = await this.createRunnerJob({ + type: 'vod-hls-transcoding', + jobUUID, + payload, + privatePayload, + priority, + dependsOnRunnerJob + }) + + await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode') + + return job + } + + // --------------------------------------------------------------------------- + + async specificComplete (options: { + runnerJob: MRunnerJob + resultPayload: VODHLSTranscodingSuccess + }) { + const { runnerJob, resultPayload } = options + const privatePayload = runnerJob.privatePayload as RunnerJobVODHLSTranscodingPrivatePayload + + const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags) + if (!video) return + + const videoFilePath = resultPayload.videoFile as string + const resolutionPlaylistFilePath = resultPayload.resolutionPlaylistFile as string + + const videoFile = await buildNewFile({ path: videoFilePath, mode: 'hls' }) + const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename) + await move(videoFilePath, newVideoFilePath) + + const resolutionPlaylistFilename = getHlsResolutionPlaylistFilename(videoFile.filename) + const newResolutionPlaylistFilePath = join(dirname(resolutionPlaylistFilePath), resolutionPlaylistFilename) + await move(resolutionPlaylistFilePath, newResolutionPlaylistFilePath) + + await renameVideoFileInPlaylist(newResolutionPlaylistFilePath, videoFile.filename) + + await onHLSVideoFileTranscoding({ + video, + videoFile, + m3u8OutputPath: newResolutionPlaylistFilePath, + videoOutputPath: newVideoFilePath + }) + + await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video }) + + if (privatePayload.deleteWebVideoFiles === true) { + logger.info('Removing web video files of %s now we have a HLS version of it.', video.uuid, this.lTags(video.uuid)) + + await removeAllWebTorrentFiles(video) + } + + logger.info('Runner VOD HLS job %s for %s ended.', runnerJob.uuid, video.uuid, this.lTags(runnerJob.uuid, video.uuid)) + } +} diff --git a/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts new file mode 100644 index 000000000..57761a7a1 --- /dev/null +++ b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts @@ -0,0 +1,84 @@ +import { pick } from 'lodash' +import { logger } from '@server/helpers/logger' +import { VideoJobInfoModel } from '@server/models/video/video-job-info' +import { MVideo } from '@server/types/models' +import { MRunnerJob } from '@server/types/models/runners' +import { buildUUID } from '@shared/extra-utils' +import { + RunnerJobUpdatePayload, + RunnerJobVODWebVideoTranscodingPayload, + RunnerJobVODWebVideoTranscodingPrivatePayload, + VODWebVideoTranscodingSuccess +} from '@shared/models' +import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls' +import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler' +import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared' + +type CreateOptions = { + video: MVideo + isNewVideo: boolean + resolution: number + fps: number + priority: number + dependsOnRunnerJob?: MRunnerJob +} + +// eslint-disable-next-line max-len +export class VODWebVideoTranscodingJobHandler extends AbstractVODTranscodingJobHandler { + + async create (options: CreateOptions) { + const { video, resolution, fps, priority, dependsOnRunnerJob } = options + + const jobUUID = buildUUID() + const payload: RunnerJobVODWebVideoTranscodingPayload = { + input: { + videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid) + }, + output: { + resolution, + fps + } + } + + const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = { + ...pick(options, [ 'isNewVideo' ]), + + videoUUID: video.uuid + } + + const job = await this.createRunnerJob({ + type: 'vod-web-video-transcoding', + jobUUID, + payload, + privatePayload, + dependsOnRunnerJob, + priority + }) + + await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode') + + return job + } + + // --------------------------------------------------------------------------- + + async specificComplete (options: { + runnerJob: MRunnerJob + resultPayload: VODWebVideoTranscodingSuccess + }) { + const { runnerJob, resultPayload } = options + const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload + + const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags) + if (!video) return + + const videoFilePath = resultPayload.videoFile as string + + await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload }) + + logger.info( + 'Runner VOD web video transcoding job %s for %s ended.', + runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid) + ) + } +} diff --git a/server/lib/runners/runner-urls.ts b/server/lib/runners/runner-urls.ts new file mode 100644 index 000000000..329fb1170 --- /dev/null +++ b/server/lib/runners/runner-urls.ts @@ -0,0 +1,9 @@ +import { WEBSERVER } from '@server/initializers/constants' + +export function generateRunnerTranscodingVideoInputFileUrl (jobUUID: string, videoUUID: string) { + return WEBSERVER.URL + '/api/v1/runners/jobs/' + jobUUID + '/files/videos/' + videoUUID + '/max-quality' +} + +export function generateRunnerTranscodingVideoPreviewFileUrl (jobUUID: string, videoUUID: string) { + return WEBSERVER.URL + '/api/v1/runners/jobs/' + jobUUID + '/files/videos/' + videoUUID + '/previews/max-quality' +} diff --git a/server/lib/runners/runner.ts b/server/lib/runners/runner.ts new file mode 100644 index 000000000..74c814ba1 --- /dev/null +++ b/server/lib/runners/runner.ts @@ -0,0 +1,36 @@ +import express from 'express' +import { retryTransactionWrapper } from '@server/helpers/database-utils' +import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { sequelizeTypescript } from '@server/initializers/database' +import { MRunner } from '@server/types/models/runners' + +const lTags = loggerTagsFactory('runner') + +const updatingRunner = new Set() + +function updateLastRunnerContact (req: express.Request, runner: MRunner) { + const now = new Date() + + // Don't update last runner contact too often + if (now.getTime() - runner.lastContact.getTime() < 2000) return + if (updatingRunner.has(runner.id)) return + + updatingRunner.add(runner.id) + + runner.lastContact = now + runner.ip = req.ip + + logger.debug('Updating last runner contact for %s', runner.name, lTags(runner.name)) + + retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async transaction => { + return runner.save({ transaction }) + }) + }) + .catch(err => logger.error('Cannot update last runner contact for %s', runner.name, { err, ...lTags(runner.name) })) + .finally(() => updatingRunner.delete(runner.id)) +} + +export { + updateLastRunnerContact +} -- cgit v1.2.3