]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Avoid concurrency issue on transcoding
authorChocobozzz <me@florianbigard.com>
Tue, 9 Aug 2022 07:09:31 +0000 (09:09 +0200)
committerChocobozzz <me@florianbigard.com>
Tue, 9 Aug 2022 07:32:17 +0000 (09:32 +0200)
scripts/create-transcoding-job.ts
server/controllers/api/videos/transcoding.ts
server/lib/activitypub/videos/shared/video-sync-attributes.ts
server/lib/job-queue/handlers/video-transcoding.ts
server/lib/job-queue/job-queue.ts
server/lib/video.ts
server/models/video/video.ts
server/tools/peertube-import-videos.ts

index f8c0ed4618e4149457ae88c8a2541d4411fce497..aa97b0ba70de7ddb37a0c3758f650d1fa749bbc9 100755 (executable)
@@ -2,7 +2,7 @@ import { program } from 'commander'
 import { isUUIDValid, toCompleteUUID } from '@server/helpers/custom-validators/misc'
 import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg'
 import { CONFIG } from '@server/initializers/config'
-import { addTranscodingJob } from '@server/lib/video'
+import { buildTranscodingJob } from '@server/lib/video'
 import { VideoState, VideoTranscodingPayload } from '@shared/models'
 import { initDatabaseModels } from '../server/initializers/database'
 import { JobQueue } from '../server/lib/job-queue'
@@ -57,7 +57,7 @@ async function run () {
 
     for (const resolution of resolutionsEnabled) {
       dataInput.push({
-        type: 'new-resolution-to-hls',
+        type: 'new-resolution-to-hls' as 'new-resolution-to-hls',
         videoUUID: video.uuid,
         resolution,
 
@@ -72,7 +72,7 @@ async function run () {
   } else {
     if (options.resolution !== undefined) {
       dataInput.push({
-        type: 'new-resolution-to-webtorrent',
+        type: 'new-resolution-to-webtorrent' as 'new-resolution-to-webtorrent',
         videoUUID: video.uuid,
 
         createHLSIfNeeded: true,
@@ -90,7 +90,7 @@ async function run () {
       }
 
       dataInput.push({
-        type: 'optimize-to-webtorrent',
+        type: 'optimize-to-webtorrent' as 'optimize-to-webtorrent',
         videoUUID: video.uuid,
         isNewVideo: false
       })
@@ -103,7 +103,8 @@ async function run () {
   await video.save()
 
   for (const d of dataInput) {
-    await addTranscodingJob(d, {})
+    await JobQueue.Instance.createJob(await buildTranscodingJob(d))
+
     console.log('Transcoding job for video %s created.', video.uuid)
   }
 }
index b2b71a870aea243c452a96e93c83f628af9fcdec..9aca761c166aa5bb73217c4bac9418678716723b 100644 (file)
@@ -1,10 +1,12 @@
+import Bluebird from 'bluebird'
 import express from 'express'
 import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg'
 import { logger, loggerTagsFactory } from '@server/helpers/logger'
-import { addTranscodingJob } from '@server/lib/video'
+import { JobQueue } from '@server/lib/job-queue'
+import { Hooks } from '@server/lib/plugins/hooks'
+import { buildTranscodingJob } from '@server/lib/video'
 import { HttpStatusCode, UserRight, VideoState, VideoTranscodingCreate } from '@shared/models'
 import { asyncMiddleware, authenticate, createTranscodingValidator, ensureUserHasRight } from '../../../middlewares'
-import { Hooks } from '@server/lib/plugins/hooks'
 
 const lTags = loggerTagsFactory('api', 'video')
 const transcodingRouter = express.Router()
@@ -44,29 +46,81 @@ async function createTranscoding (req: express.Request, res: express.Response) {
   video.state = VideoState.TO_TRANSCODE
   await video.save()
 
-  for (const resolution of resolutions) {
+  const hasAudio = !!audioStream
+  const childrenResolutions = resolutions.filter(r => r !== maxResolution)
+
+  const children = await Bluebird.mapSeries(childrenResolutions, resolution => {
     if (body.transcodingType === 'hls') {
-      await addTranscodingJob({
-        type: 'new-resolution-to-hls',
+      return buildHLSJobOption({
         videoUUID: video.uuid,
+        hasAudio,
         resolution,
-        hasAudio: !!audioStream,
-        copyCodecs: false,
-        isNewVideo: false,
-        autoDeleteWebTorrentIfNeeded: false,
-        isMaxQuality: maxResolution === resolution
+        isMaxQuality: false
       })
-    } else if (body.transcodingType === 'webtorrent') {
-      await addTranscodingJob({
-        type: 'new-resolution-to-webtorrent',
+    }
+
+    if (body.transcodingType === 'webtorrent') {
+      return buildWebTorrentJobOption({
         videoUUID: video.uuid,
-        isNewVideo: false,
-        resolution,
-        hasAudio: !!audioStream,
-        createHLSIfNeeded: false
+        hasAudio,
+        resolution
       })
     }
-  }
+  })
+
+  const parent = body.transcodingType === 'hls'
+    ? await buildHLSJobOption({
+      videoUUID: video.uuid,
+      hasAudio,
+      resolution: maxResolution,
+      isMaxQuality: false
+    })
+    : await buildWebTorrentJobOption({
+      videoUUID: video.uuid,
+      hasAudio,
+      resolution: maxResolution
+    })
+
+  // Porcess the last resolution after the other ones to prevent concurrency issue
+  // Because low resolutions use the biggest one as ffmpeg input
+  await JobQueue.Instance.createJobWithChildren(parent, children)
 
   return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
 }
+
+function buildHLSJobOption (options: {
+  videoUUID: string
+  hasAudio: boolean
+  resolution: number
+  isMaxQuality: boolean
+}) {
+  const { videoUUID, hasAudio, resolution, isMaxQuality } = options
+
+  return buildTranscodingJob({
+    type: 'new-resolution-to-hls',
+    videoUUID,
+    resolution,
+    hasAudio,
+    copyCodecs: false,
+    isNewVideo: false,
+    autoDeleteWebTorrentIfNeeded: false,
+    isMaxQuality
+  })
+}
+
+function buildWebTorrentJobOption (options: {
+  videoUUID: string
+  hasAudio: boolean
+  resolution: number
+}) {
+  const { videoUUID, hasAudio, resolution } = options
+
+  return buildTranscodingJob({
+    type: 'new-resolution-to-webtorrent',
+    videoUUID,
+    isNewVideo: false,
+    resolution,
+    hasAudio,
+    createHLSIfNeeded: false
+  })
+}
index 8ed1b6447adb0ea0372e885681b6f77003745b18..e3cb96a62958121a9247d3b97906229dcd99ac2b 100644 (file)
@@ -73,10 +73,6 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi
   return totalItems
 }
 
-function createJob (payload: ActivitypubHttpFetcherPayload) {
-  return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
-}
-
 function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) {
   const uri = fetchedVideo.shares
 
@@ -104,3 +100,7 @@ function syncComments (video: MVideo, fetchedVideo: VideoObject, isSync: boolean
   return crawlCollectionPage<string>(uri, handler, cleaner)
     .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err, rootUrl: uri, ...lTags(video.uuid, video.url) }))
 }
+
+function createJob (payload: ActivitypubHttpFetcherPayload) {
+  return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
+}
index 8dbae8c423490a1679da4506544f382bf35b2c0a..cb297815703dda93cb5680692207ee519637d1c6 100644 (file)
@@ -1,7 +1,7 @@
 import { Job } from 'bullmq'
 import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg'
 import { Hooks } from '@server/lib/plugins/hooks'
-import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
+import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
 import { VideoPathManager } from '@server/lib/video-path-manager'
 import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state'
 import { UserModel } from '@server/models/user/user'
@@ -27,6 +27,7 @@ import {
   optimizeOriginalVideofile,
   transcodeNewWebTorrentResolution
 } from '../../transcoding/transcoding'
+import { JobQueue } from '../job-queue'
 
 type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
 
@@ -248,7 +249,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: {
     ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ])
   }
 
-  await addTranscodingJob(hlsTranscodingPayload, jobOptions)
+  await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions))
 
   return true
 }
@@ -312,7 +313,7 @@ async function createLowerResolutionsJobs (options: {
       priority: await getTranscodingJobPriority(user)
     }
 
-    await addTranscodingJob(dataInput, jobOptions)
+    await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions))
   }
 
   if (resolutionCreated.length === 0) {
index 50d732bebcf5c05c9df0c1eac2b12d3c6585ab24..386d20103716711e22c90152fee273b96f7c9022 100644 (file)
@@ -325,10 +325,8 @@ class JobQueue {
       if (!job) continue
 
       lastJob = {
-        name: 'job',
-        data: job.payload,
-        queueName: job.type,
-        opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
+        ...this.buildJobFlowOption(job),
+
         children: lastJob
           ? [ lastJob ]
           : []
@@ -338,6 +336,23 @@ class JobQueue {
     return this.flowProducer.add(lastJob)
   }
 
+  async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
+    return this.flowProducer.add({
+      ...this.buildJobFlowOption(parent),
+
+      children: children.map(c => this.buildJobFlowOption(c))
+    })
+  }
+
+  private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) {
+    return {
+      name: 'job',
+      data: job.payload,
+      queueName: job.type,
+      opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
+    }
+  }
+
   private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
     return {
       backoff: { delay: 60 * 1000, type: 'exponential' },
@@ -425,10 +440,6 @@ class JobQueue {
     }
   }
 
-  waitJob (job: Job) {
-    return job.waitUntilFinished(this.queueEvents[job.queueName])
-  }
-
   private addRepeatableJobs () {
     this.queues['videos-views-stats'].add('job', {}, {
       repeat: REPEAT_JOBS['videos-views-stats']
index f7d7aa186bf589db7279dc7252e6851ea23d512d..6c4f3ce7b0c00826bd8eab505d9d6f02f640b6eb 100644 (file)
@@ -9,7 +9,7 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info'
 import { FilteredModelAttributes } from '@server/types'
 import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models'
 import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models'
-import { CreateJobOptions, JobQueue } from './job-queue/job-queue'
+import { CreateJobOptions } from './job-queue/job-queue'
 import { updateVideoMiniatureFromExisting } from './thumbnail'
 
 function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> {
@@ -121,10 +121,10 @@ async function buildOptimizeOrMergeAudioJob (options: {
   }
 }
 
-async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) {
+async function buildTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) {
   await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
 
-  return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options })
+  return { type: 'video-transcoding' as 'video-transcoding', payload, ...options }
 }
 
 async function getTranscodingJobPriority (user: MUserId) {
@@ -182,7 +182,7 @@ export {
   buildVideoThumbnailsFromReq,
   setVideoTags,
   buildOptimizeOrMergeAudioJob,
-  addTranscodingJob,
+  buildTranscodingJob,
   buildMoveToObjectStorageJob,
   getTranscodingJobPriority,
   getCachedVideoDuration
index b8e383502c133c82e95c60a60602708fa11ab3e0..a8ea67c395379a22090f396268bafefc434dbeb2 100644 (file)
@@ -1592,22 +1592,21 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> {
   }
 
   getQualityFileBy<T extends MVideoWithFile> (this: T, fun: (files: MVideoFile[], it: (file: MVideoFile) => number) => MVideoFile) {
-    // We first transcode to WebTorrent format, so try this array first
-    if (Array.isArray(this.VideoFiles) && this.VideoFiles.length !== 0) {
-      const file = fun(this.VideoFiles, file => file.resolution)
+    const files = this.getAllFiles()
+    const file = fun(files, file => file.resolution)
+    if (!file) return undefined
 
+    if (file.videoId) {
       return Object.assign(file, { Video: this })
     }
 
-    // No webtorrent files, try with streaming playlist files
-    if (Array.isArray(this.VideoStreamingPlaylists) && this.VideoStreamingPlaylists.length !== 0) {
+    if (file.videoStreamingPlaylistId) {
       const streamingPlaylistWithVideo = Object.assign(this.VideoStreamingPlaylists[0], { Video: this })
 
-      const file = fun(streamingPlaylistWithVideo.VideoFiles, file => file.resolution)
       return Object.assign(file, { VideoStreamingPlaylist: streamingPlaylistWithVideo })
     }
 
-    return undefined
+    throw new Error('File is not associated to a video of a playlist')
   }
 
   getMaxQualityFile<T extends MVideoWithFile> (this: T): MVideoFileVideo | MVideoFileStreamingPlaylistVideo {
index e2f80c703c4bbd0962f97e78f420cac3219f459d..76338ea3c750c90fad2703b365fff889d7090390 100644 (file)
@@ -165,7 +165,7 @@ async function processVideo (parameters: {
     const youtubeDLBinary = await YoutubeDLCLI.safeGet()
     const output = await youtubeDLBinary.download({
       url: videoInfo.url,
-      format: YoutubeDLCLI.getYoutubeDLVideoFormat([]),
+      format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false),
       output: path,
       additionalYoutubeDLArgs: command.args,
       processOptions
@@ -251,7 +251,7 @@ async function fetchObject (info: any) {
   const youtubeDLCLI = await YoutubeDLCLI.safeGet()
   const result = await youtubeDLCLI.getInfo({
     url,
-    format: YoutubeDLCLI.getYoutubeDLVideoFormat([]),
+    format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false),
     processOptions
   })
 
@@ -336,7 +336,7 @@ function exitError (message: string, ...meta: any[]) {
 function getYoutubeDLInfo (youtubeDLCLI: YoutubeDLCLI, url: string, args: string[]) {
   return youtubeDLCLI.getInfo({
     url,
-    format: YoutubeDLCLI.getYoutubeDLVideoFormat([]),
+    format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false),
     additionalYoutubeDLArgs: [ '-j', '--flat-playlist', '--playlist-reverse', ...args ],
     processOptions
   })