X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Flive%2Fshared%2Fmuxing-session.ts;h=eccaefcfaf5ddbc498228b959e82966ad3ad3100;hb=636d73c58866ed235c207719e41fdde3c2d6c969;hp=a80abc843e31eb53631614fc4b3927962667640f;hpb=679c12e69c9f3a2d003ee3abe8b8da49f25b2bd3;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index a80abc843..eccaefcfa 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -1,6 +1,6 @@ -import * as Bluebird from 'bluebird' -import * as chokidar from 'chokidar' +import { mapSeries } from 'bluebird' +import { FSWatcher, watch } from 'chokidar' import { FfmpegCommand } from 'fluent-ffmpeg' import { appendFile, ensureDir, readFile, stat } from 'fs-extra' import { basename, join } from 'path' @@ -11,9 +11,9 @@ import { CONFIG } from '@server/initializers/config' import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' import { VideoFileModel } from '@server/models/video/video-file' import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' +import { getLiveDirectory } from '../../paths' import { VideoTranscodingProfilesManager } from '../../transcoding/video-transcoding-profiles' import { isAbleToUploadVideo } from '../../user' -import { getHLSDirectory } from '../../video-paths' import { LiveQuotaStore } from '../live-quota-store' import { LiveSegmentShaStore } from '../live-segment-sha-store' import { buildConcatenatedName } from '../live-utils' @@ -52,7 +52,7 @@ class MuxingSession extends EventEmitter { private readonly sessionId: string private readonly videoLive: MVideoLiveVideo private readonly streamingPlaylist: MStreamingPlaylistVideo - private readonly rtmpUrl: string + private readonly inputUrl: string private readonly fps: number private readonly allResolutions: number[] @@ -67,8 +67,8 @@ class MuxingSession extends EventEmitter { private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} - private tsWatcher: chokidar.FSWatcher - private masterWatcher: chokidar.FSWatcher + private tsWatcher: FSWatcher + private masterWatcher: FSWatcher private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => { return isAbleToUploadVideo(userId, 1000) @@ -84,7 +84,7 @@ class MuxingSession extends EventEmitter { sessionId: string videoLive: MVideoLiveVideo streamingPlaylist: MStreamingPlaylistVideo - rtmpUrl: string + inputUrl: string fps: number bitrate: number ratio: number @@ -97,11 +97,11 @@ class MuxingSession extends EventEmitter { this.sessionId = options.sessionId this.videoLive = options.videoLive this.streamingPlaylist = options.streamingPlaylist - this.rtmpUrl = options.rtmpUrl + this.inputUrl = options.inputUrl this.fps = options.fps this.bitrate = options.bitrate - this.ratio = options.bitrate + this.ratio = options.ratio this.allResolutions = options.allResolutions @@ -120,7 +120,7 @@ class MuxingSession extends EventEmitter { this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED ? await getLiveTranscodingCommand({ - rtmpUrl: this.rtmpUrl, + inputUrl: this.inputUrl, outPath, masterPlaylistName: this.streamingPlaylist.playlistFilename, @@ -133,15 +133,22 @@ class MuxingSession extends EventEmitter { availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), profile: CONFIG.LIVE.TRANSCODING.PROFILE }) - : getLiveMuxingCommand(this.rtmpUrl, outPath, this.streamingPlaylist.playlistFilename) + : getLiveMuxingCommand(this.inputUrl, outPath, this.streamingPlaylist.playlistFilename) - logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags) + logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) this.watchTSFiles(outPath) this.watchMasterFile(outPath) + let ffmpegShellCommand: string + this.ffmpegCommand.on('start', cmdline => { + ffmpegShellCommand = cmdline + + logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() }) + }) + this.ffmpegCommand.on('error', (err, stdout, stderr) => { - this.onFFmpegError(err, stdout, stderr, outPath) + this.onFFmpegError({ err, stdout, stderr, outPath, ffmpegShellCommand }) }) this.ffmpegCommand.on('end', () => this.onFFmpegEnded(outPath)) @@ -161,19 +168,27 @@ class MuxingSession extends EventEmitter { this.hasClientSocketInBadHealthWithCache.clear() } - private onFFmpegError (err: any, stdout: string, stderr: string, outPath: string) { + private onFFmpegError (options: { + err: any + stdout: string + stderr: string + outPath: string + ffmpegShellCommand: string + }) { + const { err, stdout, stderr, outPath, ffmpegShellCommand } = options + this.onFFmpegEnded(outPath) // Don't care that we killed the ffmpeg process if (err?.message?.includes('Exiting normally')) return - logger.error('Live transcoding error.', { err, stdout, stderr, ...this.lTags }) + logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) this.emit('ffmpeg-error', ({ sessionId: this.sessionId })) } private onFFmpegEnded (outPath: string) { - logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.rtmpUrl, this.lTags) + logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags()) setTimeout(() => { // Wait latest segments generation, and close watchers @@ -188,7 +203,7 @@ class MuxingSession extends EventEmitter { .catch(err => { logger.error( 'Cannot close watchers of %s or process remaining hash segments.', outPath, - { err, ...this.lTags } + { err, ...this.lTags() } ) }) @@ -197,25 +212,25 @@ class MuxingSession extends EventEmitter { } private watchMasterFile (outPath: string) { - this.masterWatcher = chokidar.watch(outPath + '/' + this.streamingPlaylist.playlistFilename) + this.masterWatcher = watch(outPath + '/' + this.streamingPlaylist.playlistFilename) - this.masterWatcher.on('add', async () => { + this.masterWatcher.on('add', () => { this.emit('master-playlist-created', { videoId: this.videoId }) this.masterWatcher.close() - .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...this.lTags })) + .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...this.lTags() })) }) } private watchTSFiles (outPath: string) { const startStreamDateTime = new Date().getTime() - this.tsWatcher = chokidar.watch(outPath + '/*.ts') + this.tsWatcher = watch(outPath + '/*.ts') const playlistIdMatcher = /^([\d+])-/ const addHandler = async segmentPath => { - logger.debug('Live add handler of %s.', segmentPath, this.lTags) + logger.debug('Live add handler of %s.', segmentPath, this.lTags()) const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] @@ -259,7 +274,7 @@ class MuxingSession extends EventEmitter { return canUpload !== true } catch (err) { - logger.error('Cannot stat %s or check quota of %d.', segmentPath, this.user.id, { err, ...this.lTags }) + logger.error('Cannot stat %s or check quota of %d.', segmentPath, this.user.id, { err, ...this.lTags() }) } } @@ -277,12 +292,12 @@ class MuxingSession extends EventEmitter { }) VideoFileModel.customUpsert(file, 'streaming-playlist', null) - .catch(err => logger.error('Cannot create file for live streaming.', { err, ...this.lTags })) + .catch(err => logger.error('Cannot create file for live streaming.', { err, ...this.lTags() })) } } private async prepareDirectories () { - const outPath = getHLSDirectory(this.videoLive.Video) + const outPath = getLiveDirectory(this.videoLive.Video) await ensureDir(outPath) const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY) @@ -306,21 +321,21 @@ class MuxingSession extends EventEmitter { } private processSegments (hlsVideoPath: string, segmentPaths: string[]) { - Bluebird.mapSeries(segmentPaths, async previousSegment => { + mapSeries(segmentPaths, async previousSegment => { // Add sha hash of previous segments, because ffmpeg should have finished generating them await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) if (this.saveReplay) { await this.addSegmentToReplay(hlsVideoPath, previousSegment) } - }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...this.lTags })) + }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...this.lTags() })) } private hasClientSocketInBadHealth (sessionId: string) { const rtmpSession = this.context.sessions.get(sessionId) if (!rtmpSession) { - logger.warn('Cannot get session %s to check players socket health.', sessionId, this.lTags) + logger.warn('Cannot get session %s to check players socket health.', sessionId, this.lTags()) return } @@ -328,7 +343,7 @@ class MuxingSession extends EventEmitter { const playerSession = this.context.sessions.get(playerSessionId) if (!playerSession) { - logger.error('Cannot get player session %s to check socket health.', playerSession, this.lTags) + logger.error('Cannot get player session %s to check socket health.', playerSession, this.lTags()) continue } @@ -349,7 +364,7 @@ class MuxingSession extends EventEmitter { await appendFile(dest, data) } catch (err) { - logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...this.lTags }) + logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...this.lTags() }) } } }