From bb4ba6d94c5051fdd665ebe63fffcc105778b8be Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 3 Dec 2020 14:10:54 +0100 Subject: Add permanent live support --- server/lib/activitypub/videos.ts | 2 ++ server/lib/job-queue/handlers/video-live-ending.ts | 27 +++++++++------ server/lib/live-manager.ts | 40 ++++++++++++++++++---- 3 files changed, 51 insertions(+), 18 deletions(-) (limited to 'server/lib') diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index 4053f487c..04f0bfc23 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts @@ -429,6 +429,7 @@ async function updateVideoFromAP (options: { if (video.isLive) { const [ videoLive ] = await VideoLiveModel.upsert({ saveReplay: videoObject.liveSaveReplay, + permanentLive: videoObject.permanentLive, videoId: video.id }, { transaction: t, returning: true }) @@ -631,6 +632,7 @@ async function createVideo (videoObject: VideoObject, channel: MChannelAccountLi const videoLive = new VideoLiveModel({ streamKey: null, saveReplay: videoObject.liveSaveReplay, + permanentLive: videoObject.permanentLive, videoId: videoCreated.id }) diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index d3c84ce75..e3c11caa2 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -1,5 +1,5 @@ import * as Bull from 'bull' -import { copy, readdir, remove } from 'fs-extra' +import { copy, pathExists, readdir, remove } from 'fs-extra' import { join } from 'path' import { getDurationFromVideoFile, getVideoFileResolution } from '@server/helpers/ffprobe-utils' import { VIDEO_LIVE } from '@server/initializers/constants' @@ -14,6 +14,7 @@ import { VideoStreamingPlaylistModel } from '@server/models/video/video-streamin import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models' import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' import { logger } from '../../../helpers/logger' +import { LiveManager } from '@server/lib/live-manager' async function processVideoLiveEnding (job: Bull.Job) { const payload = job.data as VideoLiveEndingPayload @@ -36,6 +37,8 @@ async function processVideoLiveEnding (job: Bull.Job) { return } + LiveManager.Instance.cleanupShaSegments(video.uuid) + if (live.saveReplay !== true) { return cleanupLive(video, streamingPlaylist) } @@ -43,10 +46,19 @@ async function processVideoLiveEnding (job: Bull.Job) { return saveLive(video, live) } +async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { + const hlsDirectory = getHLSDirectory(video) + + await remove(hlsDirectory) + + await streamingPlaylist.destroy() +} + // --------------------------------------------------------------------------- export { - processVideoLiveEnding + processVideoLiveEnding, + cleanupLive } // --------------------------------------------------------------------------- @@ -131,16 +143,9 @@ async function saveLive (video: MVideo, live: MVideoLive) { await publishAndFederateIfNeeded(videoWithFiles, true) } -async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { - const hlsDirectory = getHLSDirectory(video) - - await remove(hlsDirectory) - - streamingPlaylist.destroy() - .catch(err => logger.error('Cannot remove live streaming playlist.', { err })) -} - async function cleanupLiveFiles (hlsDirectory: string) { + if (!await pathExists(hlsDirectory)) return + const files = await readdir(hlsDirectory) for (const filename of files) { diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts index 4f45ce530..dcf016169 100644 --- a/server/lib/live-manager.ts +++ b/server/lib/live-manager.ts @@ -19,6 +19,7 @@ import { VideoState, VideoStreamingPlaylistType } from '@shared/models' import { federateVideoIfNeeded } from './activitypub/videos' import { buildSha256Segment } from './hls' import { JobQueue } from './job-queue' +import { cleanupLive } from './job-queue/handlers/video-live-ending' import { PeerTubeSocket } from './peertube-socket' import { isAbleToUploadVideo } from './user' import { getHLSDirectory } from './video-paths' @@ -153,6 +154,10 @@ class LiveManager { watchers.push(new Date().getTime()) } + cleanupShaSegments (videoUUID: string) { + this.segmentsSha256.delete(videoUUID) + } + private getContext () { return context } @@ -184,6 +189,14 @@ class LiveManager { return this.abortSession(sessionId) } + // Cleanup old potential live files (could happen with a permanent live) + this.cleanupShaSegments(video.uuid) + + const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) + if (oldStreamingPlaylist) { + await cleanupLive(video, oldStreamingPlaylist) + } + this.videoSessions.set(video.id, sessionId) const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid) @@ -372,7 +385,13 @@ class LiveManager { logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl) this.transSessions.delete(sessionId) + this.watchersPerVideo.delete(videoLive.videoId) + this.videoSessions.delete(videoLive.videoId) + + const newLivesPerUser = this.livesPerUser.get(user.id) + .filter(o => o.liveId !== videoLive.id) + this.livesPerUser.set(user.id, newLivesPerUser) setTimeout(() => { // Wait latest segments generation, and close watchers @@ -412,14 +431,21 @@ class LiveManager { const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) if (!fullVideo) return - JobQueue.Instance.createJob({ - type: 'video-live-ending', - payload: { - videoId: fullVideo.id - } - }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) + const live = await VideoLiveModel.loadByVideoId(videoId) + + if (!live.permanentLive) { + JobQueue.Instance.createJob({ + type: 'video-live-ending', + payload: { + videoId: fullVideo.id + } + }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) + + fullVideo.state = VideoState.LIVE_ENDED + } else { + fullVideo.state = VideoState.WAITING_FOR_LIVE + } - fullVideo.state = VideoState.LIVE_ENDED await fullVideo.save() PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) -- cgit v1.2.3