]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/handlers/video-transcoding.ts
Live streaming implementation first step
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / handlers / video-transcoding.ts
index ceee83f1338dbc80d89151a170eb231bb0b6a48c..6659ab716438dada65555926fa7c9836c4ab8374 100644 (file)
@@ -1,45 +1,21 @@
 import * as Bull from 'bull'
-import { VideoResolution, VideoState } from '../../../../shared'
+import {
+  MergeAudioTranscodingPayload,
+  NewResolutionTranscodingPayload,
+  OptimizeTranscodingPayload,
+  VideoTranscodingPayload
+} from '../../../../shared'
 import { logger } from '../../../helpers/logger'
 import { VideoModel } from '../../../models/video/video'
 import { JobQueue } from '../job-queue'
-import { federateVideoIfNeeded } from '../../activitypub'
+import { federateVideoIfNeeded } from '../../activitypub/videos'
 import { retryTransactionWrapper } from '../../../helpers/database-utils'
-import { sequelizeTypescript, CONFIG } from '../../../initializers'
-import * as Bluebird from 'bluebird'
+import { sequelizeTypescript } from '../../../initializers/database'
 import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils'
-import { generateHlsPlaylist, importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding'
+import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding'
 import { Notifier } from '../../notifier'
-
-export type VideoTranscodingPayload = {
-  videoUUID: string
-  resolution?: VideoResolution
-  isNewVideo?: boolean
-  isPortraitMode?: boolean
-  generateHlsPlaylist?: boolean
-}
-
-export type VideoFileImportPayload = {
-  videoUUID: string,
-  filePath: string
-}
-
-async function processVideoFileImport (job: Bull.Job) {
-  const payload = job.data as VideoFileImportPayload
-  logger.info('Processing video file import in job %d.', job.id)
-
-  const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID)
-  // No video, maybe deleted?
-  if (!video) {
-    logger.info('Do not process job %d, video does not exist.', job.id)
-    return undefined
-  }
-
-  await importVideoFile(video, payload.filePath)
-
-  await onVideoFileTranscoderOrImportSuccess(video)
-  return video
-}
+import { CONFIG } from '../../../initializers/config'
+import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models'
 
 async function processVideoTranscoding (job: Bull.Job) {
   const payload = job.data as VideoTranscodingPayload
@@ -52,16 +28,20 @@ async function processVideoTranscoding (job: Bull.Job) {
     return undefined
   }
 
-  if (payload.generateHlsPlaylist) {
-    await generateHlsPlaylist(video, payload.resolution, payload.isPortraitMode || false)
+  if (payload.type === 'hls') {
+    await generateHlsPlaylist(video, payload.resolution, payload.copyCodecs, payload.isPortraitMode || false)
 
     await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video)
-  } else if (payload.resolution) { // Transcoding in other resolution
-    await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false)
+  } else if (payload.type === 'new-resolution') {
+    await transcodeNewResolution(video, payload.resolution, payload.isPortraitMode || false)
+
+    await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload)
+  } else if (payload.type === 'merge-audio') {
+    await mergeAudioVideofile(video, payload.resolution)
 
-    await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video, payload)
+    await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload)
   } else {
-    await optimizeVideofile(video)
+    await optimizeOriginalVideofile(video)
 
     await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload)
   }
@@ -69,98 +49,81 @@ async function processVideoTranscoding (job: Bull.Job) {
   return video
 }
 
-async function onHlsPlaylistGenerationSuccess (video: VideoModel) {
+async function onHlsPlaylistGenerationSuccess (video: MVideoFullLight) {
   if (video === undefined) return undefined
 
-  await sequelizeTypescript.transaction(async t => {
-    // Maybe the video changed in database, refresh it
-    let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
-    // Video does not exist anymore
-    if (!videoDatabase) return undefined
-
-    // If the video was not published, we consider it is a new one for other instances
-    await federateVideoIfNeeded(videoDatabase, false, t)
-  })
-}
-
-async function onVideoFileTranscoderOrImportSuccess (video: VideoModel, payload?: VideoTranscodingPayload) {
-  if (video === undefined) return undefined
-
-  const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
-    // Maybe the video changed in database, refresh it
-    let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
-    // Video does not exist anymore
-    if (!videoDatabase) return undefined
-
-    let videoPublished = false
-
-    // We transcoded the video file in another format, now we can publish it
-    if (videoDatabase.state !== VideoState.PUBLISHED) {
-      videoPublished = true
-
-      videoDatabase.state = VideoState.PUBLISHED
-      videoDatabase.publishedAt = new Date()
-      videoDatabase = await videoDatabase.save({ transaction: t })
+  // We generated the HLS playlist, we don't need the webtorrent files anymore if the admin disabled it
+  if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) {
+    for (const file of video.VideoFiles) {
+      await video.removeFile(file)
+      await file.destroy()
     }
 
-    // If the video was not published, we consider it is a new one for other instances
-    await federateVideoIfNeeded(videoDatabase, videoPublished, t)
+    video.VideoFiles = []
+  }
 
-    return { videoDatabase, videoPublished }
-  })
+  return publishAndFederateIfNeeded(video)
+}
 
-  // don't notify prior to scheduled video update
-  if (videoPublished && !videoDatabase.ScheduleVideoUpdate) {
-    Notifier.Instance.notifyOnNewVideo(videoDatabase)
-    Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase)
-  }
+async function publishNewResolutionIfNeeded (video: MVideoUUID, payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload) {
+  await publishAndFederateIfNeeded(video)
 
   await createHlsJobIfEnabled(payload)
 }
 
-async function onVideoFileOptimizerSuccess (videoArg: VideoModel, payload: VideoTranscodingPayload) {
+async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: OptimizeTranscodingPayload) {
   if (videoArg === undefined) return undefined
 
   // Outside the transaction (IO on disk)
-  const { videoFileResolution } = await videoArg.getOriginalFileResolution()
+  const { videoFileResolution, isPortraitMode } = await videoArg.getMaxQualityResolution()
 
   const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
     // Maybe the video changed in database, refresh it
-    let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t)
+    const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t)
     // Video does not exist anymore
     if (!videoDatabase) return undefined
 
     // Create transcoding jobs if there are enabled resolutions
-    const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution)
+    const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod')
     logger.info(
-      'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution,
+      'Resolutions computed for video %s and origin file resolution of %d.', videoDatabase.uuid, videoFileResolution,
       { resolutions: resolutionsEnabled }
     )
 
     let videoPublished = false
 
-    if (resolutionsEnabled.length !== 0) {
-      const tasks: (Bluebird<Bull.Job<any>> | Promise<Bull.Job<any>>)[] = []
+    // Generate HLS version of the max quality file
+    const hlsPayload = Object.assign({}, payload, { resolution: videoDatabase.getMaxQualityFile().resolution })
+    await createHlsJobIfEnabled(hlsPayload)
 
+    if (resolutionsEnabled.length !== 0) {
       for (const resolution of resolutionsEnabled) {
-        const dataInput = {
-          videoUUID: videoDatabase.uuid,
-          resolution
+        let dataInput: VideoTranscodingPayload
+
+        if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED) {
+          dataInput = {
+            type: 'new-resolution' as 'new-resolution',
+            videoUUID: videoDatabase.uuid,
+            resolution,
+            isPortraitMode
+          }
+        } else if (CONFIG.TRANSCODING.HLS.ENABLED) {
+          dataInput = {
+            type: 'hls',
+            videoUUID: videoDatabase.uuid,
+            resolution,
+            isPortraitMode,
+            copyCodecs: false
+          }
         }
 
-        const p = JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
-        tasks.push(p)
+        JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput })
       }
 
-      await Promise.all(tasks)
-
       logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
     } else {
-      videoPublished = true
-
       // No transcoding to do, it's now published
-      videoDatabase.state = VideoState.PUBLISHED
-      videoDatabase = await videoDatabase.save({ transaction: t })
+      videoPublished = await videoDatabase.publishIfNeededAndSave(t)
 
       logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy })
     }
@@ -170,35 +133,52 @@ async function onVideoFileOptimizerSuccess (videoArg: VideoModel, payload: Video
     return { videoDatabase, videoPublished }
   })
 
-  // don't notify prior to scheduled video update
-  if (!videoDatabase.ScheduleVideoUpdate) {
-    if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase)
-    if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase)
-  }
-
-  await createHlsJobIfEnabled(Object.assign({}, payload, { resolution: videoDatabase.getOriginalFile().resolution }))
+  if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase)
+  if (videoPublished) Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
 }
 
 // ---------------------------------------------------------------------------
 
 export {
   processVideoTranscoding,
-  processVideoFileImport
+  publishNewResolutionIfNeeded
 }
 
 // ---------------------------------------------------------------------------
 
-function createHlsJobIfEnabled (payload?: VideoTranscodingPayload) {
+function createHlsJobIfEnabled (payload?: { videoUUID: string, resolution: number, isPortraitMode?: boolean }) {
   // Generate HLS playlist?
   if (payload && CONFIG.TRANSCODING.HLS.ENABLED) {
     const hlsTranscodingPayload = {
+      type: 'hls' as 'hls',
       videoUUID: payload.videoUUID,
       resolution: payload.resolution,
       isPortraitMode: payload.isPortraitMode,
-
-      generateHlsPlaylist: true
+      copyCodecs: true
     }
 
     return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload })
   }
 }
+
+async function publishAndFederateIfNeeded (video: MVideoUUID) {
+  const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => {
+    // Maybe the video changed in database, refresh it
+    const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
+    // Video does not exist anymore
+    if (!videoDatabase) return undefined
+
+    // We transcoded the video file in another format, now we can publish it
+    const videoPublished = await videoDatabase.publishIfNeededAndSave(t)
+
+    // If the video was not published, we consider it is a new one for other instances
+    await federateVideoIfNeeded(videoDatabase, videoPublished, t)
+
+    return { videoDatabase, videoPublished }
+  })
+
+  if (videoPublished) {
+    Notifier.Instance.notifyOnNewVideoIfNeeded(videoDatabase)
+    Notifier.Instance.notifyOnVideoPublishedAfterTranscoding(videoDatabase)
+  }
+}