From 4ec52d04dcc5d664612331f8e08d7d90da990415 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 21 Apr 2022 09:06:52 +0200 Subject: Add ability to save replay of permanent lives --- server/lib/live/live-manager.ts | 69 +++++++++++++++++++++----------- server/lib/live/live-utils.ts | 4 +- server/lib/live/shared/muxing-session.ts | 33 +++++++-------- 3 files changed, 64 insertions(+), 42 deletions(-) (limited to 'server/lib/live') diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 920d3a5ec..5ffe41ee3 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -1,6 +1,7 @@ -import { readFile } from 'fs-extra' +import { readdir, readFile } from 'fs-extra' import { createServer, Server } from 'net' +import { join } from 'path' import { createServer as createServerTLS, Server as ServerTLS } from 'tls' import { computeLowerResolutionsToTranscode, @@ -18,10 +19,11 @@ import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' +import { wait } from '@shared/core-utils' import { VideoState, VideoStreamingPlaylistType } from '@shared/models' import { federateVideoIfNeeded } from '../activitypub/videos' import { JobQueue } from '../job-queue' -import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../paths' +import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' import { PeerTubeSocket } from '../peertube-socket' import { LiveQuotaStore } from './live-quota-store' import { LiveSegmentShaStore } from './live-segment-sha-store' @@ -322,7 +324,7 @@ class LiveManager { muxingSession.destroy() - return this.onAfterMuxingCleanup(videoId) + return this.onAfterMuxingCleanup({ videoId }) .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) }) @@ -349,12 +351,15 @@ class LiveManager { live.Video = video - setTimeout(() => { - federateVideoIfNeeded(video, false) - .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })) + await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) - PeerTubeSocket.Instance.sendVideoLiveNewState(video) - }, getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) + try { + await federateVideoIfNeeded(video, false) + } catch (err) { + logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags }) + } + + PeerTubeSocket.Instance.sendVideoLiveNewState(video) } catch (err) { logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags }) } @@ -364,25 +369,32 @@ class LiveManager { this.videoSessions.delete(videoId) } - private async onAfterMuxingCleanup (videoUUID: string, cleanupNow = false) { + private async onAfterMuxingCleanup (options: { + videoId: number | string + cleanupNow?: boolean // Default false + }) { + const { videoId, cleanupNow = false } = options + try { - const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID) + const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) if (!fullVideo) return const live = await VideoLiveModel.loadByVideoId(fullVideo.id) - if (!live.permanentLive) { - JobQueue.Instance.createJob({ - type: 'video-live-ending', - payload: { - videoId: fullVideo.id - } - }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) - - fullVideo.state = VideoState.LIVE_ENDED - } else { - fullVideo.state = VideoState.WAITING_FOR_LIVE - } + JobQueue.Instance.createJob({ + type: 'video-live-ending', + payload: { + videoId: fullVideo.id, + replayDirectory: live.saveReplay + ? await this.findReplayDirectory(fullVideo) + : undefined, + publishedAt: fullVideo.publishedAt.toISOString() + } + }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) + + fullVideo.state = live.permanentLive + ? VideoState.WAITING_FOR_LIVE + : VideoState.LIVE_ENDED await fullVideo.save() @@ -390,7 +402,7 @@ class LiveManager { await federateVideoIfNeeded(fullVideo, false) } catch (err) { - logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) }) + logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') }) } } @@ -398,10 +410,19 @@ class LiveManager { const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() for (const uuid of videoUUIDs) { - await this.onAfterMuxingCleanup(uuid, true) + await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true }) } } + private async findReplayDirectory (video: MVideo) { + const directory = getLiveReplayBaseDirectory(video) + const files = await readdir(directory) + + if (files.length === 0) return undefined + + return join(directory, files.sort().reverse()[0]) + } + private buildAllResolutionsToTranscode (originResolution: number) { const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED ? computeLowerResolutionsToTranscode(originResolution, 'live') diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts index 3bf723b98..46c7fd2f8 100644 --- a/server/lib/live/live-utils.ts +++ b/server/lib/live/live-utils.ts @@ -9,12 +9,12 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) { return 'concat-' + num[1] + '.ts' } -async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { +async function cleanupLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) { const hlsDirectory = getLiveDirectory(video) await remove(hlsDirectory) - await streamingPlaylist.destroy() + if (streamingPlaylist) await streamingPlaylist.destroy() } export { diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index a703f5b5f..588ee8749 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -11,7 +11,7 @@ import { CONFIG } from '@server/initializers/config' import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' import { VideoFileModel } from '@server/models/video/video-file' import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' -import { getLiveDirectory } from '../../paths' +import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' import { isAbleToUploadVideo } from '../../user' import { LiveQuotaStore } from '../live-quota-store' @@ -63,6 +63,9 @@ class MuxingSession extends EventEmitter { private readonly videoUUID: string private readonly saveReplay: boolean + private readonly outDirectory: string + private readonly replayDirectory: string + private readonly lTags: LoggerTagsFn private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} @@ -110,19 +113,22 @@ class MuxingSession extends EventEmitter { this.saveReplay = this.videoLive.saveReplay + this.outDirectory = getLiveDirectory(this.videoLive.Video) + this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) + this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) } async runMuxing () { this.createFiles() - const outPath = await this.prepareDirectories() + await this.prepareDirectories() this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED ? await getLiveTranscodingCommand({ inputUrl: this.inputUrl, - outPath, + outPath: this.outDirectory, masterPlaylistName: this.streamingPlaylist.playlistFilename, latencyMode: this.videoLive.latencyMode, @@ -137,15 +143,15 @@ class MuxingSession extends EventEmitter { }) : getLiveMuxingCommand({ inputUrl: this.inputUrl, - outPath, + outPath: this.outDirectory, masterPlaylistName: this.streamingPlaylist.playlistFilename, latencyMode: this.videoLive.latencyMode }) logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) - this.watchTSFiles(outPath) - this.watchMasterFile(outPath) + this.watchTSFiles(this.outDirectory) + this.watchMasterFile(this.outDirectory) let ffmpegShellCommand: string this.ffmpegCommand.on('start', cmdline => { @@ -155,10 +161,10 @@ class MuxingSession extends EventEmitter { }) this.ffmpegCommand.on('error', (err, stdout, stderr) => { - this.onFFmpegError({ err, stdout, stderr, outPath, ffmpegShellCommand }) + this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand }) }) - this.ffmpegCommand.on('end', () => this.onFFmpegEnded(outPath)) + this.ffmpegCommand.on('end', () => this.onFFmpegEnded(this.outDirectory)) this.ffmpegCommand.run() } @@ -304,16 +310,11 @@ class MuxingSession extends EventEmitter { } private async prepareDirectories () { - const outPath = getLiveDirectory(this.videoLive.Video) - await ensureDir(outPath) - - const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY) + await ensureDir(this.outDirectory) if (this.videoLive.saveReplay === true) { - await ensureDir(replayDirectory) + await ensureDir(this.replayDirectory) } - - return outPath } private isDurationConstraintValid (streamingStartTime: number) { @@ -364,7 +365,7 @@ class MuxingSession extends EventEmitter { private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) { const segmentName = basename(segmentPath) - const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, buildConcatenatedName(segmentName)) + const dest = join(this.replayDirectory, buildConcatenatedName(segmentName)) try { const data = await readFile(segmentPath) -- cgit v1.2.3