X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Flive-manager.ts;h=563ba2578bbf34d0e3f783e853957f8ff51002f6;hb=868fce62f86812759ccedccf7634236ac3701d9a;hp=8e7fd551171dbb9fda4b1254293512637b15e738;hpb=20213fbd2a366dffc35aa7dddad71323893f8d62;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index 8e7fd5511..563ba2578 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts @@ -8,7 +8,7 @@ import { basename, join } from 'path' import { isTestInstance } from '@server/helpers/core-utils' import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils' import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils' -import { logger } from '@server/helpers/logger' +import { logger, loggerTagsFactory } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants' import { UserModel } from '@server/models/user/user' @@ -48,6 +48,8 @@ const config = { } } +const lTags = loggerTagsFactory('live') + class LiveManager { private static instance: LiveManager @@ -75,20 +77,20 @@ class LiveManager { init () { const events = this.getContext().nodeEvent events.on('postPublish', (sessionId: string, streamPath: string) => { - logger.debug('RTMP received stream', { id: sessionId, streamPath }) + logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) }) const splittedPath = streamPath.split('/') if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) { - logger.warn('Live path is incorrect.', { streamPath }) + logger.warn('Live path is incorrect.', { streamPath, ...lTags(sessionId) }) return this.abortSession(sessionId) } this.handleSession(sessionId, streamPath, splittedPath[2]) - .catch(err => logger.error('Cannot handle sessions.', { err })) + .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) })) }) events.on('donePublish', sessionId => { - logger.info('Live session ended.', { sessionId }) + logger.info('Live session ended.', { sessionId, ...lTags(sessionId) }) }) registerConfigChangedHandler(() => { @@ -104,13 +106,13 @@ class LiveManager { // Cleanup broken lives, that were terminated by a server restart for example this.handleBrokenLives() - .catch(err => logger.error('Cannot handle broken lives.', { err })) + .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) } run () { - logger.info('Running RTMP server on port %d', config.rtmp.port) + logger.info('Running RTMP server on port %d', config.rtmp.port, lTags()) this.rtmpServer = createServer(socket => { const session = new NodeRtmpSession(config, socket) @@ -119,14 +121,14 @@ class LiveManager { }) this.rtmpServer.on('error', err => { - logger.error('Cannot run RTMP server.', { err }) + logger.error('Cannot run RTMP server.', { err, ...lTags() }) }) this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) } stop () { - logger.info('Stopping RTMP server.') + logger.info('Stopping RTMP server.', lTags()) this.rtmpServer.close() this.rtmpServer = undefined @@ -185,7 +187,7 @@ class LiveManager { return readFile(segmentPath) .then(data => appendFile(dest, data)) - .catch(err => logger.error('Cannot copy segment %s to repay directory.', segmentPath, { err })) + .catch(err => logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...lTags() })) } buildConcatenatedName (segmentOrPlaylistPath: string) { @@ -202,7 +204,7 @@ class LiveManager { if (videoLive.saveReplay) { await this.addSegmentToReplay(hlsVideoPath, previousSegment) } - }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err })) + }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...lTags(videoUUID) })) } private getContext () { @@ -226,13 +228,13 @@ class LiveManager { private async handleSession (sessionId: string, streamPath: string, streamKey: string) { const videoLive = await VideoLiveModel.loadByStreamKey(streamKey) if (!videoLive) { - logger.warn('Unknown live video with stream key %s.', streamKey) + logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId)) return this.abortSession(sessionId) } const video = videoLive.Video if (video.isBlacklisted()) { - logger.warn('Video is blacklisted. Refusing stream %s.', streamKey) + logger.warn('Video is blacklisted. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid)) return this.abortSession(sessionId) } @@ -262,7 +264,10 @@ class LiveManager { const allResolutions = resolutionsEnabled.concat([ session.videoHeight ]) - logger.info('Will mux/transcode live video of original resolution %d.', session.videoHeight, { allResolutions }) + logger.info( + 'Will mux/transcode live video of original resolution %d.', session.videoHeight, + { allResolutions, ...lTags(sessionId, video.uuid) } + ) const [ videoStreamingPlaylist ] = await VideoStreamingPlaylistModel.upsert({ videoId: video.id, @@ -317,7 +322,7 @@ class LiveManager { }) VideoFileModel.customUpsert(file, 'streaming-playlist', null) - .catch(err => logger.error('Cannot create file for live streaming.', { err })) + .catch(err => logger.error('Cannot create file for live streaming.', { err, ...lTags(sessionId, videoLive.Video.uuid) })) } const outPath = getHLSDirectory(videoLive.Video) @@ -342,7 +347,7 @@ class LiveManager { }) : getLiveMuxingCommand(rtmpUrl, outPath) - logger.info('Running live muxing/transcoding for %s.', videoUUID) + logger.info('Running live muxing/transcoding for %s.', videoUUID, lTags(sessionId, videoUUID)) this.transSessions.set(sessionId, ffmpegExec) const tsWatcher = chokidar.watch(outPath + '/*.ts') @@ -351,7 +356,7 @@ class LiveManager { const playlistIdMatcher = /^([\d+])-/ const addHandler = segmentPath => { - logger.debug('Live add handler of %s.', segmentPath) + logger.debug('Live add handler of %s.', segmentPath, lTags(sessionId, videoUUID)) const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] @@ -363,7 +368,9 @@ class LiveManager { if (this.hasClientSocketsInBadHealthWithCache(sessionId)) { logger.error( 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + - ' Stopping session of video %s.', videoUUID) + ' Stopping session of video %s.', videoUUID, + lTags(sessionId, videoUUID) + ) this.stopSessionOf(videoLive.videoId) return @@ -371,7 +378,7 @@ class LiveManager { // Duration constraint check if (this.isDurationConstraintValid(startStreamDateTime) !== true) { - logger.info('Stopping session of %s: max duration exceeded.', videoUUID) + logger.info('Stopping session of %s: max duration exceeded.', videoUUID, lTags(sessionId, videoUUID)) this.stopSessionOf(videoLive.videoId) return @@ -386,12 +393,12 @@ class LiveManager { .then(() => this.isQuotaConstraintValid(user, videoLive)) .then(quotaValid => { if (quotaValid !== true) { - logger.info('Stopping session of %s: user quota exceeded.', videoUUID) + logger.info('Stopping session of %s: user quota exceeded.', videoUUID, lTags(sessionId, videoUUID)) this.stopSessionOf(videoLive.videoId) } }) - .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err })) + .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err, ...lTags(sessionId, videoUUID) })) } } @@ -411,21 +418,21 @@ class LiveManager { setTimeout(() => { federateVideoIfNeeded(video, false) - .catch(err => logger.error('Cannot federate live video %s.', video.url, { err })) + .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...lTags(sessionId, videoUUID) })) PeerTubeSocket.Instance.sendVideoLiveNewState(video) }, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) } catch (err) { - logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err }) + logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err, ...lTags(sessionId, videoUUID) }) } finally { masterWatcher.close() - .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err })) + .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...lTags(sessionId, videoUUID) })) } }) const onFFmpegEnded = () => { - logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl) + logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl, lTags(sessionId, videoUUID)) this.transSessions.delete(sessionId) @@ -446,10 +453,15 @@ class LiveManager { this.processSegments(outPath, videoUUID, videoLive, segmentsToProcessPerPlaylist[key]) } }) - .catch(err => logger.error('Cannot close watchers of %s or process remaining hash segments.', outPath, { err })) + .catch(err => { + logger.error( + 'Cannot close watchers of %s or process remaining hash segments.', outPath, + { err, ...lTags(sessionId, videoUUID) } + ) + }) this.onEndTransmuxing(videoLive.Video.id) - .catch(err => logger.error('Error in closed transmuxing.', { err })) + .catch(err => logger.error('Error in closed transmuxing.', { err, ...lTags(sessionId, videoUUID) })) }, 1000) } @@ -459,7 +471,7 @@ class LiveManager { // Don't care that we killed the ffmpeg process if (err?.message?.includes('Exiting normally')) return - logger.error('Live transcoding error.', { err, stdout, stderr }) + logger.error('Live transcoding error.', { err, stdout, stderr, ...lTags(sessionId, videoUUID) }) this.abortSession(sessionId) }) @@ -469,12 +481,12 @@ class LiveManager { ffmpegExec.run() } - private async onEndTransmuxing (videoId: number, cleanupNow = false) { + private async onEndTransmuxing (videoUUID: string, cleanupNow = false) { try { - const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) + const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID) if (!fullVideo) return - const live = await VideoLiveModel.loadByVideoId(videoId) + const live = await VideoLiveModel.loadByVideoId(fullVideo.id) if (!live.permanentLive) { JobQueue.Instance.createJob({ @@ -495,13 +507,13 @@ class LiveManager { await federateVideoIfNeeded(fullVideo, false) } catch (err) { - logger.error('Cannot save/federate new video state of live streaming of video id %d.', videoId, { err }) + logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) }) } } private async addSegmentSha (videoUUID: string, segmentPath: string) { const segmentName = basename(segmentPath) - logger.debug('Adding live sha segment %s.', segmentPath) + logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID)) const shaResult = await buildSha256Segment(segmentPath) @@ -516,16 +528,16 @@ class LiveManager { private removeSegmentSha (videoUUID: string, segmentPath: string) { const segmentName = basename(segmentPath) - logger.debug('Removing live sha segment %s.', segmentPath) + logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID)) const filesMap = this.segmentsSha256.get(videoUUID) if (!filesMap) { - logger.warn('Unknown files map to remove sha for %s.', videoUUID) + logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID)) return } if (!filesMap.has(segmentName)) { - logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath) + logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID)) return } @@ -547,7 +559,7 @@ class LiveManager { const rtmpSession = this.getContext().sessions.get(sessionId) if (!rtmpSession) { - logger.warn('Cannot get session %s to check players socket health.', sessionId) + logger.warn('Cannot get session %s to check players socket health.', sessionId, lTags(sessionId)) return } @@ -555,7 +567,7 @@ class LiveManager { const playerSession = this.getContext().sessions.get(playerSessionId) if (!playerSession) { - logger.error('Cannot get player session %s to check socket health.', playerSession) + logger.error('Cannot get player session %s to check socket health.', playerSession, lTags(sessionId)) continue } @@ -576,7 +588,7 @@ class LiveManager { private async updateLiveViews () { if (!this.isRunning()) return - if (!isTestInstance()) logger.info('Updating live video views.') + if (!isTestInstance()) logger.info('Updating live video views.', lTags()) for (const videoId of this.watchersPerVideo.keys()) { const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE @@ -597,15 +609,15 @@ class LiveManager { const newWatchers = watchers.filter(w => w > notBefore) this.watchersPerVideo.set(videoId, newWatchers) - logger.debug('New live video views for %s is %d.', video.url, numWatchers) + logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags()) } } private async handleBrokenLives () { - const videoIds = await VideoModel.listPublishedLiveIds() + const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() - for (const id of videoIds) { - await this.onEndTransmuxing(id, true) + for (const uuid of videoUUIDs) { + await this.onEndTransmuxing(uuid, true) } }