]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/handlers/video-transcoding.ts
/!\ Use a dedicated config file for development
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / handlers / video-transcoding.ts
index e9b84ecd66722aec9bcdedb0655ba874ae4bd022..5afca65cab154fa62be86e5f2a0eb8a18a111d85 100644 (file)
-import * as Bull from 'bull'
-import { VideoResolution, VideoState } from '../../../../shared'
-import { logger } from '../../../helpers/logger'
-import { VideoModel } from '../../../models/video/video'
-import { JobQueue } from '../job-queue'
-import { federateVideoIfNeeded } from '../../activitypub'
+import { Job } from 'bull'
+import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg'
+import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
+import { VideoPathManager } from '@server/lib/video-path-manager'
+import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state'
+import { UserModel } from '@server/models/user/user'
+import { VideoJobInfoModel } from '@server/models/video/video-job-info'
+import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models'
+import { pick } from '@shared/core-utils'
+import {
+  HLSTranscodingPayload,
+  MergeAudioTranscodingPayload,
+  NewWebTorrentResolutionTranscodingPayload,
+  OptimizeTranscodingPayload,
+  VideoResolution,
+  VideoTranscodingPayload
+} from '@shared/models'
 import { retryTransactionWrapper } from '../../../helpers/database-utils'
-import { sequelizeTypescript } from '../../../initializers'
-import * as Bluebird from 'bluebird'
-import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils'
-import { generateHlsPlaylist, optimizeVideofile, transcodeOriginalVideofile, mergeAudioVideofile } from '../../video-transcoding'
-import { Notifier } from '../../notifier'
+import { computeLowerResolutionsToTranscode } from '../../../helpers/ffmpeg'
+import { logger, loggerTagsFactory } from '../../../helpers/logger'
 import { CONFIG } from '../../../initializers/config'
-
-interface BaseTranscodingPayload {
-  videoUUID: string
-  isNewVideo?: boolean
-}
-
-interface HLSTranscodingPayload extends BaseTranscodingPayload {
-  type: 'hls'
-  isPortraitMode?: boolean
-  resolution: VideoResolution
-}
-
-interface NewResolutionTranscodingPayload extends BaseTranscodingPayload {
-  type: 'new-resolution'
-  isPortraitMode?: boolean
-  resolution: VideoResolution
-}
-
-interface MergeAudioTranscodingPayload extends BaseTranscodingPayload {
-  type: 'merge-audio'
-  resolution: VideoResolution
-}
-
-interface OptimizeTranscodingPayload extends BaseTranscodingPayload {
-  type: 'optimize'
+import { VideoModel } from '../../../models/video/video'
+import {
+  generateHlsPlaylistResolution,
+  mergeAudioVideofile,
+  optimizeOriginalVideofile,
+  transcodeNewWebTorrentResolution
+} from '../../transcoding/transcoding'
+
+type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
+
+const handlers: { [ id in VideoTranscodingPayload['type'] ]: HandlerFunction } = {
+  'new-resolution-to-hls': handleHLSJob,
+  'new-resolution-to-webtorrent': handleNewWebTorrentResolutionJob,
+  'merge-audio-to-webtorrent': handleWebTorrentMergeAudioJob,
+  'optimize-to-webtorrent': handleWebTorrentOptimizeJob
 }
 
-export type VideoTranscodingPayload = HLSTranscodingPayload | NewResolutionTranscodingPayload
-  | OptimizeTranscodingPayload | MergeAudioTranscodingPayload
+const lTags = loggerTagsFactory('transcoding')
 
-async function processVideoTranscoding (job: Bull.Job) {
+async function processVideoTranscoding (job: Job) {
   const payload = job.data as VideoTranscodingPayload
-  logger.info('Processing video file in job %d.', job.id)
+  logger.info('Processing transcoding job %d.', job.id, lTags(payload.videoUUID))
 
-  const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID)
+  const video = await VideoModel.loadFull(payload.videoUUID)
   // No video, maybe deleted?
   if (!video) {
-    logger.info('Do not process job %d, video does not exist.', job.id)
+    logger.info('Do not process job %d, video does not exist.', job.id, lTags(payload.videoUUID))
     return undefined
   }
 
-  if (payload.type === 'hls') {
-    await generateHlsPlaylist(video, payload.resolution, payload.isPortraitMode || false)
+  const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId)
+
+  const handler = handlers[payload.type]
 
-    await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video)
-  } else if (payload.type === 'new-resolution') {
-    await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false)
+  if (!handler) {
+    await moveToFailedTranscodingState(video)
+    await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
+
+    throw new Error('Cannot find transcoding handler for ' + payload.type)
+  }
 
-    await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload)
-  } else if (payload.type === 'merge-audio') {
-    await mergeAudioVideofile(video, payload.resolution)
+  try {
+    await handler(job, payload, video, user)
+  } catch (error) {
+    await moveToFailedTranscodingState(video)
 
-    await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload)
-  } else {
-    await optimizeVideofile(video)
+    await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
 
-    await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload)
+    throw error
   }
 
   return video
 }
 
-async function onHlsPlaylistGenerationSuccess (video: VideoModel) {
-  if (video === undefined) return undefined
-
-  await sequelizeTypescript.transaction(async t => {
-    // Maybe the video changed in database, refresh it
-    let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
-    // Video does not exist anymore
-    if (!videoDatabase) return undefined
+// ---------------------------------------------------------------------------
 
-    // If the video was not published, we consider it is a new one for other instances
-    await federateVideoIfNeeded(videoDatabase, false, t)
-  })
+export {
+  processVideoTranscoding
 }
 
-async function publishNewResolutionIfNeeded (video: VideoModel, payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload) {
-  const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
-    // Maybe the video changed in database, refresh it
-    let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
-    // Video does not exist anymore
-    if (!videoDatabase) return undefined
-
-    let videoPublished = false
+// ---------------------------------------------------------------------------
+// Job handlers
+// ---------------------------------------------------------------------------
 
-    // We transcoded the video file in another format, now we can publish it
-    if (videoDatabase.state !== VideoState.PUBLISHED) {
-      videoPublished = true
+async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) {
+  logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid))
 
-      videoDatabase.state = VideoState.PUBLISHED
-      videoDatabase.publishedAt = new Date()
-      videoDatabase = await videoDatabase.save({ transaction: t })
-    }
+  const videoFileInput = payload.copyCodecs
+    ? video.getWebTorrentFile(payload.resolution)
+    : video.getMaxQualityFile()
 
-    // If the video was not published, we consider it is a new one for other instances
-    await federateVideoIfNeeded(videoDatabase, videoPublished, t)
+  const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist()
 
-    return { videoDatabase, videoPublished }
+  await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => {
+    return generateHlsPlaylistResolution({
+      video,
+      videoInputPath,
+      resolution: payload.resolution,
+      copyCodecs: payload.copyCodecs,
+      isPortraitMode: payload.isPortraitMode || false,
+      job
+    })
   })
 
-  if (videoPublished) {
-    Notifier.Instance.notifyOnNewVideo(videoDatabase)
-    Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
-  }
+  logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid))
 
-  await createHlsJobIfEnabled(payload)
+  await onHlsPlaylistGeneration(video, user, payload)
 }
 
-async function onVideoFileOptimizerSuccess (videoArg: VideoModel, payload: OptimizeTranscodingPayload) {
-  if (videoArg === undefined) return undefined
+async function handleNewWebTorrentResolutionJob (
+  job: Job,
+  payload: NewWebTorrentResolutionTranscodingPayload,
+  video: MVideoFullLight,
+  user: MUserId
+) {
+  logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid))
 
-  // Outside the transaction (IO on disk)
-  const { videoFileResolution } = await videoArg.getOriginalFileResolution()
+  await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job)
 
-  const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
-    // Maybe the video changed in database, refresh it
-    let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t)
-    // Video does not exist anymore
-    if (!videoDatabase) return undefined
+  logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid))
 
-    // Create transcoding jobs if there are enabled resolutions
-    const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution)
-    logger.info(
-      'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution,
-      { resolutions: resolutionsEnabled }
-    )
+  await onNewWebTorrentFileResolution(video, user, payload)
+}
 
-    let videoPublished = false
+async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) {
+  logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid))
 
-    if (resolutionsEnabled.length !== 0) {
-      const tasks: (Bluebird<Bull.Job<any>> | Promise<Bull.Job<any>>)[] = []
+  await mergeAudioVideofile(video, payload.resolution, job)
 
-      for (const resolution of resolutionsEnabled) {
-        const dataInput = {
-          type: 'new-resolution' as 'new-resolution',
-          videoUUID: videoDatabase.uuid,
-          resolution
-        }
+  logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid))
 
-        const p = JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
-        tasks.push(p)
-      }
+  await onVideoFirstWebTorrentTranscoding(video, payload, 'video', user)
+}
 
-      await Promise.all(tasks)
+async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) {
+  logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid))
 
-      logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
-    } else {
-      videoPublished = true
+  const { transcodeType } = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
 
-      // No transcoding to do, it's now published
-      videoDatabase.state = VideoState.PUBLISHED
-      videoDatabase = await videoDatabase.save({ transaction: t })
+  logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid))
 
-      logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy })
+  await onVideoFirstWebTorrentTranscoding(video, payload, transcodeType, user)
+}
+
+// ---------------------------------------------------------------------------
+
+async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) {
+  if (payload.isMaxQuality && payload.autoDeleteWebTorrentIfNeeded && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) {
+    // Remove webtorrent files if not enabled
+    for (const file of video.VideoFiles) {
+      await video.removeWebTorrentFileAndTorrent(file)
+      await file.destroy()
     }
 
-    await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t)
+    video.VideoFiles = []
+
+    // Create HLS new resolution jobs
+    await createLowerResolutionsJobs({
+      video,
+      user,
+      videoFileResolution: payload.resolution,
+      isPortraitMode: payload.isPortraitMode,
+      hasAudio: payload.hasAudio,
+      isNewVideo: payload.isNewVideo ?? true,
+      type: 'hls'
+    })
+  }
 
-    return { videoDatabase, videoPublished }
+  await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
+  await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo })
+}
+
+async function onVideoFirstWebTorrentTranscoding (
+  videoArg: MVideoWithFile,
+  payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload,
+  transcodeType: TranscodeVODOptionsType,
+  user: MUserId
+) {
+  const { resolution, isPortraitMode, audioStream } = await videoArg.probeMaxQualityFile()
+
+  // Maybe the video changed in database, refresh it
+  const videoDatabase = await VideoModel.loadFull(videoArg.uuid)
+  // Video does not exist anymore
+  if (!videoDatabase) return undefined
+
+  // Generate HLS version of the original file
+  const originalFileHLSPayload = {
+    ...payload,
+
+    isPortraitMode,
+    hasAudio: !!audioStream,
+    resolution: videoDatabase.getMaxQualityFile().resolution,
+    // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues
+    copyCodecs: transcodeType !== 'quick-transcode',
+    isMaxQuality: true
+  }
+  const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload)
+  const hasNewResolutions = await createLowerResolutionsJobs({
+    video: videoDatabase,
+    user,
+    videoFileResolution: resolution,
+    hasAudio: !!audioStream,
+    isPortraitMode,
+    type: 'webtorrent',
+    isNewVideo: payload.isNewVideo ?? true
   })
 
-  if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase)
-  if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
+  await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode')
 
-  const hlsPayload = Object.assign({}, payload, { resolution: videoDatabase.getOriginalFile().resolution })
-  await createHlsJobIfEnabled(hlsPayload)
+  // Move to next state if there are no other resolutions to generate
+  if (!hasHls && !hasNewResolutions) {
+    await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo })
+  }
 }
 
-// ---------------------------------------------------------------------------
+async function onNewWebTorrentFileResolution (
+  video: MVideo,
+  user: MUserId,
+  payload: NewWebTorrentResolutionTranscodingPayload | MergeAudioTranscodingPayload
+) {
+  if (payload.createHLSIfNeeded) {
+    await createHlsJobIfEnabled(user, { hasAudio: true, copyCodecs: true, isMaxQuality: false, ...payload })
+  }
 
-export {
-  processVideoTranscoding,
-  publishNewResolutionIfNeeded
+  await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
+
+  await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo })
 }
 
 // ---------------------------------------------------------------------------
 
-function createHlsJobIfEnabled (payload?: { videoUUID: string, resolution: number, isPortraitMode?: boolean }) {
-  // Generate HLS playlist?
-  if (payload && CONFIG.TRANSCODING.HLS.ENABLED) {
-    const hlsTranscodingPayload = {
-      type: 'hls' as 'hls',
-      videoUUID: payload.videoUUID,
-      resolution: payload.resolution,
-      isPortraitMode: payload.isPortraitMode
+async function createHlsJobIfEnabled (user: MUserId, payload: {
+  videoUUID: string
+  resolution: number
+  hasAudio: boolean
+  isPortraitMode?: boolean
+  copyCodecs: boolean
+  isMaxQuality: boolean
+  isNewVideo?: boolean
+}) {
+  if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false
+
+  const jobOptions = {
+    priority: await getTranscodingJobPriority(user)
+  }
+
+  const hlsTranscodingPayload: HLSTranscodingPayload = {
+    type: 'new-resolution-to-hls',
+    autoDeleteWebTorrentIfNeeded: true,
+
+    ...pick(payload, [ 'videoUUID', 'resolution', 'isPortraitMode', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ])
+  }
+
+  await addTranscodingJob(hlsTranscodingPayload, jobOptions)
+
+  return true
+}
+
+async function createLowerResolutionsJobs (options: {
+  video: MVideoFullLight
+  user: MUserId
+  videoFileResolution: number
+  isPortraitMode: boolean
+  hasAudio: boolean
+  isNewVideo: boolean
+  type: 'hls' | 'webtorrent'
+}) {
+  const { video, user, videoFileResolution, isPortraitMode, isNewVideo, hasAudio, type } = options
+
+  // Create transcoding jobs if there are enabled resolutions
+  const resolutionsEnabled = computeLowerResolutionsToTranscode(videoFileResolution, 'vod')
+  const resolutionCreated: string[] = []
+
+  for (const resolution of resolutionsEnabled) {
+    if (resolution === VideoResolution.H_NOVIDEO && hasAudio === false) continue
+
+    let dataInput: VideoTranscodingPayload
+
+    if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') {
+      // WebTorrent will create subsequent HLS job
+      dataInput = {
+        type: 'new-resolution-to-webtorrent',
+        videoUUID: video.uuid,
+        resolution,
+        isPortraitMode,
+        hasAudio,
+        createHLSIfNeeded: true,
+        isNewVideo
+      }
+
+      resolutionCreated.push('webtorrent-' + resolution)
+    }
+
+    if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') {
+      dataInput = {
+        type: 'new-resolution-to-hls',
+        videoUUID: video.uuid,
+        resolution,
+        isPortraitMode,
+        hasAudio,
+        copyCodecs: false,
+        isMaxQuality: false,
+        autoDeleteWebTorrentIfNeeded: true,
+        isNewVideo
+      }
+
+      resolutionCreated.push('hls-' + resolution)
     }
 
-    return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload })
+    if (!dataInput) continue
+
+    const jobOptions = {
+      priority: await getTranscodingJobPriority(user)
+    }
+
+    await addTranscodingJob(dataInput, jobOptions)
+  }
+
+  if (resolutionCreated.length === 0) {
+    logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid, lTags(video.uuid))
+
+    return false
   }
+
+  logger.info(
+    'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution,
+    { resolutionCreated, ...lTags(video.uuid) }
+  )
+
+  return true
 }