From b42c2c7e89a64ed730d8140840fe74a13c31f2a4 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 9 Aug 2022 09:09:31 +0200 Subject: Avoid concurrency issue on transcoding --- server/lib/job-queue/job-queue.ts | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) (limited to 'server/lib/job-queue/job-queue.ts') diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 50d732beb..386d20103 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -325,10 +325,8 @@ class JobQueue { if (!job) continue lastJob = { - name: 'job', - data: job.payload, - queueName: job.type, - opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), + ...this.buildJobFlowOption(job), + children: lastJob ? [ lastJob ] : [] @@ -338,6 +336,23 @@ class JobQueue { return this.flowProducer.add(lastJob) } + async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { + return this.flowProducer.add({ + ...this.buildJobFlowOption(parent), + + children: children.map(c => this.buildJobFlowOption(c)) + }) + } + + private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) { + return { + name: 'job', + data: job.payload, + queueName: job.type, + opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) + } + } + private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { return { backoff: { delay: 60 * 1000, type: 'exponential' }, @@ -425,10 +440,6 @@ class JobQueue { } } - waitJob (job: Job) { - return job.waitUntilFinished(this.queueEvents[job.queueName]) - } - private addRepeatableJobs () { this.queues['videos-views-stats'].add('job', {}, { repeat: REPEAT_JOBS['videos-views-stats'] -- cgit v1.2.3