]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Fix high CPU with long live when save replay is true
authorChocobozzz <me@florianbigard.com>
Mon, 30 Nov 2020 14:59:22 +0000 (15:59 +0100)
committerChocobozzz <me@florianbigard.com>
Mon, 30 Nov 2020 14:59:22 +0000 (15:59 +0100)
server/helpers/ffmpeg-utils.ts
server/initializers/constants.ts
server/lib/job-queue/handlers/video-live-ending.ts
server/lib/live-manager.ts

index 7c997877cff112a07d4caad0b3cc06d771637164..085635b5a40c65b4231692038ebf3625cc22c3f3 100644 (file)
@@ -190,12 +190,11 @@ async function getLiveTranscodingCommand (options: {
   outPath: string
   resolutions: number[]
   fps: number
-  deleteSegments: boolean
 
   availableEncoders: AvailableEncoders
   profile: string
 }) {
-  const { rtmpUrl, outPath, resolutions, fps, deleteSegments, availableEncoders, profile } = options
+  const { rtmpUrl, outPath, resolutions, fps, availableEncoders, profile } = options
   const input = rtmpUrl
 
   const command = getFFmpeg(input)
@@ -272,14 +271,14 @@ async function getLiveTranscodingCommand (options: {
     varStreamMap.push(`v:${i},a:${i}`)
   }
 
-  addDefaultLiveHLSParams(command, outPath, deleteSegments)
+  addDefaultLiveHLSParams(command, outPath)
 
   command.outputOption('-var_stream_map', varStreamMap.join(' '))
 
   return command
 }
 
-function getLiveMuxingCommand (rtmpUrl: string, outPath: string, deleteSegments: boolean) {
+function getLiveMuxingCommand (rtmpUrl: string, outPath: string) {
   const command = getFFmpeg(rtmpUrl)
   command.inputOption('-fflags nobuffer')
 
@@ -288,17 +287,17 @@ function getLiveMuxingCommand (rtmpUrl: string, outPath: string, deleteSegments:
   command.outputOption('-map 0:a?')
   command.outputOption('-map 0:v?')
 
-  addDefaultLiveHLSParams(command, outPath, deleteSegments)
+  addDefaultLiveHLSParams(command, outPath)
 
   return command
 }
 
-async function hlsPlaylistToFragmentedMP4 (hlsDirectory: string, segmentFiles: string[], outputPath: string) {
-  const concatFilePath = join(hlsDirectory, 'concat.txt')
+async function hlsPlaylistToFragmentedMP4 (replayDirectory: string, segmentFiles: string[], outputPath: string) {
+  const concatFilePath = join(replayDirectory, 'concat.txt')
 
   function cleaner () {
     remove(concatFilePath)
-      .catch(err => logger.error('Cannot remove concat file in %s.', hlsDirectory, { err }))
+      .catch(err => logger.error('Cannot remove concat file in %s.', replayDirectory, { err }))
   }
 
   // First concat the ts files to a mp4 file
@@ -385,14 +384,10 @@ function addDefaultEncoderParams (options: {
   }
 }
 
-function addDefaultLiveHLSParams (command: ffmpeg.FfmpegCommand, outPath: string, deleteSegments: boolean) {
+function addDefaultLiveHLSParams (command: ffmpeg.FfmpegCommand, outPath: string) {
   command.outputOption('-hls_time ' + VIDEO_LIVE.SEGMENT_TIME_SECONDS)
   command.outputOption('-hls_list_size ' + VIDEO_LIVE.SEGMENTS_LIST_SIZE)
-
-  if (deleteSegments === true) {
-    command.outputOption('-hls_flags delete_segments')
-  }
-
+  command.outputOption('-hls_flags delete_segments')
   command.outputOption(`-hls_segment_filename ${join(outPath, '%v-%06d.ts')}`)
   command.outputOption('-master_pl_name master.m3u8')
   command.outputOption(`-f hls`)
index 6c44d703e78813510ea410ccd596b540f2cc4c73..da837837e9dcc344c8e4bc8af8dfa283a6e28314 100644 (file)
@@ -634,6 +634,7 @@ const VIDEO_LIVE = {
   CLEANUP_DELAY: 1000 * 60 * 5, // 5 minutes
   SEGMENT_TIME_SECONDS: 4, // 4 seconds
   SEGMENTS_LIST_SIZE: 15, // 15 maximum segments in live playlist
+  REPLAY_DIRECTORY: 'replay',
   EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION: 4,
   RTMP: {
     CHUNK_SIZE: 60000,
index 447744224bea38e224dafdf087974659eea528a6..0d2bcaa28150f65a7808d6e51728276b60ed0966 100644 (file)
@@ -1,5 +1,5 @@
 import * as Bull from 'bull'
-import { readdir, remove } from 'fs-extra'
+import { move, readdir, remove } from 'fs-extra'
 import { join } from 'path'
 import { hlsPlaylistToFragmentedMP4 } from '@server/helpers/ffmpeg-utils'
 import { getDurationFromVideoFile, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
@@ -14,6 +14,7 @@ import { VideoStreamingPlaylistModel } from '@server/models/video/video-streamin
 import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models'
 import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
 import { logger } from '../../../helpers/logger'
+import { VIDEO_LIVE } from '@server/initializers/constants'
 
 async function processVideoLiveEnding (job: Bull.Job) {
   const payload = job.data as VideoLiveEndingPayload
@@ -53,24 +54,40 @@ export {
 
 async function saveLive (video: MVideo, live: MVideoLive) {
   const hlsDirectory = getHLSDirectory(video, false)
-  const files = await readdir(hlsDirectory)
+  const replayDirectory = join(hlsDirectory, VIDEO_LIVE.REPLAY_DIRECTORY)
+
+  const rootFiles = await readdir(hlsDirectory)
+
+  const playlistFiles: string[] = []
+
+  for (const file of rootFiles) {
+    if (file.endsWith('.m3u8') !== true) continue
+
+    await move(join(hlsDirectory, file), join(replayDirectory, file))
+
+    if (file !== 'master.m3u8') {
+      playlistFiles.push(file)
+    }
+  }
+
+  const replayFiles = await readdir(replayDirectory)
 
-  const playlistFiles = files.filter(f => f.endsWith('.m3u8') && f !== 'master.m3u8')
   const resolutions: number[] = []
   let duration: number
 
   for (const playlistFile of playlistFiles) {
-    const playlistPath = join(hlsDirectory, playlistFile)
+    const playlistPath = join(replayDirectory, playlistFile)
     const { videoFileResolution } = await getVideoFileResolution(playlistPath)
 
+    // Put the final mp4 in the hls directory, and not in the replay directory
     const mp4TmpPath = buildMP4TmpPath(hlsDirectory, videoFileResolution)
 
     // Playlist name is for example 3.m3u8
     // Segments names are 3-0.ts 3-1.ts etc
     const shouldStartWith = playlistFile.replace(/\.m3u8$/, '') + '-'
 
-    const segmentFiles = files.filter(f => f.startsWith(shouldStartWith) && f.endsWith('.ts'))
-    await hlsPlaylistToFragmentedMP4(hlsDirectory, segmentFiles, mp4TmpPath)
+    const segmentFiles = replayFiles.filter(f => f.startsWith(shouldStartWith) && f.endsWith('.ts'))
+    await hlsPlaylistToFragmentedMP4(replayDirectory, segmentFiles, mp4TmpPath)
 
     if (!duration) {
       duration = await getDurationFromVideoFile(mp4TmpPath)
@@ -143,7 +160,8 @@ async function cleanupLiveFiles (hlsDirectory: string) {
       filename.endsWith('.m3u8') ||
       filename.endsWith('.mpd') ||
       filename.endsWith('.m4s') ||
-      filename.endsWith('.tmp')
+      filename.endsWith('.tmp') ||
+      filename === VIDEO_LIVE.REPLAY_DIRECTORY
     ) {
       const p = join(hlsDirectory, filename)
 
index d63e79dfc6edb8055aa98ba8bfb9ee32e4127840..d201465fa035cf267290bb166a43c013a240b838 100644 (file)
@@ -1,8 +1,8 @@
 
 import * as chokidar from 'chokidar'
 import { FfmpegCommand } from 'fluent-ffmpeg'
-import { ensureDir, stat } from 'fs-extra'
-import { basename } from 'path'
+import { copy, ensureDir, stat } from 'fs-extra'
+import { basename, join } from 'path'
 import { isTestInstance } from '@server/helpers/core-utils'
 import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
 import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
@@ -25,6 +25,7 @@ import { getHLSDirectory } from './video-paths'
 import { availableEncoders } from './video-transcoding-profiles'
 
 import memoizee = require('memoizee')
+import { mkdir } from 'fs'
 const NodeRtmpServer = require('node-media-server/node_rtmp_server')
 const context = require('node-media-server/node_core_ctx')
 const nodeMediaServerLogger = require('node-media-server/node_core_logger')
@@ -261,8 +262,13 @@ class LiveManager {
     const outPath = getHLSDirectory(videoLive.Video)
     await ensureDir(outPath)
 
+    const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY)
+
+    if (videoLive.saveReplay === true) {
+      await ensureDir(replayDirectory)
+    }
+
     const videoUUID = videoLive.Video.uuid
-    const deleteSegments = videoLive.saveReplay === false
 
     const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED
       ? await getLiveTranscodingCommand({
@@ -270,11 +276,10 @@ class LiveManager {
         outPath,
         resolutions: allResolutions,
         fps,
-        deleteSegments,
         availableEncoders,
         profile: 'default'
       })
-      : getLiveMuxingCommand(rtmpUrl, outPath, deleteSegments)
+      : getLiveMuxingCommand(rtmpUrl, outPath)
 
     logger.info('Running live muxing/transcoding for %s.', videoUUID)
     this.transSessions.set(sessionId, ffmpegExec)
@@ -284,11 +289,18 @@ class LiveManager {
     const segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
     const playlistIdMatcher = /^([\d+])-/
 
-    const processHashSegments = (segmentsToProcess: string[]) => {
+    const processSegments = (segmentsToProcess: string[]) => {
       // Add sha hash of previous segments, because ffmpeg should have finished generating them
       for (const previousSegment of segmentsToProcess) {
         this.addSegmentSha(videoUUID, previousSegment)
           .catch(err => logger.error('Cannot add sha segment of video %s -> %s.', videoUUID, previousSegment, { err }))
+
+        if (videoLive.saveReplay) {
+          const segmentName = basename(previousSegment)
+
+          copy(previousSegment, join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY, segmentName))
+            .catch(err => logger.error('Cannot copy segment %s to repay directory.', previousSegment, { err }))
+        }
       }
     }
 
@@ -298,7 +310,7 @@ class LiveManager {
       const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
 
       const segmentsToProcess = segmentsToProcessPerPlaylist[playlistId] || []
-      processHashSegments(segmentsToProcess)
+      processSegments(segmentsToProcess)
 
       segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
 
@@ -369,7 +381,7 @@ class LiveManager {
         .then(() => {
           // Process remaining segments hash
           for (const key of Object.keys(segmentsToProcessPerPlaylist)) {
-            processHashSegments(segmentsToProcessPerPlaylist[key])
+            processSegments(segmentsToProcessPerPlaylist[key])
           }
         })
         .catch(err => logger.error('Cannot close watchers of %s or process remaining hash segments.', outPath, { err }))