From b42c2c7e89a64ed730d8140840fe74a13c31f2a4 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 9 Aug 2022 09:09:31 +0200 Subject: Avoid concurrency issue on transcoding --- .../videos/shared/video-sync-attributes.ts | 8 +++---- server/lib/job-queue/handlers/video-transcoding.ts | 7 +++--- server/lib/job-queue/job-queue.ts | 27 +++++++++++++++------- server/lib/video.ts | 8 +++---- 4 files changed, 31 insertions(+), 19 deletions(-) (limited to 'server/lib') diff --git a/server/lib/activitypub/videos/shared/video-sync-attributes.ts b/server/lib/activitypub/videos/shared/video-sync-attributes.ts index 8ed1b6447..e3cb96a62 100644 --- a/server/lib/activitypub/videos/shared/video-sync-attributes.ts +++ b/server/lib/activitypub/videos/shared/video-sync-attributes.ts @@ -73,10 +73,6 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi return totalItems } -function createJob (payload: ActivitypubHttpFetcherPayload) { - return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) -} - function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { const uri = fetchedVideo.shares @@ -104,3 +100,7 @@ function syncComments (video: MVideo, fetchedVideo: VideoObject, isSync: boolean return crawlCollectionPage(uri, handler, cleaner) .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err, rootUrl: uri, ...lTags(video.uuid, video.url) })) } + +function createJob (payload: ActivitypubHttpFetcherPayload) { + return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) +} diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 8dbae8c42..cb2978157 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -1,7 +1,7 @@ import { Job } from 'bullmq' import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' import { Hooks } from '@server/lib/plugins/hooks' -import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' +import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' import { VideoPathManager } from '@server/lib/video-path-manager' import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' import { UserModel } from '@server/models/user/user' @@ -27,6 +27,7 @@ import { optimizeOriginalVideofile, transcodeNewWebTorrentResolution } from '../../transcoding/transcoding' +import { JobQueue } from '../job-queue' type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise @@ -248,7 +249,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) } - await addTranscodingJob(hlsTranscodingPayload, jobOptions) + await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions)) return true } @@ -312,7 +313,7 @@ async function createLowerResolutionsJobs (options: { priority: await getTranscodingJobPriority(user) } - await addTranscodingJob(dataInput, jobOptions) + await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions)) } if (resolutionCreated.length === 0) { diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 50d732beb..386d20103 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -325,10 +325,8 @@ class JobQueue { if (!job) continue lastJob = { - name: 'job', - data: job.payload, - queueName: job.type, - opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), + ...this.buildJobFlowOption(job), + children: lastJob ? [ lastJob ] : [] @@ -338,6 +336,23 @@ class JobQueue { return this.flowProducer.add(lastJob) } + async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { + return this.flowProducer.add({ + ...this.buildJobFlowOption(parent), + + children: children.map(c => this.buildJobFlowOption(c)) + }) + } + + private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) { + return { + name: 'job', + data: job.payload, + queueName: job.type, + opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) + } + } + private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { return { backoff: { delay: 60 * 1000, type: 'exponential' }, @@ -425,10 +440,6 @@ class JobQueue { } } - waitJob (job: Job) { - return job.waitUntilFinished(this.queueEvents[job.queueName]) - } - private addRepeatableJobs () { this.queues['videos-views-stats'].add('job', {}, { repeat: REPEAT_JOBS['videos-views-stats'] diff --git a/server/lib/video.ts b/server/lib/video.ts index f7d7aa186..6c4f3ce7b 100644 --- a/server/lib/video.ts +++ b/server/lib/video.ts @@ -9,7 +9,7 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info' import { FilteredModelAttributes } from '@server/types' import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' -import { CreateJobOptions, JobQueue } from './job-queue/job-queue' +import { CreateJobOptions } from './job-queue/job-queue' import { updateVideoMiniatureFromExisting } from './thumbnail' function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes { @@ -121,10 +121,10 @@ async function buildOptimizeOrMergeAudioJob (options: { } } -async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { +async function buildTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') - return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options }) + return { type: 'video-transcoding' as 'video-transcoding', payload, ...options } } async function getTranscodingJobPriority (user: MUserId) { @@ -182,7 +182,7 @@ export { buildVideoThumbnailsFromReq, setVideoTags, buildOptimizeOrMergeAudioJob, - addTranscodingJob, + buildTranscodingJob, buildMoveToObjectStorageJob, getTranscodingJobPriority, getCachedVideoDuration -- cgit v1.2.3