aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-10-21 14:42:31 +0200
committerChocobozzz <chocobozzz@cpy.re>2022-10-24 14:48:24 +0200
commitb3ce36069f90dcc0a0fab41d8c77f8722774544d (patch)
treef212ddb627e3cf1cfaab5dca040e769814759892
parent0177101284509ad68c5e484bce0474f57069c006 (diff)
downloadPeerTube-b3ce36069f90dcc0a0fab41d8c77f8722774544d.tar.gz
PeerTube-b3ce36069f90dcc0a0fab41d8c77f8722774544d.tar.zst
PeerTube-b3ce36069f90dcc0a0fab41d8c77f8722774544d.zip
Prevent concurrency issues when sending m3u8 file
-rw-r--r--server/lib/live/shared/muxing-session.ts11
-rw-r--r--server/tests/feeds/feeds.ts2
2 files changed, 10 insertions, 3 deletions
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index 4c27d5dd8..64add2611 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -3,6 +3,7 @@ import { mapSeries } from 'bluebird'
3import { FSWatcher, watch } from 'chokidar' 3import { FSWatcher, watch } from 'chokidar'
4import { FfmpegCommand } from 'fluent-ffmpeg' 4import { FfmpegCommand } from 'fluent-ffmpeg'
5import { appendFile, ensureDir, readFile, stat } from 'fs-extra' 5import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
6import PQueue from 'p-queue'
6import { basename, join } from 'path' 7import { basename, join } from 'path'
7import { EventEmitter } from 'stream' 8import { EventEmitter } from 'stream'
8import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg' 9import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg'
@@ -21,7 +22,6 @@ import { LiveSegmentShaStore } from '../live-segment-sha-store'
21import { buildConcatenatedName } from '../live-utils' 22import { buildConcatenatedName } from '../live-utils'
22 23
23import memoizee = require('memoizee') 24import memoizee = require('memoizee')
24
25interface MuxingSessionEvents { 25interface MuxingSessionEvents {
26 'live-ready': (options: { videoId: number }) => void 26 'live-ready': (options: { videoId: number }) => void
27 27
@@ -278,11 +278,18 @@ class MuxingSession extends EventEmitter {
278 private watchM3U8File () { 278 private watchM3U8File () {
279 this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') 279 this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
280 280
281 const sendQueues = new Map<string, PQueue>()
282
281 const onChangeOrAdd = async (m3u8Path: string) => { 283 const onChangeOrAdd = async (m3u8Path: string) => {
282 if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return 284 if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
283 285
284 try { 286 try {
285 await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path) 287 if (!sendQueues.has(m3u8Path)) {
288 sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
289 }
290
291 const queue = sendQueues.get(m3u8Path)
292 await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
286 } catch (err) { 293 } catch (err) {
287 logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) 294 logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
288 } 295 }
diff --git a/server/tests/feeds/feeds.ts b/server/tests/feeds/feeds.ts
index c49175d5e..906dab1a3 100644
--- a/server/tests/feeds/feeds.ts
+++ b/server/tests/feeds/feeds.ts
@@ -314,7 +314,7 @@ describe('Test syndication feeds', () => {
314 const jsonObj = JSON.parse(json) 314 const jsonObj = JSON.parse(json)
315 const imageUrl = jsonObj.icon 315 const imageUrl = jsonObj.icon
316 expect(imageUrl).to.include('/lazy-static/avatars/') 316 expect(imageUrl).to.include('/lazy-static/avatars/')
317 await makeRawRequest({ url: imageUrl }) 317 await makeRawRequest({ url: imageUrl, expectedStatus: HttpStatusCode.OK_200 })
318 }) 318 })
319 }) 319 })
320 320