X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Flive%2Flive-manager.ts;h=5e459f3c39733a8ca21541c6d98d4093cf4fa036;hb=77239b425a8e00822a53c9907415832a473c3eb6;hp=5ffe41ee33d7b17d82e487bc796b0742f13d8223;hpb=4ec52d04dcc5d664612331f8e08d7d90da990415;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 5ffe41ee3..5e459f3c3 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -1,15 +1,15 @@ - import { readdir, readFile } from 'fs-extra' import { createServer, Server } from 'net' import { join } from 'path' import { createServer as createServerTLS, Server as ServerTLS } from 'tls' import { - computeLowerResolutionsToTranscode, + computeResolutionsToTranscode, ffprobePromise, getLiveSegmentTime, getVideoStreamBitrate, getVideoStreamDimensionsInfo, - getVideoStreamFPS + getVideoStreamFPS, + hasAudioStream } from '@server/helpers/ffmpeg' import { logger, loggerTagsFactory } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' @@ -17,17 +17,18 @@ import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/ import { UserModel } from '@server/models/user/user' import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' +import { VideoLiveSessionModel } from '@server/models/video/video-live-session' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' -import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' -import { wait } from '@shared/core-utils' -import { VideoState, VideoStreamingPlaylistType } from '@shared/models' +import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models' +import { pick, wait } from '@shared/core-utils' +import { LiveVideoError, VideoState, VideoStorage, VideoStreamingPlaylistType } from '@shared/models' import { federateVideoIfNeeded } from '../activitypub/videos' import { JobQueue } from '../job-queue' import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' import { PeerTubeSocket } from '../peertube-socket' +import { Hooks } from '../plugins/hooks' import { LiveQuotaStore } from './live-quota-store' -import { LiveSegmentShaStore } from './live-segment-sha-store' -import { cleanupLive } from './live-utils' +import { cleanupAndDestroyPermanentLive } from './live-utils' import { MuxingSession } from './shared' const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') @@ -118,7 +119,7 @@ class LiveManager { logger.error('Cannot run RTMP server.', { err, ...lTags() }) }) - this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) + this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT, CONFIG.LIVE.RTMP.HOSTNAME) } if (CONFIG.LIVE.RTMPS.ENABLED) { @@ -141,7 +142,7 @@ class LiveManager { logger.error('Cannot run RTMPS server.', { err, ...lTags() }) }) - this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT) + this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT, CONFIG.LIVE.RTMPS.HOSTNAME) } } @@ -174,10 +175,13 @@ class LiveManager { return !!this.rtmpServer } - stopSessionOf (videoId: number) { + stopSessionOf (videoId: number, error: LiveVideoError | null) { const sessionId = this.videoSessions.get(videoId) if (!sessionId) return + this.saveEndingSession(videoId, error) + .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) + this.videoSessions.delete(videoId) this.abortSession(sessionId) } @@ -215,12 +219,12 @@ class LiveManager { return this.abortSession(sessionId) } - // Cleanup old potential live files (could happen with a permanent live) - LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) - + // Cleanup old potential live (could happen with a permanent live) const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) if (oldStreamingPlaylist) { - await cleanupLive(video, oldStreamingPlaylist) + if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) + + await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) } this.videoSessions.set(video.id, sessionId) @@ -228,10 +232,11 @@ class LiveManager { const now = Date.now() const probe = await ffprobePromise(inputUrl) - const [ { resolution, ratio }, fps, bitrate ] = await Promise.all([ + const [ { resolution, ratio }, fps, bitrate, hasAudio ] = await Promise.all([ getVideoStreamDimensionsInfo(inputUrl, probe), getVideoStreamFPS(inputUrl, probe), - getVideoStreamBitrate(inputUrl, probe) + getVideoStreamBitrate(inputUrl, probe), + hasAudioStream(inputUrl, probe) ]) logger.info( @@ -239,7 +244,11 @@ class LiveManager { inputUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid) ) - const allResolutions = this.buildAllResolutionsToTranscode(resolution) + const allResolutions = await Hooks.wrapObject( + this.buildAllResolutionsToTranscode(resolution, hasAudio), + 'filter:transcoding.auto.resolutions-to-transcode.result', + { video } + ) logger.info( 'Will mux/transcode live video of original resolution %d.', resolution, @@ -251,46 +260,48 @@ class LiveManager { return this.runMuxingSession({ sessionId, videoLive, + streamingPlaylist, inputUrl, fps, bitrate, ratio, - allResolutions + allResolutions, + hasAudio }) } private async runMuxingSession (options: { sessionId: string videoLive: MVideoLiveVideo + streamingPlaylist: MStreamingPlaylistVideo inputUrl: string fps: number bitrate: number ratio: number allResolutions: number[] + hasAudio: boolean }) { - const { sessionId, videoLive, streamingPlaylist, allResolutions, fps, bitrate, ratio, inputUrl } = options + const { sessionId, videoLive } = options const videoUUID = videoLive.Video.uuid const localLTags = lTags(sessionId, videoUUID) + const liveSession = await this.saveStartingSession(videoLive) + const user = await UserModel.loadByLiveId(videoLive.id) LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id) const muxingSession = new MuxingSession({ context: this.getContext(), - user, sessionId, videoLive, - streamingPlaylist, - inputUrl, - bitrate, - ratio, - fps, - allResolutions + user, + + ...pick(options, [ 'streamingPlaylist', 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ]) }) - muxingSession.on('master-playlist-created', () => this.publishAndFederateLive(videoLive, localLTags)) + muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) muxingSession.on('bad-socket-health', ({ videoId }) => { logger.error( @@ -299,32 +310,37 @@ class LiveManager { localLTags ) - this.stopSessionOf(videoId) + this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH) }) muxingSession.on('duration-exceeded', ({ videoId }) => { logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId) + this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED) }) muxingSession.on('quota-exceeded', ({ videoId }) => { logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId) + this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED) + }) + + muxingSession.on('ffmpeg-error', ({ videoId }) => { + this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR) }) - muxingSession.on('ffmpeg-error', ({ sessionId }) => this.abortSession(sessionId)) muxingSession.on('ffmpeg-end', ({ videoId }) => { - this.onMuxingFFmpegEnd(videoId) + this.onMuxingFFmpegEnd(videoId, sessionId) }) muxingSession.on('after-cleanup', ({ videoId }) => { this.muxingSessions.delete(sessionId) + LiveQuotaStore.Instance.removeLive(user.id, videoLive.id) + muxingSession.destroy() - return this.onAfterMuxingCleanup({ videoId }) + return this.onAfterMuxingCleanup({ videoId, liveSession }) .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) }) @@ -341,7 +357,7 @@ class LiveManager { const videoId = live.videoId try { - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) + const video = await VideoModel.loadFull(videoId) logger.info('Will publish and federate live %s.', video.url, localLTags) @@ -365,32 +381,53 @@ class LiveManager { } } - private onMuxingFFmpegEnd (videoId: number) { + private onMuxingFFmpegEnd (videoId: number, sessionId: string) { this.videoSessions.delete(videoId) + + this.saveEndingSession(videoId, null) + .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) } private async onAfterMuxingCleanup (options: { videoId: number | string + liveSession?: MVideoLiveSession cleanupNow?: boolean // Default false }) { - const { videoId, cleanupNow = false } = options + const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options try { - const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) + const fullVideo = await VideoModel.loadFull(videoId) if (!fullVideo) return const live = await VideoLiveModel.loadByVideoId(fullVideo.id) - JobQueue.Instance.createJob({ + const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findLatestSessionOf(fullVideo.id) + + // On server restart during a live + if (!liveSession.endDate) { + liveSession.endDate = new Date() + await liveSession.save() + } + + JobQueue.Instance.createJobAsync({ type: 'video-live-ending', payload: { videoId: fullVideo.id, + replayDirectory: live.saveReplay ? await this.findReplayDirectory(fullVideo) : undefined, + + liveSessionId: liveSession.id, + streamingPlaylistId: fullVideo.getHLSPlaylist()?.id, + publishedAt: fullVideo.publishedAt.toISOString() - } - }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) + }, + + delay: cleanupNow + ? 0 + : VIDEO_LIVE.CLEANUP_DELAY + }) fullVideo.state = live.permanentLive ? VideoState.WAITING_FOR_LIVE @@ -423,12 +460,18 @@ class LiveManager { return join(directory, files.sort().reverse()[0]) } - private buildAllResolutionsToTranscode (originResolution: number) { + private buildAllResolutionsToTranscode (originResolution: number, hasAudio: boolean) { + const includeInput = CONFIG.LIVE.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION + const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED - ? computeLowerResolutionsToTranscode(originResolution, 'live') + ? computeResolutionsToTranscode({ input: originResolution, type: 'live', includeInput, strictLower: false, hasAudio }) : [] - return resolutionsEnabled.concat([ originResolution ]) + if (resolutionsEnabled.length === 0) { + return [ originResolution ] + } + + return resolutionsEnabled } private async createLivePlaylist (video: MVideo, allResolutions: number[]): Promise { @@ -442,9 +485,34 @@ class LiveManager { playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions) + playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED + ? VideoStorage.OBJECT_STORAGE + : VideoStorage.FILE_SYSTEM + return playlist.save() } + private saveStartingSession (videoLive: MVideoLiveVideo) { + const liveSession = new VideoLiveSessionModel({ + startDate: new Date(), + liveVideoId: videoLive.videoId, + saveReplay: videoLive.saveReplay, + endingProcessed: false + }) + + return liveSession.save() + } + + private async saveEndingSession (videoId: number, error: LiveVideoError | null) { + const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId) + if (!liveSession) return + + liveSession.endDate = new Date() + liveSession.error = error + + return liveSession.save() + } + static get Instance () { return this.instance || (this.instance = new this()) }