From 26e3e98ff0e222a9fb9226938ac6902af77921bd Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 3 May 2022 11:38:07 +0200 Subject: Support live session in server --- server/lib/live/live-manager.ts | 64 ++++++++++++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 11 deletions(-) (limited to 'server/lib/live/live-manager.ts') diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index da09aa05c..df2804a0e 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -17,10 +17,11 @@ import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/ import { UserModel } from '@server/models/user/user' import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' +import { VideoLiveSessionModel } from '@server/models/video/video-live-session' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' -import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' +import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models' import { wait } from '@shared/core-utils' -import { VideoState, VideoStreamingPlaylistType } from '@shared/models' +import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models' import { federateVideoIfNeeded } from '../activitypub/videos' import { JobQueue } from '../job-queue' import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' @@ -174,10 +175,13 @@ class LiveManager { return !!this.rtmpServer } - stopSessionOf (videoId: number) { + stopSessionOf (videoId: number, error: LiveVideoError | null) { const sessionId = this.videoSessions.get(videoId) if (!sessionId) return + this.saveEndingSession(videoId, error) + .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) + this.videoSessions.delete(videoId) this.abortSession(sessionId) } @@ -274,6 +278,8 @@ class LiveManager { const videoUUID = videoLive.Video.uuid const localLTags = lTags(sessionId, videoUUID) + const liveSession = await this.saveStartingSession(videoLive) + const user = await UserModel.loadByLiveId(videoLive.id) LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id) @@ -299,24 +305,27 @@ class LiveManager { localLTags ) - this.stopSessionOf(videoId) + this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH) }) muxingSession.on('duration-exceeded', ({ videoId }) => { logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId) + this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED) }) muxingSession.on('quota-exceeded', ({ videoId }) => { logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId) + this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED) + }) + + muxingSession.on('ffmpeg-error', ({ videoId }) => { + this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR) }) - muxingSession.on('ffmpeg-error', ({ sessionId }) => this.abortSession(sessionId)) muxingSession.on('ffmpeg-end', ({ videoId }) => { - this.onMuxingFFmpegEnd(videoId) + this.onMuxingFFmpegEnd(videoId, sessionId) }) muxingSession.on('after-cleanup', ({ videoId }) => { @@ -324,7 +333,7 @@ class LiveManager { muxingSession.destroy() - return this.onAfterMuxingCleanup({ videoId }) + return this.onAfterMuxingCleanup({ videoId, liveSession }) .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) }) @@ -365,15 +374,19 @@ class LiveManager { } } - private onMuxingFFmpegEnd (videoId: number) { + private onMuxingFFmpegEnd (videoId: number, sessionId: string) { this.videoSessions.delete(videoId) + + this.saveEndingSession(videoId, null) + .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) } private async onAfterMuxingCleanup (options: { videoId: number | string + liveSession?: MVideoLiveSession cleanupNow?: boolean // Default false }) { - const { videoId, cleanupNow = false } = options + const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options try { const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) @@ -381,13 +394,25 @@ class LiveManager { const live = await VideoLiveModel.loadByVideoId(fullVideo.id) + const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findCurrentSessionOf(fullVideo.id) + + // On server restart during a live + if (!liveSession.endDate) { + liveSession.endDate = new Date() + await liveSession.save() + } + JobQueue.Instance.createJob({ type: 'video-live-ending', payload: { videoId: fullVideo.id, + replayDirectory: live.saveReplay ? await this.findReplayDirectory(fullVideo) : undefined, + + liveSessionId: liveSession.id, + publishedAt: fullVideo.publishedAt.toISOString() } }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) @@ -445,6 +470,23 @@ class LiveManager { return playlist.save() } + private saveStartingSession (videoLive: MVideoLiveVideo) { + const liveSession = new VideoLiveSessionModel({ + startDate: new Date(), + liveVideoId: videoLive.videoId + }) + + return liveSession.save() + } + + private async saveEndingSession (videoId: number, error: LiveVideoError | null) { + const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId) + liveSession.endDate = new Date() + liveSession.error = error + + return liveSession.save() + } + static get Instance () { return this.instance || (this.instance = new this()) } -- cgit v1.2.3