import { isTestInstance } from '@server/helpers/core-utils'
import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
import { isTestInstance } from '@server/helpers/core-utils'
import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
import { UserModel } from '@server/models/user/user'
import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
import { UserModel } from '@server/models/user/user'
init () {
const events = this.getContext().nodeEvent
events.on('postPublish', (sessionId: string, streamPath: string) => {
init () {
const events = this.getContext().nodeEvent
events.on('postPublish', (sessionId: string, streamPath: string) => {
- logger.debug('RTMP received stream', { id: sessionId, streamPath })
+ logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) })
const splittedPath = streamPath.split('/')
if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
const splittedPath = streamPath.split('/')
if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
return this.abortSession(sessionId)
}
this.handleSession(sessionId, streamPath, splittedPath[2])
return this.abortSession(sessionId)
}
this.handleSession(sessionId, streamPath, splittedPath[2])
this.rtmpServer = createServer(socket => {
const session = new NodeRtmpSession(config, socket)
this.rtmpServer = createServer(socket => {
const session = new NodeRtmpSession(config, socket)
return readFile(segmentPath)
.then(data => appendFile(dest, data))
return readFile(segmentPath)
.then(data => appendFile(dest, data))
- .catch(err => logger.error('Cannot copy segment %s to repay directory.', segmentPath, { err }))
+ .catch(err => logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...lTags() }))
if (videoLive.saveReplay) {
await this.addSegmentToReplay(hlsVideoPath, previousSegment)
}
if (videoLive.saveReplay) {
await this.addSegmentToReplay(hlsVideoPath, previousSegment)
}
- }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err }))
+ }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...lTags(videoUUID) }))
private async handleSession (sessionId: string, streamPath: string, streamKey: string) {
const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
if (!videoLive) {
private async handleSession (sessionId: string, streamPath: string, streamKey: string) {
const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
if (!videoLive) {
return this.abortSession(sessionId)
}
const video = videoLive.Video
if (video.isBlacklisted()) {
return this.abortSession(sessionId)
}
const video = videoLive.Video
if (video.isBlacklisted()) {
- logger.info('Will mux/transcode live video of original resolution %d.', session.videoHeight, { allResolutions })
+ logger.info(
+ 'Will mux/transcode live video of original resolution %d.', session.videoHeight,
+ { allResolutions, ...lTags(sessionId, video.uuid) }
+ )
- .catch(err => logger.error('Cannot create file for live streaming.', { err }))
+ .catch(err => logger.error('Cannot create file for live streaming.', { err, ...lTags(sessionId, videoLive.Video.uuid) }))
this.transSessions.set(sessionId, ffmpegExec)
const tsWatcher = chokidar.watch(outPath + '/*.ts')
this.transSessions.set(sessionId, ffmpegExec)
const tsWatcher = chokidar.watch(outPath + '/*.ts')
if (this.hasClientSocketsInBadHealthWithCache(sessionId)) {
logger.error(
'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
if (this.hasClientSocketsInBadHealthWithCache(sessionId)) {
logger.error(
'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
- ' Stopping session of video %s.', videoUUID)
+ ' Stopping session of video %s.', videoUUID,
+ lTags(sessionId, videoUUID)
+ )
.then(() => this.isQuotaConstraintValid(user, videoLive))
.then(quotaValid => {
if (quotaValid !== true) {
.then(() => this.isQuotaConstraintValid(user, videoLive))
.then(quotaValid => {
if (quotaValid !== true) {
- .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err }))
+ .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err, ...lTags(sessionId, videoUUID) }))
- .catch(err => logger.error('Cannot federate live video %s.', video.url, { err }))
+ .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...lTags(sessionId, videoUUID) }))
PeerTubeSocket.Instance.sendVideoLiveNewState(video)
}, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
} catch (err) {
PeerTubeSocket.Instance.sendVideoLiveNewState(video)
}, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
} catch (err) {
- logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err })
+ logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err, ...lTags(sessionId, videoUUID) })
- .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err }))
+ .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...lTags(sessionId, videoUUID) }))
this.processSegments(outPath, videoUUID, videoLive, segmentsToProcessPerPlaylist[key])
}
})
this.processSegments(outPath, videoUUID, videoLive, segmentsToProcessPerPlaylist[key])
}
})
- .catch(err => logger.error('Cannot close watchers of %s or process remaining hash segments.', outPath, { err }))
+ .catch(err => {
+ logger.error(
+ 'Cannot close watchers of %s or process remaining hash segments.', outPath,
+ { err, ...lTags(sessionId, videoUUID) }
+ )
+ })
// Don't care that we killed the ffmpeg process
if (err?.message?.includes('Exiting normally')) return
// Don't care that we killed the ffmpeg process
if (err?.message?.includes('Exiting normally')) return
- logger.error('Live transcoding error.', { err, stdout, stderr })
+ logger.error('Live transcoding error.', { err, stdout, stderr, ...lTags(sessionId, videoUUID) })
- logger.error('Cannot save/federate new video state of live streaming of video id %d.', videoId, { err })
+ logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) })
}
}
private async addSegmentSha (videoUUID: string, segmentPath: string) {
const segmentName = basename(segmentPath)
}
}
private async addSegmentSha (videoUUID: string, segmentPath: string) {
const segmentName = basename(segmentPath)
private removeSegmentSha (videoUUID: string, segmentPath: string) {
const segmentName = basename(segmentPath)
private removeSegmentSha (videoUUID: string, segmentPath: string) {
const segmentName = basename(segmentPath)
- logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath)
+ logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID))
for (const videoId of this.watchersPerVideo.keys()) {
const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
for (const videoId of this.watchersPerVideo.keys()) {
const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
const newWatchers = watchers.filter(w => w > notBefore)
this.watchersPerVideo.set(videoId, newWatchers)
const newWatchers = watchers.filter(w => w > notBefore)
this.watchersPerVideo.set(videoId, newWatchers)
- logger.debug('New live video views for %s is %d.', video.url, numWatchers)
+ logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags())
- for (const id of videoIds) {
- await this.onEndTransmuxing(id, true)
+ for (const uuid of videoUUIDs) {
+ await this.onEndTransmuxing(uuid, true)