diff options
author | Chocobozzz <me@florianbigard.com> | 2022-08-09 09:09:31 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-08-09 09:32:17 +0200 |
commit | b42c2c7e89a64ed730d8140840fe74a13c31f2a4 (patch) | |
tree | 715e7ad31d03881e3f3530dba1fe3d172251249b /server/lib/job-queue | |
parent | bd911b54b555b11df7e9849cf92d358bccfecf6e (diff) | |
download | PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.tar.gz PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.tar.zst PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.zip |
Avoid concurrency issue on transcoding
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/video-transcoding.ts | 7 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 27 |
2 files changed, 23 insertions, 11 deletions
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 8dbae8c42..cb2978157 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { Job } from 'bullmq' | 1 | import { Job } from 'bullmq' |
2 | import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' | 2 | import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' |
3 | import { Hooks } from '@server/lib/plugins/hooks' | 3 | import { Hooks } from '@server/lib/plugins/hooks' |
4 | import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' | 4 | import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' |
5 | import { VideoPathManager } from '@server/lib/video-path-manager' | 5 | import { VideoPathManager } from '@server/lib/video-path-manager' |
6 | import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' | 6 | import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' |
7 | import { UserModel } from '@server/models/user/user' | 7 | import { UserModel } from '@server/models/user/user' |
@@ -27,6 +27,7 @@ import { | |||
27 | optimizeOriginalVideofile, | 27 | optimizeOriginalVideofile, |
28 | transcodeNewWebTorrentResolution | 28 | transcodeNewWebTorrentResolution |
29 | } from '../../transcoding/transcoding' | 29 | } from '../../transcoding/transcoding' |
30 | import { JobQueue } from '../job-queue' | ||
30 | 31 | ||
31 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> | 32 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> |
32 | 33 | ||
@@ -248,7 +249,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { | |||
248 | ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) | 249 | ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) |
249 | } | 250 | } |
250 | 251 | ||
251 | await addTranscodingJob(hlsTranscodingPayload, jobOptions) | 252 | await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions)) |
252 | 253 | ||
253 | return true | 254 | return true |
254 | } | 255 | } |
@@ -312,7 +313,7 @@ async function createLowerResolutionsJobs (options: { | |||
312 | priority: await getTranscodingJobPriority(user) | 313 | priority: await getTranscodingJobPriority(user) |
313 | } | 314 | } |
314 | 315 | ||
315 | await addTranscodingJob(dataInput, jobOptions) | 316 | await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions)) |
316 | } | 317 | } |
317 | 318 | ||
318 | if (resolutionCreated.length === 0) { | 319 | if (resolutionCreated.length === 0) { |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 50d732beb..386d20103 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -325,10 +325,8 @@ class JobQueue { | |||
325 | if (!job) continue | 325 | if (!job) continue |
326 | 326 | ||
327 | lastJob = { | 327 | lastJob = { |
328 | name: 'job', | 328 | ...this.buildJobFlowOption(job), |
329 | data: job.payload, | 329 | |
330 | queueName: job.type, | ||
331 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), | ||
332 | children: lastJob | 330 | children: lastJob |
333 | ? [ lastJob ] | 331 | ? [ lastJob ] |
334 | : [] | 332 | : [] |
@@ -338,6 +336,23 @@ class JobQueue { | |||
338 | return this.flowProducer.add(lastJob) | 336 | return this.flowProducer.add(lastJob) |
339 | } | 337 | } |
340 | 338 | ||
339 | async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { | ||
340 | return this.flowProducer.add({ | ||
341 | ...this.buildJobFlowOption(parent), | ||
342 | |||
343 | children: children.map(c => this.buildJobFlowOption(c)) | ||
344 | }) | ||
345 | } | ||
346 | |||
347 | private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) { | ||
348 | return { | ||
349 | name: 'job', | ||
350 | data: job.payload, | ||
351 | queueName: job.type, | ||
352 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) | ||
353 | } | ||
354 | } | ||
355 | |||
341 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { | 356 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { |
342 | return { | 357 | return { |
343 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 358 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
@@ -425,10 +440,6 @@ class JobQueue { | |||
425 | } | 440 | } |
426 | } | 441 | } |
427 | 442 | ||
428 | waitJob (job: Job) { | ||
429 | return job.waitUntilFinished(this.queueEvents[job.queueName]) | ||
430 | } | ||
431 | |||
432 | private addRepeatableJobs () { | 443 | private addRepeatableJobs () { |
433 | this.queues['videos-views-stats'].add('job', {}, { | 444 | this.queues['videos-views-stats'].add('job', {}, { |
434 | repeat: REPEAT_JOBS['videos-views-stats'] | 445 | repeat: REPEAT_JOBS['videos-views-stats'] |