aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-09 09:09:31 +0200
committerChocobozzz <me@florianbigard.com>2022-08-09 09:32:17 +0200
commitb42c2c7e89a64ed730d8140840fe74a13c31f2a4 (patch)
tree715e7ad31d03881e3f3530dba1fe3d172251249b /server/lib/job-queue
parentbd911b54b555b11df7e9849cf92d358bccfecf6e (diff)
downloadPeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.tar.gz
PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.tar.zst
PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.zip
Avoid concurrency issue on transcoding
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts7
-rw-r--r--server/lib/job-queue/job-queue.ts27
2 files changed, 23 insertions, 11 deletions
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 8dbae8c42..cb2978157 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,7 +1,7 @@
1import { Job } from 'bullmq' 1import { Job } from 'bullmq'
2import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' 2import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg'
3import { Hooks } from '@server/lib/plugins/hooks' 3import { Hooks } from '@server/lib/plugins/hooks'
4import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' 4import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
5import { VideoPathManager } from '@server/lib/video-path-manager' 5import { VideoPathManager } from '@server/lib/video-path-manager'
6import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' 6import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state'
7import { UserModel } from '@server/models/user/user' 7import { UserModel } from '@server/models/user/user'
@@ -27,6 +27,7 @@ import {
27 optimizeOriginalVideofile, 27 optimizeOriginalVideofile,
28 transcodeNewWebTorrentResolution 28 transcodeNewWebTorrentResolution
29} from '../../transcoding/transcoding' 29} from '../../transcoding/transcoding'
30import { JobQueue } from '../job-queue'
30 31
31type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> 32type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
32 33
@@ -248,7 +249,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: {
248 ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) 249 ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ])
249 } 250 }
250 251
251 await addTranscodingJob(hlsTranscodingPayload, jobOptions) 252 await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions))
252 253
253 return true 254 return true
254} 255}
@@ -312,7 +313,7 @@ async function createLowerResolutionsJobs (options: {
312 priority: await getTranscodingJobPriority(user) 313 priority: await getTranscodingJobPriority(user)
313 } 314 }
314 315
315 await addTranscodingJob(dataInput, jobOptions) 316 await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions))
316 } 317 }
317 318
318 if (resolutionCreated.length === 0) { 319 if (resolutionCreated.length === 0) {
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 50d732beb..386d20103 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -325,10 +325,8 @@ class JobQueue {
325 if (!job) continue 325 if (!job) continue
326 326
327 lastJob = { 327 lastJob = {
328 name: 'job', 328 ...this.buildJobFlowOption(job),
329 data: job.payload, 329
330 queueName: job.type,
331 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
332 children: lastJob 330 children: lastJob
333 ? [ lastJob ] 331 ? [ lastJob ]
334 : [] 332 : []
@@ -338,6 +336,23 @@ class JobQueue {
338 return this.flowProducer.add(lastJob) 336 return this.flowProducer.add(lastJob)
339 } 337 }
340 338
339 async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
340 return this.flowProducer.add({
341 ...this.buildJobFlowOption(parent),
342
343 children: children.map(c => this.buildJobFlowOption(c))
344 })
345 }
346
347 private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) {
348 return {
349 name: 'job',
350 data: job.payload,
351 queueName: job.type,
352 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
353 }
354 }
355
341 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { 356 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
342 return { 357 return {
343 backoff: { delay: 60 * 1000, type: 'exponential' }, 358 backoff: { delay: 60 * 1000, type: 'exponential' },
@@ -425,10 +440,6 @@ class JobQueue {
425 } 440 }
426 } 441 }
427 442
428 waitJob (job: Job) {
429 return job.waitUntilFinished(this.queueEvents[job.queueName])
430 }
431
432 private addRepeatableJobs () { 443 private addRepeatableJobs () {
433 this.queues['videos-views-stats'].add('job', {}, { 444 this.queues['videos-views-stats'].add('job', {}, {
434 repeat: REPEAT_JOBS['videos-views-stats'] 445 repeat: REPEAT_JOBS['videos-views-stats']