aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-09 09:09:31 +0200
committerChocobozzz <me@florianbigard.com>2022-08-09 09:32:17 +0200
commitb42c2c7e89a64ed730d8140840fe74a13c31f2a4 (patch)
tree715e7ad31d03881e3f3530dba1fe3d172251249b /server/lib
parentbd911b54b555b11df7e9849cf92d358bccfecf6e (diff)
downloadPeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.tar.gz
PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.tar.zst
PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.zip
Avoid concurrency issue on transcoding
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/activitypub/videos/shared/video-sync-attributes.ts8
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts7
-rw-r--r--server/lib/job-queue/job-queue.ts27
-rw-r--r--server/lib/video.ts8
4 files changed, 31 insertions, 19 deletions
diff --git a/server/lib/activitypub/videos/shared/video-sync-attributes.ts b/server/lib/activitypub/videos/shared/video-sync-attributes.ts
index 8ed1b6447..e3cb96a62 100644
--- a/server/lib/activitypub/videos/shared/video-sync-attributes.ts
+++ b/server/lib/activitypub/videos/shared/video-sync-attributes.ts
@@ -73,10 +73,6 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi
73 return totalItems 73 return totalItems
74} 74}
75 75
76function createJob (payload: ActivitypubHttpFetcherPayload) {
77 return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
78}
79
80function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { 76function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) {
81 const uri = fetchedVideo.shares 77 const uri = fetchedVideo.shares
82 78
@@ -104,3 +100,7 @@ function syncComments (video: MVideo, fetchedVideo: VideoObject, isSync: boolean
104 return crawlCollectionPage<string>(uri, handler, cleaner) 100 return crawlCollectionPage<string>(uri, handler, cleaner)
105 .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err, rootUrl: uri, ...lTags(video.uuid, video.url) })) 101 .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err, rootUrl: uri, ...lTags(video.uuid, video.url) }))
106} 102}
103
104function createJob (payload: ActivitypubHttpFetcherPayload) {
105 return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
106}
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 8dbae8c42..cb2978157 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,7 +1,7 @@
1import { Job } from 'bullmq' 1import { Job } from 'bullmq'
2import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' 2import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg'
3import { Hooks } from '@server/lib/plugins/hooks' 3import { Hooks } from '@server/lib/plugins/hooks'
4import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' 4import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
5import { VideoPathManager } from '@server/lib/video-path-manager' 5import { VideoPathManager } from '@server/lib/video-path-manager'
6import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' 6import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state'
7import { UserModel } from '@server/models/user/user' 7import { UserModel } from '@server/models/user/user'
@@ -27,6 +27,7 @@ import {
27 optimizeOriginalVideofile, 27 optimizeOriginalVideofile,
28 transcodeNewWebTorrentResolution 28 transcodeNewWebTorrentResolution
29} from '../../transcoding/transcoding' 29} from '../../transcoding/transcoding'
30import { JobQueue } from '../job-queue'
30 31
31type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> 32type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
32 33
@@ -248,7 +249,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: {
248 ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) 249 ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ])
249 } 250 }
250 251
251 await addTranscodingJob(hlsTranscodingPayload, jobOptions) 252 await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions))
252 253
253 return true 254 return true
254} 255}
@@ -312,7 +313,7 @@ async function createLowerResolutionsJobs (options: {
312 priority: await getTranscodingJobPriority(user) 313 priority: await getTranscodingJobPriority(user)
313 } 314 }
314 315
315 await addTranscodingJob(dataInput, jobOptions) 316 await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions))
316 } 317 }
317 318
318 if (resolutionCreated.length === 0) { 319 if (resolutionCreated.length === 0) {
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 50d732beb..386d20103 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -325,10 +325,8 @@ class JobQueue {
325 if (!job) continue 325 if (!job) continue
326 326
327 lastJob = { 327 lastJob = {
328 name: 'job', 328 ...this.buildJobFlowOption(job),
329 data: job.payload, 329
330 queueName: job.type,
331 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
332 children: lastJob 330 children: lastJob
333 ? [ lastJob ] 331 ? [ lastJob ]
334 : [] 332 : []
@@ -338,6 +336,23 @@ class JobQueue {
338 return this.flowProducer.add(lastJob) 336 return this.flowProducer.add(lastJob)
339 } 337 }
340 338
339 async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
340 return this.flowProducer.add({
341 ...this.buildJobFlowOption(parent),
342
343 children: children.map(c => this.buildJobFlowOption(c))
344 })
345 }
346
347 private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) {
348 return {
349 name: 'job',
350 data: job.payload,
351 queueName: job.type,
352 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
353 }
354 }
355
341 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { 356 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
342 return { 357 return {
343 backoff: { delay: 60 * 1000, type: 'exponential' }, 358 backoff: { delay: 60 * 1000, type: 'exponential' },
@@ -425,10 +440,6 @@ class JobQueue {
425 } 440 }
426 } 441 }
427 442
428 waitJob (job: Job) {
429 return job.waitUntilFinished(this.queueEvents[job.queueName])
430 }
431
432 private addRepeatableJobs () { 443 private addRepeatableJobs () {
433 this.queues['videos-views-stats'].add('job', {}, { 444 this.queues['videos-views-stats'].add('job', {}, {
434 repeat: REPEAT_JOBS['videos-views-stats'] 445 repeat: REPEAT_JOBS['videos-views-stats']
diff --git a/server/lib/video.ts b/server/lib/video.ts
index f7d7aa186..6c4f3ce7b 100644
--- a/server/lib/video.ts
+++ b/server/lib/video.ts
@@ -9,7 +9,7 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info'
9import { FilteredModelAttributes } from '@server/types' 9import { FilteredModelAttributes } from '@server/types'
10import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' 10import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models'
11import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' 11import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models'
12import { CreateJobOptions, JobQueue } from './job-queue/job-queue' 12import { CreateJobOptions } from './job-queue/job-queue'
13import { updateVideoMiniatureFromExisting } from './thumbnail' 13import { updateVideoMiniatureFromExisting } from './thumbnail'
14 14
15function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> { 15function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> {
@@ -121,10 +121,10 @@ async function buildOptimizeOrMergeAudioJob (options: {
121 } 121 }
122} 122}
123 123
124async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { 124async function buildTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) {
125 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') 125 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
126 126
127 return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options }) 127 return { type: 'video-transcoding' as 'video-transcoding', payload, ...options }
128} 128}
129 129
130async function getTranscodingJobPriority (user: MUserId) { 130async function getTranscodingJobPriority (user: MUserId) {
@@ -182,7 +182,7 @@ export {
182 buildVideoThumbnailsFromReq, 182 buildVideoThumbnailsFromReq,
183 setVideoTags, 183 setVideoTags,
184 buildOptimizeOrMergeAudioJob, 184 buildOptimizeOrMergeAudioJob,
185 addTranscodingJob, 185 buildTranscodingJob,
186 buildMoveToObjectStorageJob, 186 buildMoveToObjectStorageJob,
187 getTranscodingJobPriority, 187 getTranscodingJobPriority,
188 getCachedVideoDuration 188 getCachedVideoDuration