import { FSWatcher, watch } from 'chokidar'
import { FfmpegCommand } from 'fluent-ffmpeg'
import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
+import PQueue from 'p-queue'
import { basename, join } from 'path'
import { EventEmitter } from 'stream'
import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg'
import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
import { CONFIG } from '@server/initializers/config'
import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
-import { removeHLSFileObjectStorage, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
+import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
import { VideoFileModel } from '@server/models/video/video-file'
import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
import { VideoStorage } from '@shared/models'
import { buildConcatenatedName } from '../live-utils'
import memoizee = require('memoizee')
-
interface MuxingSessionEvents {
'live-ready': (options: { videoId: number }) => void
private watchM3U8File () {
this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
+ const sendQueues = new Map<string, PQueue>()
+
const onChangeOrAdd = async (m3u8Path: string) => {
if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
try {
- await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)
+ if (!sendQueues.has(m3u8Path)) {
+ sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
+ }
+
+ const queue = sendQueues.get(m3u8Path)
+ await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
} catch (err) {
logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
}
if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
try {
- await removeHLSFileObjectStorage(this.streamingPlaylist, segmentPath)
+ await removeHLSFileObjectStorageByPath(this.streamingPlaylist, segmentPath)
} catch (err) {
logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() })
}