import { FSWatcher, watch } from 'chokidar'
import { FfmpegCommand } from 'fluent-ffmpeg'
import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
+import PQueue from 'p-queue'
import { basename, join } from 'path'
import { EventEmitter } from 'stream'
import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg'
import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
import { CONFIG } from '@server/initializers/config'
import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
+import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
import { VideoFileModel } from '@server/models/video/video-file'
import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
-import { getLiveDirectory } from '../../paths'
+import { VideoStorage } from '@shared/models'
+import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths'
import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
import { isAbleToUploadVideo } from '../../user'
import { LiveQuotaStore } from '../live-quota-store'
import { buildConcatenatedName } from '../live-utils'
import memoizee = require('memoizee')
-
interface MuxingSessionEvents {
- 'master-playlist-created': ({ videoId: number }) => void
+ 'live-ready': (options: { videoId: number }) => void
- 'bad-socket-health': ({ videoId: number }) => void
- 'duration-exceeded': ({ videoId: number }) => void
- 'quota-exceeded': ({ videoId: number }) => void
+ 'bad-socket-health': (options: { videoId: number }) => void
+ 'duration-exceeded': (options: { videoId: number }) => void
+ 'quota-exceeded': (options: { videoId: number }) => void
- 'ffmpeg-end': ({ videoId: number }) => void
- 'ffmpeg-error': ({ sessionId: string }) => void
+ 'ffmpeg-end': (options: { videoId: number }) => void
+ 'ffmpeg-error': (options: { videoId: number }) => void
- 'after-cleanup': ({ videoId: number }) => void
+ 'after-cleanup': (options: { videoId: number }) => void
}
declare interface MuxingSession {
private readonly bitrate: number
private readonly ratio: number
+ private readonly hasAudio: boolean
+
private readonly videoId: number
private readonly videoUUID: string
private readonly saveReplay: boolean
+ private readonly outDirectory: string
+ private readonly replayDirectory: string
+
+ private readonly liveSegmentShaStore: LiveSegmentShaStore
+
private readonly lTags: LoggerTagsFn
private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
private tsWatcher: FSWatcher
private masterWatcher: FSWatcher
+ private m3u8Watcher: FSWatcher
+
+ private masterPlaylistCreated = false
+ private liveReady = false
+
+ private aborted = false
private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
return isAbleToUploadVideo(userId, 1000)
bitrate: number
ratio: number
allResolutions: number[]
+ hasAudio: boolean
}) {
super()
this.bitrate = options.bitrate
this.ratio = options.ratio
+ this.hasAudio = options.hasAudio
+
this.allResolutions = options.allResolutions
this.videoId = this.videoLive.Video.id
this.saveReplay = this.videoLive.saveReplay
+ this.outDirectory = getLiveDirectory(this.videoLive.Video)
+ this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
+
+ this.liveSegmentShaStore = new LiveSegmentShaStore({
+ videoUUID: this.videoLive.Video.uuid,
+ sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename),
+ streamingPlaylist: this.streamingPlaylist,
+ sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
+ })
+
this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
}
async runMuxing () {
this.createFiles()
- const outPath = await this.prepareDirectories()
+ await this.prepareDirectories()
this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED
? await getLiveTranscodingCommand({
inputUrl: this.inputUrl,
- outPath,
+ outPath: this.outDirectory,
masterPlaylistName: this.streamingPlaylist.playlistFilename,
+ latencyMode: this.videoLive.latencyMode,
+
resolutions: this.allResolutions,
fps: this.fps,
bitrate: this.bitrate,
ratio: this.ratio,
+ hasAudio: this.hasAudio,
+
availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
profile: CONFIG.LIVE.TRANSCODING.PROFILE
})
- : getLiveMuxingCommand(this.inputUrl, outPath, this.streamingPlaylist.playlistFilename)
+ : getLiveMuxingCommand({
+ inputUrl: this.inputUrl,
+ outPath: this.outDirectory,
+ masterPlaylistName: this.streamingPlaylist.playlistFilename,
+ latencyMode: this.videoLive.latencyMode
+ })
logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
- this.watchTSFiles(outPath)
- this.watchMasterFile(outPath)
+ this.watchMasterFile()
+ this.watchTSFiles()
+ this.watchM3U8File()
let ffmpegShellCommand: string
this.ffmpegCommand.on('start', cmdline => {
})
this.ffmpegCommand.on('error', (err, stdout, stderr) => {
- this.onFFmpegError({ err, stdout, stderr, outPath, ffmpegShellCommand })
+ this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand })
})
- this.ffmpegCommand.on('end', () => this.onFFmpegEnded(outPath))
+ this.ffmpegCommand.on('end', () => {
+ this.emit('ffmpeg-end', ({ videoId: this.videoId }))
+
+ this.onFFmpegEnded()
+ })
this.ffmpegCommand.run()
}
abort () {
if (!this.ffmpegCommand) return
+ this.aborted = true
this.ffmpegCommand.kill('SIGINT')
}
err: any
stdout: string
stderr: string
- outPath: string
ffmpegShellCommand: string
}) {
- const { err, stdout, stderr, outPath, ffmpegShellCommand } = options
+ const { err, stdout, stderr, ffmpegShellCommand } = options
- this.onFFmpegEnded(outPath)
+ this.onFFmpegEnded()
// Don't care that we killed the ffmpeg process
if (err?.message?.includes('Exiting normally')) return
logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() })
- this.emit('ffmpeg-error', ({ sessionId: this.sessionId }))
+ this.emit('ffmpeg-error', ({ videoId: this.videoId }))
}
- private onFFmpegEnded (outPath: string) {
+ private onFFmpegEnded () {
logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags())
setTimeout(() => {
// Wait latest segments generation, and close watchers
- Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ])
+ Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
.then(() => {
// Process remaining segments hash
for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
- this.processSegments(outPath, this.segmentsToProcessPerPlaylist[key])
+ this.processSegments(this.segmentsToProcessPerPlaylist[key])
}
})
.catch(err => {
logger.error(
- 'Cannot close watchers of %s or process remaining hash segments.', outPath,
+ 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory,
{ err, ...this.lTags() }
)
})
}, 1000)
}
- private watchMasterFile (outPath: string) {
- this.masterWatcher = watch(outPath + '/' + this.streamingPlaylist.playlistFilename)
+ private watchMasterFile () {
+ this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename)
- this.masterWatcher.on('add', () => {
- this.emit('master-playlist-created', { videoId: this.videoId })
+ this.masterWatcher.on('add', async () => {
+ if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
+ try {
+ const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename)
+
+ this.streamingPlaylist.playlistUrl = url
+ await this.streamingPlaylist.save()
+ } catch (err) {
+ logger.error('Cannot upload live master file to object storage.', { err, ...this.lTags() })
+ }
+ }
+
+ this.masterPlaylistCreated = true
this.masterWatcher.close()
- .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...this.lTags() }))
+ .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
})
}
- private watchTSFiles (outPath: string) {
+ private watchM3U8File () {
+ this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
+
+ const sendQueues = new Map<string, PQueue>()
+
+ const onChangeOrAdd = async (m3u8Path: string) => {
+ if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
+
+ try {
+ if (!sendQueues.has(m3u8Path)) {
+ sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
+ }
+
+ const queue = sendQueues.get(m3u8Path)
+ await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
+ } catch (err) {
+ logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
+ }
+ }
+
+ this.m3u8Watcher.on('change', onChangeOrAdd)
+ }
+
+ private watchTSFiles () {
const startStreamDateTime = new Date().getTime()
- this.tsWatcher = watch(outPath + '/*.ts')
+ this.tsWatcher = watch(this.outDirectory + '/*.ts')
const playlistIdMatcher = /^([\d+])-/
const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || []
- this.processSegments(outPath, segmentsToProcess)
+ this.processSegments(segmentsToProcess)
this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
}
}
- const deleteHandler = segmentPath => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath)
+ const deleteHandler = async (segmentPath: string) => {
+ try {
+ await this.liveSegmentShaStore.removeSegmentSha(segmentPath)
+ } catch (err) {
+ logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() })
+ }
+
+ if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
+ try {
+ await removeHLSFileObjectStorageByPath(this.streamingPlaylist, segmentPath)
+ } catch (err) {
+ logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() })
+ }
+ }
+ }
this.tsWatcher.on('add', p => addHandler(p))
this.tsWatcher.on('unlink', p => deleteHandler(p))
private async isQuotaExceeded (segmentPath: string) {
if (this.saveReplay !== true) return false
+ if (this.aborted) return false
try {
const segmentStat = await stat(segmentPath)
extname: '.ts',
infoHash: null,
fps: this.fps,
+ storage: this.streamingPlaylist.storage,
videoStreamingPlaylistId: this.streamingPlaylist.id
})
}
private async prepareDirectories () {
- const outPath = getLiveDirectory(this.videoLive.Video)
- await ensureDir(outPath)
-
- const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY)
+ await ensureDir(this.outDirectory)
if (this.videoLive.saveReplay === true) {
- await ensureDir(replayDirectory)
+ await ensureDir(this.replayDirectory)
}
-
- return outPath
}
private isDurationConstraintValid (streamingStartTime: number) {
return now <= max
}
- private processSegments (hlsVideoPath: string, segmentPaths: string[]) {
- mapSeries(segmentPaths, async previousSegment => {
- // Add sha hash of previous segments, because ffmpeg should have finished generating them
- await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment)
+ private processSegments (segmentPaths: string[]) {
+ mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment))
+ .catch(err => {
+ if (this.aborted) return
+
+ logger.error('Cannot process segments', { err, ...this.lTags() })
+ })
+ }
+
+ private async processSegment (segmentPath: string) {
+ // Add sha hash of previous segments, because ffmpeg should have finished generating them
+ await this.liveSegmentShaStore.addSegmentSha(segmentPath)
- if (this.saveReplay) {
- await this.addSegmentToReplay(hlsVideoPath, previousSegment)
+ if (this.saveReplay) {
+ await this.addSegmentToReplay(segmentPath)
+ }
+
+ if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
+ try {
+ await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
+ } catch (err) {
+ logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
}
- }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...this.lTags() }))
+ }
+
+ // Master playlist and segment JSON file are created, live is ready
+ if (this.masterPlaylistCreated && !this.liveReady) {
+ this.liveReady = true
+
+ this.emit('live-ready', { videoId: this.videoId })
+ }
}
private hasClientSocketInBadHealth (sessionId: string) {
return false
}
- private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) {
+ private async addSegmentToReplay (segmentPath: string) {
const segmentName = basename(segmentPath)
- const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, buildConcatenatedName(segmentName))
+ const dest = join(this.replayDirectory, buildConcatenatedName(segmentName))
try {
const data = await readFile(segmentPath)