X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Flive%2Flive-manager.ts;h=68aa184431e3c96e21b8d986cad26321d935f00e;hb=a5cf9c98678962640cde7ed595aa59e993677abf;hp=f106d69fb119180504e12114e15e64e791c3ce59;hpb=421ff4618da64f0849353383f690a014024c40da;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index f106d69fb..68aa18443 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -1,23 +1,30 @@ - +import { readdir, readFile } from 'fs-extra' import { createServer, Server } from 'net' -import { isTestInstance } from '@server/helpers/core-utils' -import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils' +import { join } from 'path' +import { createServer as createServerTLS, Server as ServerTLS } from 'tls' import { logger, loggerTagsFactory } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' -import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants' +import { VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants' +import { sequelizeTypescript } from '@server/initializers/database' +import { RunnerJobModel } from '@server/models/runner/runner-job' import { UserModel } from '@server/models/user/user' import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' +import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' +import { VideoLiveSessionModel } from '@server/models/video/video-live-session' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' -import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' -import { VideoState, VideoStreamingPlaylistType } from '@shared/models' +import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models' +import { pick, wait } from '@shared/core-utils' +import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream } from '@shared/ffmpeg' +import { LiveVideoError, VideoState } from '@shared/models' import { federateVideoIfNeeded } from '../activitypub/videos' import { JobQueue } from '../job-queue' +import { getLiveReplayBaseDirectory } from '../paths' import { PeerTubeSocket } from '../peertube-socket' -import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../video-paths' +import { Hooks } from '../plugins/hooks' +import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolutions' import { LiveQuotaStore } from './live-quota-store' -import { LiveSegmentShaStore } from './live-segment-sha-store' -import { cleanupLive } from './live-utils' +import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils' import { MuxingSession } from './shared' const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') @@ -34,9 +41,6 @@ const config = { gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE, ping: VIDEO_LIVE.RTMP.PING, ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT - }, - transcoding: { - ffmpeg: 'ffmpeg' } } @@ -47,11 +51,12 @@ class LiveManager { private static instance: LiveManager private readonly muxingSessions = new Map() - private readonly videoSessions = new Map() - // Values are Date().getTime() - private readonly watchersPerVideo = new Map() + private readonly videoSessions = new Map() private rtmpServer: Server + private rtmpsServer: ServerTLS + + private running = false private constructor () { } @@ -67,21 +72,28 @@ class LiveManager { return this.abortSession(sessionId) } - this.handleSession(sessionId, streamPath, splittedPath[2]) + const session = this.getContext().sessions.get(sessionId) + const inputLocalUrl = session.inputOriginLocalUrl + streamPath + const inputPublicUrl = session.inputOriginPublicUrl + streamPath + + this.handleSession({ sessionId, inputPublicUrl, inputLocalUrl, streamKey: splittedPath[2] }) .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) })) }) events.on('donePublish', sessionId => { logger.info('Live session ended.', { sessionId, ...lTags(sessionId) }) + + // Force session aborting, so we kill ffmpeg even if it still has data to process (slow CPU) + setTimeout(() => this.abortSession(sessionId), 2000) }) registerConfigChangedHandler(() => { - if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) { - this.run() + if (!this.running && CONFIG.LIVE.ENABLED === true) { + this.run().catch(err => logger.error('Cannot run live server.', { err })) return } - if (this.rtmpServer && CONFIG.LIVE.ENABLED === false) { + if (this.running && CONFIG.LIVE.ENABLED === false) { this.stop() } }) @@ -89,31 +101,70 @@ class LiveManager { // Cleanup broken lives, that were terminated by a server restart for example this.handleBrokenLives() .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) - - setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) } - run () { - logger.info('Running RTMP server on port %d', config.rtmp.port, lTags()) + async run () { + this.running = true - this.rtmpServer = createServer(socket => { - const session = new NodeRtmpSession(config, socket) + if (CONFIG.LIVE.RTMP.ENABLED) { + logger.info('Running RTMP server on port %d', CONFIG.LIVE.RTMP.PORT, lTags()) - session.run() - }) + this.rtmpServer = createServer(socket => { + const session = new NodeRtmpSession(config, socket) - this.rtmpServer.on('error', err => { - logger.error('Cannot run RTMP server.', { err, ...lTags() }) - }) + session.inputOriginLocalUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT + session.inputOriginPublicUrl = WEBSERVER.RTMP_URL + session.run() + }) + + this.rtmpServer.on('error', err => { + logger.error('Cannot run RTMP server.', { err, ...lTags() }) + }) + + this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT, CONFIG.LIVE.RTMP.HOSTNAME) + } - this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) + if (CONFIG.LIVE.RTMPS.ENABLED) { + logger.info('Running RTMPS server on port %d', CONFIG.LIVE.RTMPS.PORT, lTags()) + + const [ key, cert ] = await Promise.all([ + readFile(CONFIG.LIVE.RTMPS.KEY_FILE), + readFile(CONFIG.LIVE.RTMPS.CERT_FILE) + ]) + const serverOptions = { key, cert } + + this.rtmpsServer = createServerTLS(serverOptions, socket => { + const session = new NodeRtmpSession(config, socket) + + session.inputOriginLocalUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT + session.inputOriginPublicUrl = WEBSERVER.RTMPS_URL + session.run() + }) + + this.rtmpsServer.on('error', err => { + logger.error('Cannot run RTMPS server.', { err, ...lTags() }) + }) + + this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT, CONFIG.LIVE.RTMPS.HOSTNAME) + } } stop () { - logger.info('Stopping RTMP server.', lTags()) + this.running = false + + if (this.rtmpServer) { + logger.info('Stopping RTMP server.', lTags()) + + this.rtmpServer.close() + this.rtmpServer = undefined + } - this.rtmpServer.close() - this.rtmpServer = undefined + if (this.rtmpsServer) { + logger.info('Stopping RTMPS server.', lTags()) + + this.rtmpsServer.close() + this.rtmpsServer = undefined + } // Sessions is an object this.getContext().sessions.forEach((session: any) => { @@ -127,25 +178,20 @@ class LiveManager { return !!this.rtmpServer } - stopSessionOf (videoId: number) { - const sessionId = this.videoSessions.get(videoId) - if (!sessionId) return - - this.videoSessions.delete(videoId) - this.abortSession(sessionId) - } - - addViewTo (videoId: number) { - if (this.videoSessions.has(videoId) === false) return + stopSessionOf (videoUUID: string, error: LiveVideoError | null) { + const sessionId = this.videoSessions.get(videoUUID) + if (!sessionId) { + logger.debug('No live session to stop for video %s', videoUUID, lTags(sessionId, videoUUID)) + return + } - let watchers = this.watchersPerVideo.get(videoId) + logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) }) - if (!watchers) { - watchers = [] - this.watchersPerVideo.set(videoId, watchers) - } + this.saveEndingSession(videoUUID, error) + .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId, videoUUID) })) - watchers.push(new Date().getTime()) + this.videoSessions.delete(videoUUID) + this.abortSession(sessionId) } private getContext () { @@ -168,7 +214,14 @@ class LiveManager { } } - private async handleSession (sessionId: string, streamPath: string, streamKey: string) { + private async handleSession (options: { + sessionId: string + inputLocalUrl: string + inputPublicUrl: string + streamKey: string + }) { + const { inputLocalUrl, inputPublicUrl, sessionId, streamKey } = options + const videoLive = await VideoLiveModel.loadByStreamKey(streamKey) if (!videoLive) { logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId)) @@ -181,103 +234,132 @@ class LiveManager { return this.abortSession(sessionId) } - // Cleanup old potential live files (could happen with a permanent live) - LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) + if (this.videoSessions.has(video.uuid)) { + logger.warn('Video %s has already a live session. Refusing stream %s.', video.uuid, streamKey, lTags(sessionId, video.uuid)) + return this.abortSession(sessionId) + } + // Cleanup old potential live (could happen with a permanent live) const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) if (oldStreamingPlaylist) { - await cleanupLive(video, oldStreamingPlaylist) + if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) + + await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) } - this.videoSessions.set(video.id, sessionId) + this.videoSessions.set(video.uuid, sessionId) - const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath + const now = Date.now() + const probe = await ffprobePromise(inputLocalUrl) - const [ { videoFileResolution }, fps ] = await Promise.all([ - getVideoFileResolution(rtmpUrl), - getVideoFileFPS(rtmpUrl) + const [ { resolution, ratio }, fps, bitrate, hasAudio ] = await Promise.all([ + getVideoStreamDimensionsInfo(inputLocalUrl, probe), + getVideoStreamFPS(inputLocalUrl, probe), + getVideoStreamBitrate(inputLocalUrl, probe), + hasAudioStream(inputLocalUrl, probe) ]) - const allResolutions = this.buildAllResolutionsToTranscode(videoFileResolution) + logger.info( + '%s probing took %d ms (bitrate: %d, fps: %d, resolution: %d)', + inputLocalUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid) + ) + + const allResolutions = await Hooks.wrapObject( + this.buildAllResolutionsToTranscode(resolution, hasAudio), + 'filter:transcoding.auto.resolutions-to-transcode.result', + { video } + ) logger.info( - 'Will mux/transcode live video of original resolution %d.', videoFileResolution, + 'Handling live video of original resolution %d.', resolution, { allResolutions, ...lTags(sessionId, video.uuid) } ) - const streamingPlaylist = await this.createLivePlaylist(video, allResolutions) - return this.runMuxingSession({ sessionId, videoLive, - streamingPlaylist, - rtmpUrl, + + inputLocalUrl, + inputPublicUrl, fps, - allResolutions + bitrate, + ratio, + allResolutions, + hasAudio }) } private async runMuxingSession (options: { sessionId: string - videoLive: MVideoLiveVideo - streamingPlaylist: MStreamingPlaylistVideo - rtmpUrl: string + videoLive: MVideoLiveVideoWithSetting + + inputLocalUrl: string + inputPublicUrl: string + fps: number + bitrate: number + ratio: number allResolutions: number[] + hasAudio: boolean }) { - const { sessionId, videoLive, streamingPlaylist, allResolutions, fps, rtmpUrl } = options + const { sessionId, videoLive } = options const videoUUID = videoLive.Video.uuid const localLTags = lTags(sessionId, videoUUID) + const liveSession = await this.saveStartingSession(videoLive) + const user = await UserModel.loadByLiveId(videoLive.id) LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id) const muxingSession = new MuxingSession({ context: this.getContext(), - user, sessionId, videoLive, - streamingPlaylist, - rtmpUrl, - fps, - allResolutions + user, + + ...pick(options, [ 'inputLocalUrl', 'inputPublicUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ]) }) - muxingSession.on('master-playlist-created', () => this.publishAndFederateLive(videoLive, localLTags)) + muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) - muxingSession.on('bad-socket-health', ({ videoId }) => { + muxingSession.on('bad-socket-health', ({ videoUUID }) => { logger.error( 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + ' Stopping session of video %s.', videoUUID, localLTags ) - this.stopSessionOf(videoId) + this.stopSessionOf(videoUUID, LiveVideoError.BAD_SOCKET_HEALTH) }) - muxingSession.on('duration-exceeded', ({ videoId }) => { + muxingSession.on('duration-exceeded', ({ videoUUID }) => { logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId) + this.stopSessionOf(videoUUID, LiveVideoError.DURATION_EXCEEDED) }) - muxingSession.on('quota-exceeded', ({ videoId }) => { + muxingSession.on('quota-exceeded', ({ videoUUID }) => { logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId) + this.stopSessionOf(videoUUID, LiveVideoError.QUOTA_EXCEEDED) + }) + + muxingSession.on('transcoding-error', ({ videoUUID }) => { + this.stopSessionOf(videoUUID, LiveVideoError.FFMPEG_ERROR) }) - muxingSession.on('ffmpeg-error', ({ sessionId }) => this.abortSession(sessionId)) - muxingSession.on('ffmpeg-end', ({ videoId }) => { - this.onMuxingFFmpegEnd(videoId) + muxingSession.on('transcoding-end', ({ videoUUID }) => { + this.onMuxingFFmpegEnd(videoUUID, sessionId) }) - muxingSession.on('after-cleanup', ({ videoId }) => { + muxingSession.on('after-cleanup', ({ videoUUID }) => { this.muxingSessions.delete(sessionId) + LiveQuotaStore.Instance.removeLive(user.id, videoLive.id) + muxingSession.destroy() - return this.onAfterMuxingCleanup(videoId) + return this.onAfterMuxingCleanup({ videoUUID, liveSession }) .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) }) @@ -294,117 +376,160 @@ class LiveManager { const videoId = live.videoId try { - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) + const video = await VideoModel.loadFull(videoId) logger.info('Will publish and federate live %s.', video.url, localLTags) video.state = VideoState.PUBLISHED + video.publishedAt = new Date() await video.save() live.Video = video - setTimeout(() => { - federateVideoIfNeeded(video, false) - .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })) + await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) + + try { + await federateVideoIfNeeded(video, false) + } catch (err) { + logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags }) + } - PeerTubeSocket.Instance.sendVideoLiveNewState(video) - }, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) + PeerTubeSocket.Instance.sendVideoLiveNewState(video) } catch (err) { logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags }) } } - private onMuxingFFmpegEnd (videoId: number) { - this.watchersPerVideo.delete(videoId) - this.videoSessions.delete(videoId) + private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) { + // Session already cleaned up + if (!this.videoSessions.has(videoUUID)) return + + this.videoSessions.delete(videoUUID) + + this.saveEndingSession(videoUUID, null) + .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) } - private async onAfterMuxingCleanup (videoUUID: string, cleanupNow = false) { + private async onAfterMuxingCleanup (options: { + videoUUID: string + liveSession?: MVideoLiveSession + cleanupNow?: boolean // Default false + }) { + const { videoUUID, liveSession: liveSessionArg, cleanupNow = false } = options + + logger.debug('Live of video %s has been cleaned up. Moving to its next state.', videoUUID, lTags(videoUUID)) + try { - const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID) + const fullVideo = await VideoModel.loadFull(videoUUID) if (!fullVideo) return const live = await VideoLiveModel.loadByVideoId(fullVideo.id) - if (!live.permanentLive) { - JobQueue.Instance.createJob({ - type: 'video-live-ending', - payload: { - videoId: fullVideo.id - } - }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) - - fullVideo.state = VideoState.LIVE_ENDED - } else { - fullVideo.state = VideoState.WAITING_FOR_LIVE - } - - await fullVideo.save() + const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findLatestSessionOf(fullVideo.id) - PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) - - await federateVideoIfNeeded(fullVideo, false) - } catch (err) { - logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) }) - } - } - - private async updateLiveViews () { - if (!this.isRunning()) return + // On server restart during a live + if (!liveSession.endDate) { + liveSession.endDate = new Date() + await liveSession.save() + } - if (!isTestInstance()) logger.info('Updating live video views.', lTags()) + JobQueue.Instance.createJobAsync({ + type: 'video-live-ending', + payload: { + videoId: fullVideo.id, - for (const videoId of this.watchersPerVideo.keys()) { - const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE + replayDirectory: live.saveReplay + ? await this.findReplayDirectory(fullVideo) + : undefined, - const watchers = this.watchersPerVideo.get(videoId) + liveSessionId: liveSession.id, + streamingPlaylistId: fullVideo.getHLSPlaylist()?.id, - const numWatchers = watchers.length + publishedAt: fullVideo.publishedAt.toISOString() + }, - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) - video.views = numWatchers - await video.save() + delay: cleanupNow + ? 0 + : VIDEO_LIVE.CLEANUP_DELAY + }) - await federateVideoIfNeeded(video, false) + fullVideo.state = live.permanentLive + ? VideoState.WAITING_FOR_LIVE + : VideoState.LIVE_ENDED - PeerTubeSocket.Instance.sendVideoViewsUpdate(video) + await fullVideo.save() - // Only keep not expired watchers - const newWatchers = watchers.filter(w => w > notBefore) - this.watchersPerVideo.set(videoId, newWatchers) + PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) - logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags()) + await federateVideoIfNeeded(fullVideo, false) + } catch (err) { + logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) }) } } private async handleBrokenLives () { + await RunnerJobModel.cancelAllJobs({ type: 'live-rtmp-hls-transcoding' }) + const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() for (const uuid of videoUUIDs) { - await this.onAfterMuxingCleanup(uuid, true) + await this.onAfterMuxingCleanup({ videoUUID: uuid, cleanupNow: true }) } } - private buildAllResolutionsToTranscode (originResolution: number) { + private async findReplayDirectory (video: MVideo) { + const directory = getLiveReplayBaseDirectory(video) + const files = await readdir(directory) + + if (files.length === 0) return undefined + + return join(directory, files.sort().reverse()[0]) + } + + private buildAllResolutionsToTranscode (originResolution: number, hasAudio: boolean) { + const includeInput = CONFIG.LIVE.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION + const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED - ? computeResolutionsToTranscode(originResolution, 'live') + ? computeResolutionsToTranscode({ input: originResolution, type: 'live', includeInput, strictLower: false, hasAudio }) : [] - return resolutionsEnabled.concat([ originResolution ]) + if (resolutionsEnabled.length === 0) { + return [ originResolution ] + } + + return resolutionsEnabled } - private async createLivePlaylist (video: MVideo, allResolutions: number[]): Promise { - const playlist = await VideoStreamingPlaylistModel.loadOrGenerate(video) + private async saveStartingSession (videoLive: MVideoLiveVideoWithSetting) { + const replaySettings = videoLive.saveReplay + ? new VideoLiveReplaySettingModel({ + privacy: videoLive.ReplaySetting.privacy + }) + : null + + return sequelizeTypescript.transaction(async t => { + if (videoLive.saveReplay) { + await replaySettings.save({ transaction: t }) + } - playlist.playlistFilename = generateHLSMasterPlaylistFilename(true) - playlist.segmentsSha256Filename = generateHlsSha256SegmentsFilename(true) + return VideoLiveSessionModel.create({ + startDate: new Date(), + liveVideoId: videoLive.videoId, + saveReplay: videoLive.saveReplay, + replaySettingId: videoLive.saveReplay ? replaySettings.id : null, + endingProcessed: false + }, { transaction: t }) + }) + } - playlist.p2pMediaLoaderPeerVersion = P2P_MEDIA_LOADER_PEER_VERSION - playlist.type = VideoStreamingPlaylistType.HLS + private async saveEndingSession (videoUUID: string, error: LiveVideoError | null) { + const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoUUID) + if (!liveSession) return - playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions) + liveSession.endDate = new Date() + liveSession.error = error - return playlist.save() + return liveSession.save() } static get Instance () {