X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Flive%2Flive-manager.ts;h=acb7af2745fc6d551a67011a2b0bf5f1284ee11c;hb=cb0eda5602a21d1626a7face32de6153ed07b5f9;hp=05274955d9f4d9408d7322ffeb21868d4394cc1a;hpb=05a60d85997c108d39bcfb14f1ffd4c74f8b1e93;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 05274955d..acb7af274 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -2,36 +2,30 @@ import { readdir, readFile } from 'fs-extra' import { createServer, Server } from 'net' import { join } from 'path' import { createServer as createServerTLS, Server as ServerTLS } from 'tls' -import { - computeResolutionsToTranscode, - ffprobePromise, - getLiveSegmentTime, - getVideoStreamBitrate, - getVideoStreamDimensionsInfo, - getVideoStreamFPS, - hasAudioStream -} from '@server/helpers/ffmpeg' import { logger, loggerTagsFactory } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' -import { VIDEO_LIVE } from '@server/initializers/constants' +import { VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' +import { sequelizeTypescript } from '@server/initializers/database' +import { RunnerJobModel } from '@server/models/runner/runner-job' import { UserModel } from '@server/models/user/user' import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' +import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' import { VideoLiveSessionModel } from '@server/models/video/video-live-session' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models' import { pick, wait } from '@shared/core-utils' +import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream } from '@shared/ffmpeg' import { LiveVideoError, VideoState } from '@shared/models' import { federateVideoIfNeeded } from '../activitypub/videos' import { JobQueue } from '../job-queue' import { getLiveReplayBaseDirectory } from '../paths' import { PeerTubeSocket } from '../peertube-socket' import { Hooks } from '../plugins/hooks' +import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolutions' import { LiveQuotaStore } from './live-quota-store' -import { cleanupAndDestroyPermanentLive } from './live-utils' +import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils' import { MuxingSession } from './shared' -import { sequelizeTypescript } from '@server/initializers/database' -import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') const context = require('node-media-server/src/node_core_ctx') @@ -57,7 +51,7 @@ class LiveManager { private static instance: LiveManager private readonly muxingSessions = new Map() - private readonly videoSessions = new Map() + private readonly videoSessions = new Map() private rtmpServer: Server private rtmpsServer: ServerTLS @@ -79,13 +73,18 @@ class LiveManager { } const session = this.getContext().sessions.get(sessionId) + const inputLocalUrl = session.inputOriginLocalUrl + streamPath + const inputPublicUrl = session.inputOriginPublicUrl + streamPath - this.handleSession(sessionId, session.inputOriginUrl + streamPath, splittedPath[2]) + this.handleSession({ sessionId, inputPublicUrl, inputLocalUrl, streamKey: splittedPath[2] }) .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) })) }) events.on('donePublish', sessionId => { logger.info('Live session ended.', { sessionId, ...lTags(sessionId) }) + + // Force session aborting, so we kill ffmpeg even if it still has data to process (slow CPU) + setTimeout(() => this.abortSession(sessionId), 2000) }) registerConfigChangedHandler(() => { @@ -113,7 +112,8 @@ class LiveManager { this.rtmpServer = createServer(socket => { const session = new NodeRtmpSession(config, socket) - session.inputOriginUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT + session.inputOriginLocalUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT + session.inputOriginPublicUrl = WEBSERVER.RTMP_URL session.run() }) @@ -136,7 +136,8 @@ class LiveManager { this.rtmpsServer = createServerTLS(serverOptions, socket => { const session = new NodeRtmpSession(config, socket) - session.inputOriginUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT + session.inputOriginLocalUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT + session.inputOriginPublicUrl = WEBSERVER.RTMPS_URL session.run() }) @@ -177,14 +178,23 @@ class LiveManager { return !!this.rtmpServer } - stopSessionOf (videoId: number, error: LiveVideoError | null) { - const sessionId = this.videoSessions.get(videoId) - if (!sessionId) return + hasSession (sessionId: string) { + return this.getContext().sessions.has(sessionId) + } - this.saveEndingSession(videoId, error) - .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) + stopSessionOf (videoUUID: string, error: LiveVideoError | null) { + const sessionId = this.videoSessions.get(videoUUID) + if (!sessionId) { + logger.debug('No live session to stop for video %s', videoUUID, lTags(sessionId, videoUUID)) + return + } + + logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) }) - this.videoSessions.delete(videoId) + this.saveEndingSession(videoUUID, error) + .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId, videoUUID) })) + + this.videoSessions.delete(videoUUID) this.abortSession(sessionId) } @@ -208,7 +218,14 @@ class LiveManager { } } - private async handleSession (sessionId: string, inputUrl: string, streamKey: string) { + private async handleSession (options: { + sessionId: string + inputLocalUrl: string + inputPublicUrl: string + streamKey: string + }) { + const { inputLocalUrl, inputPublicUrl, sessionId, streamKey } = options + const videoLive = await VideoLiveModel.loadByStreamKey(streamKey) if (!videoLive) { logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId)) @@ -221,6 +238,11 @@ class LiveManager { return this.abortSession(sessionId) } + if (this.videoSessions.has(video.uuid)) { + logger.warn('Video %s has already a live session. Refusing stream %s.', video.uuid, streamKey, lTags(sessionId, video.uuid)) + return this.abortSession(sessionId) + } + // Cleanup old potential live (could happen with a permanent live) const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) if (oldStreamingPlaylist) { @@ -229,21 +251,21 @@ class LiveManager { await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) } - this.videoSessions.set(video.id, sessionId) + this.videoSessions.set(video.uuid, sessionId) const now = Date.now() - const probe = await ffprobePromise(inputUrl) + const probe = await ffprobePromise(inputLocalUrl) const [ { resolution, ratio }, fps, bitrate, hasAudio ] = await Promise.all([ - getVideoStreamDimensionsInfo(inputUrl, probe), - getVideoStreamFPS(inputUrl, probe), - getVideoStreamBitrate(inputUrl, probe), - hasAudioStream(inputUrl, probe) + getVideoStreamDimensionsInfo(inputLocalUrl, probe), + getVideoStreamFPS(inputLocalUrl, probe), + getVideoStreamBitrate(inputLocalUrl, probe), + hasAudioStream(inputLocalUrl, probe) ]) logger.info( '%s probing took %d ms (bitrate: %d, fps: %d, resolution: %d)', - inputUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid) + inputLocalUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid) ) const allResolutions = await Hooks.wrapObject( @@ -253,7 +275,7 @@ class LiveManager { ) logger.info( - 'Will mux/transcode live video of original resolution %d.', resolution, + 'Handling live video of original resolution %d.', resolution, { allResolutions, ...lTags(sessionId, video.uuid) } ) @@ -261,7 +283,8 @@ class LiveManager { sessionId, videoLive, - inputUrl, + inputLocalUrl, + inputPublicUrl, fps, bitrate, ratio, @@ -274,7 +297,9 @@ class LiveManager { sessionId: string videoLive: MVideoLiveVideoWithSetting - inputUrl: string + inputLocalUrl: string + inputPublicUrl: string + fps: number bitrate: number ratio: number @@ -288,7 +313,7 @@ class LiveManager { const liveSession = await this.saveStartingSession(videoLive) const user = await UserModel.loadByLiveId(videoLive.id) - LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id) + LiveQuotaStore.Instance.addNewLive(user.id, sessionId) const muxingSession = new MuxingSession({ context: this.getContext(), @@ -296,49 +321,49 @@ class LiveManager { videoLive, user, - ...pick(options, [ 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ]) + ...pick(options, [ 'inputLocalUrl', 'inputPublicUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ]) }) muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) - muxingSession.on('bad-socket-health', ({ videoId }) => { + muxingSession.on('bad-socket-health', ({ videoUUID }) => { logger.error( 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + ' Stopping session of video %s.', videoUUID, localLTags ) - this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH) + this.stopSessionOf(videoUUID, LiveVideoError.BAD_SOCKET_HEALTH) }) - muxingSession.on('duration-exceeded', ({ videoId }) => { + muxingSession.on('duration-exceeded', ({ videoUUID }) => { logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED) + this.stopSessionOf(videoUUID, LiveVideoError.DURATION_EXCEEDED) }) - muxingSession.on('quota-exceeded', ({ videoId }) => { + muxingSession.on('quota-exceeded', ({ videoUUID }) => { logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED) + this.stopSessionOf(videoUUID, LiveVideoError.QUOTA_EXCEEDED) }) - muxingSession.on('ffmpeg-error', ({ videoId }) => { - this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR) + muxingSession.on('transcoding-error', ({ videoUUID }) => { + this.stopSessionOf(videoUUID, LiveVideoError.FFMPEG_ERROR) }) - muxingSession.on('ffmpeg-end', ({ videoId }) => { - this.onMuxingFFmpegEnd(videoId, sessionId) + muxingSession.on('transcoding-end', ({ videoUUID }) => { + this.onMuxingFFmpegEnd(videoUUID, sessionId) }) - muxingSession.on('after-cleanup', ({ videoId }) => { + muxingSession.on('after-cleanup', ({ videoUUID }) => { this.muxingSessions.delete(sessionId) - LiveQuotaStore.Instance.removeLive(user.id, videoLive.id) + LiveQuotaStore.Instance.removeLive(user.id, sessionId) muxingSession.destroy() - return this.onAfterMuxingCleanup({ videoId, liveSession }) + return this.onAfterMuxingCleanup({ videoUUID, liveSession }) .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) }) @@ -374,27 +399,34 @@ class LiveManager { } PeerTubeSocket.Instance.sendVideoLiveNewState(video) + + Hooks.runAction('action:live.video.state.updated', { video }) } catch (err) { logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags }) } } - private onMuxingFFmpegEnd (videoId: number, sessionId: string) { - this.videoSessions.delete(videoId) + private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) { + // Session already cleaned up + if (!this.videoSessions.has(videoUUID)) return + + this.videoSessions.delete(videoUUID) - this.saveEndingSession(videoId, null) + this.saveEndingSession(videoUUID, null) .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) } private async onAfterMuxingCleanup (options: { - videoId: number | string + videoUUID: string liveSession?: MVideoLiveSession cleanupNow?: boolean // Default false }) { - const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options + const { videoUUID, liveSession: liveSessionArg, cleanupNow = false } = options + + logger.debug('Live of video %s has been cleaned up. Moving to its next state.', videoUUID, lTags(videoUUID)) try { - const fullVideo = await VideoModel.loadFull(videoId) + const fullVideo = await VideoModel.loadFull(videoUUID) if (!fullVideo) return const live = await VideoLiveModel.loadByVideoId(fullVideo.id) @@ -436,16 +468,20 @@ class LiveManager { PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) await federateVideoIfNeeded(fullVideo, false) + + Hooks.runAction('action:live.video.state.updated', { video: fullVideo }) } catch (err) { - logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') }) + logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) }) } } private async handleBrokenLives () { + await RunnerJobModel.cancelAllJobs({ type: 'live-rtmp-hls-transcoding' }) + const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() for (const uuid of videoUUIDs) { - await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true }) + await this.onAfterMuxingCleanup({ videoUUID: uuid, cleanupNow: true }) } } @@ -494,8 +530,8 @@ class LiveManager { }) } - private async saveEndingSession (videoId: number, error: LiveVideoError | null) { - const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId) + private async saveEndingSession (videoUUID: string, error: LiveVideoError | null) { + const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoUUID) if (!liveSession) return liveSession.endDate = new Date()