From b3ce36069f90dcc0a0fab41d8c77f8722774544d Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 21 Oct 2022 14:42:31 +0200 Subject: Prevent concurrency issues when sending m3u8 file --- server/lib/live/shared/muxing-session.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'server/lib') diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 4c27d5dd8..64add2611 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -3,6 +3,7 @@ import { mapSeries } from 'bluebird' import { FSWatcher, watch } from 'chokidar' import { FfmpegCommand } from 'fluent-ffmpeg' import { appendFile, ensureDir, readFile, stat } from 'fs-extra' +import PQueue from 'p-queue' import { basename, join } from 'path' import { EventEmitter } from 'stream' import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg' @@ -21,7 +22,6 @@ import { LiveSegmentShaStore } from '../live-segment-sha-store' import { buildConcatenatedName } from '../live-utils' import memoizee = require('memoizee') - interface MuxingSessionEvents { 'live-ready': (options: { videoId: number }) => void @@ -278,11 +278,18 @@ class MuxingSession extends EventEmitter { private watchM3U8File () { this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') + const sendQueues = new Map() + const onChangeOrAdd = async (m3u8Path: string) => { if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return try { - await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path) + if (!sendQueues.has(m3u8Path)) { + sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) + } + + const queue = sendQueues.get(m3u8Path) + await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)) } catch (err) { logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) } -- cgit v1.2.3