X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Flive%2Flive-manager.ts;h=d499b4b1af3e245d593fc4af21cb3fa5ab7aa928;hb=b6e2b5df73d3b67e275000f612907859c39d90d1;hp=1b7b9dd4d6f3cc20287030818d7085b7b886e600;hpb=5037e0e474044d7fc04092158784395a001e5c25;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 1b7b9dd4d..d499b4b1a 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -1,31 +1,33 @@ -import { readFile } from 'fs-extra' +import { readdir, readFile } from 'fs-extra' import { createServer, Server } from 'net' +import { join } from 'path' import { createServer as createServerTLS, Server as ServerTLS } from 'tls' -import { isTestInstance } from '@server/helpers/core-utils' import { - computeResolutionsToTranscode, + computeLowerResolutionsToTranscode, ffprobePromise, - getVideoFileBitrate, - getVideoFileFPS, - getVideoFileResolution -} from '@server/helpers/ffprobe-utils' + getLiveSegmentTime, + getVideoStreamBitrate, + getVideoStreamDimensionsInfo, + getVideoStreamFPS +} from '@server/helpers/ffmpeg' import { logger, loggerTagsFactory } from '@server/helpers/logger' import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' -import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME } from '@server/initializers/constants' +import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' import { UserModel } from '@server/models/user/user' import { VideoModel } from '@server/models/video/video' import { VideoLiveModel } from '@server/models/video/video-live' +import { VideoLiveSessionModel } from '@server/models/video/video-live-session' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' -import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' -import { VideoState, VideoStreamingPlaylistType } from '@shared/models' +import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models' +import { wait } from '@shared/core-utils' +import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models' import { federateVideoIfNeeded } from '../activitypub/videos' import { JobQueue } from '../job-queue' -import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../paths' +import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' import { PeerTubeSocket } from '../peertube-socket' import { LiveQuotaStore } from './live-quota-store' -import { LiveSegmentShaStore } from './live-segment-sha-store' -import { cleanupLive } from './live-utils' +import { cleanupPermanentLive } from './live-utils' import { MuxingSession } from './shared' const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') @@ -53,8 +55,6 @@ class LiveManager { private readonly muxingSessions = new Map() private readonly videoSessions = new Map() - // Values are Date().getTime() - private readonly watchersPerVideo = new Map() private rtmpServer: Server private rtmpsServer: ServerTLS @@ -99,8 +99,6 @@ class LiveManager { // Cleanup broken lives, that were terminated by a server restart for example this.handleBrokenLives() .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() })) - - setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) } async run () { @@ -120,7 +118,7 @@ class LiveManager { logger.error('Cannot run RTMP server.', { err, ...lTags() }) }) - this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) + this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT, CONFIG.LIVE.RTMP.HOSTNAME) } if (CONFIG.LIVE.RTMPS.ENABLED) { @@ -143,7 +141,7 @@ class LiveManager { logger.error('Cannot run RTMPS server.', { err, ...lTags() }) }) - this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT) + this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT, CONFIG.LIVE.RTMPS.HOSTNAME) } } @@ -176,27 +174,17 @@ class LiveManager { return !!this.rtmpServer } - stopSessionOf (videoId: number) { + stopSessionOf (videoId: number, error: LiveVideoError | null) { const sessionId = this.videoSessions.get(videoId) if (!sessionId) return + this.saveEndingSession(videoId, error) + .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) + this.videoSessions.delete(videoId) this.abortSession(sessionId) } - addViewTo (videoId: number) { - if (this.videoSessions.has(videoId) === false) return - - let watchers = this.watchersPerVideo.get(videoId) - - if (!watchers) { - watchers = [] - this.watchersPerVideo.set(videoId, watchers) - } - - watchers.push(new Date().getTime()) - } - private getContext () { return context } @@ -230,12 +218,12 @@ class LiveManager { return this.abortSession(sessionId) } - // Cleanup old potential live files (could happen with a permanent live) - LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid) - + // Cleanup old potential live (could happen with a permanent live) const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) if (oldStreamingPlaylist) { - await cleanupLive(video, oldStreamingPlaylist) + if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid) + + await cleanupPermanentLive(video, oldStreamingPlaylist) } this.videoSessions.set(video.id, sessionId) @@ -244,9 +232,9 @@ class LiveManager { const probe = await ffprobePromise(inputUrl) const [ { resolution, ratio }, fps, bitrate ] = await Promise.all([ - getVideoFileResolution(inputUrl, probe), - getVideoFileFPS(inputUrl, probe), - getVideoFileBitrate(inputUrl, probe) + getVideoStreamDimensionsInfo(inputUrl, probe), + getVideoStreamFPS(inputUrl, probe), + getVideoStreamBitrate(inputUrl, probe) ]) logger.info( @@ -289,6 +277,8 @@ class LiveManager { const videoUUID = videoLive.Video.uuid const localLTags = lTags(sessionId, videoUUID) + const liveSession = await this.saveStartingSession(videoLive) + const user = await UserModel.loadByLiveId(videoLive.id) LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id) @@ -314,32 +304,37 @@ class LiveManager { localLTags ) - this.stopSessionOf(videoId) + this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH) }) muxingSession.on('duration-exceeded', ({ videoId }) => { logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId) + this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED) }) muxingSession.on('quota-exceeded', ({ videoId }) => { logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) - this.stopSessionOf(videoId) + this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED) + }) + + muxingSession.on('ffmpeg-error', ({ videoId }) => { + this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR) }) - muxingSession.on('ffmpeg-error', ({ sessionId }) => this.abortSession(sessionId)) muxingSession.on('ffmpeg-end', ({ videoId }) => { - this.onMuxingFFmpegEnd(videoId) + this.onMuxingFFmpegEnd(videoId, sessionId) }) muxingSession.on('after-cleanup', ({ videoId }) => { this.muxingSessions.delete(sessionId) + LiveQuotaStore.Instance.removeLive(user.id, videoLive.id) + muxingSession.destroy() - return this.onAfterMuxingCleanup(videoId) + return this.onAfterMuxingCleanup({ videoId, liveSession }) .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) }) @@ -361,81 +356,80 @@ class LiveManager { logger.info('Will publish and federate live %s.', video.url, localLTags) video.state = VideoState.PUBLISHED + video.publishedAt = new Date() await video.save() live.Video = video - setTimeout(() => { - federateVideoIfNeeded(video, false) - .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })) + await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) + + try { + await federateVideoIfNeeded(video, false) + } catch (err) { + logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags }) + } - PeerTubeSocket.Instance.sendVideoLiveNewState(video) - }, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) + PeerTubeSocket.Instance.sendVideoLiveNewState(video) } catch (err) { logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags }) } } - private onMuxingFFmpegEnd (videoId: number) { - this.watchersPerVideo.delete(videoId) + private onMuxingFFmpegEnd (videoId: number, sessionId: string) { this.videoSessions.delete(videoId) + + this.saveEndingSession(videoId, null) + .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) } - private async onAfterMuxingCleanup (videoUUID: string, cleanupNow = false) { + private async onAfterMuxingCleanup (options: { + videoId: number | string + liveSession?: MVideoLiveSession + cleanupNow?: boolean // Default false + }) { + const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options + try { - const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID) + const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) if (!fullVideo) return const live = await VideoLiveModel.loadByVideoId(fullVideo.id) - if (!live.permanentLive) { - JobQueue.Instance.createJob({ - type: 'video-live-ending', - payload: { - videoId: fullVideo.id - } - }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) - - fullVideo.state = VideoState.LIVE_ENDED - } else { - fullVideo.state = VideoState.WAITING_FOR_LIVE - } - - await fullVideo.save() - - PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) - - await federateVideoIfNeeded(fullVideo, false) - } catch (err) { - logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) }) - } - } - - private async updateLiveViews () { - if (!this.isRunning()) return + const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findCurrentSessionOf(fullVideo.id) - if (!isTestInstance()) logger.info('Updating live video views.', lTags()) + // On server restart during a live + if (!liveSession.endDate) { + liveSession.endDate = new Date() + await liveSession.save() + } - for (const videoId of this.watchersPerVideo.keys()) { - const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE + JobQueue.Instance.createJob({ + type: 'video-live-ending', + payload: { + videoId: fullVideo.id, - const watchers = this.watchersPerVideo.get(videoId) + replayDirectory: live.saveReplay + ? await this.findReplayDirectory(fullVideo) + : undefined, - const numWatchers = watchers.length + liveSessionId: liveSession.id, + streamingPlaylistId: fullVideo.getHLSPlaylist()?.id, - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) - video.views = numWatchers - await video.save() + publishedAt: fullVideo.publishedAt.toISOString() + } + }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) - await federateVideoIfNeeded(video, false) + fullVideo.state = live.permanentLive + ? VideoState.WAITING_FOR_LIVE + : VideoState.LIVE_ENDED - PeerTubeSocket.Instance.sendVideoViewsUpdate(video) + await fullVideo.save() - // Only keep not expired watchers - const newWatchers = watchers.filter(w => w > notBefore) - this.watchersPerVideo.set(videoId, newWatchers) + PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo) - logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags()) + await federateVideoIfNeeded(fullVideo, false) + } catch (err) { + logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') }) } } @@ -443,13 +437,22 @@ class LiveManager { const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() for (const uuid of videoUUIDs) { - await this.onAfterMuxingCleanup(uuid, true) + await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true }) } } + private async findReplayDirectory (video: MVideo) { + const directory = getLiveReplayBaseDirectory(video) + const files = await readdir(directory) + + if (files.length === 0) return undefined + + return join(directory, files.sort().reverse()[0]) + } + private buildAllResolutionsToTranscode (originResolution: number) { const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED - ? computeResolutionsToTranscode(originResolution, 'live') + ? computeLowerResolutionsToTranscode(originResolution, 'live') : [] return resolutionsEnabled.concat([ originResolution ]) @@ -469,6 +472,23 @@ class LiveManager { return playlist.save() } + private saveStartingSession (videoLive: MVideoLiveVideo) { + const liveSession = new VideoLiveSessionModel({ + startDate: new Date(), + liveVideoId: videoLive.videoId + }) + + return liveSession.save() + } + + private async saveEndingSession (videoId: number, error: LiveVideoError | null) { + const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId) + liveSession.endDate = new Date() + liveSession.error = error + + return liveSession.save() + } + static get Instance () { return this.instance || (this.instance = new this()) }