From 34023e12534f22f28d0b5afc1db3fdf2fd1e7e60 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 10 May 2023 11:16:05 +0200 Subject: Fix S3 live sync Ensure TS chunks referenced in M3U8 playlist are already uploaded on S3 --- server/lib/live/shared/muxing-session.ts | 74 +++++++++++++++++++------------- 1 file changed, 45 insertions(+), 29 deletions(-) (limited to 'server/lib/live/shared') diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index ef4ecb83e..57e28aabf 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -8,7 +8,12 @@ import { computeOutputFPS } from '@server/helpers/ffmpeg' import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' import { CONFIG } from '@server/initializers/config' import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' -import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage' +import { + removeHLSFileObjectStorageByPath, + storeHLSFileFromContent, + storeHLSFileFromFilename, + storeHLSFileFromPath +} from '@server/lib/object-storage' import { VideoFileModel } from '@server/models/video/video-file' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' @@ -74,6 +79,9 @@ class MuxingSession extends EventEmitter { private readonly lTags: LoggerTagsFn + // Path -> Queue + private readonly objectStorageSendQueues = new Map() + private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} private streamingPlaylist: MStreamingPlaylistVideo @@ -151,7 +159,6 @@ class MuxingSession extends EventEmitter { this.watchMasterFile() this.watchTSFiles() - this.watchM3U8File() } abort () { @@ -192,41 +199,15 @@ class MuxingSession extends EventEmitter { }) } - private watchM3U8File () { - const sendQueues = new Map() - - const onChange = async (m3u8Path: string) => { - if (m3u8Path.endsWith('.m3u8') !== true) return - if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return - - logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags()) - - try { - if (!sendQueues.has(m3u8Path)) { - sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) - } - - const queue = sendQueues.get(m3u8Path) - await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)) - } catch (err) { - logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) - } - } - - this.filesWatcher.on('change', onChange) - } - private watchTSFiles () { const startStreamDateTime = new Date().getTime() - const playlistIdMatcher = /^([\d+])-/ - const addHandler = async (segmentPath: string) => { if (segmentPath.endsWith('.ts') !== true) return logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) - const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] + const playlistId = this.getPlaylistIdFromTS(segmentPath) const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] this.processSegments(segmentsToProcess) @@ -349,6 +330,8 @@ class MuxingSession extends EventEmitter { if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { try { await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) + + await this.processM3U8ToObjectStorage(segmentPath) } catch (err) { logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) } @@ -362,6 +345,29 @@ class MuxingSession extends EventEmitter { } } + private async processM3U8ToObjectStorage (segmentPath: string) { + const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath)) + + logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags()) + + const segmentName = basename(segmentPath) + + const playlistContent = await readFile(m3u8Path, 'utf-8') + // Remove new chunk references, that will be processed later + const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n' + + try { + if (!this.objectStorageSendQueues.has(m3u8Path)) { + this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) + } + + const queue = this.objectStorageSendQueues.get(m3u8Path) + await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent)) + } catch (err) { + logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) + } + } + private onTranscodingError () { this.emit('transcoding-error', ({ videoUUID: this.videoUUID })) } @@ -484,6 +490,16 @@ class MuxingSession extends EventEmitter { ? new RemoteTranscodingWrapper(options) : new FFmpegTranscodingWrapper(options) } + + private getPlaylistIdFromTS (segmentPath: string) { + const playlistIdMatcher = /^([\d+])-/ + + return basename(segmentPath).match(playlistIdMatcher)[1] + } + + private getPlaylistNameFromTS (segmentPath: string) { + return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8` + } } // --------------------------------------------------------------------------- -- cgit v1.2.3