import { FSWatcher, watch } from 'chokidar'
import { FfmpegCommand } from 'fluent-ffmpeg'
import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
+import PQueue from 'p-queue'
import { basename, join } from 'path'
import { EventEmitter } from 'stream'
import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg'
import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
import { CONFIG } from '@server/initializers/config'
import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
+import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
import { VideoFileModel } from '@server/models/video/video-file'
import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
+import { VideoStorage } from '@shared/models'
import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths'
import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
import { isAbleToUploadVideo } from '../../user'
import { buildConcatenatedName } from '../live-utils'
import memoizee = require('memoizee')
-
interface MuxingSessionEvents {
- 'master-playlist-created': (options: { videoId: number }) => void
+ 'live-ready': (options: { videoId: number }) => void
'bad-socket-health': (options: { videoId: number }) => void
'duration-exceeded': (options: { videoId: number }) => void
private readonly outDirectory: string
private readonly replayDirectory: string
+ private readonly liveSegmentShaStore: LiveSegmentShaStore
+
private readonly lTags: LoggerTagsFn
private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
private tsWatcher: FSWatcher
private masterWatcher: FSWatcher
+ private m3u8Watcher: FSWatcher
+
+ private masterPlaylistCreated = false
+ private liveReady = false
private aborted = false
this.outDirectory = getLiveDirectory(this.videoLive.Video)
this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
+ this.liveSegmentShaStore = new LiveSegmentShaStore({
+ videoUUID: this.videoLive.Video.uuid,
+ sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename),
+ streamingPlaylist: this.streamingPlaylist,
+ sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
+ })
+
this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
}
logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
- this.watchTSFiles()
this.watchMasterFile()
+ this.watchTSFiles()
+ this.watchM3U8File()
let ffmpegShellCommand: string
this.ffmpegCommand.on('start', cmdline => {
setTimeout(() => {
// Wait latest segments generation, and close watchers
- Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ])
+ Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
.then(() => {
// Process remaining segments hash
for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
private watchMasterFile () {
this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename)
- this.masterWatcher.on('add', () => {
- this.emit('master-playlist-created', { videoId: this.videoId })
+ this.masterWatcher.on('add', async () => {
+ if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
+ try {
+ const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename)
+
+ this.streamingPlaylist.playlistUrl = url
+ await this.streamingPlaylist.save()
+ } catch (err) {
+ logger.error('Cannot upload live master file to object storage.', { err, ...this.lTags() })
+ }
+ }
+
+ this.masterPlaylistCreated = true
this.masterWatcher.close()
.catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
})
}
+ private watchM3U8File () {
+ this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
+
+ const sendQueues = new Map<string, PQueue>()
+
+ const onChangeOrAdd = async (m3u8Path: string) => {
+ if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
+
+ try {
+ if (!sendQueues.has(m3u8Path)) {
+ sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
+ }
+
+ const queue = sendQueues.get(m3u8Path)
+ await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
+ } catch (err) {
+ logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
+ }
+ }
+
+ this.m3u8Watcher.on('change', onChangeOrAdd)
+ }
+
private watchTSFiles () {
const startStreamDateTime = new Date().getTime()
}
}
- const deleteHandler = (segmentPath: string) => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath)
+ const deleteHandler = async (segmentPath: string) => {
+ try {
+ await this.liveSegmentShaStore.removeSegmentSha(segmentPath)
+ } catch (err) {
+ logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() })
+ }
+
+ if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
+ try {
+ await removeHLSFileObjectStorageByPath(this.streamingPlaylist, segmentPath)
+ } catch (err) {
+ logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() })
+ }
+ }
+ }
this.tsWatcher.on('add', p => addHandler(p))
this.tsWatcher.on('unlink', p => deleteHandler(p))
extname: '.ts',
infoHash: null,
fps: this.fps,
+ storage: this.streamingPlaylist.storage,
videoStreamingPlaylistId: this.streamingPlaylist.id
})
}
private processSegments (segmentPaths: string[]) {
- mapSeries(segmentPaths, async previousSegment => {
- // Add sha hash of previous segments, because ffmpeg should have finished generating them
- await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment)
+ mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment))
+ .catch(err => {
+ if (this.aborted) return
+
+ logger.error('Cannot process segments', { err, ...this.lTags() })
+ })
+ }
+
+ private async processSegment (segmentPath: string) {
+ // Add sha hash of previous segments, because ffmpeg should have finished generating them
+ await this.liveSegmentShaStore.addSegmentSha(segmentPath)
+
+ if (this.saveReplay) {
+ await this.addSegmentToReplay(segmentPath)
+ }
- if (this.saveReplay) {
- await this.addSegmentToReplay(previousSegment)
+ if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
+ try {
+ await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
+ } catch (err) {
+ logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
}
- }).catch(err => {
- if (this.aborted) return
+ }
- logger.error('Cannot process segments', { err, ...this.lTags() })
- })
+ // Master playlist and segment JSON file are created, live is ready
+ if (this.masterPlaylistCreated && !this.liveReady) {
+ this.liveReady = true
+
+ this.emit('live-ready', { videoId: this.videoId })
+ }
}
private hasClientSocketInBadHealth (sessionId: string) {