import { isTestInstance } from '@server/helpers/core-utils'
import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
-import { logger } from '@server/helpers/logger'
+import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
import { UserModel } from '@server/models/user/user'
}
}
+const lTags = loggerTagsFactory('live')
+
class LiveManager {
private static instance: LiveManager
init () {
const events = this.getContext().nodeEvent
events.on('postPublish', (sessionId: string, streamPath: string) => {
- logger.debug('RTMP received stream', { id: sessionId, streamPath })
+ logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) })
const splittedPath = streamPath.split('/')
if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
- logger.warn('Live path is incorrect.', { streamPath })
+ logger.warn('Live path is incorrect.', { streamPath, ...lTags(sessionId) })
return this.abortSession(sessionId)
}
this.handleSession(sessionId, streamPath, splittedPath[2])
- .catch(err => logger.error('Cannot handle sessions.', { err }))
+ .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) }))
})
events.on('donePublish', sessionId => {
- logger.info('Live session ended.', { sessionId })
+ logger.info('Live session ended.', { sessionId, ...lTags(sessionId) })
})
registerConfigChangedHandler(() => {
// Cleanup broken lives, that were terminated by a server restart for example
this.handleBrokenLives()
- .catch(err => logger.error('Cannot handle broken lives.', { err }))
+ .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
}
run () {
- logger.info('Running RTMP server on port %d', config.rtmp.port)
+ logger.info('Running RTMP server on port %d', config.rtmp.port, lTags())
this.rtmpServer = createServer(socket => {
const session = new NodeRtmpSession(config, socket)
})
this.rtmpServer.on('error', err => {
- logger.error('Cannot run RTMP server.', { err })
+ logger.error('Cannot run RTMP server.', { err, ...lTags() })
})
this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT)
}
stop () {
- logger.info('Stopping RTMP server.')
+ logger.info('Stopping RTMP server.', lTags())
this.rtmpServer.close()
this.rtmpServer = undefined
return readFile(segmentPath)
.then(data => appendFile(dest, data))
- .catch(err => logger.error('Cannot copy segment %s to repay directory.', segmentPath, { err }))
+ .catch(err => logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...lTags() }))
}
buildConcatenatedName (segmentOrPlaylistPath: string) {
if (videoLive.saveReplay) {
await this.addSegmentToReplay(hlsVideoPath, previousSegment)
}
- }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err }))
+ }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...lTags(videoUUID) }))
}
private getContext () {
private async handleSession (sessionId: string, streamPath: string, streamKey: string) {
const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
if (!videoLive) {
- logger.warn('Unknown live video with stream key %s.', streamKey)
+ logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId))
return this.abortSession(sessionId)
}
const video = videoLive.Video
if (video.isBlacklisted()) {
- logger.warn('Video is blacklisted. Refusing stream %s.', streamKey)
+ logger.warn('Video is blacklisted. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid))
return this.abortSession(sessionId)
}
const allResolutions = resolutionsEnabled.concat([ session.videoHeight ])
- logger.info('Will mux/transcode live video of original resolution %d.', session.videoHeight, { allResolutions })
+ logger.info(
+ 'Will mux/transcode live video of original resolution %d.', session.videoHeight,
+ { allResolutions, ...lTags(sessionId, video.uuid) }
+ )
const [ videoStreamingPlaylist ] = await VideoStreamingPlaylistModel.upsert({
videoId: video.id,
})
VideoFileModel.customUpsert(file, 'streaming-playlist', null)
- .catch(err => logger.error('Cannot create file for live streaming.', { err }))
+ .catch(err => logger.error('Cannot create file for live streaming.', { err, ...lTags(sessionId, videoLive.Video.uuid) }))
}
const outPath = getHLSDirectory(videoLive.Video)
})
: getLiveMuxingCommand(rtmpUrl, outPath)
- logger.info('Running live muxing/transcoding for %s.', videoUUID)
+ logger.info('Running live muxing/transcoding for %s.', videoUUID, lTags(sessionId, videoUUID))
this.transSessions.set(sessionId, ffmpegExec)
const tsWatcher = chokidar.watch(outPath + '/*.ts')
const playlistIdMatcher = /^([\d+])-/
const addHandler = segmentPath => {
- logger.debug('Live add handler of %s.', segmentPath)
+ logger.debug('Live add handler of %s.', segmentPath, lTags(sessionId, videoUUID))
const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
if (this.hasClientSocketsInBadHealthWithCache(sessionId)) {
logger.error(
'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
- ' Stopping session of video %s.', videoUUID)
+ ' Stopping session of video %s.', videoUUID,
+ lTags(sessionId, videoUUID)
+ )
this.stopSessionOf(videoLive.videoId)
return
// Duration constraint check
if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
- logger.info('Stopping session of %s: max duration exceeded.', videoUUID)
+ logger.info('Stopping session of %s: max duration exceeded.', videoUUID, lTags(sessionId, videoUUID))
this.stopSessionOf(videoLive.videoId)
return
.then(() => this.isQuotaConstraintValid(user, videoLive))
.then(quotaValid => {
if (quotaValid !== true) {
- logger.info('Stopping session of %s: user quota exceeded.', videoUUID)
+ logger.info('Stopping session of %s: user quota exceeded.', videoUUID, lTags(sessionId, videoUUID))
this.stopSessionOf(videoLive.videoId)
}
})
- .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err }))
+ .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err, ...lTags(sessionId, videoUUID) }))
}
}
setTimeout(() => {
federateVideoIfNeeded(video, false)
- .catch(err => logger.error('Cannot federate live video %s.', video.url, { err }))
+ .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...lTags(sessionId, videoUUID) }))
PeerTubeSocket.Instance.sendVideoLiveNewState(video)
}, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
} catch (err) {
- logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err })
+ logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err, ...lTags(sessionId, videoUUID) })
} finally {
masterWatcher.close()
- .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err }))
+ .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...lTags(sessionId, videoUUID) }))
}
})
const onFFmpegEnded = () => {
- logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl)
+ logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl, lTags(sessionId, videoUUID))
this.transSessions.delete(sessionId)
this.processSegments(outPath, videoUUID, videoLive, segmentsToProcessPerPlaylist[key])
}
})
- .catch(err => logger.error('Cannot close watchers of %s or process remaining hash segments.', outPath, { err }))
+ .catch(err => {
+ logger.error(
+ 'Cannot close watchers of %s or process remaining hash segments.', outPath,
+ { err, ...lTags(sessionId, videoUUID) }
+ )
+ })
this.onEndTransmuxing(videoLive.Video.id)
- .catch(err => logger.error('Error in closed transmuxing.', { err }))
+ .catch(err => logger.error('Error in closed transmuxing.', { err, ...lTags(sessionId, videoUUID) }))
}, 1000)
}
// Don't care that we killed the ffmpeg process
if (err?.message?.includes('Exiting normally')) return
- logger.error('Live transcoding error.', { err, stdout, stderr })
+ logger.error('Live transcoding error.', { err, stdout, stderr, ...lTags(sessionId, videoUUID) })
this.abortSession(sessionId)
})
ffmpegExec.run()
}
- private async onEndTransmuxing (videoId: number, cleanupNow = false) {
+ private async onEndTransmuxing (videoUUID: string, cleanupNow = false) {
try {
- const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
+ const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID)
if (!fullVideo) return
- const live = await VideoLiveModel.loadByVideoId(videoId)
+ const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
if (!live.permanentLive) {
JobQueue.Instance.createJob({
await federateVideoIfNeeded(fullVideo, false)
} catch (err) {
- logger.error('Cannot save/federate new video state of live streaming of video id %d.', videoId, { err })
+ logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) })
}
}
private async addSegmentSha (videoUUID: string, segmentPath: string) {
const segmentName = basename(segmentPath)
- logger.debug('Adding live sha segment %s.', segmentPath)
+ logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID))
const shaResult = await buildSha256Segment(segmentPath)
private removeSegmentSha (videoUUID: string, segmentPath: string) {
const segmentName = basename(segmentPath)
- logger.debug('Removing live sha segment %s.', segmentPath)
+ logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID))
const filesMap = this.segmentsSha256.get(videoUUID)
if (!filesMap) {
- logger.warn('Unknown files map to remove sha for %s.', videoUUID)
+ logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID))
return
}
if (!filesMap.has(segmentName)) {
- logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath)
+ logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID))
return
}
const rtmpSession = this.getContext().sessions.get(sessionId)
if (!rtmpSession) {
- logger.warn('Cannot get session %s to check players socket health.', sessionId)
+ logger.warn('Cannot get session %s to check players socket health.', sessionId, lTags(sessionId))
return
}
const playerSession = this.getContext().sessions.get(playerSessionId)
if (!playerSession) {
- logger.error('Cannot get player session %s to check socket health.', playerSession)
+ logger.error('Cannot get player session %s to check socket health.', playerSession, lTags(sessionId))
continue
}
private async updateLiveViews () {
if (!this.isRunning()) return
- if (!isTestInstance()) logger.info('Updating live video views.')
+ if (!isTestInstance()) logger.info('Updating live video views.', lTags())
for (const videoId of this.watchersPerVideo.keys()) {
const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
const newWatchers = watchers.filter(w => w > notBefore)
this.watchersPerVideo.set(videoId, newWatchers)
- logger.debug('New live video views for %s is %d.', video.url, numWatchers)
+ logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags())
}
}
private async handleBrokenLives () {
- const videoIds = await VideoModel.listPublishedLiveIds()
+ const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
- for (const id of videoIds) {
- await this.onEndTransmuxing(id, true)
+ for (const uuid of videoUUIDs) {
+ await this.onEndTransmuxing(uuid, true)
}
}