From 5333788c08ab6152303829d4624774b5d788ff40 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 25 May 2022 14:54:16 +0200 Subject: Fix saving permanent live replay on quick restream --- server/lib/job-queue/handlers/video-live-ending.ts | 51 +++++++++------------- server/lib/live/live-manager.ts | 6 ++- server/lib/live/live-utils.ts | 40 +++++++++++++++-- server/tests/api/live/live-save-replay.ts | 34 +++++++++++++++ server/tests/shared/live.ts | 24 +++++++++- 5 files changed, 116 insertions(+), 39 deletions(-) diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 55fd09344..79aa547ba 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -1,10 +1,10 @@ import { Job } from 'bull' -import { pathExists, readdir, remove } from 'fs-extra' +import { readdir, remove } from 'fs-extra' import { join } from 'path' import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg' import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' -import { cleanupLive, LiveSegmentShaStore } from '@server/lib/live' +import { cleanupNormalLive, cleanupPermanentLive, cleanupTMPLiveFiles, LiveSegmentShaStore } from '@server/lib/live' import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, @@ -45,13 +45,13 @@ async function processVideoLiveEnding (job: Job) { LiveSegmentShaStore.Instance.cleanupShaSegments(liveVideo.uuid) if (live.saveReplay !== true) { - return cleanupLiveAndFederate({ liveVideo }) + return cleanupLiveAndFederate({ live, video: liveVideo }) } if (live.permanentLive) { await saveReplayToExternalVideo({ liveVideo, liveSession, publishedAt: payload.publishedAt, replayDirectory: payload.replayDirectory }) - return cleanupLiveAndFederate({ liveVideo }) + return cleanupLiveAndFederate({ live, video: liveVideo }) } return replaceLiveByReplay({ liveVideo, live, liveSession, replayDirectory: payload.replayDirectory }) @@ -164,7 +164,11 @@ async function replaceLiveByReplay (options: { await assignReplayFilesToVideo({ video: videoWithFiles, replayDirectory }) - await remove(getLiveReplayBaseDirectory(videoWithFiles)) + if (live.permanentLive) { // Remove session replay + await remove(replayDirectory) + } else { // We won't stream again in this live, we can delete the base replay directory + await remove(getLiveReplayBaseDirectory(videoWithFiles)) + } // Regenerate the thumbnail & preview? if (videoWithFiles.getMiniature().automaticallyGenerated === true) { @@ -227,34 +231,19 @@ async function assignReplayFilesToVideo (options: { } async function cleanupLiveAndFederate (options: { - liveVideo: MVideo + live: MVideoLive + video: MVideo }) { - const { liveVideo } = options - - const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(liveVideo.id) - await cleanupLive(liveVideo, streamingPlaylist) - - const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(liveVideo.id) - return federateVideoIfNeeded(fullVideo, false, undefined) -} - -async function cleanupTMPLiveFiles (hlsDirectory: string) { - if (!await pathExists(hlsDirectory)) return + const { live, video } = options - const files = await readdir(hlsDirectory) + const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) - for (const filename of files) { - if ( - filename.endsWith('.ts') || - filename.endsWith('.m3u8') || - filename.endsWith('.mpd') || - filename.endsWith('.m4s') || - filename.endsWith('.tmp') - ) { - const p = join(hlsDirectory, filename) - - remove(p) - .catch(err => logger.error('Cannot remove %s.', p, { err })) - } + if (live.permanentLive) { + await cleanupPermanentLive(video, streamingPlaylist) + } else { + await cleanupNormalLive(video, streamingPlaylist) } + + const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.id) + return federateVideoIfNeeded(fullVideo, false, undefined) } diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index e04ae9fef..0f14a6851 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -28,7 +28,7 @@ import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, g import { PeerTubeSocket } from '../peertube-socket' import { LiveQuotaStore } from './live-quota-store' import { LiveSegmentShaStore } from './live-segment-sha-store' -import { cleanupLive } from './live-utils' +import { cleanupPermanentLive } from './live-utils' import { MuxingSession } from './shared' const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') @@ -224,7 +224,9 @@ class LiveManager { const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) if (oldStreamingPlaylist) { - await cleanupLive(video, oldStreamingPlaylist) + if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) + + await cleanupPermanentLive(video, oldStreamingPlaylist) } this.videoSessions.set(video.id, sessionId) diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts index 46c7fd2f8..6365e23db 100644 --- a/server/lib/live/live-utils.ts +++ b/server/lib/live/live-utils.ts @@ -1,5 +1,6 @@ -import { remove } from 'fs-extra' -import { basename } from 'path' +import { pathExists, readdir, remove } from 'fs-extra' +import { basename, join } from 'path' +import { logger } from '@server/helpers/logger' import { MStreamingPlaylist, MVideo } from '@server/types/models' import { getLiveDirectory } from '../paths' @@ -9,7 +10,15 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) { return 'concat-' + num[1] + '.ts' } -async function cleanupLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) { +async function cleanupPermanentLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) { + const hlsDirectory = getLiveDirectory(video) + + await cleanupTMPLiveFiles(hlsDirectory) + + if (streamingPlaylist) await streamingPlaylist.destroy() +} + +async function cleanupNormalLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) { const hlsDirectory = getLiveDirectory(video) await remove(hlsDirectory) @@ -17,7 +26,30 @@ async function cleanupLive (video: MVideo, streamingPlaylist?: MStreamingPlaylis if (streamingPlaylist) await streamingPlaylist.destroy() } +async function cleanupTMPLiveFiles (hlsDirectory: string) { + if (!await pathExists(hlsDirectory)) return + + const files = await readdir(hlsDirectory) + + for (const filename of files) { + if ( + filename.endsWith('.ts') || + filename.endsWith('.m3u8') || + filename.endsWith('.mpd') || + filename.endsWith('.m4s') || + filename.endsWith('.tmp') + ) { + const p = join(hlsDirectory, filename) + + remove(p) + .catch(err => logger.error('Cannot remove %s.', p, { err })) + } + } +} + export { - cleanupLive, + cleanupPermanentLive, + cleanupNormalLive, + cleanupTMPLiveFiles, buildConcatenatedName } diff --git a/server/tests/api/live/live-save-replay.ts b/server/tests/api/live/live-save-replay.ts index 7ddcb04ef..007af51e9 100644 --- a/server/tests/api/live/live-save-replay.ts +++ b/server/tests/api/live/live-save-replay.ts @@ -441,6 +441,40 @@ describe('Save replay setting', function () { await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404) await checkLiveCleanup(servers[0], liveVideoUUID, []) }) + + it('Should correctly save replays with multiple sessions', async function () { + this.timeout(120000) + + liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true }) + await waitJobs(servers) + + // Streaming session #1 + ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID }) + await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID) + await stopFfmpeg(ffmpegCommand) + await servers[0].live.waitUntilWaiting({ videoId: liveVideoUUID }) + + // Streaming session #2 + ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID }) + await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID) + await stopFfmpeg(ffmpegCommand) + await waitUntilLiveWaitingOnAllServers(servers, liveVideoUUID) + + // Wait for replays + await waitJobs(servers) + + const { total, data: sessions } = await servers[0].live.listSessions({ videoId: liveVideoUUID }) + + expect(total).to.equal(2) + expect(sessions).to.have.lengthOf(2) + + for (const session of sessions) { + expect(session.error).to.be.null + expect(session.replayVideo).to.exist + + await servers[0].videos.get({ id: session.replayVideo.uuid }) + } + }) }) after(async function () { diff --git a/server/tests/shared/live.ts b/server/tests/shared/live.ts index 6ee4899b0..4bd4786fc 100644 --- a/server/tests/shared/live.ts +++ b/server/tests/shared/live.ts @@ -3,15 +3,35 @@ import { expect } from 'chai' import { pathExists, readdir } from 'fs-extra' import { join } from 'path' +import { LiveVideo } from '@shared/models' import { PeerTubeServer } from '@shared/server-commands' async function checkLiveCleanup (server: PeerTubeServer, videoUUID: string, savedResolutions: number[] = []) { + let live: LiveVideo + + try { + live = await server.live.get({ videoId: videoUUID }) + } catch {} + const basePath = server.servers.buildDirectory('streaming-playlists') const hlsPath = join(basePath, 'hls', videoUUID) if (savedResolutions.length === 0) { - const result = await pathExists(hlsPath) - expect(result).to.be.false + + if (live?.permanentLive) { + expect(await pathExists(hlsPath)).to.be.true + + const hlsFiles = await readdir(hlsPath) + expect(hlsFiles).to.have.lengthOf(1) // Only replays directory + + const replayDir = join(hlsPath, 'replay') + expect(await pathExists(replayDir)).to.be.true + + const replayFiles = await readdir(join(hlsPath, 'replay')) + expect(replayFiles).to.have.lengthOf(0) + } else { + expect(await pathExists(hlsPath)).to.be.false + } return } -- cgit v1.2.3