From b42c2c7e89a64ed730d8140840fe74a13c31f2a4 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 9 Aug 2022 09:09:31 +0200 Subject: Avoid concurrency issue on transcoding --- scripts/create-transcoding-job.ts | 11 +-- server/controllers/api/videos/transcoding.ts | 90 +++++++++++++++++----- .../videos/shared/video-sync-attributes.ts | 8 +- server/lib/job-queue/handlers/video-transcoding.ts | 7 +- server/lib/job-queue/job-queue.ts | 27 +++++-- server/lib/video.ts | 8 +- server/models/video/video.ts | 13 ++-- server/tools/peertube-import-videos.ts | 6 +- 8 files changed, 118 insertions(+), 52 deletions(-) diff --git a/scripts/create-transcoding-job.ts b/scripts/create-transcoding-job.ts index f8c0ed461..aa97b0ba7 100755 --- a/scripts/create-transcoding-job.ts +++ b/scripts/create-transcoding-job.ts @@ -2,7 +2,7 @@ import { program } from 'commander' import { isUUIDValid, toCompleteUUID } from '@server/helpers/custom-validators/misc' import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg' import { CONFIG } from '@server/initializers/config' -import { addTranscodingJob } from '@server/lib/video' +import { buildTranscodingJob } from '@server/lib/video' import { VideoState, VideoTranscodingPayload } from '@shared/models' import { initDatabaseModels } from '../server/initializers/database' import { JobQueue } from '../server/lib/job-queue' @@ -57,7 +57,7 @@ async function run () { for (const resolution of resolutionsEnabled) { dataInput.push({ - type: 'new-resolution-to-hls', + type: 'new-resolution-to-hls' as 'new-resolution-to-hls', videoUUID: video.uuid, resolution, @@ -72,7 +72,7 @@ async function run () { } else { if (options.resolution !== undefined) { dataInput.push({ - type: 'new-resolution-to-webtorrent', + type: 'new-resolution-to-webtorrent' as 'new-resolution-to-webtorrent', videoUUID: video.uuid, createHLSIfNeeded: true, @@ -90,7 +90,7 @@ async function run () { } dataInput.push({ - type: 'optimize-to-webtorrent', + type: 'optimize-to-webtorrent' as 'optimize-to-webtorrent', videoUUID: video.uuid, isNewVideo: false }) @@ -103,7 +103,8 @@ async function run () { await video.save() for (const d of dataInput) { - await addTranscodingJob(d, {}) + await JobQueue.Instance.createJob(await buildTranscodingJob(d)) + console.log('Transcoding job for video %s created.', video.uuid) } } diff --git a/server/controllers/api/videos/transcoding.ts b/server/controllers/api/videos/transcoding.ts index b2b71a870..9aca761c1 100644 --- a/server/controllers/api/videos/transcoding.ts +++ b/server/controllers/api/videos/transcoding.ts @@ -1,10 +1,12 @@ +import Bluebird from 'bluebird' import express from 'express' import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg' import { logger, loggerTagsFactory } from '@server/helpers/logger' -import { addTranscodingJob } from '@server/lib/video' +import { JobQueue } from '@server/lib/job-queue' +import { Hooks } from '@server/lib/plugins/hooks' +import { buildTranscodingJob } from '@server/lib/video' import { HttpStatusCode, UserRight, VideoState, VideoTranscodingCreate } from '@shared/models' import { asyncMiddleware, authenticate, createTranscodingValidator, ensureUserHasRight } from '../../../middlewares' -import { Hooks } from '@server/lib/plugins/hooks' const lTags = loggerTagsFactory('api', 'video') const transcodingRouter = express.Router() @@ -44,29 +46,81 @@ async function createTranscoding (req: express.Request, res: express.Response) { video.state = VideoState.TO_TRANSCODE await video.save() - for (const resolution of resolutions) { + const hasAudio = !!audioStream + const childrenResolutions = resolutions.filter(r => r !== maxResolution) + + const children = await Bluebird.mapSeries(childrenResolutions, resolution => { if (body.transcodingType === 'hls') { - await addTranscodingJob({ - type: 'new-resolution-to-hls', + return buildHLSJobOption({ videoUUID: video.uuid, + hasAudio, resolution, - hasAudio: !!audioStream, - copyCodecs: false, - isNewVideo: false, - autoDeleteWebTorrentIfNeeded: false, - isMaxQuality: maxResolution === resolution + isMaxQuality: false }) - } else if (body.transcodingType === 'webtorrent') { - await addTranscodingJob({ - type: 'new-resolution-to-webtorrent', + } + + if (body.transcodingType === 'webtorrent') { + return buildWebTorrentJobOption({ videoUUID: video.uuid, - isNewVideo: false, - resolution, - hasAudio: !!audioStream, - createHLSIfNeeded: false + hasAudio, + resolution }) } - } + }) + + const parent = body.transcodingType === 'hls' + ? await buildHLSJobOption({ + videoUUID: video.uuid, + hasAudio, + resolution: maxResolution, + isMaxQuality: false + }) + : await buildWebTorrentJobOption({ + videoUUID: video.uuid, + hasAudio, + resolution: maxResolution + }) + + // Porcess the last resolution after the other ones to prevent concurrency issue + // Because low resolutions use the biggest one as ffmpeg input + await JobQueue.Instance.createJobWithChildren(parent, children) return res.sendStatus(HttpStatusCode.NO_CONTENT_204) } + +function buildHLSJobOption (options: { + videoUUID: string + hasAudio: boolean + resolution: number + isMaxQuality: boolean +}) { + const { videoUUID, hasAudio, resolution, isMaxQuality } = options + + return buildTranscodingJob({ + type: 'new-resolution-to-hls', + videoUUID, + resolution, + hasAudio, + copyCodecs: false, + isNewVideo: false, + autoDeleteWebTorrentIfNeeded: false, + isMaxQuality + }) +} + +function buildWebTorrentJobOption (options: { + videoUUID: string + hasAudio: boolean + resolution: number +}) { + const { videoUUID, hasAudio, resolution } = options + + return buildTranscodingJob({ + type: 'new-resolution-to-webtorrent', + videoUUID, + isNewVideo: false, + resolution, + hasAudio, + createHLSIfNeeded: false + }) +} diff --git a/server/lib/activitypub/videos/shared/video-sync-attributes.ts b/server/lib/activitypub/videos/shared/video-sync-attributes.ts index 8ed1b6447..e3cb96a62 100644 --- a/server/lib/activitypub/videos/shared/video-sync-attributes.ts +++ b/server/lib/activitypub/videos/shared/video-sync-attributes.ts @@ -73,10 +73,6 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi return totalItems } -function createJob (payload: ActivitypubHttpFetcherPayload) { - return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) -} - function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { const uri = fetchedVideo.shares @@ -104,3 +100,7 @@ function syncComments (video: MVideo, fetchedVideo: VideoObject, isSync: boolean return crawlCollectionPage(uri, handler, cleaner) .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err, rootUrl: uri, ...lTags(video.uuid, video.url) })) } + +function createJob (payload: ActivitypubHttpFetcherPayload) { + return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) +} diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 8dbae8c42..cb2978157 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -1,7 +1,7 @@ import { Job } from 'bullmq' import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' import { Hooks } from '@server/lib/plugins/hooks' -import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' +import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' import { VideoPathManager } from '@server/lib/video-path-manager' import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' import { UserModel } from '@server/models/user/user' @@ -27,6 +27,7 @@ import { optimizeOriginalVideofile, transcodeNewWebTorrentResolution } from '../../transcoding/transcoding' +import { JobQueue } from '../job-queue' type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise @@ -248,7 +249,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) } - await addTranscodingJob(hlsTranscodingPayload, jobOptions) + await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions)) return true } @@ -312,7 +313,7 @@ async function createLowerResolutionsJobs (options: { priority: await getTranscodingJobPriority(user) } - await addTranscodingJob(dataInput, jobOptions) + await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions)) } if (resolutionCreated.length === 0) { diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 50d732beb..386d20103 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -325,10 +325,8 @@ class JobQueue { if (!job) continue lastJob = { - name: 'job', - data: job.payload, - queueName: job.type, - opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), + ...this.buildJobFlowOption(job), + children: lastJob ? [ lastJob ] : [] @@ -338,6 +336,23 @@ class JobQueue { return this.flowProducer.add(lastJob) } + async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { + return this.flowProducer.add({ + ...this.buildJobFlowOption(parent), + + children: children.map(c => this.buildJobFlowOption(c)) + }) + } + + private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) { + return { + name: 'job', + data: job.payload, + queueName: job.type, + opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) + } + } + private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { return { backoff: { delay: 60 * 1000, type: 'exponential' }, @@ -425,10 +440,6 @@ class JobQueue { } } - waitJob (job: Job) { - return job.waitUntilFinished(this.queueEvents[job.queueName]) - } - private addRepeatableJobs () { this.queues['videos-views-stats'].add('job', {}, { repeat: REPEAT_JOBS['videos-views-stats'] diff --git a/server/lib/video.ts b/server/lib/video.ts index f7d7aa186..6c4f3ce7b 100644 --- a/server/lib/video.ts +++ b/server/lib/video.ts @@ -9,7 +9,7 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info' import { FilteredModelAttributes } from '@server/types' import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' -import { CreateJobOptions, JobQueue } from './job-queue/job-queue' +import { CreateJobOptions } from './job-queue/job-queue' import { updateVideoMiniatureFromExisting } from './thumbnail' function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes { @@ -121,10 +121,10 @@ async function buildOptimizeOrMergeAudioJob (options: { } } -async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { +async function buildTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') - return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options }) + return { type: 'video-transcoding' as 'video-transcoding', payload, ...options } } async function getTranscodingJobPriority (user: MUserId) { @@ -182,7 +182,7 @@ export { buildVideoThumbnailsFromReq, setVideoTags, buildOptimizeOrMergeAudioJob, - addTranscodingJob, + buildTranscodingJob, buildMoveToObjectStorageJob, getTranscodingJobPriority, getCachedVideoDuration diff --git a/server/models/video/video.ts b/server/models/video/video.ts index b8e383502..a8ea67c39 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts @@ -1592,22 +1592,21 @@ export class VideoModel extends Model>> { } getQualityFileBy (this: T, fun: (files: MVideoFile[], it: (file: MVideoFile) => number) => MVideoFile) { - // We first transcode to WebTorrent format, so try this array first - if (Array.isArray(this.VideoFiles) && this.VideoFiles.length !== 0) { - const file = fun(this.VideoFiles, file => file.resolution) + const files = this.getAllFiles() + const file = fun(files, file => file.resolution) + if (!file) return undefined + if (file.videoId) { return Object.assign(file, { Video: this }) } - // No webtorrent files, try with streaming playlist files - if (Array.isArray(this.VideoStreamingPlaylists) && this.VideoStreamingPlaylists.length !== 0) { + if (file.videoStreamingPlaylistId) { const streamingPlaylistWithVideo = Object.assign(this.VideoStreamingPlaylists[0], { Video: this }) - const file = fun(streamingPlaylistWithVideo.VideoFiles, file => file.resolution) return Object.assign(file, { VideoStreamingPlaylist: streamingPlaylistWithVideo }) } - return undefined + throw new Error('File is not associated to a video of a playlist') } getMaxQualityFile (this: T): MVideoFileVideo | MVideoFileStreamingPlaylistVideo { diff --git a/server/tools/peertube-import-videos.ts b/server/tools/peertube-import-videos.ts index e2f80c703..76338ea3c 100644 --- a/server/tools/peertube-import-videos.ts +++ b/server/tools/peertube-import-videos.ts @@ -165,7 +165,7 @@ async function processVideo (parameters: { const youtubeDLBinary = await YoutubeDLCLI.safeGet() const output = await youtubeDLBinary.download({ url: videoInfo.url, - format: YoutubeDLCLI.getYoutubeDLVideoFormat([]), + format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false), output: path, additionalYoutubeDLArgs: command.args, processOptions @@ -251,7 +251,7 @@ async function fetchObject (info: any) { const youtubeDLCLI = await YoutubeDLCLI.safeGet() const result = await youtubeDLCLI.getInfo({ url, - format: YoutubeDLCLI.getYoutubeDLVideoFormat([]), + format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false), processOptions }) @@ -336,7 +336,7 @@ function exitError (message: string, ...meta: any[]) { function getYoutubeDLInfo (youtubeDLCLI: YoutubeDLCLI, url: string, args: string[]) { return youtubeDLCLI.getInfo({ url, - format: YoutubeDLCLI.getYoutubeDLVideoFormat([]), + format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false), additionalYoutubeDLArgs: [ '-j', '--flat-playlist', '--playlist-reverse', ...args ], processOptions }) -- cgit v1.2.3