diff options
author | Chocobozzz <me@florianbigard.com> | 2022-10-21 14:42:31 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2022-10-24 14:48:24 +0200 |
commit | b3ce36069f90dcc0a0fab41d8c77f8722774544d (patch) | |
tree | f212ddb627e3cf1cfaab5dca040e769814759892 /server/lib/live/shared | |
parent | 0177101284509ad68c5e484bce0474f57069c006 (diff) | |
download | PeerTube-b3ce36069f90dcc0a0fab41d8c77f8722774544d.tar.gz PeerTube-b3ce36069f90dcc0a0fab41d8c77f8722774544d.tar.zst PeerTube-b3ce36069f90dcc0a0fab41d8c77f8722774544d.zip |
Prevent concurrency issues when sending m3u8 file
Diffstat (limited to 'server/lib/live/shared')
-rw-r--r-- | server/lib/live/shared/muxing-session.ts | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 4c27d5dd8..64add2611 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -3,6 +3,7 @@ import { mapSeries } from 'bluebird' | |||
3 | import { FSWatcher, watch } from 'chokidar' | 3 | import { FSWatcher, watch } from 'chokidar' |
4 | import { FfmpegCommand } from 'fluent-ffmpeg' | 4 | import { FfmpegCommand } from 'fluent-ffmpeg' |
5 | import { appendFile, ensureDir, readFile, stat } from 'fs-extra' | 5 | import { appendFile, ensureDir, readFile, stat } from 'fs-extra' |
6 | import PQueue from 'p-queue' | ||
6 | import { basename, join } from 'path' | 7 | import { basename, join } from 'path' |
7 | import { EventEmitter } from 'stream' | 8 | import { EventEmitter } from 'stream' |
8 | import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg' | 9 | import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg' |
@@ -21,7 +22,6 @@ import { LiveSegmentShaStore } from '../live-segment-sha-store' | |||
21 | import { buildConcatenatedName } from '../live-utils' | 22 | import { buildConcatenatedName } from '../live-utils' |
22 | 23 | ||
23 | import memoizee = require('memoizee') | 24 | import memoizee = require('memoizee') |
24 | |||
25 | interface MuxingSessionEvents { | 25 | interface MuxingSessionEvents { |
26 | 'live-ready': (options: { videoId: number }) => void | 26 | 'live-ready': (options: { videoId: number }) => void |
27 | 27 | ||
@@ -278,11 +278,18 @@ class MuxingSession extends EventEmitter { | |||
278 | private watchM3U8File () { | 278 | private watchM3U8File () { |
279 | this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') | 279 | this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') |
280 | 280 | ||
281 | const sendQueues = new Map<string, PQueue>() | ||
282 | |||
281 | const onChangeOrAdd = async (m3u8Path: string) => { | 283 | const onChangeOrAdd = async (m3u8Path: string) => { |
282 | if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return | 284 | if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return |
283 | 285 | ||
284 | try { | 286 | try { |
285 | await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path) | 287 | if (!sendQueues.has(m3u8Path)) { |
288 | sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) | ||
289 | } | ||
290 | |||
291 | const queue = sendQueues.get(m3u8Path) | ||
292 | await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)) | ||
286 | } catch (err) { | 293 | } catch (err) { |
287 | logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) | 294 | logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) |
288 | } | 295 | } |