]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/handlers/video-transcoding.ts
Add ability to remove hls/webtorrent files
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / handlers / video-transcoding.ts
index c020057c932052113ba0f437216dc3f3e2d3fe9b..904ef2e3cf714ee796638071a60a975928143dc2 100644 (file)
-import * as Bull from 'bull'
-import { VideoResolution } from '../../../../shared'
-import { logger } from '../../../helpers/logger'
-import { VideoModel } from '../../../models/video/video'
-import { JobQueue } from '../job-queue'
-import { federateVideoIfNeeded } from '../../activitypub'
+import { Job } from 'bull'
+import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils'
+import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
+import { VideoPathManager } from '@server/lib/video-path-manager'
+import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state'
+import { UserModel } from '@server/models/user/user'
+import { VideoJobInfoModel } from '@server/models/video/video-job-info'
+import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models'
+import {
+  HLSTranscodingPayload,
+  MergeAudioTranscodingPayload,
+  NewResolutionTranscodingPayload,
+  OptimizeTranscodingPayload,
+  VideoTranscodingPayload
+} from '../../../../shared'
 import { retryTransactionWrapper } from '../../../helpers/database-utils'
-import { sequelizeTypescript } from '../../../initializers'
-import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils'
-import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding'
-import { Notifier } from '../../notifier'
+import { computeResolutionsToTranscode } from '../../../helpers/ffprobe-utils'
+import { logger, loggerTagsFactory } from '../../../helpers/logger'
 import { CONFIG } from '../../../initializers/config'
-import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/typings/models'
-
-interface BaseTranscodingPayload {
-  videoUUID: string
-  isNewVideo?: boolean
-}
-
-interface HLSTranscodingPayload extends BaseTranscodingPayload {
-  type: 'hls'
-  isPortraitMode?: boolean
-  resolution: VideoResolution
-  copyCodecs: boolean
-}
-
-interface NewResolutionTranscodingPayload extends BaseTranscodingPayload {
-  type: 'new-resolution'
-  isPortraitMode?: boolean
-  resolution: VideoResolution
-}
-
-interface MergeAudioTranscodingPayload extends BaseTranscodingPayload {
-  type: 'merge-audio'
-  resolution: VideoResolution
-}
-
-interface OptimizeTranscodingPayload extends BaseTranscodingPayload {
-  type: 'optimize'
+import { VideoModel } from '../../../models/video/video'
+import {
+  generateHlsPlaylistResolution,
+  mergeAudioVideofile,
+  optimizeOriginalVideofile,
+  transcodeNewWebTorrentResolution
+} from '../../transcoding/video-transcoding'
+
+type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
+
+const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = {
+  'new-resolution-to-hls': handleHLSJob,
+  'new-resolution-to-webtorrent': handleNewWebTorrentResolutionJob,
+  'merge-audio-to-webtorrent': handleWebTorrentMergeAudioJob,
+  'optimize-to-webtorrent': handleWebTorrentOptimizeJob
 }
 
-export type VideoTranscodingPayload =
-  HLSTranscodingPayload
-  | NewResolutionTranscodingPayload
-  | OptimizeTranscodingPayload
-  | MergeAudioTranscodingPayload
+const lTags = loggerTagsFactory('transcoding')
 
-async function processVideoTranscoding (job: Bull.Job) {
+async function processVideoTranscoding (job: Job) {
   const payload = job.data as VideoTranscodingPayload
-  logger.info('Processing video file in job %d.', job.id)
+  logger.info('Processing transcoding job %d.', job.id, lTags(payload.videoUUID))
 
   const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID)
   // No video, maybe deleted?
   if (!video) {
-    logger.info('Do not process job %d, video does not exist.', job.id)
+    logger.info('Do not process job %d, video does not exist.', job.id, lTags(payload.videoUUID))
     return undefined
   }
 
-  if (payload.type === 'hls') {
-    await generateHlsPlaylist(video, payload.resolution, payload.copyCodecs, payload.isPortraitMode || false)
+  const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId)
+
+  const handler = handlers[payload.type]
 
-    await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video)
-  } else if (payload.type === 'new-resolution') {
-    await transcodeNewResolution(video, payload.resolution, payload.isPortraitMode || false)
+  if (!handler) {
+    await moveToFailedTranscodingState(video)
 
-    await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload)
-  } else if (payload.type === 'merge-audio') {
-    await mergeAudioVideofile(video, payload.resolution)
+    throw new Error('Cannot find transcoding handler for ' + payload.type)
+  }
 
-    await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload)
-  } else {
-    await optimizeOriginalVideofile(video)
+  try {
+    await handler(job, payload, video, user)
+  } catch (error) {
+    await moveToFailedTranscodingState(video)
 
-    await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload)
+    throw error
   }
 
   return video
 }
 
-async function onHlsPlaylistGenerationSuccess (video: MVideoFullLight) {
-  if (video === undefined) return undefined
+// ---------------------------------------------------------------------------
+// Job handlers
+// ---------------------------------------------------------------------------
+
+async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) {
+  logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid))
+
+  const videoFileInput = payload.copyCodecs
+    ? video.getWebTorrentFile(payload.resolution)
+    : video.getMaxQualityFile()
+
+  const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
+
+  await VideoPathManager.Instance.makeAvailableVideoFile(videoOrStreamingPlaylist, videoFileInput, videoInputPath => {
+    return generateHlsPlaylistResolution({
+      video,
+      videoInputPath,
+      resolution: payload.resolution,
+      copyCodecs: payload.copyCodecs,
+      isPortraitMode: payload.isPortraitMode || false,
+      job
+    })
+  })
+
+  logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid))
+
+  await retryTransactionWrapper(onHlsPlaylistGeneration, video, user, payload)
+}
+
+async function handleNewWebTorrentResolutionJob (
+  job: Job,
+  payload: NewResolutionTranscodingPayload,
+  video: MVideoFullLight,
+  user: MUserId
+) {
+  logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid))
+
+  await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job)
+
+  logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid))
+
+  await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload)
+}
+
+async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) {
+  logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid))
+
+  await mergeAudioVideofile(video, payload.resolution, job)
+
+  logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid))
+
+  await retryTransactionWrapper(onVideoFileOptimizer, video, payload, 'video', user)
+}
 
-  // We generated the HLS playlist, we don't need the webtorrent files anymore if the admin disabled it
-  if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) {
+async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) {
+  logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid))
+
+  const { transcodeType } = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
+
+  logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid))
+
+  await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user)
+}
+
+// ---------------------------------------------------------------------------
+
+async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) {
+  if (payload.isMaxQuality && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) {
+    // Remove webtorrent files if not enabled
     for (const file of video.VideoFiles) {
-      await video.removeFile(file)
+      await video.removeWebTorrentFileAndTorrent(file)
       await file.destroy()
     }
 
     video.VideoFiles = []
+
+    // Create HLS new resolution jobs
+    await createLowerResolutionsJobs({
+      video,
+      user,
+      videoFileResolution: payload.resolution,
+      isPortraitMode: payload.isPortraitMode,
+      isNewVideo: payload.isNewVideo ?? true,
+      type: 'hls'
+    })
   }
 
-  return publishAndFederateIfNeeded(video)
+  await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
+  await moveToNextState(video, payload.isNewVideo)
 }
 
-async function publishNewResolutionIfNeeded (video: MVideoUUID, payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload) {
-  await publishAndFederateIfNeeded(video)
+async function onVideoFileOptimizer (
+  videoArg: MVideoWithFile,
+  payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload,
+  transcodeType: TranscodeOptionsType,
+  user: MUserId
+) {
+  const { resolution, isPortraitMode } = await videoArg.getMaxQualityResolution()
+
+  // Maybe the video changed in database, refresh it
+  const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid)
+  // Video does not exist anymore
+  if (!videoDatabase) return undefined
+
+  // Generate HLS version of the original file
+  const originalFileHLSPayload = {
+    ...payload,
+
+    isPortraitMode,
+    resolution: videoDatabase.getMaxQualityFile().resolution,
+    // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues
+    copyCodecs: transcodeType !== 'quick-transcode',
+    isMaxQuality: true
+  }
+  const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload)
+  const hasNewResolutions = await createLowerResolutionsJobs({
+    video: videoDatabase,
+    user,
+    videoFileResolution: resolution,
+    isPortraitMode,
+    type: 'webtorrent',
+    isNewVideo: payload.isNewVideo ?? true
+  })
+
+  await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode')
 
-  await createHlsJobIfEnabled(payload)
+  // Move to next state if there are no other resolutions to generate
+  if (!hasHls && !hasNewResolutions) {
+    await moveToNextState(videoDatabase, payload.isNewVideo)
+  }
 }
 
-async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: OptimizeTranscodingPayload) {
-  if (videoArg === undefined) return undefined
-
-  // Outside the transaction (IO on disk)
-  const { videoFileResolution } = await videoArg.getMaxQualityResolution()
-
-  const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
-    // Maybe the video changed in database, refresh it
-    const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t)
-    // Video does not exist anymore
-    if (!videoDatabase) return undefined
-
-    // Create transcoding jobs if there are enabled resolutions
-    const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution)
-    logger.info(
-      'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution,
-      { resolutions: resolutionsEnabled }
-    )
-
-    let videoPublished = false
-
-    const hlsPayload = Object.assign({}, payload, { resolution: videoDatabase.getMaxQualityFile().resolution })
-    await createHlsJobIfEnabled(hlsPayload)
-
-    if (resolutionsEnabled.length !== 0) {
-      for (const resolution of resolutionsEnabled) {
-        let dataInput: VideoTranscodingPayload
-
-        if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) {
-          dataInput = {
-            type: 'new-resolution' as 'new-resolution',
-            videoUUID: videoDatabase.uuid,
-            resolution
-          }
-        } else if (CONFIG.TRANSCODING.HLS.ENABLED) {
-          dataInput = {
-            type: 'hls',
-            videoUUID: videoDatabase.uuid,
-            resolution,
-            isPortraitMode: false,
-            copyCodecs: false
-          }
-        }
-
-        JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
-      }
+async function onNewWebTorrentFileResolution (
+  video: MVideo,
+  user: MUserId,
+  payload: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload
+) {
+  await createHlsJobIfEnabled(user, { ...payload, copyCodecs: true, isMaxQuality: false })
+  await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
 
-      logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
-    } else {
-      // No transcoding to do, it's now published
-      videoPublished = await videoDatabase.publishIfNeededAndSave(t)
+  await moveToNextState(video, payload.isNewVideo)
+}
 
-      logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy })
-    }
+async function createHlsJobIfEnabled (user: MUserId, payload: {
+  videoUUID: string
+  resolution: number
+  isPortraitMode?: boolean
+  copyCodecs: boolean
+  isMaxQuality: boolean
+  isNewVideo?: boolean
+}) {
+  if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false
 
-    await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t)
+  const jobOptions = {
+    priority: await getTranscodingJobPriority(user)
+  }
 
-    return { videoDatabase, videoPublished }
-  })
+  const hlsTranscodingPayload: HLSTranscodingPayload = {
+    type: 'new-resolution-to-hls',
+    videoUUID: payload.videoUUID,
+    resolution: payload.resolution,
+    isPortraitMode: payload.isPortraitMode,
+    copyCodecs: payload.copyCodecs,
+    isMaxQuality: payload.isMaxQuality,
+    isNewVideo: payload.isNewVideo
+  }
+
+  await addTranscodingJob(hlsTranscodingPayload, jobOptions)
 
-  if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase)
-  if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
+  return true
 }
 
 // ---------------------------------------------------------------------------
 
 export {
   processVideoTranscoding,
-  publishNewResolutionIfNeeded
+  createHlsJobIfEnabled,
+  onNewWebTorrentFileResolution
 }
 
 // ---------------------------------------------------------------------------
 
-function createHlsJobIfEnabled (payload?: { videoUUID: string, resolution: number, isPortraitMode?: boolean }) {
-  // Generate HLS playlist?
-  if (payload && CONFIG.TRANSCODING.HLS.ENABLED) {
-    const hlsTranscodingPayload = {
-      type: 'hls' as 'hls',
-      videoUUID: payload.videoUUID,
-      resolution: payload.resolution,
-      isPortraitMode: payload.isPortraitMode,
-      copyCodecs: true
+async function createLowerResolutionsJobs (options: {
+  video: MVideoFullLight
+  user: MUserId
+  videoFileResolution: number
+  isPortraitMode: boolean
+  isNewVideo: boolean
+  type: 'hls' | 'webtorrent'
+}) {
+  const { video, user, videoFileResolution, isPortraitMode, isNewVideo, type } = options
+
+  // Create transcoding jobs if there are enabled resolutions
+  const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod')
+  const resolutionCreated: string[] = []
+
+  for (const resolution of resolutionsEnabled) {
+    let dataInput: VideoTranscodingPayload
+
+    if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') {
+      // WebTorrent will create subsequent HLS job
+      dataInput = {
+        type: 'new-resolution-to-webtorrent',
+        videoUUID: video.uuid,
+        resolution,
+        isPortraitMode,
+        isNewVideo
+      }
+
+      resolutionCreated.push('webtorrent-' + resolution)
     }
 
-    return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload })
-  }
-}
+    if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') {
+      dataInput = {
+        type: 'new-resolution-to-hls',
+        videoUUID: video.uuid,
+        resolution,
+        isPortraitMode,
+        copyCodecs: false,
+        isMaxQuality: false,
+        isNewVideo
+      }
 
-async function publishAndFederateIfNeeded (video: MVideoUUID) {
-  const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
-    // Maybe the video changed in database, refresh it
-    const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
-    // Video does not exist anymore
-    if (!videoDatabase) return undefined
+      resolutionCreated.push('hls-' + resolution)
+    }
 
-    // We transcoded the video file in another format, now we can publish it
-    const videoPublished = await videoDatabase.publishIfNeededAndSave(t)
+    if (!dataInput) continue
 
-    // If the video was not published, we consider it is a new one for other instances
-    await federateVideoIfNeeded(videoDatabase, videoPublished, t)
+    const jobOptions = {
+      priority: await getTranscodingJobPriority(user)
+    }
 
-    return { videoDatabase, videoPublished }
-  })
+    await addTranscodingJob(dataInput, jobOptions)
+  }
+
+  if (resolutionCreated.length === 0) {
+    logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid, lTags(video.uuid))
 
-  if (videoPublished) {
-    Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase)
-    Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
+    return false
   }
+
+  logger.info(
+    'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution,
+    { resolutionCreated, ...lTags(video.uuid) }
+  )
+
+  return true
 }