From cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 4 Oct 2022 10:03:17 +0200 Subject: [PATCH] Live supports object storage * Sync live files (segments, master playlist, resolution playlist, segment sha file) into object storage * Automatically delete them when the live ends * Segment sha file is now a file on disk, and not stored in memory anymore --- server.ts | 4 - server/controllers/index.ts | 1 - server/controllers/live.ts | 32 --- server/lib/hls.ts | 6 +- .../handlers/move-to-object-storage.ts | 10 +- .../job-queue/handlers/video-live-ending.ts | 23 ++- server/lib/live/live-manager.ts | 12 +- server/lib/live/live-segment-sha-store.ts | 75 ++++--- server/lib/live/live-utils.ts | 67 +++++-- server/lib/live/shared/muxing-session.ts | 106 ++++++++-- .../shared/object-storage-helpers.ts | 25 ++- server/lib/object-storage/videos.ts | 37 +++- .../models/video/video-streaming-playlist.ts | 24 +-- server/tests/api/live/live-fast-restream.ts | 2 +- server/tests/api/live/live.ts | 87 ++++----- server/tests/api/object-storage/live.ts | 183 ++++++++++++------ server/tests/shared/live.ts | 116 +++++++++-- server/tests/shared/streaming-playlists.ts | 13 +- shared/server-commands/videos/live-command.ts | 60 +++++- .../videos/streaming-playlists-command.ts | 2 +- support/doc/api/openapi.yaml | 1 - 21 files changed, 597 insertions(+), 289 deletions(-) delete mode 100644 server/controllers/live.ts diff --git a/server.ts b/server.ts index 887814d4e..2085c67d9 100644 --- a/server.ts +++ b/server.ts @@ -102,7 +102,6 @@ import { wellKnownRouter, lazyStaticRouter, servicesRouter, - liveRouter, pluginsRouter, webfingerRouter, trackerRouter, @@ -221,9 +220,6 @@ app.use(apiRoute, apiRouter) // Services (oembed...) app.use('/services', servicesRouter) -// Live streaming -app.use('/live', liveRouter) - // Plugins & themes app.use('/', pluginsRouter) diff --git a/server/controllers/index.ts b/server/controllers/index.ts index e8833d58c..8574a9e7b 100644 --- a/server/controllers/index.ts +++ b/server/controllers/index.ts @@ -6,7 +6,6 @@ export * from './feeds' export * from './services' export * from './static' export * from './lazy-static' -export * from './live' export * from './misc' export * from './webfinger' export * from './tracker' diff --git a/server/controllers/live.ts b/server/controllers/live.ts deleted file mode 100644 index 81008f120..000000000 --- a/server/controllers/live.ts +++ /dev/null @@ -1,32 +0,0 @@ -import cors from 'cors' -import express from 'express' -import { mapToJSON } from '@server/helpers/core-utils' -import { LiveSegmentShaStore } from '@server/lib/live' -import { HttpStatusCode } from '@shared/models' - -const liveRouter = express.Router() - -liveRouter.use('/segments-sha256/:videoUUID', - cors(), - getSegmentsSha256 -) - -// --------------------------------------------------------------------------- - -export { - liveRouter -} - -// --------------------------------------------------------------------------- - -function getSegmentsSha256 (req: express.Request, res: express.Response) { - const videoUUID = req.params.videoUUID - - const result = LiveSegmentShaStore.Instance.getSegmentsSha256(videoUUID) - - if (!result) { - return res.status(HttpStatusCode.NOT_FOUND_404).end() - } - - return res.json(mapToJSON(result)) -} diff --git a/server/lib/hls.ts b/server/lib/hls.ts index a0a5afc0f..a41f1ae48 100644 --- a/server/lib/hls.ts +++ b/server/lib/hls.ts @@ -15,7 +15,7 @@ import { P2P_MEDIA_LOADER_PEER_VERSION, REQUEST_TIMEOUTS } from '../initializers import { sequelizeTypescript } from '../initializers/database' import { VideoFileModel } from '../models/video/video-file' import { VideoStreamingPlaylistModel } from '../models/video/video-streaming-playlist' -import { storeHLSFile } from './object-storage' +import { storeHLSFileFromFilename } from './object-storage' import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getHlsResolutionPlaylistFilename } from './paths' import { VideoPathManager } from './video-path-manager' @@ -95,7 +95,7 @@ function updateMasterHLSPlaylist (video: MVideo, playlistArg: MStreamingPlaylist await writeFile(masterPlaylistPath, masterPlaylists.join('\n') + '\n') if (playlist.storage === VideoStorage.OBJECT_STORAGE) { - playlist.playlistUrl = await storeHLSFile(playlist, playlist.playlistFilename) + playlist.playlistUrl = await storeHLSFileFromFilename(playlist, playlist.playlistFilename) await remove(masterPlaylistPath) } @@ -146,7 +146,7 @@ function updateSha256VODSegments (video: MVideo, playlistArg: MStreamingPlaylist await outputJSON(outputPath, json) if (playlist.storage === VideoStorage.OBJECT_STORAGE) { - playlist.segmentsSha256Url = await storeHLSFile(playlist, playlist.segmentsSha256Filename) + playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlist, playlist.segmentsSha256Filename) await remove(outputPath) } diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index 25bdebeea..28c3d325d 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts @@ -5,7 +5,7 @@ import { logger, loggerTagsFactory } from '@server/helpers/logger' import { updateTorrentMetadata } from '@server/helpers/webtorrent' import { CONFIG } from '@server/initializers/config' import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants' -import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage' +import { storeHLSFileFromFilename, storeWebTorrentFile } from '@server/lib/object-storage' import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths' import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state' import { VideoModel } from '@server/models/video/video' @@ -88,10 +88,10 @@ async function moveHLSFiles (video: MVideoWithAllFiles) { // Resolution playlist const playlistFilename = getHlsResolutionPlaylistFilename(file.filename) - await storeHLSFile(playlistWithVideo, playlistFilename) + await storeHLSFileFromFilename(playlistWithVideo, playlistFilename) // Resolution fragmented file - const fileUrl = await storeHLSFile(playlistWithVideo, file.filename) + const fileUrl = await storeHLSFileFromFilename(playlistWithVideo, file.filename) const oldPath = join(getHLSDirectory(video), file.filename) @@ -113,9 +113,9 @@ async function doAfterLastJob (options: { const playlistWithVideo = playlist.withVideo(video) // Master playlist - playlist.playlistUrl = await storeHLSFile(playlistWithVideo, playlist.playlistFilename) + playlist.playlistUrl = await storeHLSFileFromFilename(playlistWithVideo, playlist.playlistFilename) // Sha256 segments file - playlist.segmentsSha256Url = await storeHLSFile(playlistWithVideo, playlist.segmentsSha256Filename) + playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlistWithVideo, playlist.segmentsSha256Filename) playlist.storage = VideoStorage.OBJECT_STORAGE diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 8a3ee09a2..abfaf1cd7 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -4,7 +4,7 @@ import { join } from 'path' import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg' import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' -import { cleanupPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' +import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' import { generateVideoMiniature } from '@server/lib/thumbnail' import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' @@ -141,23 +141,22 @@ async function replaceLiveByReplay (options: { }) { const { video, liveSession, live, permanentLive, replayDirectory } = options - await cleanupTMPLiveFiles(video) + const videoWithFiles = await VideoModel.loadFull(video.id) + const hlsPlaylist = videoWithFiles.getHLSPlaylist() + + await cleanupTMPLiveFiles(videoWithFiles, hlsPlaylist) await live.destroy() - video.isLive = false - video.waitTranscoding = true - video.state = VideoState.TO_TRANSCODE + videoWithFiles.isLive = false + videoWithFiles.waitTranscoding = true + videoWithFiles.state = VideoState.TO_TRANSCODE - await video.save() + await videoWithFiles.save() - liveSession.replayVideoId = video.id + liveSession.replayVideoId = videoWithFiles.id await liveSession.save() - // Remove old HLS playlist video files - const videoWithFiles = await VideoModel.loadFull(video.id) - - const hlsPlaylist = videoWithFiles.getHLSPlaylist() await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id) // Reset playlist @@ -234,7 +233,7 @@ async function cleanupLiveAndFederate (options: { if (streamingPlaylist) { if (permanentLive) { - await cleanupPermanentLive(video, streamingPlaylist) + await cleanupAndDestroyPermanentLive(video, streamingPlaylist) } else { await cleanupUnsavedNormalLive(video, streamingPlaylist) } diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 16715862b..9470b530b 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -21,14 +21,14 @@ import { VideoLiveSessionModel } from '@server/models/video/video-live-session' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models' import { pick, wait } from '@shared/core-utils' -import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models' +import { LiveVideoError, VideoState, VideoStorage, VideoStreamingPlaylistType } from '@shared/models' import { federateVideoIfNeeded } from '../activitypub/videos' import { JobQueue } from '../job-queue' import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' import { PeerTubeSocket } from '../peertube-socket' import { Hooks } from '../plugins/hooks' import { LiveQuotaStore } from './live-quota-store' -import { cleanupPermanentLive } from './live-utils' +import { cleanupAndDestroyPermanentLive } from './live-utils' import { MuxingSession } from './shared' const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') @@ -224,7 +224,7 @@ class LiveManager { if (oldStreamingPlaylist) { if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) - await cleanupPermanentLive(video, oldStreamingPlaylist) + await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) } this.videoSessions.set(video.id, sessionId) @@ -301,7 +301,7 @@ class LiveManager { ...pick(options, [ 'streamingPlaylist', 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ]) }) - muxingSession.on('master-playlist-created', () => this.publishAndFederateLive(videoLive, localLTags)) + muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) muxingSession.on('bad-socket-health', ({ videoId }) => { logger.error( @@ -485,6 +485,10 @@ class LiveManager { playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions) + playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED + ? VideoStorage.OBJECT_STORAGE + : VideoStorage.FILE_SYSTEM + return playlist.save() } diff --git a/server/lib/live/live-segment-sha-store.ts b/server/lib/live/live-segment-sha-store.ts index 4af6f3ebf..faf03dccf 100644 --- a/server/lib/live/live-segment-sha-store.ts +++ b/server/lib/live/live-segment-sha-store.ts @@ -1,62 +1,73 @@ +import { writeJson } from 'fs-extra' import { basename } from 'path' +import { mapToJSON } from '@server/helpers/core-utils' import { logger, loggerTagsFactory } from '@server/helpers/logger' +import { MStreamingPlaylistVideo } from '@server/types/models' import { buildSha256Segment } from '../hls' +import { storeHLSFileFromPath } from '../object-storage' const lTags = loggerTagsFactory('live') class LiveSegmentShaStore { - private static instance: LiveSegmentShaStore - - private readonly segmentsSha256 = new Map>() - - private constructor () { + private readonly segmentsSha256 = new Map() + + private readonly videoUUID: string + private readonly sha256Path: string + private readonly streamingPlaylist: MStreamingPlaylistVideo + private readonly sendToObjectStorage: boolean + + constructor (options: { + videoUUID: string + sha256Path: string + streamingPlaylist: MStreamingPlaylistVideo + sendToObjectStorage: boolean + }) { + this.videoUUID = options.videoUUID + this.sha256Path = options.sha256Path + this.streamingPlaylist = options.streamingPlaylist + this.sendToObjectStorage = options.sendToObjectStorage } - getSegmentsSha256 (videoUUID: string) { - return this.segmentsSha256.get(videoUUID) - } - - async addSegmentSha (videoUUID: string, segmentPath: string) { - const segmentName = basename(segmentPath) - logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID)) + async addSegmentSha (segmentPath: string) { + logger.debug('Adding live sha segment %s.', segmentPath, lTags(this.videoUUID)) const shaResult = await buildSha256Segment(segmentPath) - if (!this.segmentsSha256.has(videoUUID)) { - this.segmentsSha256.set(videoUUID, new Map()) - } + const segmentName = basename(segmentPath) + this.segmentsSha256.set(segmentName, shaResult) - const filesMap = this.segmentsSha256.get(videoUUID) - filesMap.set(segmentName, shaResult) + await this.writeToDisk() } - removeSegmentSha (videoUUID: string, segmentPath: string) { + async removeSegmentSha (segmentPath: string) { const segmentName = basename(segmentPath) - logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID)) + logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID)) - const filesMap = this.segmentsSha256.get(videoUUID) - if (!filesMap) { - logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID)) + if (!this.segmentsSha256.has(segmentName)) { + logger.warn('Unknown segment in files map for video %s and segment %s.', this.videoUUID, segmentPath, lTags(this.videoUUID)) return } - if (!filesMap.has(segmentName)) { - logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID)) - return - } + this.segmentsSha256.delete(segmentName) - filesMap.delete(segmentName) + await this.writeToDisk() } - cleanupShaSegments (videoUUID: string) { - this.segmentsSha256.delete(videoUUID) - } + private async writeToDisk () { + await writeJson(this.sha256Path, mapToJSON(this.segmentsSha256)) - static get Instance () { - return this.instance || (this.instance = new this()) + if (this.sendToObjectStorage) { + const url = await storeHLSFileFromPath(this.streamingPlaylist, this.sha256Path) + + if (this.streamingPlaylist.segmentsSha256Url !== url) { + this.streamingPlaylist.segmentsSha256Url = url + await this.streamingPlaylist.save() + } + } } + } export { diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts index bba876642..d2b8e3a55 100644 --- a/server/lib/live/live-utils.ts +++ b/server/lib/live/live-utils.ts @@ -1,9 +1,10 @@ import { pathExists, readdir, remove } from 'fs-extra' import { basename, join } from 'path' import { logger } from '@server/helpers/logger' -import { MStreamingPlaylist, MVideo } from '@server/types/models' +import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models' +import { VideoStorage } from '@shared/models' +import { listHLSFileKeysOf, removeHLSFileObjectStorage, removeHLSObjectStorage } from '../object-storage' import { getLiveDirectory } from '../paths' -import { LiveSegmentShaStore } from './live-segment-sha-store' function buildConcatenatedName (segmentOrPlaylistPath: string) { const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/) @@ -11,8 +12,8 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) { return 'concat-' + num[1] + '.ts' } -async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { - await cleanupTMPLiveFiles(video) +async function cleanupAndDestroyPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { + await cleanupTMPLiveFiles(video, streamingPlaylist) await streamingPlaylist.destroy() } @@ -20,32 +21,51 @@ async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamin async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { const hlsDirectory = getLiveDirectory(video) + // We uploaded files to object storage too, remove them + if (streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { + await removeHLSObjectStorage(streamingPlaylist.withVideo(video)) + } + await remove(hlsDirectory) await streamingPlaylist.destroy() +} - LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) +async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreamingPlaylist) { + await cleanupTMPLiveFilesFromObjectStorage(streamingPlaylist.withVideo(video)) + + await cleanupTMPLiveFilesFromFilesystem(video) } -async function cleanupTMPLiveFiles (video: MVideo) { - const hlsDirectory = getLiveDirectory(video) +export { + cleanupAndDestroyPermanentLive, + cleanupUnsavedNormalLive, + cleanupTMPLiveFiles, + buildConcatenatedName +} + +// --------------------------------------------------------------------------- - LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) +function isTMPLiveFile (name: string) { + return name.endsWith('.ts') || + name.endsWith('.m3u8') || + name.endsWith('.json') || + name.endsWith('.mpd') || + name.endsWith('.m4s') || + name.endsWith('.tmp') +} + +async function cleanupTMPLiveFilesFromFilesystem (video: MVideo) { + const hlsDirectory = getLiveDirectory(video) if (!await pathExists(hlsDirectory)) return - logger.info('Cleanup TMP live files of %s.', hlsDirectory) + logger.info('Cleanup TMP live files from filesystem of %s.', hlsDirectory) const files = await readdir(hlsDirectory) for (const filename of files) { - if ( - filename.endsWith('.ts') || - filename.endsWith('.m3u8') || - filename.endsWith('.mpd') || - filename.endsWith('.m4s') || - filename.endsWith('.tmp') - ) { + if (isTMPLiveFile(filename)) { const p = join(hlsDirectory, filename) remove(p) @@ -54,9 +74,14 @@ async function cleanupTMPLiveFiles (video: MVideo) { } } -export { - cleanupPermanentLive, - cleanupUnsavedNormalLive, - cleanupTMPLiveFiles, - buildConcatenatedName +async function cleanupTMPLiveFilesFromObjectStorage (streamingPlaylist: MStreamingPlaylistVideo) { + if (streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return + + const keys = await listHLSFileKeysOf(streamingPlaylist) + + for (const key of keys) { + if (isTMPLiveFile(key)) { + await removeHLSFileObjectStorage(streamingPlaylist, key) + } + } } diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 505717dce..4c27d5dd8 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -9,8 +9,10 @@ import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' import { CONFIG } from '@server/initializers/config' import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' +import { removeHLSFileObjectStorage, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage' import { VideoFileModel } from '@server/models/video/video-file' import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' +import { VideoStorage } from '@shared/models' import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' import { isAbleToUploadVideo } from '../../user' @@ -21,7 +23,7 @@ import { buildConcatenatedName } from '../live-utils' import memoizee = require('memoizee') interface MuxingSessionEvents { - 'master-playlist-created': (options: { videoId: number }) => void + 'live-ready': (options: { videoId: number }) => void 'bad-socket-health': (options: { videoId: number }) => void 'duration-exceeded': (options: { videoId: number }) => void @@ -68,12 +70,18 @@ class MuxingSession extends EventEmitter { private readonly outDirectory: string private readonly replayDirectory: string + private readonly liveSegmentShaStore: LiveSegmentShaStore + private readonly lTags: LoggerTagsFn private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} private tsWatcher: FSWatcher private masterWatcher: FSWatcher + private m3u8Watcher: FSWatcher + + private masterPlaylistCreated = false + private liveReady = false private aborted = false @@ -123,6 +131,13 @@ class MuxingSession extends EventEmitter { this.outDirectory = getLiveDirectory(this.videoLive.Video) this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) + this.liveSegmentShaStore = new LiveSegmentShaStore({ + videoUUID: this.videoLive.Video.uuid, + sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename), + streamingPlaylist: this.streamingPlaylist, + sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED + }) + this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) } @@ -159,8 +174,9 @@ class MuxingSession extends EventEmitter { logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) - this.watchTSFiles() this.watchMasterFile() + this.watchTSFiles() + this.watchM3U8File() let ffmpegShellCommand: string this.ffmpegCommand.on('start', cmdline => { @@ -219,7 +235,7 @@ class MuxingSession extends EventEmitter { setTimeout(() => { // Wait latest segments generation, and close watchers - Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ]) + Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) .then(() => { // Process remaining segments hash for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { @@ -240,14 +256,41 @@ class MuxingSession extends EventEmitter { private watchMasterFile () { this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) - this.masterWatcher.on('add', () => { - this.emit('master-playlist-created', { videoId: this.videoId }) + this.masterWatcher.on('add', async () => { + if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { + try { + const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) + + this.streamingPlaylist.playlistUrl = url + await this.streamingPlaylist.save() + } catch (err) { + logger.error('Cannot upload live master file to object storage.', { err, ...this.lTags() }) + } + } + + this.masterPlaylistCreated = true this.masterWatcher.close() .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) }) } + private watchM3U8File () { + this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') + + const onChangeOrAdd = async (m3u8Path: string) => { + if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return + + try { + await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path) + } catch (err) { + logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) + } + } + + this.m3u8Watcher.on('change', onChangeOrAdd) + } + private watchTSFiles () { const startStreamDateTime = new Date().getTime() @@ -282,7 +325,21 @@ class MuxingSession extends EventEmitter { } } - const deleteHandler = (segmentPath: string) => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) + const deleteHandler = async (segmentPath: string) => { + try { + await this.liveSegmentShaStore.removeSegmentSha(segmentPath) + } catch (err) { + logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() }) + } + + if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { + try { + await removeHLSFileObjectStorage(this.streamingPlaylist, segmentPath) + } catch (err) { + logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() }) + } + } + } this.tsWatcher.on('add', p => addHandler(p)) this.tsWatcher.on('unlink', p => deleteHandler(p)) @@ -315,6 +372,7 @@ class MuxingSession extends EventEmitter { extname: '.ts', infoHash: null, fps: this.fps, + storage: this.streamingPlaylist.storage, videoStreamingPlaylistId: this.streamingPlaylist.id }) @@ -343,18 +401,36 @@ class MuxingSession extends EventEmitter { } private processSegments (segmentPaths: string[]) { - mapSeries(segmentPaths, async previousSegment => { - // Add sha hash of previous segments, because ffmpeg should have finished generating them - await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) + mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment)) + .catch(err => { + if (this.aborted) return + + logger.error('Cannot process segments', { err, ...this.lTags() }) + }) + } - if (this.saveReplay) { - await this.addSegmentToReplay(previousSegment) + private async processSegment (segmentPath: string) { + // Add sha hash of previous segments, because ffmpeg should have finished generating them + await this.liveSegmentShaStore.addSegmentSha(segmentPath) + + if (this.saveReplay) { + await this.addSegmentToReplay(segmentPath) + } + + if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { + try { + await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) + } catch (err) { + logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) } - }).catch(err => { - if (this.aborted) return + } - logger.error('Cannot process segments', { err, ...this.lTags() }) - }) + // Master playlist and segment JSON file are created, live is ready + if (this.masterPlaylistCreated && !this.liveReady) { + this.liveReady = true + + this.emit('live-ready', { videoId: this.videoId }) + } } private hasClientSocketInBadHealth (sessionId: string) { diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts index 16161362c..c131977e8 100644 --- a/server/lib/object-storage/shared/object-storage-helpers.ts +++ b/server/lib/object-storage/shared/object-storage-helpers.ts @@ -22,6 +22,24 @@ type BucketInfo = { PREFIX?: string } +async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) { + const s3Client = getClient() + + const commandPrefix = bucketInfo.PREFIX + prefix + const listCommand = new ListObjectsV2Command({ + Bucket: bucketInfo.BUCKET_NAME, + Prefix: commandPrefix + }) + + const listedObjects = await s3Client.send(listCommand) + + if (isArray(listedObjects.Contents) !== true) return [] + + return listedObjects.Contents.map(c => c.Key) +} + +// --------------------------------------------------------------------------- + async function storeObject (options: { inputPath: string objectStorageKey: string @@ -36,6 +54,8 @@ async function storeObject (options: { return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo }) } +// --------------------------------------------------------------------------- + async function removeObject (filename: string, bucketInfo: BucketInfo) { const command = new DeleteObjectCommand({ Bucket: bucketInfo.BUCKET_NAME, @@ -89,6 +109,8 @@ async function removePrefix (prefix: string, bucketInfo: BucketInfo) { if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo) } +// --------------------------------------------------------------------------- + async function makeAvailable (options: { key: string destination: string @@ -122,7 +144,8 @@ export { storeObject, removeObject, removePrefix, - makeAvailable + makeAvailable, + listKeysOfPrefix } // --------------------------------------------------------------------------- diff --git a/server/lib/object-storage/videos.ts b/server/lib/object-storage/videos.ts index 66e738200..62aae248b 100644 --- a/server/lib/object-storage/videos.ts +++ b/server/lib/object-storage/videos.ts @@ -1,19 +1,35 @@ -import { join } from 'path' +import { basename, join } from 'path' import { logger } from '@server/helpers/logger' import { CONFIG } from '@server/initializers/config' import { MStreamingPlaylistVideo, MVideoFile } from '@server/types/models' import { getHLSDirectory } from '../paths' import { generateHLSObjectBaseStorageKey, generateHLSObjectStorageKey, generateWebTorrentObjectStorageKey } from './keys' -import { lTags, makeAvailable, removeObject, removePrefix, storeObject } from './shared' +import { listKeysOfPrefix, lTags, makeAvailable, removeObject, removePrefix, storeObject } from './shared' -function storeHLSFile (playlist: MStreamingPlaylistVideo, filename: string, path?: string) { +function listHLSFileKeysOf (playlist: MStreamingPlaylistVideo) { + return listKeysOfPrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS) +} + +// --------------------------------------------------------------------------- + +function storeHLSFileFromFilename (playlist: MStreamingPlaylistVideo, filename: string) { return storeObject({ - inputPath: path ?? join(getHLSDirectory(playlist.Video), filename), + inputPath: join(getHLSDirectory(playlist.Video), filename), objectStorageKey: generateHLSObjectStorageKey(playlist, filename), bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS }) } +function storeHLSFileFromPath (playlist: MStreamingPlaylistVideo, path: string) { + return storeObject({ + inputPath: path, + objectStorageKey: generateHLSObjectStorageKey(playlist, basename(path)), + bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS + }) +} + +// --------------------------------------------------------------------------- + function storeWebTorrentFile (filename: string) { return storeObject({ inputPath: join(CONFIG.STORAGE.VIDEOS_DIR, filename), @@ -22,6 +38,8 @@ function storeWebTorrentFile (filename: string) { }) } +// --------------------------------------------------------------------------- + function removeHLSObjectStorage (playlist: MStreamingPlaylistVideo) { return removePrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS) } @@ -30,10 +48,14 @@ function removeHLSFileObjectStorage (playlist: MStreamingPlaylistVideo, filename return removeObject(generateHLSObjectStorageKey(playlist, filename), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS) } +// --------------------------------------------------------------------------- + function removeWebTorrentObjectStorage (videoFile: MVideoFile) { return removeObject(generateWebTorrentObjectStorageKey(videoFile.filename), CONFIG.OBJECT_STORAGE.VIDEOS) } +// --------------------------------------------------------------------------- + async function makeHLSFileAvailable (playlist: MStreamingPlaylistVideo, filename: string, destination: string) { const key = generateHLSObjectStorageKey(playlist, filename) @@ -62,9 +84,14 @@ async function makeWebTorrentFileAvailable (filename: string, destination: strin return destination } +// --------------------------------------------------------------------------- + export { + listHLSFileKeysOf, + storeWebTorrentFile, - storeHLSFile, + storeHLSFileFromFilename, + storeHLSFileFromPath, removeHLSObjectStorage, removeHLSFileObjectStorage, diff --git a/server/models/video/video-streaming-playlist.ts b/server/models/video/video-streaming-playlist.ts index f587989dc..2b6771f27 100644 --- a/server/models/video/video-streaming-playlist.ts +++ b/server/models/video/video-streaming-playlist.ts @@ -245,21 +245,25 @@ export class VideoStreamingPlaylistModel extends Model v.uuid === liveVideoId)).to.exist - - const video = await server.videos.get({ id: liveVideoId }) - - expect(video.streamingPlaylists).to.have.lengthOf(1) - - const hlsPlaylist = video.streamingPlaylists.find(s => s.type === VideoStreamingPlaylistType.HLS) - expect(hlsPlaylist).to.exist - - // Only finite files are displayed - expect(hlsPlaylist.files).to.have.lengthOf(0) - - await checkResolutionsInMasterPlaylist({ server, playlistUrl: hlsPlaylist.playlistUrl, resolutions }) - - for (let i = 0; i < resolutions.length; i++) { - const segmentNum = 3 - const segmentName = `${i}-00000${segmentNum}.ts` - await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, playlistNumber: i, segment: segmentNum }) - - const subPlaylist = await servers[0].streamingPlaylists.get({ - url: `${servers[0].url}/static/streaming-playlists/hls/${video.uuid}/${i}.m3u8` - }) - - expect(subPlaylist).to.contain(segmentName) - - const baseUrlAndPath = servers[0].url + '/static/streaming-playlists/hls' - await checkLiveSegmentHash({ - server, - baseUrlSegment: baseUrlAndPath, - videoUUID: video.uuid, - segmentName, - hlsPlaylist - }) - } - } - } - function updateConf (resolutions: number[]) { return servers[0].config.updateCustomSubConfig({ newConfig: { @@ -449,7 +409,14 @@ describe('Test live', function () { await waitUntilLivePublishedOnAllServers(servers, liveVideoId) await waitJobs(servers) - await testVideoResolutions(liveVideoId, [ 720 ]) + await testVideoResolutions({ + originServer: servers[0], + servers, + liveVideoId, + resolutions: [ 720 ], + objectStorage: false, + transcoded: true + }) await stopFfmpeg(ffmpegCommand) }) @@ -477,7 +444,14 @@ describe('Test live', function () { await waitUntilLivePublishedOnAllServers(servers, liveVideoId) await waitJobs(servers) - await testVideoResolutions(liveVideoId, resolutions.concat([ 720 ])) + await testVideoResolutions({ + originServer: servers[0], + servers, + liveVideoId, + resolutions: resolutions.concat([ 720 ]), + objectStorage: false, + transcoded: true + }) await stopFfmpeg(ffmpegCommand) }) @@ -522,7 +496,14 @@ describe('Test live', function () { await waitUntilLivePublishedOnAllServers(servers, liveVideoId) await waitJobs(servers) - await testVideoResolutions(liveVideoId, resolutions) + await testVideoResolutions({ + originServer: servers[0], + servers, + liveVideoId, + resolutions, + objectStorage: false, + transcoded: true + }) await stopFfmpeg(ffmpegCommand) await commands[0].waitUntilEnded({ videoId: liveVideoId }) @@ -611,7 +592,14 @@ describe('Test live', function () { await waitUntilLivePublishedOnAllServers(servers, liveVideoId) await waitJobs(servers) - await testVideoResolutions(liveVideoId, resolutions) + await testVideoResolutions({ + originServer: servers[0], + servers, + liveVideoId, + resolutions, + objectStorage: false, + transcoded: true + }) await stopFfmpeg(ffmpegCommand) await commands[0].waitUntilEnded({ videoId: liveVideoId }) @@ -640,7 +628,14 @@ describe('Test live', function () { await waitUntilLivePublishedOnAllServers(servers, liveVideoId) await waitJobs(servers) - await testVideoResolutions(liveVideoId, [ 720 ]) + await testVideoResolutions({ + originServer: servers[0], + servers, + liveVideoId, + resolutions: [ 720 ], + objectStorage: false, + transcoded: true + }) await stopFfmpeg(ffmpegCommand) await commands[0].waitUntilEnded({ videoId: liveVideoId }) diff --git a/server/tests/api/object-storage/live.ts b/server/tests/api/object-storage/live.ts index 0958ffe0f..7e16b4c89 100644 --- a/server/tests/api/object-storage/live.ts +++ b/server/tests/api/object-storage/live.ts @@ -1,9 +1,9 @@ /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ import { expect } from 'chai' -import { expectStartWith } from '@server/tests/shared' +import { expectStartWith, testVideoResolutions } from '@server/tests/shared' import { areObjectStorageTestsDisabled } from '@shared/core-utils' -import { HttpStatusCode, LiveVideoCreate, VideoFile, VideoPrivacy } from '@shared/models' +import { HttpStatusCode, LiveVideoCreate, VideoPrivacy } from '@shared/models' import { createMultipleServers, doubleFollow, @@ -35,41 +35,43 @@ async function createLive (server: PeerTubeServer, permanent: boolean) { return uuid } -async function checkFiles (files: VideoFile[]) { - for (const file of files) { - expectStartWith(file.fileUrl, ObjectStorageCommand.getPlaylistBaseUrl()) +async function checkFilesExist (servers: PeerTubeServer[], videoUUID: string, numberOfFiles: number) { + for (const server of servers) { + const video = await server.videos.get({ id: videoUUID }) - await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200) - } -} + expect(video.files).to.have.lengthOf(0) + expect(video.streamingPlaylists).to.have.lengthOf(1) -async function getFiles (server: PeerTubeServer, videoUUID: string) { - const video = await server.videos.get({ id: videoUUID }) + const files = video.streamingPlaylists[0].files + expect(files).to.have.lengthOf(numberOfFiles) - expect(video.files).to.have.lengthOf(0) - expect(video.streamingPlaylists).to.have.lengthOf(1) + for (const file of files) { + expectStartWith(file.fileUrl, ObjectStorageCommand.getPlaylistBaseUrl()) - return video.streamingPlaylists[0].files + await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200) + } + } } -async function streamAndEnd (servers: PeerTubeServer[], liveUUID: string) { - const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveUUID }) - await waitUntilLivePublishedOnAllServers(servers, liveUUID) - - const videoLiveDetails = await servers[0].videos.get({ id: liveUUID }) - const liveDetails = await servers[0].live.get({ videoId: liveUUID }) +async function checkFilesCleanup (server: PeerTubeServer, videoUUID: string, resolutions: number[]) { + const resolutionFiles = resolutions.map((_value, i) => `${i}.m3u8`) - await stopFfmpeg(ffmpegCommand) - - if (liveDetails.permanentLive) { - await waitUntilLiveWaitingOnAllServers(servers, liveUUID) - } else { - await waitUntilLiveReplacedByReplayOnAllServers(servers, liveUUID) + for (const playlistName of [ 'master.m3u8' ].concat(resolutionFiles)) { + await server.live.getPlaylistFile({ + videoUUID, + playlistName, + expectedStatus: HttpStatusCode.NOT_FOUND_404, + objectStorage: true + }) } - await waitJobs(servers) - - return { videoLiveDetails, liveDetails } + await server.live.getSegmentFile({ + videoUUID, + playlistNumber: 0, + segment: 0, + objectStorage: true, + expectedStatus: HttpStatusCode.NOT_FOUND_404 + }) } describe('Object storage for lives', function () { @@ -100,57 +102,124 @@ describe('Object storage for lives', function () { videoUUID = await createLive(servers[0], false) }) - it('Should create a live and save the replay on object storage', async function () { + it('Should create a live and publish it on object storage', async function () { + this.timeout(220000) + + const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUID }) + await waitUntilLivePublishedOnAllServers(servers, videoUUID) + + await testVideoResolutions({ + originServer: servers[0], + servers, + liveVideoId: videoUUID, + resolutions: [ 720 ], + transcoded: false, + objectStorage: true + }) + + await stopFfmpeg(ffmpegCommand) + }) + + it('Should have saved the replay on object storage', async function () { this.timeout(220000) - await streamAndEnd(servers, videoUUID) + await waitUntilLiveReplacedByReplayOnAllServers(servers, videoUUID) + await waitJobs(servers) - for (const server of servers) { - const files = await getFiles(server, videoUUID) - expect(files).to.have.lengthOf(1) + await checkFilesExist(servers, videoUUID, 1) + }) - await checkFiles(files) - } + it('Should have cleaned up live files from object storage', async function () { + await checkFilesCleanup(servers[0], videoUUID, [ 720 ]) }) }) describe('With live transcoding', async function () { - let videoUUIDPermanent: string - let videoUUIDNonPermanent: string + const resolutions = [ 720, 480, 360, 240, 144 ] before(async function () { await servers[0].config.enableLive({ transcoding: true }) - - videoUUIDPermanent = await createLive(servers[0], true) - videoUUIDNonPermanent = await createLive(servers[0], false) }) - it('Should create a live and save the replay on object storage', async function () { - this.timeout(240000) + describe('Normal replay', function () { + let videoUUIDNonPermanent: string + + before(async function () { + videoUUIDNonPermanent = await createLive(servers[0], false) + }) + + it('Should create a live and publish it on object storage', async function () { + this.timeout(240000) + + const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUIDNonPermanent }) + await waitUntilLivePublishedOnAllServers(servers, videoUUIDNonPermanent) + + await testVideoResolutions({ + originServer: servers[0], + servers, + liveVideoId: videoUUIDNonPermanent, + resolutions, + transcoded: true, + objectStorage: true + }) + + await stopFfmpeg(ffmpegCommand) + }) - await streamAndEnd(servers, videoUUIDNonPermanent) + it('Should have saved the replay on object storage', async function () { + this.timeout(220000) - for (const server of servers) { - const files = await getFiles(server, videoUUIDNonPermanent) - expect(files).to.have.lengthOf(5) + await waitUntilLiveReplacedByReplayOnAllServers(servers, videoUUIDNonPermanent) + await waitJobs(servers) - await checkFiles(files) - } + await checkFilesExist(servers, videoUUIDNonPermanent, 5) + }) + + it('Should have cleaned up live files from object storage', async function () { + await checkFilesCleanup(servers[0], videoUUIDNonPermanent, resolutions) + }) }) - it('Should create a live and save the replay of permanent live on object storage', async function () { - this.timeout(240000) + describe('Permanent replay', function () { + let videoUUIDPermanent: string + + before(async function () { + videoUUIDPermanent = await createLive(servers[0], true) + }) + + it('Should create a live and publish it on object storage', async function () { + this.timeout(240000) + + const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUIDPermanent }) + await waitUntilLivePublishedOnAllServers(servers, videoUUIDPermanent) + + await testVideoResolutions({ + originServer: servers[0], + servers, + liveVideoId: videoUUIDPermanent, + resolutions, + transcoded: true, + objectStorage: true + }) + + await stopFfmpeg(ffmpegCommand) + }) + + it('Should have saved the replay on object storage', async function () { + this.timeout(220000) - const { videoLiveDetails } = await streamAndEnd(servers, videoUUIDPermanent) + await waitUntilLiveWaitingOnAllServers(servers, videoUUIDPermanent) + await waitJobs(servers) - const replay = await findExternalSavedVideo(servers[0], videoLiveDetails) + const videoLiveDetails = await servers[0].videos.get({ id: videoUUIDPermanent }) + const replay = await findExternalSavedVideo(servers[0], videoLiveDetails) - for (const server of servers) { - const files = await getFiles(server, replay.uuid) - expect(files).to.have.lengthOf(5) + await checkFilesExist(servers, replay.uuid, 5) + }) - await checkFiles(files) - } + it('Should have cleaned up live files from object storage', async function () { + await checkFilesCleanup(servers[0], videoUUIDPermanent, resolutions) + }) }) }) diff --git a/server/tests/shared/live.ts b/server/tests/shared/live.ts index 4bd4786fc..aa79622cb 100644 --- a/server/tests/shared/live.ts +++ b/server/tests/shared/live.ts @@ -3,39 +3,92 @@ import { expect } from 'chai' import { pathExists, readdir } from 'fs-extra' import { join } from 'path' -import { LiveVideo } from '@shared/models' -import { PeerTubeServer } from '@shared/server-commands' +import { wait } from '@shared/core-utils' +import { LiveVideo, VideoStreamingPlaylistType } from '@shared/models' +import { ObjectStorageCommand, PeerTubeServer } from '@shared/server-commands' +import { checkLiveSegmentHash, checkResolutionsInMasterPlaylist } from './streaming-playlists' async function checkLiveCleanup (server: PeerTubeServer, videoUUID: string, savedResolutions: number[] = []) { - let live: LiveVideo - - try { - live = await server.live.get({ videoId: videoUUID }) - } catch {} - const basePath = server.servers.buildDirectory('streaming-playlists') const hlsPath = join(basePath, 'hls', videoUUID) if (savedResolutions.length === 0) { + return checkUnsavedLiveCleanup(server, videoUUID, hlsPath) + } + + return checkSavedLiveCleanup(hlsPath, savedResolutions) +} + +// --------------------------------------------------------------------------- - if (live?.permanentLive) { - expect(await pathExists(hlsPath)).to.be.true +async function testVideoResolutions (options: { + originServer: PeerTubeServer + servers: PeerTubeServer[] + liveVideoId: string + resolutions: number[] + transcoded: boolean + objectStorage: boolean +}) { + const { originServer, servers, liveVideoId, resolutions, transcoded, objectStorage } = options - const hlsFiles = await readdir(hlsPath) - expect(hlsFiles).to.have.lengthOf(1) // Only replays directory + for (const server of servers) { + const { data } = await server.videos.list() + expect(data.find(v => v.uuid === liveVideoId)).to.exist - const replayDir = join(hlsPath, 'replay') - expect(await pathExists(replayDir)).to.be.true + const video = await server.videos.get({ id: liveVideoId }) + expect(video.streamingPlaylists).to.have.lengthOf(1) - const replayFiles = await readdir(join(hlsPath, 'replay')) - expect(replayFiles).to.have.lengthOf(0) - } else { - expect(await pathExists(hlsPath)).to.be.false + const hlsPlaylist = video.streamingPlaylists.find(s => s.type === VideoStreamingPlaylistType.HLS) + expect(hlsPlaylist).to.exist + expect(hlsPlaylist.files).to.have.lengthOf(0) // Only fragmented mp4 files are displayed + + await checkResolutionsInMasterPlaylist({ server, playlistUrl: hlsPlaylist.playlistUrl, resolutions, transcoded }) + + if (objectStorage) { + expect(hlsPlaylist.playlistUrl).to.contain(ObjectStorageCommand.getPlaylistBaseUrl()) } - return + for (let i = 0; i < resolutions.length; i++) { + const segmentNum = 3 + const segmentName = `${i}-00000${segmentNum}.ts` + await originServer.live.waitUntilSegmentGeneration({ videoUUID: video.uuid, playlistNumber: i, segment: segmentNum }) + + const baseUrl = objectStorage + ? ObjectStorageCommand.getPlaylistBaseUrl() + 'hls' + : originServer.url + '/static/streaming-playlists/hls' + + if (objectStorage) { + // Playlist file upload + await wait(500) + + expect(hlsPlaylist.segmentsSha256Url).to.contain(ObjectStorageCommand.getPlaylistBaseUrl()) + } + + const subPlaylist = await originServer.streamingPlaylists.get({ url: `${baseUrl}/${video.uuid}/${i}.m3u8` }) + + expect(subPlaylist).to.contain(segmentName) + + await checkLiveSegmentHash({ + server, + baseUrlSegment: baseUrl, + videoUUID: video.uuid, + segmentName, + hlsPlaylist + }) + } } +} + +// --------------------------------------------------------------------------- + +export { + checkLiveCleanup, + testVideoResolutions +} +// --------------------------------------------------------------------------- + +async function checkSavedLiveCleanup (hlsPath: string, savedResolutions: number[] = []) { const files = await readdir(hlsPath) // fragmented file and playlist per resolution + master playlist + segments sha256 json file @@ -56,6 +109,27 @@ async function checkLiveCleanup (server: PeerTubeServer, videoUUID: string, save expect(shaFile).to.exist } -export { - checkLiveCleanup +async function checkUnsavedLiveCleanup (server: PeerTubeServer, videoUUID: string, hlsPath: string) { + let live: LiveVideo + + try { + live = await server.live.get({ videoId: videoUUID }) + } catch {} + + if (live?.permanentLive) { + expect(await pathExists(hlsPath)).to.be.true + + const hlsFiles = await readdir(hlsPath) + expect(hlsFiles).to.have.lengthOf(1) // Only replays directory + + const replayDir = join(hlsPath, 'replay') + expect(await pathExists(replayDir)).to.be.true + + const replayFiles = await readdir(join(hlsPath, 'replay')) + expect(replayFiles).to.have.lengthOf(0) + + return + } + + expect(await pathExists(hlsPath)).to.be.false } diff --git a/server/tests/shared/streaming-playlists.ts b/server/tests/shared/streaming-playlists.ts index 4d82b3654..eff34944b 100644 --- a/server/tests/shared/streaming-playlists.ts +++ b/server/tests/shared/streaming-playlists.ts @@ -26,7 +26,7 @@ async function checkSegmentHash (options: { const offset = parseInt(matches[2], 10) const range = `${offset}-${offset + length - 1}` - const segmentBody = await command.getSegment({ + const segmentBody = await command.getFragmentedSegment({ url: `${baseUrlSegment}/${videoName}`, expectedStatus: HttpStatusCode.PARTIAL_CONTENT_206, range: `bytes=${range}` @@ -46,7 +46,7 @@ async function checkLiveSegmentHash (options: { const { server, baseUrlSegment, videoUUID, segmentName, hlsPlaylist } = options const command = server.streamingPlaylists - const segmentBody = await command.getSegment({ url: `${baseUrlSegment}/${videoUUID}/${segmentName}` }) + const segmentBody = await command.getFragmentedSegment({ url: `${baseUrlSegment}/${videoUUID}/${segmentName}` }) const shaBody = await command.getSegmentSha256({ url: hlsPlaylist.segmentsSha256Url }) expect(sha256(segmentBody)).to.equal(shaBody[segmentName]) @@ -56,15 +56,16 @@ async function checkResolutionsInMasterPlaylist (options: { server: PeerTubeServer playlistUrl: string resolutions: number[] + transcoded?: boolean // default true }) { - const { server, playlistUrl, resolutions } = options + const { server, playlistUrl, resolutions, transcoded = true } = options const masterPlaylist = await server.streamingPlaylists.get({ url: playlistUrl }) for (const resolution of resolutions) { - const reg = new RegExp( - '#EXT-X-STREAM-INF:BANDWIDTH=\\d+,RESOLUTION=\\d+x' + resolution + ',(FRAME-RATE=\\d+,)?CODECS="avc1.64001f,mp4a.40.2"' - ) + const reg = transcoded + ? new RegExp('#EXT-X-STREAM-INF:BANDWIDTH=\\d+,RESOLUTION=\\d+x' + resolution + ',(FRAME-RATE=\\d+,)?CODECS="avc1.64001f,mp4a.40.2"') + : new RegExp('#EXT-X-STREAM-INF:BANDWIDTH=\\d+,RESOLUTION=\\d+x' + resolution + '') expect(masterPlaylist).to.match(reg) } diff --git a/shared/server-commands/videos/live-command.ts b/shared/server-commands/videos/live-command.ts index d804fd883..defae95fb 100644 --- a/shared/server-commands/videos/live-command.ts +++ b/shared/server-commands/videos/live-command.ts @@ -15,6 +15,7 @@ import { VideoState } from '@shared/models' import { unwrapBody } from '../requests' +import { ObjectStorageCommand } from '../server' import { AbstractCommand, OverrideCommandOptions } from '../shared' import { sendRTMPStream, testFfmpegStreamError } from './live' @@ -34,6 +35,8 @@ export class LiveCommand extends AbstractCommand { }) } + // --------------------------------------------------------------------------- + listSessions (options: OverrideCommandOptions & { videoId: number | string }) { @@ -70,6 +73,8 @@ export class LiveCommand extends AbstractCommand { }) } + // --------------------------------------------------------------------------- + update (options: OverrideCommandOptions & { videoId: number | string fields: LiveVideoUpdate @@ -110,6 +115,8 @@ export class LiveCommand extends AbstractCommand { return body.video } + // --------------------------------------------------------------------------- + async sendRTMPStreamInVideo (options: OverrideCommandOptions & { videoId: number | string fixtureName?: string @@ -130,6 +137,8 @@ export class LiveCommand extends AbstractCommand { return testFfmpegStreamError(command, options.shouldHaveError) } + // --------------------------------------------------------------------------- + waitUntilPublished (options: OverrideCommandOptions & { videoId: number | string }) { @@ -163,15 +172,34 @@ export class LiveCommand extends AbstractCommand { return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, totalSessions * 2, false) } - getSegment (options: OverrideCommandOptions & { + async waitUntilReplacedByReplay (options: OverrideCommandOptions & { + videoId: number | string + }) { + let video: VideoDetails + + do { + video = await this.server.videos.getWithToken({ token: options.token, id: options.videoId }) + + await wait(500) + } while (video.isLive === true || video.state.id !== VideoState.PUBLISHED) + } + + // --------------------------------------------------------------------------- + + getSegmentFile (options: OverrideCommandOptions & { videoUUID: string playlistNumber: number segment: number + objectStorage?: boolean // default false }) { - const { playlistNumber, segment, videoUUID } = options + const { playlistNumber, segment, videoUUID, objectStorage = false } = options const segmentName = `${playlistNumber}-00000${segment}.ts` - const url = `${this.server.url}/static/streaming-playlists/hls/${videoUUID}/${segmentName}` + const baseUrl = objectStorage + ? ObjectStorageCommand.getPlaylistBaseUrl() + : `${this.server.url}/static/streaming-playlists/hls` + + const url = `${baseUrl}/${videoUUID}/${segmentName}` return this.getRawRequest({ ...options, @@ -182,18 +210,30 @@ export class LiveCommand extends AbstractCommand { }) } - async waitUntilReplacedByReplay (options: OverrideCommandOptions & { - videoId: number | string + getPlaylistFile (options: OverrideCommandOptions & { + videoUUID: string + playlistName: string + objectStorage?: boolean // default false }) { - let video: VideoDetails + const { playlistName, videoUUID, objectStorage = false } = options - do { - video = await this.server.videos.getWithToken({ token: options.token, id: options.videoId }) + const baseUrl = objectStorage + ? ObjectStorageCommand.getPlaylistBaseUrl() + : `${this.server.url}/static/streaming-playlists/hls` - await wait(500) - } while (video.isLive === true || video.state.id !== VideoState.PUBLISHED) + const url = `${baseUrl}/${videoUUID}/${playlistName}` + + return this.getRawRequest({ + ...options, + + url, + implicitToken: false, + defaultExpectedStatus: HttpStatusCode.OK_200 + }) } + // --------------------------------------------------------------------------- + async countPlaylists (options: OverrideCommandOptions & { videoUUID: string }) { diff --git a/shared/server-commands/videos/streaming-playlists-command.ts b/shared/server-commands/videos/streaming-playlists-command.ts index 5d40d35cb..7f923d001 100644 --- a/shared/server-commands/videos/streaming-playlists-command.ts +++ b/shared/server-commands/videos/streaming-playlists-command.ts @@ -16,7 +16,7 @@ export class StreamingPlaylistsCommand extends AbstractCommand { })) } - getSegment (options: OverrideCommandOptions & { + getFragmentedSegment (options: OverrideCommandOptions & { url: string range?: string }) { diff --git a/support/doc/api/openapi.yaml b/support/doc/api/openapi.yaml index 5077f8d90..c62310b76 100644 --- a/support/doc/api/openapi.yaml +++ b/support/doc/api/openapi.yaml @@ -145,7 +145,6 @@ info: | `/api/*` | | `/download/*` | | `/lazy-static/*` | - | `/live/segments-sha256/*` | | `/.well-known/webfinger` | In addition, all routes serving ActivityPub are CORS-enabled for all origins. -- 2.41.0