X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Flive%2Fshared%2Fmuxing-session.ts;h=f3f8fc8863fc9c65ef1f8ed8c6f959159aef9223;hb=0c9668f77901e7540e2c7045eb0f2974a4842a69;hp=62708b14b504dde626cc0c2d7130358265a234db;hpb=c826f34a45757b324a20f71665b44ed10e6953b5;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 62708b14b..f3f8fc886 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -1,36 +1,42 @@ - -import * as Bluebird from 'bluebird' -import * as chokidar from 'chokidar' -import { FfmpegCommand } from 'fluent-ffmpeg' +import { mapSeries } from 'bluebird' +import { FSWatcher, watch } from 'chokidar' +import { EventEmitter } from 'events' import { appendFile, ensureDir, readFile, stat } from 'fs-extra' +import PQueue from 'p-queue' import { basename, join } from 'path' -import { EventEmitter } from 'stream' -import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils' +import { computeOutputFPS } from '@server/helpers/ffmpeg' import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' import { CONFIG } from '@server/initializers/config' -import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' +import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' +import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage' import { VideoFileModel } from '@server/models/video/video-file' +import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' -import { VideoTranscodingProfilesManager } from '../../transcoding/video-transcoding-profiles' +import { VideoStorage, VideoStreamingPlaylistType } from '@shared/models' +import { + generateHLSMasterPlaylistFilename, + generateHlsSha256SegmentsFilename, + getLiveDirectory, + getLiveReplayBaseDirectory +} from '../../paths' import { isAbleToUploadVideo } from '../../user' -import { getHLSDirectory } from '../../video-paths' import { LiveQuotaStore } from '../live-quota-store' import { LiveSegmentShaStore } from '../live-segment-sha-store' -import { buildConcatenatedName } from '../live-utils' +import { buildConcatenatedName, getLiveSegmentTime } from '../live-utils' +import { AbstractTranscodingWrapper, FFmpegTranscodingWrapper, RemoteTranscodingWrapper } from './transcoding-wrapper' import memoizee = require('memoizee') - interface MuxingSessionEvents { - 'master-playlist-created': ({ videoId: number }) => void + 'live-ready': (options: { videoUUID: string }) => void - 'bad-socket-health': ({ videoId: number }) => void - 'duration-exceeded': ({ videoId: number }) => void - 'quota-exceeded': ({ videoId: number }) => void + 'bad-socket-health': (options: { videoUUID: string }) => void + 'duration-exceeded': (options: { videoUUID: string }) => void + 'quota-exceeded': (options: { videoUUID: string }) => void - 'ffmpeg-end': ({ videoId: number }) => void - 'ffmpeg-error': ({ sessionId: string }) => void + 'transcoding-end': (options: { videoUUID: string }) => void + 'transcoding-error': (options: { videoUUID: string }) => void - 'after-cleanup': ({ videoId: number }) => void + 'after-cleanup': (options: { videoUUID: string }) => void } declare interface MuxingSession { @@ -45,28 +51,42 @@ declare interface MuxingSession { class MuxingSession extends EventEmitter { - private ffmpegCommand: FfmpegCommand + private transcodingWrapper: AbstractTranscodingWrapper private readonly context: any private readonly user: MUserId private readonly sessionId: string private readonly videoLive: MVideoLiveVideo - private readonly streamingPlaylist: MStreamingPlaylistVideo - private readonly rtmpUrl: string + private readonly inputUrl: string private readonly fps: number - private readonly bitrate: number private readonly allResolutions: number[] - private readonly videoId: number + private readonly bitrate: number + private readonly ratio: number + + private readonly hasAudio: boolean + private readonly videoUUID: string private readonly saveReplay: boolean + private readonly outDirectory: string + private readonly replayDirectory: string + private readonly lTags: LoggerTagsFn private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} - private tsWatcher: chokidar.FSWatcher - private masterWatcher: chokidar.FSWatcher + private streamingPlaylist: MStreamingPlaylistVideo + private liveSegmentShaStore: LiveSegmentShaStore + + private tsWatcher: FSWatcher + private masterWatcher: FSWatcher + private m3u8Watcher: FSWatcher + + private masterPlaylistCreated = false + private liveReady = false + + private aborted = false private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => { return isAbleToUploadVideo(userId, 1000) @@ -81,11 +101,12 @@ class MuxingSession extends EventEmitter { user: MUserId sessionId: string videoLive: MVideoLiveVideo - streamingPlaylist: MStreamingPlaylistVideo - rtmpUrl: string + inputUrl: string fps: number bitrate: number + ratio: number allResolutions: number[] + hasAudio: boolean }) { super() @@ -93,59 +114,51 @@ class MuxingSession extends EventEmitter { this.user = options.user this.sessionId = options.sessionId this.videoLive = options.videoLive - this.streamingPlaylist = options.streamingPlaylist - this.rtmpUrl = options.rtmpUrl + this.inputUrl = options.inputUrl this.fps = options.fps + this.bitrate = options.bitrate + this.ratio = options.ratio + + this.hasAudio = options.hasAudio + this.allResolutions = options.allResolutions - this.videoId = this.videoLive.Video.id this.videoUUID = this.videoLive.Video.uuid this.saveReplay = this.videoLive.saveReplay + this.outDirectory = getLiveDirectory(this.videoLive.Video) + this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) + this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) } async runMuxing () { - this.createFiles() - - const outPath = await this.prepareDirectories() - - this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED - ? await getLiveTranscodingCommand({ - rtmpUrl: this.rtmpUrl, - - outPath, - masterPlaylistName: this.streamingPlaylist.playlistFilename, - - resolutions: this.allResolutions, - fps: this.fps, - bitrate: this.bitrate, + this.streamingPlaylist = await this.createLivePlaylist() - availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), - profile: CONFIG.LIVE.TRANSCODING.PROFILE - }) - : getLiveMuxingCommand(this.rtmpUrl, outPath, this.streamingPlaylist.playlistFilename) + this.createLiveShaStore() + this.createFiles() - logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags) + await this.prepareDirectories() - this.watchTSFiles(outPath) - this.watchMasterFile(outPath) + this.transcodingWrapper = this.buildTranscodingWrapper() - this.ffmpegCommand.on('error', (err, stdout, stderr) => { - this.onFFmpegError(err, stdout, stderr, outPath) - }) + this.transcodingWrapper.on('end', () => this.onTranscodedEnded()) + this.transcodingWrapper.on('error', () => this.onTranscodingError()) - this.ffmpegCommand.on('end', () => this.onFFmpegEnded(outPath)) + await this.transcodingWrapper.run() - this.ffmpegCommand.run() + this.watchMasterFile() + this.watchTSFiles() + this.watchM3U8File() } abort () { - if (!this.ffmpegCommand) return + if (!this.transcodingWrapper) return - this.ffmpegCommand.kill('SIGINT') + this.aborted = true + this.transcodingWrapper.abort() } destroy () { @@ -154,87 +167,105 @@ class MuxingSession extends EventEmitter { this.hasClientSocketInBadHealthWithCache.clear() } - private onFFmpegError (err: any, stdout: string, stderr: string, outPath: string) { - this.onFFmpegEnded(outPath) + private watchMasterFile () { + this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) - // Don't care that we killed the ffmpeg process - if (err?.message?.includes('Exiting normally')) return + this.masterWatcher.on('add', async () => { + try { + if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { + const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename) - logger.error('Live transcoding error.', { err, stdout, stderr, ...this.lTags }) + this.streamingPlaylist.playlistUrl = url + } - this.emit('ffmpeg-error', ({ sessionId: this.sessionId })) - } + this.streamingPlaylist.assignP2PMediaLoaderInfoHashes(this.videoLive.Video, this.allResolutions) - private onFFmpegEnded (outPath: string) { - logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.rtmpUrl, this.lTags) + await this.streamingPlaylist.save() + } catch (err) { + logger.error('Cannot update streaming playlist.', { err, ...this.lTags() }) + } - setTimeout(() => { - // Wait latest segments generation, and close watchers + this.masterPlaylistCreated = true - Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ]) - .then(() => { - // Process remaining segments hash - for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { - this.processSegments(outPath, this.segmentsToProcessPerPlaylist[key]) - } - }) - .catch(err => { - logger.error( - 'Cannot close watchers of %s or process remaining hash segments.', outPath, - { err, ...this.lTags } - ) - }) + logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags()) - this.emit('after-cleanup', { videoId: this.videoId }) - }, 1000) + this.masterWatcher.close() + .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) + }) } - private watchMasterFile (outPath: string) { - this.masterWatcher = chokidar.watch(outPath + '/' + this.streamingPlaylist.playlistFilename) + private watchM3U8File () { + this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8') - this.masterWatcher.on('add', async () => { - this.emit('master-playlist-created', { videoId: this.videoId }) + const sendQueues = new Map() - this.masterWatcher.close() - .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...this.lTags })) - }) + const onChangeOrAdd = async (m3u8Path: string) => { + if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return + + try { + if (!sendQueues.has(m3u8Path)) { + sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) + } + + const queue = sendQueues.get(m3u8Path) + await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)) + } catch (err) { + logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) + } + } + + this.m3u8Watcher.on('change', onChangeOrAdd) } - private watchTSFiles (outPath: string) { + private watchTSFiles () { const startStreamDateTime = new Date().getTime() - this.tsWatcher = chokidar.watch(outPath + '/*.ts') + this.tsWatcher = watch(this.outDirectory + '/*.ts') const playlistIdMatcher = /^([\d+])-/ - const addHandler = async segmentPath => { - logger.debug('Live add handler of %s.', segmentPath, this.lTags) + const addHandler = async (segmentPath: string) => { + logger.debug('Live add handler of %s.', segmentPath, this.lTags()) const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] - this.processSegments(outPath, segmentsToProcess) + this.processSegments(segmentsToProcess) this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) { - this.emit('bad-socket-health', { videoId: this.videoId }) + this.emit('bad-socket-health', { videoUUID: this.videoUUID }) return } // Duration constraint check if (this.isDurationConstraintValid(startStreamDateTime) !== true) { - this.emit('duration-exceeded', { videoId: this.videoId }) + this.emit('duration-exceeded', { videoUUID: this.videoUUID }) return } // Check user quota if the user enabled replay saving if (await this.isQuotaExceeded(segmentPath) === true) { - this.emit('quota-exceeded', { videoId: this.videoId }) + this.emit('quota-exceeded', { videoUUID: this.videoUUID }) } } - const deleteHandler = segmentPath => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) + const deleteHandler = async (segmentPath: string) => { + try { + await this.liveSegmentShaStore.removeSegmentSha(segmentPath) + } catch (err) { + logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() }) + } + + if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { + try { + await removeHLSFileObjectStorageByPath(this.streamingPlaylist, segmentPath) + } catch (err) { + logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() }) + } + } + } this.tsWatcher.on('add', p => addHandler(p)) this.tsWatcher.on('unlink', p => deleteHandler(p)) @@ -242,6 +273,7 @@ class MuxingSession extends EventEmitter { private async isQuotaExceeded (segmentPath: string) { if (this.saveReplay !== true) return false + if (this.aborted) return false try { const segmentStat = await stat(segmentPath) @@ -252,7 +284,7 @@ class MuxingSession extends EventEmitter { return canUpload !== true } catch (err) { - logger.error('Cannot stat %s or check quota of %d.', segmentPath, this.user.id, { err, ...this.lTags }) + logger.error('Cannot stat %s or check quota of %d.', segmentPath, this.user.id, { err, ...this.lTags() }) } } @@ -266,25 +298,21 @@ class MuxingSession extends EventEmitter { extname: '.ts', infoHash: null, fps: this.fps, + storage: this.streamingPlaylist.storage, videoStreamingPlaylistId: this.streamingPlaylist.id }) VideoFileModel.customUpsert(file, 'streaming-playlist', null) - .catch(err => logger.error('Cannot create file for live streaming.', { err, ...this.lTags })) + .catch(err => logger.error('Cannot create file for live streaming.', { err, ...this.lTags() })) } } private async prepareDirectories () { - const outPath = getHLSDirectory(this.videoLive.Video) - await ensureDir(outPath) - - const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY) + await ensureDir(this.outDirectory) if (this.videoLive.saveReplay === true) { - await ensureDir(replayDirectory) + await ensureDir(this.replayDirectory) } - - return outPath } private isDurationConstraintValid (streamingStartTime: number) { @@ -298,22 +326,74 @@ class MuxingSession extends EventEmitter { return now <= max } - private processSegments (hlsVideoPath: string, segmentPaths: string[]) { - Bluebird.mapSeries(segmentPaths, async previousSegment => { - // Add sha hash of previous segments, because ffmpeg should have finished generating them - await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) + private processSegments (segmentPaths: string[]) { + mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment)) + .catch(err => { + if (this.aborted) return - if (this.saveReplay) { - await this.addSegmentToReplay(hlsVideoPath, previousSegment) + logger.error('Cannot process segments', { err, ...this.lTags() }) + }) + } + + private async processSegment (segmentPath: string) { + // Add sha hash of previous segments, because ffmpeg should have finished generating them + await this.liveSegmentShaStore.addSegmentSha(segmentPath) + + if (this.saveReplay) { + await this.addSegmentToReplay(segmentPath) + } + + if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { + try { + await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) + } catch (err) { + logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) } - }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...this.lTags })) + } + + // Master playlist and segment JSON file are created, live is ready + if (this.masterPlaylistCreated && !this.liveReady) { + this.liveReady = true + + this.emit('live-ready', { videoUUID: this.videoUUID }) + } + } + + private onTranscodingError () { + this.emit('transcoding-error', ({ videoUUID: this.videoUUID })) + } + + private onTranscodedEnded () { + this.emit('transcoding-end', ({ videoUUID: this.videoUUID })) + + logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags()) + + setTimeout(() => { + // Wait latest segments generation, and close watchers + + Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ]) + .then(() => { + // Process remaining segments hash + for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { + this.processSegments(this.segmentsToProcessPerPlaylist[key]) + } + }) + .catch(err => { + logger.error( + 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory, + { err, ...this.lTags() } + ) + }) + + this.emit('after-cleanup', { videoUUID: this.videoUUID }) + }, 1000) } private hasClientSocketInBadHealth (sessionId: string) { const rtmpSession = this.context.sessions.get(sessionId) if (!rtmpSession) { - logger.warn('Cannot get session %s to check players socket health.', sessionId, this.lTags) + logger.warn('Cannot get session %s to check players socket health.', sessionId, this.lTags()) return } @@ -321,7 +401,7 @@ class MuxingSession extends EventEmitter { const playerSession = this.context.sessions.get(playerSessionId) if (!playerSession) { - logger.error('Cannot get player session %s to check socket health.', playerSession, this.lTags) + logger.error('Cannot get player session %s to check socket health.', playerSession, this.lTags()) continue } @@ -333,17 +413,72 @@ class MuxingSession extends EventEmitter { return false } - private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) { + private async addSegmentToReplay (segmentPath: string) { const segmentName = basename(segmentPath) - const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, buildConcatenatedName(segmentName)) + const dest = join(this.replayDirectory, buildConcatenatedName(segmentName)) try { const data = await readFile(segmentPath) await appendFile(dest, data) } catch (err) { - logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...this.lTags }) + logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...this.lTags() }) + } + } + + private async createLivePlaylist (): Promise { + const playlist = await VideoStreamingPlaylistModel.loadOrGenerate(this.videoLive.Video) + + playlist.playlistFilename = generateHLSMasterPlaylistFilename(true) + playlist.segmentsSha256Filename = generateHlsSha256SegmentsFilename(true) + + playlist.p2pMediaLoaderPeerVersion = P2P_MEDIA_LOADER_PEER_VERSION + playlist.type = VideoStreamingPlaylistType.HLS + + playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED + ? VideoStorage.OBJECT_STORAGE + : VideoStorage.FILE_SYSTEM + + return playlist.save() + } + + private createLiveShaStore () { + this.liveSegmentShaStore = new LiveSegmentShaStore({ + videoUUID: this.videoLive.Video.uuid, + sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename), + streamingPlaylist: this.streamingPlaylist, + sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED + }) + } + + private buildTranscodingWrapper () { + const options = { + streamingPlaylist: this.streamingPlaylist, + videoLive: this.videoLive, + + lTags: this.lTags, + + inputUrl: this.inputUrl, + + toTranscode: this.allResolutions.map(resolution => ({ + resolution, + fps: computeOutputFPS({ inputFPS: this.fps, resolution }) + })), + + fps: this.fps, + bitrate: this.bitrate, + ratio: this.ratio, + hasAudio: this.hasAudio, + + segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE, + segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode), + + outDirectory: this.outDirectory } + + return CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED + ? new RemoteTranscodingWrapper(options) + : new FFmpegTranscodingWrapper(options) } }