From cfd57d2ca0bb058087f7dc90fcc3e8442b0288e1 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 4 Oct 2022 10:03:17 +0200 Subject: Live supports object storage * Sync live files (segments, master playlist, resolution playlist, segment sha file) into object storage * Automatically delete them when the live ends * Segment sha file is now a file on disk, and not stored in memory anymore --- server/lib/live/shared/muxing-session.ts | 106 ++++++++++++++++++++++++++----- 1 file changed, 91 insertions(+), 15 deletions(-) (limited to 'server/lib/live/shared') diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 505717dce..4c27d5dd8 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -9,8 +9,10 @@ import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' import { CONFIG } from '@server/initializers/config' import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' +import { removeHLSFileObjectStorage, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage' import { VideoFileModel } from '@server/models/video/video-file' import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' +import { VideoStorage } from '@shared/models' import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' import { isAbleToUploadVideo } from '../../user' @@ -21,7 +23,7 @@ import { buildConcatenatedName } from '../live-utils' import memoizee = require('memoizee') interface MuxingSessionEvents { - 'master-playlist-created': (options: { videoId: number }) => void + 'live-ready': (options: { videoId: number }) => void 'bad-socket-health': (options: { videoId: number }) => void 'duration-exceeded': (options: { videoId: number }) => void @@ -68,12 +70,18 @@ class MuxingSession extends EventEmitter { private readonly outDirectory: string private readonly replayDirectory: string + private readonly liveSegmentShaStore: LiveSegmentShaStore + private readonly lTags: LoggerTagsFn private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} private tsWatcher: FSWatcher private masterWatcher: FSWatcher + private m3u8Watcher: FSWatcher + + private masterPlaylistCreated = false + private liveReady = false private aborted = false @@ -123,6 +131,13 @@ class MuxingSession extends EventEmitter { this.outDirectory = getLiveDirectory(this.videoLive.Video) this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) + this.liveSegmentShaStore = new LiveSegmentShaStore({ + videoUUID: this.videoLive.Video.uuid, + sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename), + streamingPlaylist: this.streamingPlaylist, + sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED + }) + this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) } @@ -159,8 +174,9 @@ class MuxingSession extends EventEmitter { logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) - this.watchTSFiles() this.watchMasterFile() + this.watchTSFiles() + this.watchM3U8File() let ffmpegShellCommand: string this.ffmpegCommand.on('start', cmdline => { @@ -219,7 +235,7 @@ class MuxingSession extends EventEmitter { setTimeout(() => { // Wait latest segments generation, and close watchers - Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ]) + Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) .then(() => { // Process remaining segments hash for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { @@ -240,14 +256,41 @@ class MuxingSession extends EventEmitter { private watchMasterFile () { this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) - this.masterWatcher.on('add', () => { - this.emit('master-playlist-created', { videoId: this.videoId }) + this.masterWatcher.on('add', async () => { + if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { + try { + const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) + + this.streamingPlaylist.playlistUrl = url + await this.streamingPlaylist.save() + } catch (err) { + logger.error('Cannot upload live master file to object storage.', { err, ...this.lTags() }) + } + } + + this.masterPlaylistCreated = true this.masterWatcher.close() .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) }) } + private watchM3U8File () { + this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') + + const onChangeOrAdd = async (m3u8Path: string) => { + if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return + + try { + await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path) + } catch (err) { + logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) + } + } + + this.m3u8Watcher.on('change', onChangeOrAdd) + } + private watchTSFiles () { const startStreamDateTime = new Date().getTime() @@ -282,7 +325,21 @@ class MuxingSession extends EventEmitter { } } - const deleteHandler = (segmentPath: string) => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) + const deleteHandler = async (segmentPath: string) => { + try { + await this.liveSegmentShaStore.removeSegmentSha(segmentPath) + } catch (err) { + logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() }) + } + + if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { + try { + await removeHLSFileObjectStorage(this.streamingPlaylist, segmentPath) + } catch (err) { + logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() }) + } + } + } this.tsWatcher.on('add', p => addHandler(p)) this.tsWatcher.on('unlink', p => deleteHandler(p)) @@ -315,6 +372,7 @@ class MuxingSession extends EventEmitter { extname: '.ts', infoHash: null, fps: this.fps, + storage: this.streamingPlaylist.storage, videoStreamingPlaylistId: this.streamingPlaylist.id }) @@ -343,18 +401,36 @@ class MuxingSession extends EventEmitter { } private processSegments (segmentPaths: string[]) { - mapSeries(segmentPaths, async previousSegment => { - // Add sha hash of previous segments, because ffmpeg should have finished generating them - await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) + mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment)) + .catch(err => { + if (this.aborted) return + + logger.error('Cannot process segments', { err, ...this.lTags() }) + }) + } - if (this.saveReplay) { - await this.addSegmentToReplay(previousSegment) + private async processSegment (segmentPath: string) { + // Add sha hash of previous segments, because ffmpeg should have finished generating them + await this.liveSegmentShaStore.addSegmentSha(segmentPath) + + if (this.saveReplay) { + await this.addSegmentToReplay(segmentPath) + } + + if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { + try { + await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) + } catch (err) { + logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) } - }).catch(err => { - if (this.aborted) return + } - logger.error('Cannot process segments', { err, ...this.lTags() }) - }) + // Master playlist and segment JSON file are created, live is ready + if (this.masterPlaylistCreated && !this.liveReady) { + this.liveReady = true + + this.emit('live-ready', { videoId: this.videoId }) + } } private hasClientSocketInBadHealth (sessionId: string) { -- cgit v1.2.3