X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Flive%2Flive-manager.ts;h=33e49acc1b9a9c4d94573f9c88db3d3a5c599c7a;hb=2e9c7877eb3a3c5d64cc5c3383f0a7c0b51f5481;hp=2a429fb337841e4dc03a528759d7c46bed7db561;hpb=679c12e69c9f3a2d003ee3abe8b8da49f25b2bd3;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 2a429fb33..33e49acc1 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -1,8 +1,9 @@ +import { readFile } from 'fs-extra' import { createServer, Server } from 'net' -import { isTestInstance } from '@server/helpers/core-utils' +import { createServer as createServerTLS, Server as ServerTLS } from 'tls' import { - computeResolutionsToTranscode, + computeLowerResolutionsToTranscode, ffprobePromise, getVideoFileBitrate, getVideoFileFPS, @@ -10,7 +11,7 @@ import { } from '@server/helpers/ffprobe-utils' import { logger, loggerTagsFactory } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' -import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants' +import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' import { UserModel } from '@server/models/user/user' import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' @@ -19,8 +20,8 @@ import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/ import { VideoState, VideoStreamingPlaylistType } from '@shared/models' import { federateVideoIfNeeded } from '../activitypub/videos' import { JobQueue } from '../job-queue' +import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../paths' import { PeerTubeSocket } from '../peertube-socket' -import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../video-paths' import { LiveQuotaStore } from './live-quota-store' import { LiveSegmentShaStore } from './live-segment-sha-store' import { cleanupLive } from './live-utils' @@ -40,9 +41,6 @@ const config = { gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE, ping: VIDEO_LIVE.RTMP.PING, ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT - }, - transcoding: { - ffmpeg: 'ffmpeg' } } @@ -54,10 +52,11 @@ class LiveManager { private readonly muxingSessions = new Map() private readonly videoSessions = new Map() - // Values are Date().getTime() - private readonly watchersPerVideo = new Map() private rtmpServer: Server + private rtmpsServer: ServerTLS + + private running = false private constructor () { } @@ -73,7 +72,9 @@ class LiveManager { return this.abortSession(sessionId) } - this.handleSession(sessionId, streamPath, splittedPath[2]) + const session = this.getContext().sessions.get(sessionId) + + this.handleSession(sessionId, session.inputOriginUrl + streamPath, splittedPath[2]) .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) })) }) @@ -82,12 +83,12 @@ class LiveManager { }) registerConfigChangedHandler(() => { - if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) { - this.run() + if (!this.running && CONFIG.LIVE.ENABLED === true) { + this.run().catch(err => logger.error('Cannot run live server.', { err })) return } - if (this.rtmpServer && CONFIG.LIVE.ENABLED === false) { + if (this.running && CONFIG.LIVE.ENABLED === false) { this.stop() } }) @@ -95,31 +96,68 @@ class LiveManager { // Cleanup broken lives, that were terminated by a server restart for example this.handleBrokenLives() .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) - - setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) } - run () { - logger.info('Running RTMP server on port %d', config.rtmp.port, lTags()) + async run () { + this.running = true - this.rtmpServer = createServer(socket => { - const session = new NodeRtmpSession(config, socket) + if (CONFIG.LIVE.RTMP.ENABLED) { + logger.info('Running RTMP server on port %d', CONFIG.LIVE.RTMP.PORT, lTags()) - session.run() - }) + this.rtmpServer = createServer(socket => { + const session = new NodeRtmpSession(config, socket) - this.rtmpServer.on('error', err => { - logger.error('Cannot run RTMP server.', { err, ...lTags() }) - }) + session.inputOriginUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT + session.run() + }) + + this.rtmpServer.on('error', err => { + logger.error('Cannot run RTMP server.', { err, ...lTags() }) + }) + + this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) + } + + if (CONFIG.LIVE.RTMPS.ENABLED) { + logger.info('Running RTMPS server on port %d', CONFIG.LIVE.RTMPS.PORT, lTags()) - this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) + const [ key, cert ] = await Promise.all([ + readFile(CONFIG.LIVE.RTMPS.KEY_FILE), + readFile(CONFIG.LIVE.RTMPS.CERT_FILE) + ]) + const serverOptions = { key, cert } + + this.rtmpsServer = createServerTLS(serverOptions, socket => { + const session = new NodeRtmpSession(config, socket) + + session.inputOriginUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT + session.run() + }) + + this.rtmpsServer.on('error', err => { + logger.error('Cannot run RTMPS server.', { err, ...lTags() }) + }) + + this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT) + } } stop () { - logger.info('Stopping RTMP server.', lTags()) + this.running = false + + if (this.rtmpServer) { + logger.info('Stopping RTMP server.', lTags()) + + this.rtmpServer.close() + this.rtmpServer = undefined + } + + if (this.rtmpsServer) { + logger.info('Stopping RTMPS server.', lTags()) - this.rtmpServer.close() - this.rtmpServer = undefined + this.rtmpsServer.close() + this.rtmpsServer = undefined + } // Sessions is an object this.getContext().sessions.forEach((session: any) => { @@ -141,19 +179,6 @@ class LiveManager { this.abortSession(sessionId) } - addViewTo (videoId: number) { - if (this.videoSessions.has(videoId) === false) return - - let watchers = this.watchersPerVideo.get(videoId) - - if (!watchers) { - watchers = [] - this.watchersPerVideo.set(videoId, watchers) - } - - watchers.push(new Date().getTime()) - } - private getContext () { return context } @@ -174,7 +199,7 @@ class LiveManager { } } - private async handleSession (sessionId: string, streamPath: string, streamKey: string) { + private async handleSession (sessionId: string, inputUrl: string, streamKey: string) { const videoLive = await VideoLiveModel.loadByStreamKey(streamKey) if (!videoLive) { logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId)) @@ -197,20 +222,18 @@ class LiveManager { this.videoSessions.set(video.id, sessionId) - const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath - const now = Date.now() - const probe = await ffprobePromise(rtmpUrl) + const probe = await ffprobePromise(inputUrl) const [ { resolution, ratio }, fps, bitrate ] = await Promise.all([ - getVideoFileResolution(rtmpUrl, probe), - getVideoFileFPS(rtmpUrl, probe), - getVideoFileBitrate(rtmpUrl, probe) + getVideoFileResolution(inputUrl, probe), + getVideoFileFPS(inputUrl, probe), + getVideoFileBitrate(inputUrl, probe) ]) logger.info( '%s probing took %d ms (bitrate: %d, fps: %d, resolution: %d)', - rtmpUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid) + inputUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid) ) const allResolutions = this.buildAllResolutionsToTranscode(resolution) @@ -226,7 +249,7 @@ class LiveManager { sessionId, videoLive, streamingPlaylist, - rtmpUrl, + inputUrl, fps, bitrate, ratio, @@ -238,13 +261,13 @@ class LiveManager { sessionId: string videoLive: MVideoLiveVideo streamingPlaylist: MStreamingPlaylistVideo - rtmpUrl: string + inputUrl: string fps: number bitrate: number ratio: number allResolutions: number[] }) { - const { sessionId, videoLive, streamingPlaylist, allResolutions, fps, bitrate, ratio, rtmpUrl } = options + const { sessionId, videoLive, streamingPlaylist, allResolutions, fps, bitrate, ratio, inputUrl } = options const videoUUID = videoLive.Video.uuid const localLTags = lTags(sessionId, videoUUID) @@ -257,7 +280,7 @@ class LiveManager { sessionId, videoLive, streamingPlaylist, - rtmpUrl, + inputUrl, bitrate, ratio, fps, @@ -320,6 +343,7 @@ class LiveManager { logger.info('Will publish and federate live %s.', video.url, localLTags) video.state = VideoState.PUBLISHED + video.publishedAt = new Date() await video.save() live.Video = video @@ -336,7 +360,6 @@ class LiveManager { } private onMuxingFFmpegEnd (videoId: number) { - this.watchersPerVideo.delete(videoId) this.videoSessions.delete(videoId) } @@ -370,34 +393,6 @@ class LiveManager { } } - private async updateLiveViews () { - if (!this.isRunning()) return - - if (!isTestInstance()) logger.info('Updating live video views.', lTags()) - - for (const videoId of this.watchersPerVideo.keys()) { - const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE - - const watchers = this.watchersPerVideo.get(videoId) - - const numWatchers = watchers.length - - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) - video.views = numWatchers - await video.save() - - await federateVideoIfNeeded(video, false) - - PeerTubeSocket.Instance.sendVideoViewsUpdate(video) - - // Only keep not expired watchers - const newWatchers = watchers.filter(w => w > notBefore) - this.watchersPerVideo.set(videoId, newWatchers) - - logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags()) - } - } - private async handleBrokenLives () { const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() @@ -408,7 +403,7 @@ class LiveManager { private buildAllResolutionsToTranscode (originResolution: number) { const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED - ? computeResolutionsToTranscode(originResolution, 'live') + ? computeLowerResolutionsToTranscode(originResolution, 'live') : [] return resolutionsEnabled.concat([ originResolution ])