]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/live/shared/muxing-session.ts
Merge branch 'release/4.3.0' into develop
[github/Chocobozzz/PeerTube.git] / server / lib / live / shared / muxing-session.ts
index 4c27d5dd885418299675e3645038a6783f2a9162..6ec1269556a6cfacc7d4d1deab37373cfa3660d6 100644 (file)
@@ -3,13 +3,14 @@ import { mapSeries } from 'bluebird'
 import { FSWatcher, watch } from 'chokidar'
 import { FfmpegCommand } from 'fluent-ffmpeg'
 import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
+import PQueue from 'p-queue'
 import { basename, join } from 'path'
 import { EventEmitter } from 'stream'
 import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg'
 import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
 import { CONFIG } from '@server/initializers/config'
 import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
-import { removeHLSFileObjectStorage, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
+import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
 import { VideoFileModel } from '@server/models/video/video-file'
 import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
 import { VideoStorage } from '@shared/models'
@@ -21,7 +22,6 @@ import { LiveSegmentShaStore } from '../live-segment-sha-store'
 import { buildConcatenatedName } from '../live-utils'
 
 import memoizee = require('memoizee')
-
 interface MuxingSessionEvents {
   'live-ready': (options: { videoId: number }) => void
 
@@ -278,11 +278,18 @@ class MuxingSession extends EventEmitter {
   private watchM3U8File () {
     this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
 
+    const sendQueues = new Map<string, PQueue>()
+
     const onChangeOrAdd = async (m3u8Path: string) => {
       if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
 
       try {
-        await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)
+        if (!sendQueues.has(m3u8Path)) {
+          sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
+        }
+
+        const queue = sendQueues.get(m3u8Path)
+        await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
       } catch (err) {
         logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
       }
@@ -334,7 +341,7 @@ class MuxingSession extends EventEmitter {
 
       if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
         try {
-          await removeHLSFileObjectStorage(this.streamingPlaylist, segmentPath)
+          await removeHLSFileObjectStorageByPath(this.streamingPlaylist, segmentPath)
         } catch (err) {
           logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() })
         }