]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/handlers/video-transcoding.ts
Add ability to remove hls/webtorrent files
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / handlers / video-transcoding.ts
index 0f6b3f753071d63cae69dbfa3efc6b344935ae12..904ef2e3cf714ee796638071a60a975928143dc2 100644 (file)
@@ -1,8 +1,11 @@
-import * as Bull from 'bull'
+import { Job } from 'bull'
 import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils'
-import { publishAndFederateIfNeeded } from '@server/lib/video'
-import { getVideoFilePath } from '@server/lib/video-paths'
-import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models'
+import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
+import { VideoPathManager } from '@server/lib/video-path-manager'
+import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state'
+import { UserModel } from '@server/models/user/user'
+import { VideoJobInfoModel } from '@server/models/video/video-job-info'
+import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models'
 import {
   HLSTranscodingPayload,
   MergeAudioTranscodingPayload,
@@ -12,56 +15,55 @@ import {
 } from '../../../../shared'
 import { retryTransactionWrapper } from '../../../helpers/database-utils'
 import { computeResolutionsToTranscode } from '../../../helpers/ffprobe-utils'
-import { logger } from '../../../helpers/logger'
+import { logger, loggerTagsFactory } from '../../../helpers/logger'
 import { CONFIG } from '../../../initializers/config'
-import { sequelizeTypescript } from '../../../initializers/database'
 import { VideoModel } from '../../../models/video/video'
-import { federateVideoIfNeeded } from '../../activitypub/videos'
-import { Notifier } from '../../notifier'
 import {
   generateHlsPlaylistResolution,
   mergeAudioVideofile,
   optimizeOriginalVideofile,
   transcodeNewWebTorrentResolution
-} from '../../video-transcoding'
-import { JobQueue } from '../job-queue'
+} from '../../transcoding/video-transcoding'
 
-const handlers: { [ id: string ]: (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight) => Promise<any> } = {
-  // Deprecated, introduced in 3.1
-  'hls': handleHLSJob,
-  'new-resolution-to-hls': handleHLSJob,
+type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
 
-  // Deprecated, introduced in 3.1
-  'new-resolution': handleNewWebTorrentResolutionJob,
+const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = {
+  'new-resolution-to-hls': handleHLSJob,
   'new-resolution-to-webtorrent': handleNewWebTorrentResolutionJob,
-
-  // Deprecated, introduced in 3.1
-  'merge-audio': handleWebTorrentMergeAudioJob,
   'merge-audio-to-webtorrent': handleWebTorrentMergeAudioJob,
-
-  // Deprecated, introduced in 3.1
-  'optimize': handleWebTorrentOptimizeJob,
   'optimize-to-webtorrent': handleWebTorrentOptimizeJob
 }
 
-async function processVideoTranscoding (job: Bull.Job) {
+const lTags = loggerTagsFactory('transcoding')
+
+async function processVideoTranscoding (job: Job) {
   const payload = job.data as VideoTranscodingPayload
-  logger.info('Processing video file in job %d.', job.id)
+  logger.info('Processing transcoding job %d.', job.id, lTags(payload.videoUUID))
 
   const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID)
   // No video, maybe deleted?
   if (!video) {
-    logger.info('Do not process job %d, video does not exist.', job.id)
+    logger.info('Do not process job %d, video does not exist.', job.id, lTags(payload.videoUUID))
     return undefined
   }
 
+  const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId)
+
   const handler = handlers[payload.type]
 
   if (!handler) {
+    await moveToFailedTranscodingState(video)
+
     throw new Error('Cannot find transcoding handler for ' + payload.type)
   }
 
-  await handler(job, payload, video)
+  try {
+    await handler(job, payload, video, user)
+  } catch (error) {
+    await moveToFailedTranscodingState(video)
+
+    throw error
+  }
 
   return video
 }
@@ -70,184 +72,247 @@ async function processVideoTranscoding (job: Bull.Job) {
 // Job handlers
 // ---------------------------------------------------------------------------
 
-async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, video: MVideoFullLight) {
+async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) {
+  logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid))
+
   const videoFileInput = payload.copyCodecs
     ? video.getWebTorrentFile(payload.resolution)
     : video.getMaxQualityFile()
 
   const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
-  const videoInputPath = getVideoFilePath(videoOrStreamingPlaylist, videoFileInput)
 
-  await generateHlsPlaylistResolution({
-    video,
-    videoInputPath,
-    resolution: payload.resolution,
-    copyCodecs: payload.copyCodecs,
-    isPortraitMode: payload.isPortraitMode || false,
-    job
+  await VideoPathManager.Instance.makeAvailableVideoFile(videoOrStreamingPlaylist, videoFileInput, videoInputPath => {
+    return generateHlsPlaylistResolution({
+      video,
+      videoInputPath,
+      resolution: payload.resolution,
+      copyCodecs: payload.copyCodecs,
+      isPortraitMode: payload.isPortraitMode || false,
+      job
+    })
   })
 
-  await retryTransactionWrapper(onHlsPlaylistGeneration, video)
+  logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid))
+
+  await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload)
 }
 
-async function handleNewWebTorrentResolutionJob (job: Bull.Job, payload: NewResolutionTranscodingPayload, video: MVideoFullLight) {
+async function handleNewWebTorrentResolutionJob (
+  job: Job,
+  payload: NewResolutionTranscodingPayload,
+  video: MVideoFullLight,
+  user: MUserId
+) {
+  logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid))
+
   await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job)
 
-  await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload)
+  logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid))
+
+  await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload)
 }
 
-async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight) {
+async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) {
+  logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid))
+
   await mergeAudioVideofile(video, payload.resolution, job)
 
-  await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload)
+  logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid))
+
+  await retryTransactionWrapper(onVideoFileOptimizer, video, payload, 'video', user)
 }
 
-async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight) {
-  const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
+async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) {
+  logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid))
+
+  const { transcodeType } = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
 
-  await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType)
+  logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid))
+
+  await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user)
 }
 
 // ---------------------------------------------------------------------------
 
-async function onHlsPlaylistGeneration (video: MVideoFullLight) {
-  if (video === undefined) return undefined
-
-  // We generated the HLS playlist, we don't need the webtorrent files anymore if the admin disabled it
-  if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) {
+async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) {
+  if (payload.isMaxQuality && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) {
+    // Remove webtorrent files if not enabled
     for (const file of video.VideoFiles) {
-      await video.removeFile(file)
+      await video.removeWebTorrentFileAndTorrent(file)
       await file.destroy()
     }
 
     video.VideoFiles = []
+
+    // Create HLS new resolution jobs
+    await createLowerResolutionsJobs({
+      video,
+      user,
+      videoFileResolution: payload.resolution,
+      isPortraitMode: payload.isPortraitMode,
+      isNewVideo: payload.isNewVideo ?? true,
+      type: 'hls'
+    })
   }
 
-  return publishAndFederateIfNeeded(video)
+  await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
+  await moveToNextState(video, payload.isNewVideo)
 }
 
 async function onVideoFileOptimizer (
   videoArg: MVideoWithFile,
-  payload: OptimizeTranscodingPayload,
-  transcodeType: TranscodeOptionsType
+  payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload,
+  transcodeType: TranscodeOptionsType,
+  user: MUserId
 ) {
-  if (videoArg === undefined) return undefined
-
-  // Outside the transaction (IO on disk)
-  const { videoFileResolution, isPortraitMode } = await videoArg.getMaxQualityResolution()
-
-  const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
-    // Maybe the video changed in database, refresh it
-    const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t)
-    // Video does not exist anymore
-    if (!videoDatabase) return undefined
-
-    // Create transcoding jobs if there are enabled resolutions
-    const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod')
-    logger.info(
-      'Resolutions computed for video %s and origin file resolution of %d.', videoDatabase.uuid, videoFileResolution,
-      { resolutions: resolutionsEnabled }
-    )
-
-    let videoPublished = false
-
-    // Generate HLS version of the original file
-    const originalFileHLSPayload = Object.assign({}, payload, {
-      isPortraitMode,
-      resolution: videoDatabase.getMaxQualityFile().resolution,
-      // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues
-      copyCodecs: transcodeType !== 'quick-transcode'
-    })
-    createHlsJobIfEnabled(originalFileHLSPayload)
-
-    const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, videoFileResolution, isPortraitMode)
-
-    if (!hasNewResolutions) {
-      // No transcoding to do, it's now published
-      videoPublished = await videoDatabase.publishIfNeededAndSave(t)
-    }
-
-    await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t)
-
-    return { videoDatabase, videoPublished }
+  const { resolution, isPortraitMode } = await videoArg.getMaxQualityResolution()
+
+  // Maybe the video changed in database, refresh it
+  const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid)
+  // Video does not exist anymore
+  if (!videoDatabase) return undefined
+
+  // Generate HLS version of the original file
+  const originalFileHLSPayload = {
+    ...payload,
+
+    isPortraitMode,
+    resolution: videoDatabase.getMaxQualityFile().resolution,
+    // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues
+    copyCodecs: transcodeType !== 'quick-transcode',
+    isMaxQuality: true
+  }
+  const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload)
+  const hasNewResolutions = await createLowerResolutionsJobs({
+    video: videoDatabase,
+    user,
+    videoFileResolution: resolution,
+    isPortraitMode,
+    type: 'webtorrent',
+    isNewVideo: payload.isNewVideo ?? true
   })
 
-  if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase)
-  if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
+  await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode')
+
+  // Move to next state if there are no other resolutions to generate
+  if (!hasHls && !hasNewResolutions) {
+    await moveToNextState(videoDatabase, payload.isNewVideo)
+  }
 }
 
 async function onNewWebTorrentFileResolution (
-  video: MVideoUUID,
-  payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload
+  video: MVideo,
+  user: MUserId,
+  payload: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload
 ) {
-  await publishAndFederateIfNeeded(video)
+  await createHlsJobIfEnabled(user, { ...payload, copyCodecs: true, isMaxQuality: false })
+  await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
 
-  createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true }))
+  await moveToNextState(video, payload.isNewVideo)
+}
+
+async function createHlsJobIfEnabled (user: MUserId, payload: {
+  videoUUID: string
+  resolution: number
+  isPortraitMode?: boolean
+  copyCodecs: boolean
+  isMaxQuality: boolean
+  isNewVideo?: boolean
+}) {
+  if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false
+
+  const jobOptions = {
+    priority: await getTranscodingJobPriority(user)
+  }
+
+  const hlsTranscodingPayload: HLSTranscodingPayload = {
+    type: 'new-resolution-to-hls',
+    videoUUID: payload.videoUUID,
+    resolution: payload.resolution,
+    isPortraitMode: payload.isPortraitMode,
+    copyCodecs: payload.copyCodecs,
+    isMaxQuality: payload.isMaxQuality,
+    isNewVideo: payload.isNewVideo
+  }
+
+  await addTranscodingJob(hlsTranscodingPayload, jobOptions)
+
+  return true
 }
 
 // ---------------------------------------------------------------------------
 
 export {
   processVideoTranscoding,
+  createHlsJobIfEnabled,
   onNewWebTorrentFileResolution
 }
 
 // ---------------------------------------------------------------------------
 
-function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number, isPortraitMode?: boolean, copyCodecs: boolean }) {
-  // Generate HLS playlist?
-  if (payload && CONFIG.TRANSCODING.HLS.ENABLED) {
-    const hlsTranscodingPayload: HLSTranscodingPayload = {
-      type: 'new-resolution-to-hls',
-      videoUUID: payload.videoUUID,
-      resolution: payload.resolution,
-      isPortraitMode: payload.isPortraitMode,
-      copyCodecs: payload.copyCodecs
-    }
+async function createLowerResolutionsJobs (options: {
+  video: MVideoFullLight
+  user: MUserId
+  videoFileResolution: number
+  isPortraitMode: boolean
+  isNewVideo: boolean
+  type: 'hls' | 'webtorrent'
+}) {
+  const { video, user, videoFileResolution, isPortraitMode, isNewVideo, type } = options
 
-    return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload })
-  }
-}
-
-function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution: number, isPortraitMode: boolean) {
   // Create transcoding jobs if there are enabled resolutions
   const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod')
-  logger.info(
-    'Resolutions computed for video %s and origin file resolution of %d.', video.uuid, videoFileResolution,
-    { resolutions: resolutionsEnabled }
-  )
-
-  if (resolutionsEnabled.length === 0) {
-    logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid)
-
-    return false
-  }
+  const resolutionCreated: string[] = []
 
   for (const resolution of resolutionsEnabled) {
     let dataInput: VideoTranscodingPayload
 
-    if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) {
+    if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') {
       // WebTorrent will create subsequent HLS job
       dataInput = {
         type: 'new-resolution-to-webtorrent',
         videoUUID: video.uuid,
         resolution,
-        isPortraitMode
+        isPortraitMode,
+        isNewVideo
       }
-    } else if (CONFIG.TRANSCODING.HLS.ENABLED) {
+
+      resolutionCreated.push('webtorrent-' + resolution)
+    }
+
+    if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') {
       dataInput = {
         type: 'new-resolution-to-hls',
         videoUUID: video.uuid,
         resolution,
         isPortraitMode,
-        copyCodecs: false
+        copyCodecs: false,
+        isMaxQuality: false,
+        isNewVideo
       }
+
+      resolutionCreated.push('hls-' + resolution)
     }
 
-    JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
+    if (!dataInput) continue
+
+    const jobOptions = {
+      priority: await getTranscodingJobPriority(user)
+    }
+
+    await addTranscodingJob(dataInput, jobOptions)
+  }
+
+  if (resolutionCreated.length === 0) {
+    logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid, lTags(video.uuid))
+
+    return false
   }
 
-  logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled })
+  logger.info(
+    'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution,
+    { resolutionCreated, ...lTags(video.uuid) }
+  )
 
   return true
 }