diff options
author | Chocobozzz <me@florianbigard.com> | 2022-08-09 09:09:31 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-08-09 09:32:17 +0200 |
commit | b42c2c7e89a64ed730d8140840fe74a13c31f2a4 (patch) | |
tree | 715e7ad31d03881e3f3530dba1fe3d172251249b /server/lib | |
parent | bd911b54b555b11df7e9849cf92d358bccfecf6e (diff) | |
download | PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.tar.gz PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.tar.zst PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.zip |
Avoid concurrency issue on transcoding
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/activitypub/videos/shared/video-sync-attributes.ts | 8 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-transcoding.ts | 7 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 27 | ||||
-rw-r--r-- | server/lib/video.ts | 8 |
4 files changed, 31 insertions, 19 deletions
diff --git a/server/lib/activitypub/videos/shared/video-sync-attributes.ts b/server/lib/activitypub/videos/shared/video-sync-attributes.ts index 8ed1b6447..e3cb96a62 100644 --- a/server/lib/activitypub/videos/shared/video-sync-attributes.ts +++ b/server/lib/activitypub/videos/shared/video-sync-attributes.ts | |||
@@ -73,10 +73,6 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi | |||
73 | return totalItems | 73 | return totalItems |
74 | } | 74 | } |
75 | 75 | ||
76 | function createJob (payload: ActivitypubHttpFetcherPayload) { | ||
77 | return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) | ||
78 | } | ||
79 | |||
80 | function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { | 76 | function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { |
81 | const uri = fetchedVideo.shares | 77 | const uri = fetchedVideo.shares |
82 | 78 | ||
@@ -104,3 +100,7 @@ function syncComments (video: MVideo, fetchedVideo: VideoObject, isSync: boolean | |||
104 | return crawlCollectionPage<string>(uri, handler, cleaner) | 100 | return crawlCollectionPage<string>(uri, handler, cleaner) |
105 | .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err, rootUrl: uri, ...lTags(video.uuid, video.url) })) | 101 | .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err, rootUrl: uri, ...lTags(video.uuid, video.url) })) |
106 | } | 102 | } |
103 | |||
104 | function createJob (payload: ActivitypubHttpFetcherPayload) { | ||
105 | return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) | ||
106 | } | ||
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 8dbae8c42..cb2978157 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { Job } from 'bullmq' | 1 | import { Job } from 'bullmq' |
2 | import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' | 2 | import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' |
3 | import { Hooks } from '@server/lib/plugins/hooks' | 3 | import { Hooks } from '@server/lib/plugins/hooks' |
4 | import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' | 4 | import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' |
5 | import { VideoPathManager } from '@server/lib/video-path-manager' | 5 | import { VideoPathManager } from '@server/lib/video-path-manager' |
6 | import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' | 6 | import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' |
7 | import { UserModel } from '@server/models/user/user' | 7 | import { UserModel } from '@server/models/user/user' |
@@ -27,6 +27,7 @@ import { | |||
27 | optimizeOriginalVideofile, | 27 | optimizeOriginalVideofile, |
28 | transcodeNewWebTorrentResolution | 28 | transcodeNewWebTorrentResolution |
29 | } from '../../transcoding/transcoding' | 29 | } from '../../transcoding/transcoding' |
30 | import { JobQueue } from '../job-queue' | ||
30 | 31 | ||
31 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> | 32 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> |
32 | 33 | ||
@@ -248,7 +249,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { | |||
248 | ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) | 249 | ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) |
249 | } | 250 | } |
250 | 251 | ||
251 | await addTranscodingJob(hlsTranscodingPayload, jobOptions) | 252 | await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions)) |
252 | 253 | ||
253 | return true | 254 | return true |
254 | } | 255 | } |
@@ -312,7 +313,7 @@ async function createLowerResolutionsJobs (options: { | |||
312 | priority: await getTranscodingJobPriority(user) | 313 | priority: await getTranscodingJobPriority(user) |
313 | } | 314 | } |
314 | 315 | ||
315 | await addTranscodingJob(dataInput, jobOptions) | 316 | await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions)) |
316 | } | 317 | } |
317 | 318 | ||
318 | if (resolutionCreated.length === 0) { | 319 | if (resolutionCreated.length === 0) { |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 50d732beb..386d20103 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -325,10 +325,8 @@ class JobQueue { | |||
325 | if (!job) continue | 325 | if (!job) continue |
326 | 326 | ||
327 | lastJob = { | 327 | lastJob = { |
328 | name: 'job', | 328 | ...this.buildJobFlowOption(job), |
329 | data: job.payload, | 329 | |
330 | queueName: job.type, | ||
331 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), | ||
332 | children: lastJob | 330 | children: lastJob |
333 | ? [ lastJob ] | 331 | ? [ lastJob ] |
334 | : [] | 332 | : [] |
@@ -338,6 +336,23 @@ class JobQueue { | |||
338 | return this.flowProducer.add(lastJob) | 336 | return this.flowProducer.add(lastJob) |
339 | } | 337 | } |
340 | 338 | ||
339 | async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { | ||
340 | return this.flowProducer.add({ | ||
341 | ...this.buildJobFlowOption(parent), | ||
342 | |||
343 | children: children.map(c => this.buildJobFlowOption(c)) | ||
344 | }) | ||
345 | } | ||
346 | |||
347 | private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) { | ||
348 | return { | ||
349 | name: 'job', | ||
350 | data: job.payload, | ||
351 | queueName: job.type, | ||
352 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) | ||
353 | } | ||
354 | } | ||
355 | |||
341 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { | 356 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { |
342 | return { | 357 | return { |
343 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 358 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
@@ -425,10 +440,6 @@ class JobQueue { | |||
425 | } | 440 | } |
426 | } | 441 | } |
427 | 442 | ||
428 | waitJob (job: Job) { | ||
429 | return job.waitUntilFinished(this.queueEvents[job.queueName]) | ||
430 | } | ||
431 | |||
432 | private addRepeatableJobs () { | 443 | private addRepeatableJobs () { |
433 | this.queues['videos-views-stats'].add('job', {}, { | 444 | this.queues['videos-views-stats'].add('job', {}, { |
434 | repeat: REPEAT_JOBS['videos-views-stats'] | 445 | repeat: REPEAT_JOBS['videos-views-stats'] |
diff --git a/server/lib/video.ts b/server/lib/video.ts index f7d7aa186..6c4f3ce7b 100644 --- a/server/lib/video.ts +++ b/server/lib/video.ts | |||
@@ -9,7 +9,7 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info' | |||
9 | import { FilteredModelAttributes } from '@server/types' | 9 | import { FilteredModelAttributes } from '@server/types' |
10 | import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' | 10 | import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' |
11 | import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' | 11 | import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' |
12 | import { CreateJobOptions, JobQueue } from './job-queue/job-queue' | 12 | import { CreateJobOptions } from './job-queue/job-queue' |
13 | import { updateVideoMiniatureFromExisting } from './thumbnail' | 13 | import { updateVideoMiniatureFromExisting } from './thumbnail' |
14 | 14 | ||
15 | function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> { | 15 | function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> { |
@@ -121,10 +121,10 @@ async function buildOptimizeOrMergeAudioJob (options: { | |||
121 | } | 121 | } |
122 | } | 122 | } |
123 | 123 | ||
124 | async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { | 124 | async function buildTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { |
125 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') | 125 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') |
126 | 126 | ||
127 | return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options }) | 127 | return { type: 'video-transcoding' as 'video-transcoding', payload, ...options } |
128 | } | 128 | } |
129 | 129 | ||
130 | async function getTranscodingJobPriority (user: MUserId) { | 130 | async function getTranscodingJobPriority (user: MUserId) { |
@@ -182,7 +182,7 @@ export { | |||
182 | buildVideoThumbnailsFromReq, | 182 | buildVideoThumbnailsFromReq, |
183 | setVideoTags, | 183 | setVideoTags, |
184 | buildOptimizeOrMergeAudioJob, | 184 | buildOptimizeOrMergeAudioJob, |
185 | addTranscodingJob, | 185 | buildTranscodingJob, |
186 | buildMoveToObjectStorageJob, | 186 | buildMoveToObjectStorageJob, |
187 | getTranscodingJobPriority, | 187 | getTranscodingJobPriority, |
188 | getCachedVideoDuration | 188 | getCachedVideoDuration |