From def4ea4f3824689abd4ea66f5ab6e47d553b7ddd Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 19 May 2023 13:46:40 +0200 Subject: More robust chunk handler --- .../server/process/shared/process-live.ts | 66 +++++++++++++++------- 1 file changed, 46 insertions(+), 20 deletions(-) (limited to 'packages/peertube-runner/server/process') diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts index b17b51c7c..df1b677f0 100644 --- a/packages/peertube-runner/server/process/shared/process-live.ts +++ b/packages/peertube-runner/server/process/shared/process-live.ts @@ -21,6 +21,9 @@ export class ProcessLiveRTMPHLSTranscoding { private readonly outputPath: string private readonly fsWatchers: FSWatcher[] = [] + // Playlist name -> chunks + private readonly pendingChunksPerPlaylist = new Map() + private readonly playlistsCreated = new Set() private allPlaylistsCreated = false @@ -68,9 +71,19 @@ export class ProcessLiveRTMPHLSTranscoding { } }) - tsWatcher.on('add', p => { - this.sendAddedChunkUpdate(p) - .catch(err => this.onUpdateError(err, rej)) + tsWatcher.on('add', async p => { + try { + await this.sendPendingChunks() + } catch (err) { + this.onUpdateError(err, rej) + } + + const playlistName = this.getPlaylistIdFromTS(p) + + const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || [] + pendingChunks.push(p) + + this.pendingChunksPerPlaylist.set(playlistName, pendingChunks) }) tsWatcher.on('unlink', p => { @@ -230,31 +243,38 @@ export class ProcessLiveRTMPHLSTranscoding { return this.updateWithRetry(payload) } - private sendAddedChunkUpdate (addedChunk: string): Promise { + private async sendPendingChunks (): Promise { if (this.ended) return Promise.resolve() - logger.debug(`Sending added live chunk ${addedChunk} update`) + for (const playlist of this.pendingChunksPerPlaylist.keys()) { + for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) { + logger.debug(`Sending added live chunk ${chunk} update`) - const videoChunkFilename = basename(addedChunk) + const videoChunkFilename = basename(chunk) - let payload: LiveRTMPHLSTranscodingUpdatePayload = { - type: 'add-chunk', - videoChunkFilename, - videoChunkFile: addedChunk - } + let payload: LiveRTMPHLSTranscodingUpdatePayload = { + type: 'add-chunk', + videoChunkFilename, + videoChunkFile: chunk + } - if (this.allPlaylistsCreated) { - const playlistName = this.getPlaylistName(videoChunkFilename) + if (this.allPlaylistsCreated) { + const playlistName = this.getPlaylistName(videoChunkFilename) - payload = { - ...payload, - masterPlaylistFile: join(this.outputPath, 'master.m3u8'), - resolutionPlaylistFilename: playlistName, - resolutionPlaylistFile: join(this.outputPath, playlistName) + payload = { + ...payload, + masterPlaylistFile: join(this.outputPath, 'master.m3u8'), + resolutionPlaylistFilename: playlistName, + resolutionPlaylistFile: join(this.outputPath, playlistName) + } + } + + this.updateWithRetry(payload) + .catch(err => logger.error({ err }, 'Cannot update with retry')) } - } - return this.updateWithRetry(payload) + this.pendingChunksPerPlaylist.set(playlist, []) + } } private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise { @@ -281,6 +301,12 @@ export class ProcessLiveRTMPHLSTranscoding { return `${videoChunkFilename.split('-')[0]}.m3u8` } + private getPlaylistIdFromTS (segmentPath: string) { + const playlistIdMatcher = /^([\d+])-/ + + return basename(segmentPath).match(playlistIdMatcher)[1] + } + // --------------------------------------------------------------------------- private cleanup () { -- cgit v1.2.3