From 26e3e98ff0e222a9fb9226938ac6902af77921bd Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 3 May 2022 11:38:07 +0200 Subject: Support live session in server --- server/lib/job-queue/handlers/video-live-ending.ts | 87 +++++++++++++++------- server/lib/live/live-manager.ts | 64 +++++++++++++--- server/lib/live/shared/muxing-session.ts | 10 ++- server/lib/video-blacklist.ts | 4 +- server/lib/video-state.ts | 4 +- 5 files changed, 123 insertions(+), 46 deletions(-) (limited to 'server/lib') diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 1e290338c..55fd09344 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -15,13 +15,14 @@ import { generateVideoMiniature } from '@server/lib/thumbnail' import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' import { moveToNextState } from '@server/lib/video-state' import { VideoModel } from '@server/models/video/video' +import { VideoBlacklistModel } from '@server/models/video/video-blacklist' import { VideoFileModel } from '@server/models/video/video-file' import { VideoLiveModel } from '@server/models/video/video-live' +import { VideoLiveSessionModel } from '@server/models/video/video-live-session' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' -import { MVideo, MVideoLive, MVideoWithAllFiles } from '@server/types/models' +import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' import { logger } from '../../../helpers/logger' -import { VideoBlacklistModel } from '@server/models/video/video-blacklist' async function processVideoLiveEnding (job: Job) { const payload = job.data as VideoLiveEndingPayload @@ -32,27 +33,28 @@ async function processVideoLiveEnding (job: Job) { logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId) } - const video = await VideoModel.load(payload.videoId) + const liveVideo = await VideoModel.load(payload.videoId) const live = await VideoLiveModel.loadByVideoId(payload.videoId) + const liveSession = await VideoLiveSessionModel.load(payload.liveSessionId) - if (!video || !live) { + if (!liveVideo || !live || !liveSession) { logError() return } - LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) + LiveSegmentShaStore.Instance.cleanupShaSegments(liveVideo.uuid) if (live.saveReplay !== true) { - return cleanupLiveAndFederate(video) + return cleanupLiveAndFederate({ liveVideo }) } if (live.permanentLive) { - await saveReplayToExternalVideo(video, payload.publishedAt, payload.replayDirectory) + await saveReplayToExternalVideo({ liveVideo, liveSession, publishedAt: payload.publishedAt, replayDirectory: payload.replayDirectory }) - return cleanupLiveAndFederate(video) + return cleanupLiveAndFederate({ liveVideo }) } - return replaceLiveByReplay(video, live, payload.replayDirectory) + return replaceLiveByReplay({ liveVideo, live, liveSession, replayDirectory: payload.replayDirectory }) } // --------------------------------------------------------------------------- @@ -63,7 +65,14 @@ export { // --------------------------------------------------------------------------- -async function saveReplayToExternalVideo (liveVideo: MVideo, publishedAt: string, replayDirectory: string) { +async function saveReplayToExternalVideo (options: { + liveVideo: MVideo + liveSession: MVideoLiveSession + publishedAt: string + replayDirectory: string +}) { + const { liveVideo, liveSession, publishedAt, replayDirectory } = options + await cleanupTMPLiveFiles(getLiveDirectory(liveVideo)) const video = new VideoModel({ @@ -78,7 +87,7 @@ async function saveReplayToExternalVideo (liveVideo: MVideo, publishedAt: string language: liveVideo.language, commentsEnabled: liveVideo.commentsEnabled, downloadEnabled: liveVideo.downloadEnabled, - waitTranscoding: liveVideo.waitTranscoding, + waitTranscoding: true, nsfw: liveVideo.nsfw, description: liveVideo.description, support: liveVideo.support, @@ -94,6 +103,9 @@ async function saveReplayToExternalVideo (liveVideo: MVideo, publishedAt: string await video.save() + liveSession.replayVideoId = video.id + await liveSession.save() + // If live is blacklisted, also blacklist the replay const blacklist = await VideoBlacklistModel.loadByVideoId(liveVideo.id) if (blacklist) { @@ -105,7 +117,7 @@ async function saveReplayToExternalVideo (liveVideo: MVideo, publishedAt: string }) } - await assignReplaysToVideo(video, replayDirectory) + await assignReplayFilesToVideo({ video, replayDirectory }) await remove(replayDirectory) @@ -117,18 +129,29 @@ async function saveReplayToExternalVideo (liveVideo: MVideo, publishedAt: string await moveToNextState({ video, isNewVideo: true }) } -async function replaceLiveByReplay (video: MVideo, live: MVideoLive, replayDirectory: string) { - await cleanupTMPLiveFiles(getLiveDirectory(video)) +async function replaceLiveByReplay (options: { + liveVideo: MVideo + liveSession: MVideoLiveSession + live: MVideoLive + replayDirectory: string +}) { + const { liveVideo, liveSession, live, replayDirectory } = options + + await cleanupTMPLiveFiles(getLiveDirectory(liveVideo)) await live.destroy() - video.isLive = false - video.state = VideoState.TO_TRANSCODE + liveVideo.isLive = false + liveVideo.waitTranscoding = true + liveVideo.state = VideoState.TO_TRANSCODE - await video.save() + await liveVideo.save() + + liveSession.replayVideoId = liveVideo.id + await liveSession.save() // Remove old HLS playlist video files - const videoWithFiles = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.id) + const videoWithFiles = await VideoModel.loadAndPopulateAccountAndServerAndTags(liveVideo.id) const hlsPlaylist = videoWithFiles.getHLSPlaylist() await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id) @@ -139,7 +162,7 @@ async function replaceLiveByReplay (video: MVideo, live: MVideoLive, replayDirec hlsPlaylist.segmentsSha256Filename = generateHlsSha256SegmentsFilename() await hlsPlaylist.save() - await assignReplaysToVideo(videoWithFiles, replayDirectory) + await assignReplayFilesToVideo({ video: videoWithFiles, replayDirectory }) await remove(getLiveReplayBaseDirectory(videoWithFiles)) @@ -150,7 +173,7 @@ async function replaceLiveByReplay (video: MVideo, live: MVideoLive, replayDirec videoFile: videoWithFiles.getMaxQualityFile(), type: ThumbnailType.MINIATURE }) - await video.addAndSaveThumbnail(miniature) + await videoWithFiles.addAndSaveThumbnail(miniature) } if (videoWithFiles.getPreview().automaticallyGenerated === true) { @@ -159,13 +182,19 @@ async function replaceLiveByReplay (video: MVideo, live: MVideoLive, replayDirec videoFile: videoWithFiles.getMaxQualityFile(), type: ThumbnailType.PREVIEW }) - await video.addAndSaveThumbnail(preview) + await videoWithFiles.addAndSaveThumbnail(preview) } - await moveToNextState({ video: videoWithFiles, isNewVideo: false }) + // We consider this is a new video + await moveToNextState({ video: videoWithFiles, isNewVideo: true }) } -async function assignReplaysToVideo (video: MVideo, replayDirectory: string) { +async function assignReplayFilesToVideo (options: { + video: MVideo + replayDirectory: string +}) { + const { video, replayDirectory } = options + let durationDone = false const concatenatedTsFiles = await readdir(replayDirectory) @@ -197,11 +226,15 @@ async function assignReplaysToVideo (video: MVideo, replayDirectory: string) { return video } -async function cleanupLiveAndFederate (video: MVideo) { - const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) - await cleanupLive(video, streamingPlaylist) +async function cleanupLiveAndFederate (options: { + liveVideo: MVideo +}) { + const { liveVideo } = options + + const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(liveVideo.id) + await cleanupLive(liveVideo, streamingPlaylist) - const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.id) + const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(liveVideo.id) return federateVideoIfNeeded(fullVideo, false, undefined) } diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index da09aa05c..df2804a0e 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -17,10 +17,11 @@ import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/ import { UserModel } from '@server/models/user/user' import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' +import { VideoLiveSessionModel } from '@server/models/video/video-live-session' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' -import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' +import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models' import { wait } from '@shared/core-utils' -import { VideoState, VideoStreamingPlaylistType } from '@shared/models' +import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models' import { federateVideoIfNeeded } from '../activitypub/videos' import { JobQueue } from '../job-queue' import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' @@ -174,10 +175,13 @@ class LiveManager { return !!this.rtmpServer } - stopSessionOf (videoId: number) { + stopSessionOf (videoId: number, error: LiveVideoError | null) { const sessionId = this.videoSessions.get(videoId) if (!sessionId) return + this.saveEndingSession(videoId, error) + .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) + this.videoSessions.delete(videoId) this.abortSession(sessionId) } @@ -274,6 +278,8 @@ class LiveManager { const videoUUID = videoLive.Video.uuid const localLTags = lTags(sessionId, videoUUID) + const liveSession = await this.saveStartingSession(videoLive) + const user = await UserModel.loadByLiveId(videoLive.id) LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id) @@ -299,24 +305,27 @@ class LiveManager { localLTags ) - this.stopSessionOf(videoId) + this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH) }) muxingSession.on('duration-exceeded', ({ videoId }) => { logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId) + this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED) }) muxingSession.on('quota-exceeded', ({ videoId }) => { logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId) + this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED) + }) + + muxingSession.on('ffmpeg-error', ({ videoId }) => { + this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR) }) - muxingSession.on('ffmpeg-error', ({ sessionId }) => this.abortSession(sessionId)) muxingSession.on('ffmpeg-end', ({ videoId }) => { - this.onMuxingFFmpegEnd(videoId) + this.onMuxingFFmpegEnd(videoId, sessionId) }) muxingSession.on('after-cleanup', ({ videoId }) => { @@ -324,7 +333,7 @@ class LiveManager { muxingSession.destroy() - return this.onAfterMuxingCleanup({ videoId }) + return this.onAfterMuxingCleanup({ videoId, liveSession }) .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) }) @@ -365,15 +374,19 @@ class LiveManager { } } - private onMuxingFFmpegEnd (videoId: number) { + private onMuxingFFmpegEnd (videoId: number, sessionId: string) { this.videoSessions.delete(videoId) + + this.saveEndingSession(videoId, null) + .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) } private async onAfterMuxingCleanup (options: { videoId: number | string + liveSession?: MVideoLiveSession cleanupNow?: boolean // Default false }) { - const { videoId, cleanupNow = false } = options + const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options try { const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) @@ -381,13 +394,25 @@ class LiveManager { const live = await VideoLiveModel.loadByVideoId(fullVideo.id) + const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findCurrentSessionOf(fullVideo.id) + + // On server restart during a live + if (!liveSession.endDate) { + liveSession.endDate = new Date() + await liveSession.save() + } + JobQueue.Instance.createJob({ type: 'video-live-ending', payload: { videoId: fullVideo.id, + replayDirectory: live.saveReplay ? await this.findReplayDirectory(fullVideo) : undefined, + + liveSessionId: liveSession.id, + publishedAt: fullVideo.publishedAt.toISOString() } }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) @@ -445,6 +470,23 @@ class LiveManager { return playlist.save() } + private saveStartingSession (videoLive: MVideoLiveVideo) { + const liveSession = new VideoLiveSessionModel({ + startDate: new Date(), + liveVideoId: videoLive.videoId + }) + + return liveSession.save() + } + + private async saveEndingSession (videoId: number, error: LiveVideoError | null) { + const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId) + liveSession.endDate = new Date() + liveSession.error = error + + return liveSession.save() + } + static get Instance () { return this.instance || (this.instance = new this()) } diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 588ee8749..1ee9b430f 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts @@ -28,7 +28,7 @@ interface MuxingSessionEvents { 'quota-exceeded': ({ videoId: number }) => void 'ffmpeg-end': ({ videoId: number }) => void - 'ffmpeg-error': ({ sessionId: string }) => void + 'ffmpeg-error': ({ videoId: string }) => void 'after-cleanup': ({ videoId: number }) => void } @@ -164,7 +164,11 @@ class MuxingSession extends EventEmitter { this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand }) }) - this.ffmpegCommand.on('end', () => this.onFFmpegEnded(this.outDirectory)) + this.ffmpegCommand.on('end', () => { + this.emit('ffmpeg-end', ({ videoId: this.videoId })) + + this.onFFmpegEnded(this.outDirectory) + }) this.ffmpegCommand.run() } @@ -197,7 +201,7 @@ class MuxingSession extends EventEmitter { logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) - this.emit('ffmpeg-error', ({ sessionId: this.sessionId })) + this.emit('ffmpeg-error', ({ videoId: this.videoId })) } private onFFmpegEnded (outPath: string) { diff --git a/server/lib/video-blacklist.ts b/server/lib/video-blacklist.ts index 91f44cb11..fd5837a3a 100644 --- a/server/lib/video-blacklist.ts +++ b/server/lib/video-blacklist.ts @@ -9,7 +9,7 @@ import { MVideoFullLight, MVideoWithBlacklistLight } from '@server/types/models' -import { UserRight, VideoBlacklistCreate, VideoBlacklistType } from '../../shared/models' +import { LiveVideoError, UserRight, VideoBlacklistCreate, VideoBlacklistType } from '../../shared/models' import { UserAdminFlag } from '../../shared/models/users/user-flag.model' import { logger, loggerTagsFactory } from '../helpers/logger' import { CONFIG } from '../initializers/config' @@ -81,7 +81,7 @@ async function blacklistVideo (videoInstance: MVideoAccountLight, options: Video } if (videoInstance.isLive) { - LiveManager.Instance.stopSessionOf(videoInstance.id) + LiveManager.Instance.stopSessionOf(videoInstance.id, LiveVideoError.BLACKLISTED) } Notifier.Instance.notifyOnVideoBlacklist(blacklist) diff --git a/server/lib/video-state.ts b/server/lib/video-state.ts index 7b207eb87..ae2725d65 100644 --- a/server/lib/video-state.ts +++ b/server/lib/video-state.ts @@ -126,12 +126,10 @@ async function moveToPublishedState (options: { const { video, isNewVideo, transaction, previousVideoState } = options const previousState = previousVideoState ?? video.state - logger.info('Publishing video %s.', video.uuid, { previousState, tags: [ video.uuid ] }) + logger.info('Publishing video %s.', video.uuid, { isNewVideo, previousState, tags: [ video.uuid ] }) await video.setNewState(VideoState.PUBLISHED, isNewVideo, transaction) - // If the video was not published, we consider it is a new one for other instances - // Live videos are always federated, so it's not a new video await federateVideoIfNeeded(video, isNewVideo, transaction) if (previousState === VideoState.TO_EDIT) { -- cgit v1.2.3