aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-09 09:09:31 +0200
committerChocobozzz <me@florianbigard.com>2022-08-09 09:32:17 +0200
commitb42c2c7e89a64ed730d8140840fe74a13c31f2a4 (patch)
tree715e7ad31d03881e3f3530dba1fe3d172251249b
parentbd911b54b555b11df7e9849cf92d358bccfecf6e (diff)
downloadPeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.tar.gz
PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.tar.zst
PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.zip
Avoid concurrency issue on transcoding
-rwxr-xr-xscripts/create-transcoding-job.ts11
-rw-r--r--server/controllers/api/videos/transcoding.ts90
-rw-r--r--server/lib/activitypub/videos/shared/video-sync-attributes.ts8
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts7
-rw-r--r--server/lib/job-queue/job-queue.ts27
-rw-r--r--server/lib/video.ts8
-rw-r--r--server/models/video/video.ts13
-rw-r--r--server/tools/peertube-import-videos.ts6
8 files changed, 118 insertions, 52 deletions
diff --git a/scripts/create-transcoding-job.ts b/scripts/create-transcoding-job.ts
index f8c0ed461..aa97b0ba7 100755
--- a/scripts/create-transcoding-job.ts
+++ b/scripts/create-transcoding-job.ts
@@ -2,7 +2,7 @@ import { program } from 'commander'
2import { isUUIDValid, toCompleteUUID } from '@server/helpers/custom-validators/misc' 2import { isUUIDValid, toCompleteUUID } from '@server/helpers/custom-validators/misc'
3import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg' 3import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg'
4import { CONFIG } from '@server/initializers/config' 4import { CONFIG } from '@server/initializers/config'
5import { addTranscodingJob } from '@server/lib/video' 5import { buildTranscodingJob } from '@server/lib/video'
6import { VideoState, VideoTranscodingPayload } from '@shared/models' 6import { VideoState, VideoTranscodingPayload } from '@shared/models'
7import { initDatabaseModels } from '../server/initializers/database' 7import { initDatabaseModels } from '../server/initializers/database'
8import { JobQueue } from '../server/lib/job-queue' 8import { JobQueue } from '../server/lib/job-queue'
@@ -57,7 +57,7 @@ async function run () {
57 57
58 for (const resolution of resolutionsEnabled) { 58 for (const resolution of resolutionsEnabled) {
59 dataInput.push({ 59 dataInput.push({
60 type: 'new-resolution-to-hls', 60 type: 'new-resolution-to-hls' as 'new-resolution-to-hls',
61 videoUUID: video.uuid, 61 videoUUID: video.uuid,
62 resolution, 62 resolution,
63 63
@@ -72,7 +72,7 @@ async function run () {
72 } else { 72 } else {
73 if (options.resolution !== undefined) { 73 if (options.resolution !== undefined) {
74 dataInput.push({ 74 dataInput.push({
75 type: 'new-resolution-to-webtorrent', 75 type: 'new-resolution-to-webtorrent' as 'new-resolution-to-webtorrent',
76 videoUUID: video.uuid, 76 videoUUID: video.uuid,
77 77
78 createHLSIfNeeded: true, 78 createHLSIfNeeded: true,
@@ -90,7 +90,7 @@ async function run () {
90 } 90 }
91 91
92 dataInput.push({ 92 dataInput.push({
93 type: 'optimize-to-webtorrent', 93 type: 'optimize-to-webtorrent' as 'optimize-to-webtorrent',
94 videoUUID: video.uuid, 94 videoUUID: video.uuid,
95 isNewVideo: false 95 isNewVideo: false
96 }) 96 })
@@ -103,7 +103,8 @@ async function run () {
103 await video.save() 103 await video.save()
104 104
105 for (const d of dataInput) { 105 for (const d of dataInput) {
106 await addTranscodingJob(d, {}) 106 await JobQueue.Instance.createJob(await buildTranscodingJob(d))
107
107 console.log('Transcoding job for video %s created.', video.uuid) 108 console.log('Transcoding job for video %s created.', video.uuid)
108 } 109 }
109} 110}
diff --git a/server/controllers/api/videos/transcoding.ts b/server/controllers/api/videos/transcoding.ts
index b2b71a870..9aca761c1 100644
--- a/server/controllers/api/videos/transcoding.ts
+++ b/server/controllers/api/videos/transcoding.ts
@@ -1,10 +1,12 @@
1import Bluebird from 'bluebird'
1import express from 'express' 2import express from 'express'
2import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg' 3import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg'
3import { logger, loggerTagsFactory } from '@server/helpers/logger' 4import { logger, loggerTagsFactory } from '@server/helpers/logger'
4import { addTranscodingJob } from '@server/lib/video' 5import { JobQueue } from '@server/lib/job-queue'
6import { Hooks } from '@server/lib/plugins/hooks'
7import { buildTranscodingJob } from '@server/lib/video'
5import { HttpStatusCode, UserRight, VideoState, VideoTranscodingCreate } from '@shared/models' 8import { HttpStatusCode, UserRight, VideoState, VideoTranscodingCreate } from '@shared/models'
6import { asyncMiddleware, authenticate, createTranscodingValidator, ensureUserHasRight } from '../../../middlewares' 9import { asyncMiddleware, authenticate, createTranscodingValidator, ensureUserHasRight } from '../../../middlewares'
7import { Hooks } from '@server/lib/plugins/hooks'
8 10
9const lTags = loggerTagsFactory('api', 'video') 11const lTags = loggerTagsFactory('api', 'video')
10const transcodingRouter = express.Router() 12const transcodingRouter = express.Router()
@@ -44,29 +46,81 @@ async function createTranscoding (req: express.Request, res: express.Response) {
44 video.state = VideoState.TO_TRANSCODE 46 video.state = VideoState.TO_TRANSCODE
45 await video.save() 47 await video.save()
46 48
47 for (const resolution of resolutions) { 49 const hasAudio = !!audioStream
50 const childrenResolutions = resolutions.filter(r => r !== maxResolution)
51
52 const children = await Bluebird.mapSeries(childrenResolutions, resolution => {
48 if (body.transcodingType === 'hls') { 53 if (body.transcodingType === 'hls') {
49 await addTranscodingJob({ 54 return buildHLSJobOption({
50 type: 'new-resolution-to-hls',
51 videoUUID: video.uuid, 55 videoUUID: video.uuid,
56 hasAudio,
52 resolution, 57 resolution,
53 hasAudio: !!audioStream, 58 isMaxQuality: false
54 copyCodecs: false,
55 isNewVideo: false,
56 autoDeleteWebTorrentIfNeeded: false,
57 isMaxQuality: maxResolution === resolution
58 }) 59 })
59 } else if (body.transcodingType === 'webtorrent') { 60 }
60 await addTranscodingJob({ 61
61 type: 'new-resolution-to-webtorrent', 62 if (body.transcodingType === 'webtorrent') {
63 return buildWebTorrentJobOption({
62 videoUUID: video.uuid, 64 videoUUID: video.uuid,
63 isNewVideo: false, 65 hasAudio,
64 resolution, 66 resolution
65 hasAudio: !!audioStream,
66 createHLSIfNeeded: false
67 }) 67 })
68 } 68 }
69 } 69 })
70
71 const parent = body.transcodingType === 'hls'
72 ? await buildHLSJobOption({
73 videoUUID: video.uuid,
74 hasAudio,
75 resolution: maxResolution,
76 isMaxQuality: false
77 })
78 : await buildWebTorrentJobOption({
79 videoUUID: video.uuid,
80 hasAudio,
81 resolution: maxResolution
82 })
83
84 // Porcess the last resolution after the other ones to prevent concurrency issue
85 // Because low resolutions use the biggest one as ffmpeg input
86 await JobQueue.Instance.createJobWithChildren(parent, children)
70 87
71 return res.sendStatus(HttpStatusCode.NO_CONTENT_204) 88 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
72} 89}
90
91function buildHLSJobOption (options: {
92 videoUUID: string
93 hasAudio: boolean
94 resolution: number
95 isMaxQuality: boolean
96}) {
97 const { videoUUID, hasAudio, resolution, isMaxQuality } = options
98
99 return buildTranscodingJob({
100 type: 'new-resolution-to-hls',
101 videoUUID,
102 resolution,
103 hasAudio,
104 copyCodecs: false,
105 isNewVideo: false,
106 autoDeleteWebTorrentIfNeeded: false,
107 isMaxQuality
108 })
109}
110
111function buildWebTorrentJobOption (options: {
112 videoUUID: string
113 hasAudio: boolean
114 resolution: number
115}) {
116 const { videoUUID, hasAudio, resolution } = options
117
118 return buildTranscodingJob({
119 type: 'new-resolution-to-webtorrent',
120 videoUUID,
121 isNewVideo: false,
122 resolution,
123 hasAudio,
124 createHLSIfNeeded: false
125 })
126}
diff --git a/server/lib/activitypub/videos/shared/video-sync-attributes.ts b/server/lib/activitypub/videos/shared/video-sync-attributes.ts
index 8ed1b6447..e3cb96a62 100644
--- a/server/lib/activitypub/videos/shared/video-sync-attributes.ts
+++ b/server/lib/activitypub/videos/shared/video-sync-attributes.ts
@@ -73,10 +73,6 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi
73 return totalItems 73 return totalItems
74} 74}
75 75
76function createJob (payload: ActivitypubHttpFetcherPayload) {
77 return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
78}
79
80function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { 76function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) {
81 const uri = fetchedVideo.shares 77 const uri = fetchedVideo.shares
82 78
@@ -104,3 +100,7 @@ function syncComments (video: MVideo, fetchedVideo: VideoObject, isSync: boolean
104 return crawlCollectionPage<string>(uri, handler, cleaner) 100 return crawlCollectionPage<string>(uri, handler, cleaner)
105 .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err, rootUrl: uri, ...lTags(video.uuid, video.url) })) 101 .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err, rootUrl: uri, ...lTags(video.uuid, video.url) }))
106} 102}
103
104function createJob (payload: ActivitypubHttpFetcherPayload) {
105 return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
106}
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 8dbae8c42..cb2978157 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,7 +1,7 @@
1import { Job } from 'bullmq' 1import { Job } from 'bullmq'
2import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' 2import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg'
3import { Hooks } from '@server/lib/plugins/hooks' 3import { Hooks } from '@server/lib/plugins/hooks'
4import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' 4import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
5import { VideoPathManager } from '@server/lib/video-path-manager' 5import { VideoPathManager } from '@server/lib/video-path-manager'
6import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' 6import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state'
7import { UserModel } from '@server/models/user/user' 7import { UserModel } from '@server/models/user/user'
@@ -27,6 +27,7 @@ import {
27 optimizeOriginalVideofile, 27 optimizeOriginalVideofile,
28 transcodeNewWebTorrentResolution 28 transcodeNewWebTorrentResolution
29} from '../../transcoding/transcoding' 29} from '../../transcoding/transcoding'
30import { JobQueue } from '../job-queue'
30 31
31type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> 32type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
32 33
@@ -248,7 +249,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: {
248 ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) 249 ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ])
249 } 250 }
250 251
251 await addTranscodingJob(hlsTranscodingPayload, jobOptions) 252 await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions))
252 253
253 return true 254 return true
254} 255}
@@ -312,7 +313,7 @@ async function createLowerResolutionsJobs (options: {
312 priority: await getTranscodingJobPriority(user) 313 priority: await getTranscodingJobPriority(user)
313 } 314 }
314 315
315 await addTranscodingJob(dataInput, jobOptions) 316 await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions))
316 } 317 }
317 318
318 if (resolutionCreated.length === 0) { 319 if (resolutionCreated.length === 0) {
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 50d732beb..386d20103 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -325,10 +325,8 @@ class JobQueue {
325 if (!job) continue 325 if (!job) continue
326 326
327 lastJob = { 327 lastJob = {
328 name: 'job', 328 ...this.buildJobFlowOption(job),
329 data: job.payload, 329
330 queueName: job.type,
331 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
332 children: lastJob 330 children: lastJob
333 ? [ lastJob ] 331 ? [ lastJob ]
334 : [] 332 : []
@@ -338,6 +336,23 @@ class JobQueue {
338 return this.flowProducer.add(lastJob) 336 return this.flowProducer.add(lastJob)
339 } 337 }
340 338
339 async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
340 return this.flowProducer.add({
341 ...this.buildJobFlowOption(parent),
342
343 children: children.map(c => this.buildJobFlowOption(c))
344 })
345 }
346
347 private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) {
348 return {
349 name: 'job',
350 data: job.payload,
351 queueName: job.type,
352 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
353 }
354 }
355
341 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { 356 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
342 return { 357 return {
343 backoff: { delay: 60 * 1000, type: 'exponential' }, 358 backoff: { delay: 60 * 1000, type: 'exponential' },
@@ -425,10 +440,6 @@ class JobQueue {
425 } 440 }
426 } 441 }
427 442
428 waitJob (job: Job) {
429 return job.waitUntilFinished(this.queueEvents[job.queueName])
430 }
431
432 private addRepeatableJobs () { 443 private addRepeatableJobs () {
433 this.queues['videos-views-stats'].add('job', {}, { 444 this.queues['videos-views-stats'].add('job', {}, {
434 repeat: REPEAT_JOBS['videos-views-stats'] 445 repeat: REPEAT_JOBS['videos-views-stats']
diff --git a/server/lib/video.ts b/server/lib/video.ts
index f7d7aa186..6c4f3ce7b 100644
--- a/server/lib/video.ts
+++ b/server/lib/video.ts
@@ -9,7 +9,7 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info'
9import { FilteredModelAttributes } from '@server/types' 9import { FilteredModelAttributes } from '@server/types'
10import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' 10import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models'
11import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' 11import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models'
12import { CreateJobOptions, JobQueue } from './job-queue/job-queue' 12import { CreateJobOptions } from './job-queue/job-queue'
13import { updateVideoMiniatureFromExisting } from './thumbnail' 13import { updateVideoMiniatureFromExisting } from './thumbnail'
14 14
15function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> { 15function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> {
@@ -121,10 +121,10 @@ async function buildOptimizeOrMergeAudioJob (options: {
121 } 121 }
122} 122}
123 123
124async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { 124async function buildTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) {
125 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') 125 await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
126 126
127 return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options }) 127 return { type: 'video-transcoding' as 'video-transcoding', payload, ...options }
128} 128}
129 129
130async function getTranscodingJobPriority (user: MUserId) { 130async function getTranscodingJobPriority (user: MUserId) {
@@ -182,7 +182,7 @@ export {
182 buildVideoThumbnailsFromReq, 182 buildVideoThumbnailsFromReq,
183 setVideoTags, 183 setVideoTags,
184 buildOptimizeOrMergeAudioJob, 184 buildOptimizeOrMergeAudioJob,
185 addTranscodingJob, 185 buildTranscodingJob,
186 buildMoveToObjectStorageJob, 186 buildMoveToObjectStorageJob,
187 getTranscodingJobPriority, 187 getTranscodingJobPriority,
188 getCachedVideoDuration 188 getCachedVideoDuration
diff --git a/server/models/video/video.ts b/server/models/video/video.ts
index b8e383502..a8ea67c39 100644
--- a/server/models/video/video.ts
+++ b/server/models/video/video.ts
@@ -1592,22 +1592,21 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> {
1592 } 1592 }
1593 1593
1594 getQualityFileBy<T extends MVideoWithFile> (this: T, fun: (files: MVideoFile[], it: (file: MVideoFile) => number) => MVideoFile) { 1594 getQualityFileBy<T extends MVideoWithFile> (this: T, fun: (files: MVideoFile[], it: (file: MVideoFile) => number) => MVideoFile) {
1595 // We first transcode to WebTorrent format, so try this array first 1595 const files = this.getAllFiles()
1596 if (Array.isArray(this.VideoFiles) && this.VideoFiles.length !== 0) { 1596 const file = fun(files, file => file.resolution)
1597 const file = fun(this.VideoFiles, file => file.resolution) 1597 if (!file) return undefined
1598 1598
1599 if (file.videoId) {
1599 return Object.assign(file, { Video: this }) 1600 return Object.assign(file, { Video: this })
1600 } 1601 }
1601 1602
1602 // No webtorrent files, try with streaming playlist files 1603 if (file.videoStreamingPlaylistId) {
1603 if (Array.isArray(this.VideoStreamingPlaylists) && this.VideoStreamingPlaylists.length !== 0) {
1604 const streamingPlaylistWithVideo = Object.assign(this.VideoStreamingPlaylists[0], { Video: this }) 1604 const streamingPlaylistWithVideo = Object.assign(this.VideoStreamingPlaylists[0], { Video: this })
1605 1605
1606 const file = fun(streamingPlaylistWithVideo.VideoFiles, file => file.resolution)
1607 return Object.assign(file, { VideoStreamingPlaylist: streamingPlaylistWithVideo }) 1606 return Object.assign(file, { VideoStreamingPlaylist: streamingPlaylistWithVideo })
1608 } 1607 }
1609 1608
1610 return undefined 1609 throw new Error('File is not associated to a video of a playlist')
1611 } 1610 }
1612 1611
1613 getMaxQualityFile<T extends MVideoWithFile> (this: T): MVideoFileVideo | MVideoFileStreamingPlaylistVideo { 1612 getMaxQualityFile<T extends MVideoWithFile> (this: T): MVideoFileVideo | MVideoFileStreamingPlaylistVideo {
diff --git a/server/tools/peertube-import-videos.ts b/server/tools/peertube-import-videos.ts
index e2f80c703..76338ea3c 100644
--- a/server/tools/peertube-import-videos.ts
+++ b/server/tools/peertube-import-videos.ts
@@ -165,7 +165,7 @@ async function processVideo (parameters: {
165 const youtubeDLBinary = await YoutubeDLCLI.safeGet() 165 const youtubeDLBinary = await YoutubeDLCLI.safeGet()
166 const output = await youtubeDLBinary.download({ 166 const output = await youtubeDLBinary.download({
167 url: videoInfo.url, 167 url: videoInfo.url,
168 format: YoutubeDLCLI.getYoutubeDLVideoFormat([]), 168 format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false),
169 output: path, 169 output: path,
170 additionalYoutubeDLArgs: command.args, 170 additionalYoutubeDLArgs: command.args,
171 processOptions 171 processOptions
@@ -251,7 +251,7 @@ async function fetchObject (info: any) {
251 const youtubeDLCLI = await YoutubeDLCLI.safeGet() 251 const youtubeDLCLI = await YoutubeDLCLI.safeGet()
252 const result = await youtubeDLCLI.getInfo({ 252 const result = await youtubeDLCLI.getInfo({
253 url, 253 url,
254 format: YoutubeDLCLI.getYoutubeDLVideoFormat([]), 254 format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false),
255 processOptions 255 processOptions
256 }) 256 })
257 257
@@ -336,7 +336,7 @@ function exitError (message: string, ...meta: any[]) {
336function getYoutubeDLInfo (youtubeDLCLI: YoutubeDLCLI, url: string, args: string[]) { 336function getYoutubeDLInfo (youtubeDLCLI: YoutubeDLCLI, url: string, args: string[]) {
337 return youtubeDLCLI.getInfo({ 337 return youtubeDLCLI.getInfo({
338 url, 338 url,
339 format: YoutubeDLCLI.getYoutubeDLVideoFormat([]), 339 format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false),
340 additionalYoutubeDLArgs: [ '-j', '--flat-playlist', '--playlist-reverse', ...args ], 340 additionalYoutubeDLArgs: [ '-j', '--flat-playlist', '--playlist-reverse', ...args ],
341 processOptions 341 processOptions
342 }) 342 })