From 937581b8f61fe82fdac044e11740b9a4cb6d96b0 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 30 Nov 2020 15:59:22 +0100 Subject: Fix high CPU with long live when save replay is true --- server/helpers/ffmpeg-utils.ts | 23 ++++++---------- server/initializers/constants.ts | 1 + server/lib/job-queue/handlers/video-live-ending.ts | 32 +++++++++++++++++----- server/lib/live-manager.ts | 28 +++++++++++++------ 4 files changed, 55 insertions(+), 29 deletions(-) diff --git a/server/helpers/ffmpeg-utils.ts b/server/helpers/ffmpeg-utils.ts index 7c997877c..085635b5a 100644 --- a/server/helpers/ffmpeg-utils.ts +++ b/server/helpers/ffmpeg-utils.ts @@ -190,12 +190,11 @@ async function getLiveTranscodingCommand (options: { outPath: string resolutions: number[] fps: number - deleteSegments: boolean availableEncoders: AvailableEncoders profile: string }) { - const { rtmpUrl, outPath, resolutions, fps, deleteSegments, availableEncoders, profile } = options + const { rtmpUrl, outPath, resolutions, fps, availableEncoders, profile } = options const input = rtmpUrl const command = getFFmpeg(input) @@ -272,14 +271,14 @@ async function getLiveTranscodingCommand (options: { varStreamMap.push(`v:${i},a:${i}`) } - addDefaultLiveHLSParams(command, outPath, deleteSegments) + addDefaultLiveHLSParams(command, outPath) command.outputOption('-var_stream_map', varStreamMap.join(' ')) return command } -function getLiveMuxingCommand (rtmpUrl: string, outPath: string, deleteSegments: boolean) { +function getLiveMuxingCommand (rtmpUrl: string, outPath: string) { const command = getFFmpeg(rtmpUrl) command.inputOption('-fflags nobuffer') @@ -288,17 +287,17 @@ function getLiveMuxingCommand (rtmpUrl: string, outPath: string, deleteSegments: command.outputOption('-map 0:a?') command.outputOption('-map 0:v?') - addDefaultLiveHLSParams(command, outPath, deleteSegments) + addDefaultLiveHLSParams(command, outPath) return command } -async function hlsPlaylistToFragmentedMP4 (hlsDirectory: string, segmentFiles: string[], outputPath: string) { - const concatFilePath = join(hlsDirectory, 'concat.txt') +async function hlsPlaylistToFragmentedMP4 (replayDirectory: string, segmentFiles: string[], outputPath: string) { + const concatFilePath = join(replayDirectory, 'concat.txt') function cleaner () { remove(concatFilePath) - .catch(err => logger.error('Cannot remove concat file in %s.', hlsDirectory, { err })) + .catch(err => logger.error('Cannot remove concat file in %s.', replayDirectory, { err })) } // First concat the ts files to a mp4 file @@ -385,14 +384,10 @@ function addDefaultEncoderParams (options: { } } -function addDefaultLiveHLSParams (command: ffmpeg.FfmpegCommand, outPath: string, deleteSegments: boolean) { +function addDefaultLiveHLSParams (command: ffmpeg.FfmpegCommand, outPath: string) { command.outputOption('-hls_time ' + VIDEO_LIVE.SEGMENT_TIME_SECONDS) command.outputOption('-hls_list_size ' + VIDEO_LIVE.SEGMENTS_LIST_SIZE) - - if (deleteSegments === true) { - command.outputOption('-hls_flags delete_segments') - } - + command.outputOption('-hls_flags delete_segments') command.outputOption(`-hls_segment_filename ${join(outPath, '%v-%06d.ts')}`) command.outputOption('-master_pl_name master.m3u8') command.outputOption(`-f hls`) diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 6c44d703e..da837837e 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -634,6 +634,7 @@ const VIDEO_LIVE = { CLEANUP_DELAY: 1000 * 60 * 5, // 5 minutes SEGMENT_TIME_SECONDS: 4, // 4 seconds SEGMENTS_LIST_SIZE: 15, // 15 maximum segments in live playlist + REPLAY_DIRECTORY: 'replay', EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION: 4, RTMP: { CHUNK_SIZE: 60000, diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 447744224..0d2bcaa28 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -1,5 +1,5 @@ import * as Bull from 'bull' -import { readdir, remove } from 'fs-extra' +import { move, readdir, remove } from 'fs-extra' import { join } from 'path' import { hlsPlaylistToFragmentedMP4 } from '@server/helpers/ffmpeg-utils' import { getDurationFromVideoFile, getVideoFileResolution } from '@server/helpers/ffprobe-utils' @@ -14,6 +14,7 @@ import { VideoStreamingPlaylistModel } from '@server/models/video/video-streamin import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models' import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' import { logger } from '../../../helpers/logger' +import { VIDEO_LIVE } from '@server/initializers/constants' async function processVideoLiveEnding (job: Bull.Job) { const payload = job.data as VideoLiveEndingPayload @@ -53,24 +54,40 @@ export { async function saveLive (video: MVideo, live: MVideoLive) { const hlsDirectory = getHLSDirectory(video, false) - const files = await readdir(hlsDirectory) + const replayDirectory = join(hlsDirectory, VIDEO_LIVE.REPLAY_DIRECTORY) + + const rootFiles = await readdir(hlsDirectory) + + const playlistFiles: string[] = [] + + for (const file of rootFiles) { + if (file.endsWith('.m3u8') !== true) continue + + await move(join(hlsDirectory, file), join(replayDirectory, file)) + + if (file !== 'master.m3u8') { + playlistFiles.push(file) + } + } + + const replayFiles = await readdir(replayDirectory) - const playlistFiles = files.filter(f => f.endsWith('.m3u8') && f !== 'master.m3u8') const resolutions: number[] = [] let duration: number for (const playlistFile of playlistFiles) { - const playlistPath = join(hlsDirectory, playlistFile) + const playlistPath = join(replayDirectory, playlistFile) const { videoFileResolution } = await getVideoFileResolution(playlistPath) + // Put the final mp4 in the hls directory, and not in the replay directory const mp4TmpPath = buildMP4TmpPath(hlsDirectory, videoFileResolution) // Playlist name is for example 3.m3u8 // Segments names are 3-0.ts 3-1.ts etc const shouldStartWith = playlistFile.replace(/\.m3u8$/, '') + '-' - const segmentFiles = files.filter(f => f.startsWith(shouldStartWith) && f.endsWith('.ts')) - await hlsPlaylistToFragmentedMP4(hlsDirectory, segmentFiles, mp4TmpPath) + const segmentFiles = replayFiles.filter(f => f.startsWith(shouldStartWith) && f.endsWith('.ts')) + await hlsPlaylistToFragmentedMP4(replayDirectory, segmentFiles, mp4TmpPath) if (!duration) { duration = await getDurationFromVideoFile(mp4TmpPath) @@ -143,7 +160,8 @@ async function cleanupLiveFiles (hlsDirectory: string) { filename.endsWith('.m3u8') || filename.endsWith('.mpd') || filename.endsWith('.m4s') || - filename.endsWith('.tmp') + filename.endsWith('.tmp') || + filename === VIDEO_LIVE.REPLAY_DIRECTORY ) { const p = join(hlsDirectory, filename) diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index d63e79dfc..d201465fa 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts @@ -1,8 +1,8 @@ import * as chokidar from 'chokidar' import { FfmpegCommand } from 'fluent-ffmpeg' -import { ensureDir, stat } from 'fs-extra' -import { basename } from 'path' +import { copy, ensureDir, stat } from 'fs-extra' +import { basename, join } from 'path' import { isTestInstance } from '@server/helpers/core-utils' import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils' import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils' @@ -25,6 +25,7 @@ import { getHLSDirectory } from './video-paths' import { availableEncoders } from './video-transcoding-profiles' import memoizee = require('memoizee') +import { mkdir } from 'fs' const NodeRtmpServer = require('node-media-server/node_rtmp_server') const context = require('node-media-server/node_core_ctx') const nodeMediaServerLogger = require('node-media-server/node_core_logger') @@ -261,8 +262,13 @@ class LiveManager { const outPath = getHLSDirectory(videoLive.Video) await ensureDir(outPath) + const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY) + + if (videoLive.saveReplay === true) { + await ensureDir(replayDirectory) + } + const videoUUID = videoLive.Video.uuid - const deleteSegments = videoLive.saveReplay === false const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED ? await getLiveTranscodingCommand({ @@ -270,11 +276,10 @@ class LiveManager { outPath, resolutions: allResolutions, fps, - deleteSegments, availableEncoders, profile: 'default' }) - : getLiveMuxingCommand(rtmpUrl, outPath, deleteSegments) + : getLiveMuxingCommand(rtmpUrl, outPath) logger.info('Running live muxing/transcoding for %s.', videoUUID) this.transSessions.set(sessionId, ffmpegExec) @@ -284,11 +289,18 @@ class LiveManager { const segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} const playlistIdMatcher = /^([\d+])-/ - const processHashSegments = (segmentsToProcess: string[]) => { + const processSegments = (segmentsToProcess: string[]) => { // Add sha hash of previous segments, because ffmpeg should have finished generating them for (const previousSegment of segmentsToProcess) { this.addSegmentSha(videoUUID, previousSegment) .catch(err => logger.error('Cannot add sha segment of video %s -> %s.', videoUUID, previousSegment, { err })) + + if (videoLive.saveReplay) { + const segmentName = basename(previousSegment) + + copy(previousSegment, join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY, segmentName)) + .catch(err => logger.error('Cannot copy segment %s to repay directory.', previousSegment, { err })) + } } } @@ -298,7 +310,7 @@ class LiveManager { const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] const segmentsToProcess = segmentsToProcessPerPlaylist[playlistId] || [] - processHashSegments(segmentsToProcess) + processSegments(segmentsToProcess) segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] @@ -369,7 +381,7 @@ class LiveManager { .then(() => { // Process remaining segments hash for (const key of Object.keys(segmentsToProcessPerPlaylist)) { - processHashSegments(segmentsToProcessPerPlaylist[key]) + processSegments(segmentsToProcessPerPlaylist[key]) } }) .catch(err => logger.error('Cannot close watchers of %s or process remaining hash segments.', outPath, { err })) -- cgit v1.2.3