From 4ec52d04dcc5d664612331f8e08d7d90da990415 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 21 Apr 2022 09:06:52 +0200 Subject: Add ability to save replay of permanent lives --- server/lib/job-queue/handlers/video-live-ending.ts | 159 +++++++++++++++------ server/lib/live/live-manager.ts | 69 +++++---- server/lib/live/live-utils.ts | 4 +- server/lib/live/shared/muxing-session.ts | 33 ++--- server/lib/paths.ts | 7 +- server/lib/transcoding/transcoding.ts | 10 +- server/lib/video-blacklist.ts | 3 +- server/middlewares/validators/videos/video-live.ts | 6 - server/models/video/video-live.ts | 2 +- server/tests/api/check-params/live.ts | 6 - server/tests/api/live/live-constraints.ts | 4 +- server/tests/api/live/live-permanent.ts | 5 +- server/tests/api/live/live-save-replay.ts | 155 +++++++++++++++++--- server/tests/api/live/live.ts | 4 +- server/tests/api/object-storage/live.ts | 91 +++++++----- server/tests/shared/live.ts | 10 +- 16 files changed, 399 insertions(+), 169 deletions(-) (limited to 'server') diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index f4de4b47c..1e290338c 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -1,25 +1,33 @@ import { Job } from 'bull' import { pathExists, readdir, remove } from 'fs-extra' import { join } from 'path' -import { ffprobePromise, getAudioStream, getVideoStreamDuration, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg' -import { VIDEO_LIVE } from '@server/initializers/constants' -import { buildConcatenatedName, cleanupLive, LiveSegmentShaStore } from '@server/lib/live' -import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveDirectory } from '@server/lib/paths' +import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg' +import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' +import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' +import { cleanupLive, LiveSegmentShaStore } from '@server/lib/live' +import { + generateHLSMasterPlaylistFilename, + generateHlsSha256SegmentsFilename, + getLiveDirectory, + getLiveReplayBaseDirectory +} from '@server/lib/paths' import { generateVideoMiniature } from '@server/lib/thumbnail' import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' -import { VideoPathManager } from '@server/lib/video-path-manager' import { moveToNextState } from '@server/lib/video-state' import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' import { VideoLiveModel } from '@server/models/video/video-live' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' -import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models' +import { MVideo, MVideoLive, MVideoWithAllFiles } from '@server/types/models' import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' import { logger } from '../../../helpers/logger' +import { VideoBlacklistModel } from '@server/models/video/video-blacklist' async function processVideoLiveEnding (job: Job) { const payload = job.data as VideoLiveEndingPayload + logger.info('Processing video live ending for %s.', payload.videoId, { payload }) + function logError () { logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId) } @@ -32,19 +40,19 @@ async function processVideoLiveEnding (job: Job) { return } - const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) - if (!streamingPlaylist) { - logError() - return - } - LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) if (live.saveReplay !== true) { - return cleanupLive(video, streamingPlaylist) + return cleanupLiveAndFederate(video) } - return saveLive(video, live, streamingPlaylist) + if (live.permanentLive) { + await saveReplayToExternalVideo(video, payload.publishedAt, payload.replayDirectory) + + return cleanupLiveAndFederate(video) + } + + return replaceLiveByReplay(video, live, payload.replayDirectory) } // --------------------------------------------------------------------------- @@ -55,22 +63,66 @@ export { // --------------------------------------------------------------------------- -async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MStreamingPlaylist) { - const replayDirectory = VideoPathManager.Instance.getFSHLSOutputPath(video, VIDEO_LIVE.REPLAY_DIRECTORY) +async function saveReplayToExternalVideo (liveVideo: MVideo, publishedAt: string, replayDirectory: string) { + await cleanupTMPLiveFiles(getLiveDirectory(liveVideo)) + + const video = new VideoModel({ + name: `${liveVideo.name} - ${new Date(publishedAt).toLocaleString()}`, + isLive: false, + state: VideoState.TO_TRANSCODE, + duration: 0, + + remote: liveVideo.remote, + category: liveVideo.category, + licence: liveVideo.licence, + language: liveVideo.language, + commentsEnabled: liveVideo.commentsEnabled, + downloadEnabled: liveVideo.downloadEnabled, + waitTranscoding: liveVideo.waitTranscoding, + nsfw: liveVideo.nsfw, + description: liveVideo.description, + support: liveVideo.support, + privacy: liveVideo.privacy, + channelId: liveVideo.channelId + }) as MVideoWithAllFiles + + video.Thumbnails = [] + video.VideoFiles = [] + video.VideoStreamingPlaylists = [] + + video.url = getLocalVideoActivityPubUrl(video) + + await video.save() + + // If live is blacklisted, also blacklist the replay + const blacklist = await VideoBlacklistModel.loadByVideoId(liveVideo.id) + if (blacklist) { + await VideoBlacklistModel.create({ + videoId: video.id, + unfederated: blacklist.unfederated, + reason: blacklist.reason, + type: blacklist.type + }) + } + + await assignReplaysToVideo(video, replayDirectory) - const rootFiles = await readdir(getLiveDirectory(video)) + await remove(replayDirectory) + + for (const type of [ ThumbnailType.MINIATURE, ThumbnailType.PREVIEW ]) { + const image = await generateVideoMiniature({ video, videoFile: video.getMaxQualityFile(), type }) + await video.addAndSaveThumbnail(image) + } - const playlistFiles = rootFiles.filter(file => { - return file.endsWith('.m3u8') && file !== streamingPlaylist.playlistFilename - }) + await moveToNextState({ video, isNewVideo: true }) +} +async function replaceLiveByReplay (video: MVideo, live: MVideoLive, replayDirectory: string) { await cleanupTMPLiveFiles(getLiveDirectory(video)) await live.destroy() video.isLive = false - // Reinit views - video.views = 0 video.state = VideoState.TO_TRANSCODE await video.save() @@ -87,10 +139,38 @@ async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MSt hlsPlaylist.segmentsSha256Filename = generateHlsSha256SegmentsFilename() await hlsPlaylist.save() + await assignReplaysToVideo(videoWithFiles, replayDirectory) + + await remove(getLiveReplayBaseDirectory(videoWithFiles)) + + // Regenerate the thumbnail & preview? + if (videoWithFiles.getMiniature().automaticallyGenerated === true) { + const miniature = await generateVideoMiniature({ + video: videoWithFiles, + videoFile: videoWithFiles.getMaxQualityFile(), + type: ThumbnailType.MINIATURE + }) + await video.addAndSaveThumbnail(miniature) + } + + if (videoWithFiles.getPreview().automaticallyGenerated === true) { + const preview = await generateVideoMiniature({ + video: videoWithFiles, + videoFile: videoWithFiles.getMaxQualityFile(), + type: ThumbnailType.PREVIEW + }) + await video.addAndSaveThumbnail(preview) + } + + await moveToNextState({ video: videoWithFiles, isNewVideo: false }) +} + +async function assignReplaysToVideo (video: MVideo, replayDirectory: string) { let durationDone = false - for (const playlistFile of playlistFiles) { - const concatenatedTsFile = buildConcatenatedName(playlistFile) + const concatenatedTsFiles = await readdir(replayDirectory) + + for (const concatenatedTsFile of concatenatedTsFiles) { const concatenatedTsFilePath = join(replayDirectory, concatenatedTsFile) const probe = await ffprobePromise(concatenatedTsFilePath) @@ -99,7 +179,7 @@ async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MSt const { resolution, isPortraitMode } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) const { resolutionPlaylistPath: outputPath } = await generateHlsPlaylistResolutionFromTS({ - video: videoWithFiles, + video, concatenatedTsFilePath, resolution, isPortraitMode, @@ -107,33 +187,22 @@ async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MSt }) if (!durationDone) { - videoWithFiles.duration = await getVideoStreamDuration(outputPath) - await videoWithFiles.save() + video.duration = await getVideoStreamDuration(outputPath) + await video.save() durationDone = true } } - await remove(replayDirectory) - - // Regenerate the thumbnail & preview? - if (videoWithFiles.getMiniature().automaticallyGenerated === true) { - await generateVideoMiniature({ - video: videoWithFiles, - videoFile: videoWithFiles.getMaxQualityFile(), - type: ThumbnailType.MINIATURE - }) - } + return video +} - if (videoWithFiles.getPreview().automaticallyGenerated === true) { - await generateVideoMiniature({ - video: videoWithFiles, - videoFile: videoWithFiles.getMaxQualityFile(), - type: ThumbnailType.PREVIEW - }) - } +async function cleanupLiveAndFederate (video: MVideo) { + const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) + await cleanupLive(video, streamingPlaylist) - await moveToNextState({ video: videoWithFiles, isNewVideo: false }) + const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.id) + return federateVideoIfNeeded(fullVideo, false, undefined) } async function cleanupTMPLiveFiles (hlsDirectory: string) { diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 920d3a5ec..5ffe41ee3 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -1,6 +1,7 @@ -import { readFile } from 'fs-extra' +import { readdir, readFile } from 'fs-extra' import { createServer, Server } from 'net' +import { join } from 'path' import { createServer as createServerTLS, Server as ServerTLS } from 'tls' import { computeLowerResolutionsToTranscode, @@ -18,10 +19,11 @@ import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' +import { wait } from '@shared/core-utils' import { VideoState, VideoStreamingPlaylistType } from '@shared/models' import { federateVideoIfNeeded } from '../activitypub/videos' import { JobQueue } from '../job-queue' -import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../paths' +import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' import { PeerTubeSocket } from '../peertube-socket' import { LiveQuotaStore } from './live-quota-store' import { LiveSegmentShaStore } from './live-segment-sha-store' @@ -322,7 +324,7 @@ class LiveManager { muxingSession.destroy() - return this.onAfterMuxingCleanup(videoId) + return this.onAfterMuxingCleanup({ videoId }) .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) }) @@ -349,12 +351,15 @@ class LiveManager { live.Video = video - setTimeout(() => { - federateVideoIfNeeded(video, false) - .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })) + await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) - PeerTubeSocket.Instance.sendVideoLiveNewState(video) - }, getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) + try { + await federateVideoIfNeeded(video, false) + } catch (err) { + logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags }) + } + + PeerTubeSocket.Instance.sendVideoLiveNewState(video) } catch (err) { logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags }) } @@ -364,25 +369,32 @@ class LiveManager { this.videoSessions.delete(videoId) } - private async onAfterMuxingCleanup (videoUUID: string, cleanupNow = false) { + private async onAfterMuxingCleanup (options: { + videoId: number | string + cleanupNow?: boolean // Default false + }) { + const { videoId, cleanupNow = false } = options + try { - const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID) + const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) if (!fullVideo) return const live = await VideoLiveModel.loadByVideoId(fullVideo.id) - if (!live.permanentLive) { - JobQueue.Instance.createJob({ - type: 'video-live-ending', - payload: { - videoId: fullVideo.id - } - }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) - - fullVideo.state = VideoState.LIVE_ENDED - } else { - fullVideo.state = VideoState.WAITING_FOR_LIVE - } + JobQueue.Instance.createJob({ + type: 'video-live-ending', + payload: { + videoId: fullVideo.id, + replayDirectory: live.saveReplay + ? await this.findReplayDirectory(fullVideo) + : undefined, + publishedAt: fullVideo.publishedAt.toISOString() + } + }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) + + fullVideo.state = live.permanentLive + ? VideoState.WAITING_FOR_LIVE + : VideoState.LIVE_ENDED await fullVideo.save() @@ -390,7 +402,7 @@ class LiveManager { await federateVideoIfNeeded(fullVideo, false) } catch (err) { - logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) }) + logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') }) } } @@ -398,10 +410,19 @@ class LiveManager { const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() for (const uuid of videoUUIDs) { - await this.onAfterMuxingCleanup(uuid, true) + await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true }) } } + private async findReplayDirectory (video: MVideo) { + const directory = getLiveReplayBaseDirectory(video) + const files = await readdir(directory) + + if (files.length === 0) return undefined + + return join(directory, files.sort().reverse()[0]) + } + private buildAllResolutionsToTranscode (originResolution: number) { const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED ? computeLowerResolutionsToTranscode(originResolution, 'live') diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts index 3bf723b98..46c7fd2f8 100644 --- a/server/lib/live/live-utils.ts +++ b/server/lib/live/live-utils.ts @@ -9,12 +9,12 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) { return 'concat-' + num[1] + '.ts' } -async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { +async function cleanupLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) { const hlsDirectory = getLiveDirectory(video) await remove(hlsDirectory) - await streamingPlaylist.destroy() + if (streamingPlaylist) await streamingPlaylist.destroy() } export { diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index a703f5b5f..588ee8749 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -11,7 +11,7 @@ import { CONFIG } from '@server/initializers/config' import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' import { VideoFileModel } from '@server/models/video/video-file' import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' -import { getLiveDirectory } from '../../paths' +import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' import { isAbleToUploadVideo } from '../../user' import { LiveQuotaStore } from '../live-quota-store' @@ -63,6 +63,9 @@ class MuxingSession extends EventEmitter { private readonly videoUUID: string private readonly saveReplay: boolean + private readonly outDirectory: string + private readonly replayDirectory: string + private readonly lTags: LoggerTagsFn private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} @@ -110,19 +113,22 @@ class MuxingSession extends EventEmitter { this.saveReplay = this.videoLive.saveReplay + this.outDirectory = getLiveDirectory(this.videoLive.Video) + this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) + this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) } async runMuxing () { this.createFiles() - const outPath = await this.prepareDirectories() + await this.prepareDirectories() this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED ? await getLiveTranscodingCommand({ inputUrl: this.inputUrl, - outPath, + outPath: this.outDirectory, masterPlaylistName: this.streamingPlaylist.playlistFilename, latencyMode: this.videoLive.latencyMode, @@ -137,15 +143,15 @@ class MuxingSession extends EventEmitter { }) : getLiveMuxingCommand({ inputUrl: this.inputUrl, - outPath, + outPath: this.outDirectory, masterPlaylistName: this.streamingPlaylist.playlistFilename, latencyMode: this.videoLive.latencyMode }) logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) - this.watchTSFiles(outPath) - this.watchMasterFile(outPath) + this.watchTSFiles(this.outDirectory) + this.watchMasterFile(this.outDirectory) let ffmpegShellCommand: string this.ffmpegCommand.on('start', cmdline => { @@ -155,10 +161,10 @@ class MuxingSession extends EventEmitter { }) this.ffmpegCommand.on('error', (err, stdout, stderr) => { - this.onFFmpegError({ err, stdout, stderr, outPath, ffmpegShellCommand }) + this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand }) }) - this.ffmpegCommand.on('end', () => this.onFFmpegEnded(outPath)) + this.ffmpegCommand.on('end', () => this.onFFmpegEnded(this.outDirectory)) this.ffmpegCommand.run() } @@ -304,16 +310,11 @@ class MuxingSession extends EventEmitter { } private async prepareDirectories () { - const outPath = getLiveDirectory(this.videoLive.Video) - await ensureDir(outPath) - - const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY) + await ensureDir(this.outDirectory) if (this.videoLive.saveReplay === true) { - await ensureDir(replayDirectory) + await ensureDir(this.replayDirectory) } - - return outPath } private isDurationConstraintValid (streamingStartTime: number) { @@ -364,7 +365,7 @@ class MuxingSession extends EventEmitter { private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) { const segmentName = basename(segmentPath) - const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, buildConcatenatedName(segmentName)) + const dest = join(this.replayDirectory, buildConcatenatedName(segmentName)) try { const data = await readFile(segmentPath) diff --git a/server/lib/paths.ts b/server/lib/paths.ts index 5a85bea42..b29854700 100644 --- a/server/lib/paths.ts +++ b/server/lib/paths.ts @@ -1,6 +1,6 @@ import { join } from 'path' import { CONFIG } from '@server/initializers/config' -import { HLS_REDUNDANCY_DIRECTORY, HLS_STREAMING_PLAYLIST_DIRECTORY } from '@server/initializers/constants' +import { HLS_REDUNDANCY_DIRECTORY, HLS_STREAMING_PLAYLIST_DIRECTORY, VIDEO_LIVE } from '@server/initializers/constants' import { isStreamingPlaylist, MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoUUID } from '@server/types/models' import { removeFragmentedMP4Ext } from '@shared/core-utils' import { buildUUID } from '@shared/extra-utils' @@ -21,6 +21,10 @@ function getLiveDirectory (video: MVideoUUID) { return getHLSDirectory(video) } +function getLiveReplayBaseDirectory (video: MVideoUUID) { + return join(getLiveDirectory(video), VIDEO_LIVE.REPLAY_DIRECTORY) +} + function getHLSDirectory (video: MVideoUUID) { return join(HLS_STREAMING_PLAYLIST_DIRECTORY, video.uuid) } @@ -74,6 +78,7 @@ export { getHLSDirectory, getLiveDirectory, + getLiveReplayBaseDirectory, getHLSRedundancyDirectory, generateHLSMasterPlaylistFilename, diff --git a/server/lib/transcoding/transcoding.ts b/server/lib/transcoding/transcoding.ts index d55364e25..9a15f8613 100644 --- a/server/lib/transcoding/transcoding.ts +++ b/server/lib/transcoding/transcoding.ts @@ -3,13 +3,13 @@ import { copyFile, ensureDir, move, remove, stat } from 'fs-extra' import { basename, extname as extnameUtil, join } from 'path' import { toEven } from '@server/helpers/core-utils' import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' -import { MStreamingPlaylistFilesVideo, MVideoFile, MVideoFullLight } from '@server/types/models' +import { MStreamingPlaylistFilesVideo, MVideo, MVideoFile, MVideoFullLight } from '@server/types/models' import { VideoResolution, VideoStorage } from '../../../shared/models/videos' import { VideoStreamingPlaylistType } from '../../../shared/models/videos/video-streaming-playlist.type' import { + buildFileMetadata, canDoQuickTranscode, getVideoStreamDuration, - buildFileMetadata, getVideoStreamFPS, transcodeVOD, TranscodeVODOptions, @@ -191,7 +191,7 @@ function mergeAudioVideofile (video: MVideoFullLight, resolution: VideoResolutio // Concat TS segments from a live video to a fragmented mp4 HLS playlist async function generateHlsPlaylistResolutionFromTS (options: { - video: MVideoFullLight + video: MVideo concatenatedTsFilePath: string resolution: VideoResolution isPortraitMode: boolean @@ -209,7 +209,7 @@ async function generateHlsPlaylistResolutionFromTS (options: { // Generate an HLS playlist from an input file, and update the master playlist function generateHlsPlaylistResolution (options: { - video: MVideoFullLight + video: MVideo videoInputPath: string resolution: VideoResolution copyCodecs: boolean @@ -265,7 +265,7 @@ async function onWebTorrentVideoFileTranscoding ( async function generateHlsPlaylistCommon (options: { type: 'hls' | 'hls-from-ts' - video: MVideoFullLight + video: MVideo inputPath: string resolution: VideoResolution copyCodecs?: boolean diff --git a/server/lib/video-blacklist.ts b/server/lib/video-blacklist.ts index 0984c0d7a..91f44cb11 100644 --- a/server/lib/video-blacklist.ts +++ b/server/lib/video-blacklist.ts @@ -73,8 +73,7 @@ async function blacklistVideo (videoInstance: MVideoAccountLight, options: Video unfederated: options.unfederate === true, reason: options.reason, type: VideoBlacklistType.MANUAL - } - ) + }) blacklist.Video = videoInstance if (options.unfederate === true) { diff --git a/server/middlewares/validators/videos/video-live.ts b/server/middlewares/validators/videos/video-live.ts index 8e52c953f..b756c0bf1 100644 --- a/server/middlewares/validators/videos/video-live.ts +++ b/server/middlewares/validators/videos/video-live.ts @@ -118,12 +118,6 @@ const videoLiveAddValidator = getCommonVideoEditAttributes().concat([ }) } - if (body.permanentLive && body.saveReplay) { - cleanUpReqFiles(req) - - return res.fail({ message: 'Cannot set this live as permanent while saving its replay' }) - } - const user = res.locals.oauth.token.User if (!await doesVideoChannelOfAccountExist(body.channelId, user, res)) return cleanUpReqFiles(req) diff --git a/server/models/video/video-live.ts b/server/models/video/video-live.ts index 904f712b4..96c0bf7f7 100644 --- a/server/models/video/video-live.ts +++ b/server/models/video/video-live.ts @@ -2,7 +2,7 @@ import { AllowNull, BelongsTo, Column, CreatedAt, DataType, DefaultScope, Foreig import { CONFIG } from '@server/initializers/config' import { WEBSERVER } from '@server/initializers/constants' import { MVideoLive, MVideoLiveVideo } from '@server/types/models' -import { LiveVideo, LiveVideoLatencyMode, VideoState } from '@shared/models' +import { LiveVideo, LiveVideoLatencyMode, VideoPrivacy, VideoState } from '@shared/models' import { AttributesOnly } from '@shared/typescript-utils' import { VideoModel } from './video' import { VideoBlacklistModel } from './video-blacklist' diff --git a/server/tests/api/check-params/live.ts b/server/tests/api/check-params/live.ts index b253f5e20..2f1c1257e 100644 --- a/server/tests/api/check-params/live.ts +++ b/server/tests/api/check-params/live.ts @@ -212,12 +212,6 @@ describe('Test video lives API validator', function () { await makeUploadRequest({ url: server.url, path, token: server.accessToken, fields, attaches }) }) - it('Should fail with save replay and permanent live set to true', async function () { - const fields = { ...baseCorrectParams, saveReplay: true, permanentLive: true } - - await makePostBodyRequest({ url: server.url, path, token: server.accessToken, fields }) - }) - it('Should fail with bad latency setting', async function () { const fields = { ...baseCorrectParams, latencyMode: 42 } diff --git a/server/tests/api/live/live-constraints.ts b/server/tests/api/live/live-constraints.ts index 909399836..b92dc7b89 100644 --- a/server/tests/api/live/live-constraints.ts +++ b/server/tests/api/live/live-constraints.ts @@ -14,7 +14,7 @@ import { setDefaultVideoChannel, waitJobs } from '@shared/server-commands' -import { checkLiveCleanupAfterSave } from '../../shared' +import { checkLiveCleanup } from '../../shared' const expect = chai.expect @@ -43,7 +43,7 @@ describe('Test live constraints', function () { expect(video.duration).to.be.greaterThan(0) } - await checkLiveCleanupAfterSave(servers[0], videoId, resolutions) + await checkLiveCleanup(servers[0], videoId, resolutions) } async function waitUntilLivePublishedOnAllServers (videoId: string) { diff --git a/server/tests/api/live/live-permanent.ts b/server/tests/api/live/live-permanent.ts index 3e6fec453..a88d71dd9 100644 --- a/server/tests/api/live/live-permanent.ts +++ b/server/tests/api/live/live-permanent.ts @@ -121,7 +121,7 @@ describe('Permanent live', function () { await waitJobs(servers) }) - it('Should not have cleaned up this live', async function () { + it('Should have cleaned up this live', async function () { this.timeout(40000) await wait(5000) @@ -129,7 +129,8 @@ describe('Permanent live', function () { for (const server of servers) { const videoDetails = await server.videos.get({ id: videoUUID }) - expect(videoDetails.streamingPlaylists).to.have.lengthOf(1) + + expect(videoDetails.streamingPlaylists).to.have.lengthOf(0) } }) diff --git a/server/tests/api/live/live-save-replay.ts b/server/tests/api/live/live-save-replay.ts index 95a342b01..ba68a4287 100644 --- a/server/tests/api/live/live-save-replay.ts +++ b/server/tests/api/live/live-save-replay.ts @@ -3,7 +3,7 @@ import 'mocha' import * as chai from 'chai' import { FfmpegCommand } from 'fluent-ffmpeg' -import { checkLiveCleanupAfterSave } from '@server/tests/shared' +import { checkLiveCleanup } from '@server/tests/shared' import { wait } from '@shared/core-utils' import { HttpStatusCode, LiveVideoCreate, VideoPrivacy, VideoState } from '@shared/models' import { @@ -11,6 +11,7 @@ import { ConfigCommand, createMultipleServers, doubleFollow, + findExternalSavedVideo, PeerTubeServer, setAccessTokensToServers, setDefaultVideoChannel, @@ -18,7 +19,8 @@ import { testFfmpegStreamError, waitJobs, waitUntilLivePublishedOnAllServers, - waitUntilLiveSavedOnAllServers + waitUntilLiveReplacedByReplayOnAllServers, + waitUntilLiveWaitingOnAllServers } from '@shared/server-commands' const expect = chai.expect @@ -28,7 +30,7 @@ describe('Save replay setting', function () { let liveVideoUUID: string let ffmpegCommand: FfmpegCommand - async function createLiveWrapper (saveReplay: boolean) { + async function createLiveWrapper (options: { permanent: boolean, replay: boolean }) { if (liveVideoUUID) { try { await servers[0].videos.remove({ id: liveVideoUUID }) @@ -40,7 +42,8 @@ describe('Save replay setting', function () { channelId: servers[0].store.channel.id, privacy: VideoPrivacy.PUBLIC, name: 'my super live', - saveReplay + saveReplay: options.replay, + permanentLive: options.permanent } const { uuid } = await servers[0].live.create({ fields: attributes }) @@ -104,7 +107,7 @@ describe('Save replay setting', function () { it('Should correctly create and federate the "waiting for stream" live', async function () { this.timeout(20000) - liveVideoUUID = await createLiveWrapper(false) + liveVideoUUID = await createLiveWrapper({ permanent: false, replay: false }) await waitJobs(servers) @@ -140,13 +143,13 @@ describe('Save replay setting', function () { await checkVideoState(liveVideoUUID, VideoState.LIVE_ENDED) // No resolutions saved since we did not save replay - await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, []) + await checkLiveCleanup(servers[0], liveVideoUUID, []) }) it('Should correctly terminate the stream on blacklist and delete the live', async function () { this.timeout(40000) - liveVideoUUID = await createLiveWrapper(false) + liveVideoUUID = await createLiveWrapper({ permanent: false, replay: false }) ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID }) @@ -169,13 +172,13 @@ describe('Save replay setting', function () { await wait(5000) await waitJobs(servers) - await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, []) + await checkLiveCleanup(servers[0], liveVideoUUID, []) }) it('Should correctly terminate the stream on delete and delete the video', async function () { this.timeout(40000) - liveVideoUUID = await createLiveWrapper(false) + liveVideoUUID = await createLiveWrapper({ permanent: false, replay: false }) ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID }) @@ -193,16 +196,16 @@ describe('Save replay setting', function () { await waitJobs(servers) await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404) - await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, []) + await checkLiveCleanup(servers[0], liveVideoUUID, []) }) }) - describe('With save replay enabled', function () { + describe('With save replay enabled on non permanent live', function () { it('Should correctly create and federate the "waiting for stream" live', async function () { this.timeout(20000) - liveVideoUUID = await createLiveWrapper(true) + liveVideoUUID = await createLiveWrapper({ permanent: false, replay: true }) await waitJobs(servers) @@ -227,7 +230,7 @@ describe('Save replay setting', function () { await stopFfmpeg(ffmpegCommand) - await waitUntilLiveSavedOnAllServers(servers, liveVideoUUID) + await waitUntilLiveReplacedByReplayOnAllServers(servers, liveVideoUUID) await waitJobs(servers) // Live has been transcoded @@ -249,13 +252,13 @@ describe('Save replay setting', function () { }) it('Should have cleaned up the live files', async function () { - await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [ 720 ]) + await checkLiveCleanup(servers[0], liveVideoUUID, [ 720 ]) }) it('Should correctly terminate the stream on blacklist and blacklist the saved replay video', async function () { this.timeout(40000) - liveVideoUUID = await createLiveWrapper(true) + liveVideoUUID = await createLiveWrapper({ permanent: false, replay: true }) ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID }) await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID) @@ -277,13 +280,13 @@ describe('Save replay setting', function () { await wait(5000) await waitJobs(servers) - await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [ 720 ]) + await checkLiveCleanup(servers[0], liveVideoUUID, [ 720 ]) }) it('Should correctly terminate the stream on delete and delete the video', async function () { this.timeout(40000) - liveVideoUUID = await createLiveWrapper(true) + liveVideoUUID = await createLiveWrapper({ permanent: false, replay: true }) ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID }) await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID) @@ -300,7 +303,123 @@ describe('Save replay setting', function () { await waitJobs(servers) await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404) - await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, []) + await checkLiveCleanup(servers[0], liveVideoUUID, []) + }) + }) + + describe('With save replay enabled on permanent live', function () { + let lastReplayUUID: string + + it('Should correctly create and federate the "waiting for stream" live', async function () { + this.timeout(20000) + + liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true }) + + await waitJobs(servers) + + await checkVideosExist(liveVideoUUID, false, HttpStatusCode.OK_200) + await checkVideoState(liveVideoUUID, VideoState.WAITING_FOR_LIVE) + }) + + it('Should correctly have updated the live and federated it when streaming in the live', async function () { + this.timeout(20000) + + ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID }) + await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID) + + await waitJobs(servers) + + await checkVideosExist(liveVideoUUID, true, HttpStatusCode.OK_200) + await checkVideoState(liveVideoUUID, VideoState.PUBLISHED) + }) + + it('Should correctly have saved the live and federated it after the streaming', async function () { + this.timeout(30000) + + const liveDetails = await servers[0].videos.get({ id: liveVideoUUID }) + + await stopFfmpeg(ffmpegCommand) + + await waitUntilLiveWaitingOnAllServers(servers, liveVideoUUID) + await waitJobs(servers) + + const video = await findExternalSavedVideo(servers[0], liveDetails) + expect(video).to.exist + + for (const server of servers) { + await server.videos.get({ id: video.uuid }) + } + + lastReplayUUID = video.uuid + }) + + it('Should have cleaned up the live files', async function () { + await checkLiveCleanup(servers[0], liveVideoUUID, []) + }) + + it('Should correctly terminate the stream on blacklist and blacklist the saved replay video', async function () { + this.timeout(60000) + + await servers[0].videos.remove({ id: lastReplayUUID }) + + liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true }) + + ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID }) + await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID) + + const liveDetails = await servers[0].videos.get({ id: liveVideoUUID }) + + await waitJobs(servers) + await checkVideosExist(liveVideoUUID, true, HttpStatusCode.OK_200) + + await Promise.all([ + servers[0].blacklist.add({ videoId: liveVideoUUID, reason: 'bad live', unfederate: true }), + testFfmpegStreamError(ffmpegCommand, true) + ]) + + await waitJobs(servers) + await wait(5000) + await waitJobs(servers) + + const replay = await findExternalSavedVideo(servers[0], liveDetails) + expect(replay).to.exist + + for (const videoId of [ liveVideoUUID, replay.uuid ]) { + await checkVideosExist(videoId, false) + + await servers[0].videos.get({ id: videoId, expectedStatus: HttpStatusCode.UNAUTHORIZED_401 }) + await servers[1].videos.get({ id: videoId, expectedStatus: HttpStatusCode.NOT_FOUND_404 }) + } + + await checkLiveCleanup(servers[0], liveVideoUUID, []) + }) + + it('Should correctly terminate the stream on delete and not save the video', async function () { + this.timeout(40000) + + liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true }) + + ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID }) + await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID) + + const liveDetails = await servers[0].videos.get({ id: liveVideoUUID }) + + await waitJobs(servers) + await checkVideosExist(liveVideoUUID, true, HttpStatusCode.OK_200) + + await Promise.all([ + servers[0].videos.remove({ id: liveVideoUUID }), + testFfmpegStreamError(ffmpegCommand, true) + ]) + + await wait(5000) + await waitJobs(servers) + + const replay = await findExternalSavedVideo(servers[0], liveDetails) + expect(replay).to.not.exist + + await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404) + await checkLiveCleanup(servers[0], liveVideoUUID, []) }) }) diff --git a/server/tests/api/live/live.ts b/server/tests/api/live/live.ts index aeb039696..6e7b77bce 100644 --- a/server/tests/api/live/live.ts +++ b/server/tests/api/live/live.ts @@ -4,7 +4,7 @@ import 'mocha' import * as chai from 'chai' import { basename, join } from 'path' import { ffprobePromise, getVideoStream } from '@server/helpers/ffmpeg' -import { checkLiveCleanupAfterSave, checkLiveSegmentHash, checkResolutionsInMasterPlaylist, testImage } from '@server/tests/shared' +import { checkLiveCleanup, checkLiveSegmentHash, checkResolutionsInMasterPlaylist, testImage } from '@server/tests/shared' import { wait } from '@shared/core-utils' import { HttpStatusCode, @@ -583,7 +583,7 @@ describe('Test live', function () { it('Should correctly have cleaned up the live files', async function () { this.timeout(30000) - await checkLiveCleanupAfterSave(servers[0], liveVideoId, [ 240, 360, 720 ]) + await checkLiveCleanup(servers[0], liveVideoId, [ 240, 360, 720 ]) }) }) diff --git a/server/tests/api/object-storage/live.ts b/server/tests/api/object-storage/live.ts index 0cb0a6e34..5d6281dec 100644 --- a/server/tests/api/object-storage/live.ts +++ b/server/tests/api/object-storage/live.ts @@ -2,13 +2,13 @@ import 'mocha' import * as chai from 'chai' -import { FfmpegCommand } from 'fluent-ffmpeg' import { expectStartWith } from '@server/tests/shared' import { areObjectStorageTestsDisabled } from '@shared/core-utils' import { HttpStatusCode, LiveVideoCreate, VideoFile, VideoPrivacy } from '@shared/models' import { createMultipleServers, doubleFollow, + findExternalSavedVideo, killallServers, makeRawRequest, ObjectStorageCommand, @@ -18,17 +18,19 @@ import { stopFfmpeg, waitJobs, waitUntilLivePublishedOnAllServers, - waitUntilLiveSavedOnAllServers + waitUntilLiveReplacedByReplayOnAllServers, + waitUntilLiveWaitingOnAllServers } from '@shared/server-commands' const expect = chai.expect -async function createLive (server: PeerTubeServer) { +async function createLive (server: PeerTubeServer, permanent: boolean) { const attributes: LiveVideoCreate = { channelId: server.store.channel.id, privacy: VideoPrivacy.PUBLIC, name: 'my super live', - saveReplay: true + saveReplay: true, + permanentLive: permanent } const { uuid } = await server.live.create({ fields: attributes }) @@ -44,12 +46,39 @@ async function checkFiles (files: VideoFile[]) { } } +async function getFiles (server: PeerTubeServer, videoUUID: string) { + const video = await server.videos.get({ id: videoUUID }) + + expect(video.files).to.have.lengthOf(0) + expect(video.streamingPlaylists).to.have.lengthOf(1) + + return video.streamingPlaylists[0].files +} + +async function streamAndEnd (servers: PeerTubeServer[], liveUUID: string) { + const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveUUID }) + await waitUntilLivePublishedOnAllServers(servers, liveUUID) + + const videoLiveDetails = await servers[0].videos.get({ id: liveUUID }) + const liveDetails = await servers[0].live.get({ videoId: liveUUID }) + + await stopFfmpeg(ffmpegCommand) + + if (liveDetails.permanentLive) { + await waitUntilLiveWaitingOnAllServers(servers, liveUUID) + } else { + await waitUntilLiveReplacedByReplayOnAllServers(servers, liveUUID) + } + + await waitJobs(servers) + + return { videoLiveDetails, liveDetails } +} + describe('Object storage for lives', function () { if (areObjectStorageTestsDisabled()) return - let ffmpegCommand: FfmpegCommand let servers: PeerTubeServer[] - let videoUUID: string before(async function () { this.timeout(120000) @@ -66,31 +95,22 @@ describe('Object storage for lives', function () { }) describe('Without live transcoding', async function () { + let videoUUID: string before(async function () { await servers[0].config.enableLive({ transcoding: false }) - videoUUID = await createLive(servers[0]) + videoUUID = await createLive(servers[0], false) }) it('Should create a live and save the replay on object storage', async function () { this.timeout(220000) - ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUID }) - await waitUntilLivePublishedOnAllServers(servers, videoUUID) - - await stopFfmpeg(ffmpegCommand) - - await waitUntilLiveSavedOnAllServers(servers, videoUUID) - await waitJobs(servers) + await streamAndEnd(servers, videoUUID) for (const server of servers) { - const video = await server.videos.get({ id: videoUUID }) - - expect(video.files).to.have.lengthOf(0) - expect(video.streamingPlaylists).to.have.lengthOf(1) - - const files = video.streamingPlaylists[0].files + const files = await getFiles(server, videoUUID) + expect(files).to.have.lengthOf(1) await checkFiles(files) } @@ -98,31 +118,38 @@ describe('Object storage for lives', function () { }) describe('With live transcoding', async function () { + let videoUUIDPermanent: string + let videoUUIDNonPermanent: string before(async function () { await servers[0].config.enableLive({ transcoding: true }) - videoUUID = await createLive(servers[0]) + videoUUIDPermanent = await createLive(servers[0], true) + videoUUIDNonPermanent = await createLive(servers[0], false) }) - it('Should import a video and have sent it to object storage', async function () { + it('Should create a live and save the replay on object storage', async function () { this.timeout(240000) - ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUID }) - await waitUntilLivePublishedOnAllServers(servers, videoUUID) + await streamAndEnd(servers, videoUUIDNonPermanent) - await stopFfmpeg(ffmpegCommand) + for (const server of servers) { + const files = await getFiles(server, videoUUIDNonPermanent) + expect(files).to.have.lengthOf(5) - await waitUntilLiveSavedOnAllServers(servers, videoUUID) - await waitJobs(servers) + await checkFiles(files) + } + }) - for (const server of servers) { - const video = await server.videos.get({ id: videoUUID }) + it('Should create a live and save the replay of permanent live on object storage', async function () { + this.timeout(240000) + + const { videoLiveDetails } = await streamAndEnd(servers, videoUUIDPermanent) - expect(video.files).to.have.lengthOf(0) - expect(video.streamingPlaylists).to.have.lengthOf(1) + const replay = await findExternalSavedVideo(servers[0], videoLiveDetails) - const files = video.streamingPlaylists[0].files + for (const server of servers) { + const files = await getFiles(server, replay.uuid) expect(files).to.have.lengthOf(5) await checkFiles(files) diff --git a/server/tests/shared/live.ts b/server/tests/shared/live.ts index 72e3e27f6..6ee4899b0 100644 --- a/server/tests/shared/live.ts +++ b/server/tests/shared/live.ts @@ -5,11 +5,11 @@ import { pathExists, readdir } from 'fs-extra' import { join } from 'path' import { PeerTubeServer } from '@shared/server-commands' -async function checkLiveCleanupAfterSave (server: PeerTubeServer, videoUUID: string, resolutions: number[] = []) { +async function checkLiveCleanup (server: PeerTubeServer, videoUUID: string, savedResolutions: number[] = []) { const basePath = server.servers.buildDirectory('streaming-playlists') const hlsPath = join(basePath, 'hls', videoUUID) - if (resolutions.length === 0) { + if (savedResolutions.length === 0) { const result = await pathExists(hlsPath) expect(result).to.be.false @@ -19,9 +19,9 @@ async function checkLiveCleanupAfterSave (server: PeerTubeServer, videoUUID: str const files = await readdir(hlsPath) // fragmented file and playlist per resolution + master playlist + segments sha256 json file - expect(files).to.have.lengthOf(resolutions.length * 2 + 2) + expect(files).to.have.lengthOf(savedResolutions.length * 2 + 2) - for (const resolution of resolutions) { + for (const resolution of savedResolutions) { const fragmentedFile = files.find(f => f.endsWith(`-${resolution}-fragmented.mp4`)) expect(fragmentedFile).to.exist @@ -37,5 +37,5 @@ async function checkLiveCleanupAfterSave (server: PeerTubeServer, videoUUID: str } export { - checkLiveCleanupAfterSave + checkLiveCleanup } -- cgit v1.2.3