aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live/live-manager.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-05-03 11:38:07 +0200
committerChocobozzz <me@florianbigard.com>2022-05-03 14:49:15 +0200
commit26e3e98ff0e222a9fb9226938ac6902af77921bd (patch)
tree73d1c6f2524e380862d3365f12043fc319d40841 /server/lib/live/live-manager.ts
parent86c5229b4d726202378ef46854383bcafca22310 (diff)
downloadPeerTube-26e3e98ff0e222a9fb9226938ac6902af77921bd.tar.gz
PeerTube-26e3e98ff0e222a9fb9226938ac6902af77921bd.tar.zst
PeerTube-26e3e98ff0e222a9fb9226938ac6902af77921bd.zip
Support live session in server
Diffstat (limited to 'server/lib/live/live-manager.ts')
-rw-r--r--server/lib/live/live-manager.ts64
1 files changed, 53 insertions, 11 deletions
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index da09aa05c..df2804a0e 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -17,10 +17,11 @@ import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/
17import { UserModel } from '@server/models/user/user' 17import { UserModel } from '@server/models/user/user'
18import { VideoModel } from '@server/models/video/video' 18import { VideoModel } from '@server/models/video/video'
19import { VideoLiveModel } from '@server/models/video/video-live' 19import { VideoLiveModel } from '@server/models/video/video-live'
20import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
20import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 21import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
21import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' 22import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models'
22import { wait } from '@shared/core-utils' 23import { wait } from '@shared/core-utils'
23import { VideoState, VideoStreamingPlaylistType } from '@shared/models' 24import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models'
24import { federateVideoIfNeeded } from '../activitypub/videos' 25import { federateVideoIfNeeded } from '../activitypub/videos'
25import { JobQueue } from '../job-queue' 26import { JobQueue } from '../job-queue'
26import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' 27import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths'
@@ -174,10 +175,13 @@ class LiveManager {
174 return !!this.rtmpServer 175 return !!this.rtmpServer
175 } 176 }
176 177
177 stopSessionOf (videoId: number) { 178 stopSessionOf (videoId: number, error: LiveVideoError | null) {
178 const sessionId = this.videoSessions.get(videoId) 179 const sessionId = this.videoSessions.get(videoId)
179 if (!sessionId) return 180 if (!sessionId) return
180 181
182 this.saveEndingSession(videoId, error)
183 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
184
181 this.videoSessions.delete(videoId) 185 this.videoSessions.delete(videoId)
182 this.abortSession(sessionId) 186 this.abortSession(sessionId)
183 } 187 }
@@ -274,6 +278,8 @@ class LiveManager {
274 const videoUUID = videoLive.Video.uuid 278 const videoUUID = videoLive.Video.uuid
275 const localLTags = lTags(sessionId, videoUUID) 279 const localLTags = lTags(sessionId, videoUUID)
276 280
281 const liveSession = await this.saveStartingSession(videoLive)
282
277 const user = await UserModel.loadByLiveId(videoLive.id) 283 const user = await UserModel.loadByLiveId(videoLive.id)
278 LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id) 284 LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id)
279 285
@@ -299,24 +305,27 @@ class LiveManager {
299 localLTags 305 localLTags
300 ) 306 )
301 307
302 this.stopSessionOf(videoId) 308 this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH)
303 }) 309 })
304 310
305 muxingSession.on('duration-exceeded', ({ videoId }) => { 311 muxingSession.on('duration-exceeded', ({ videoId }) => {
306 logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) 312 logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags)
307 313
308 this.stopSessionOf(videoId) 314 this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED)
309 }) 315 })
310 316
311 muxingSession.on('quota-exceeded', ({ videoId }) => { 317 muxingSession.on('quota-exceeded', ({ videoId }) => {
312 logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) 318 logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags)
313 319
314 this.stopSessionOf(videoId) 320 this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED)
321 })
322
323 muxingSession.on('ffmpeg-error', ({ videoId }) => {
324 this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR)
315 }) 325 })
316 326
317 muxingSession.on('ffmpeg-error', ({ sessionId }) => this.abortSession(sessionId))
318 muxingSession.on('ffmpeg-end', ({ videoId }) => { 327 muxingSession.on('ffmpeg-end', ({ videoId }) => {
319 this.onMuxingFFmpegEnd(videoId) 328 this.onMuxingFFmpegEnd(videoId, sessionId)
320 }) 329 })
321 330
322 muxingSession.on('after-cleanup', ({ videoId }) => { 331 muxingSession.on('after-cleanup', ({ videoId }) => {
@@ -324,7 +333,7 @@ class LiveManager {
324 333
325 muxingSession.destroy() 334 muxingSession.destroy()
326 335
327 return this.onAfterMuxingCleanup({ videoId }) 336 return this.onAfterMuxingCleanup({ videoId, liveSession })
328 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) 337 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
329 }) 338 })
330 339
@@ -365,15 +374,19 @@ class LiveManager {
365 } 374 }
366 } 375 }
367 376
368 private onMuxingFFmpegEnd (videoId: number) { 377 private onMuxingFFmpegEnd (videoId: number, sessionId: string) {
369 this.videoSessions.delete(videoId) 378 this.videoSessions.delete(videoId)
379
380 this.saveEndingSession(videoId, null)
381 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
370 } 382 }
371 383
372 private async onAfterMuxingCleanup (options: { 384 private async onAfterMuxingCleanup (options: {
373 videoId: number | string 385 videoId: number | string
386 liveSession?: MVideoLiveSession
374 cleanupNow?: boolean // Default false 387 cleanupNow?: boolean // Default false
375 }) { 388 }) {
376 const { videoId, cleanupNow = false } = options 389 const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options
377 390
378 try { 391 try {
379 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) 392 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
@@ -381,13 +394,25 @@ class LiveManager {
381 394
382 const live = await VideoLiveModel.loadByVideoId(fullVideo.id) 395 const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
383 396
397 const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findCurrentSessionOf(fullVideo.id)
398
399 // On server restart during a live
400 if (!liveSession.endDate) {
401 liveSession.endDate = new Date()
402 await liveSession.save()
403 }
404
384 JobQueue.Instance.createJob({ 405 JobQueue.Instance.createJob({
385 type: 'video-live-ending', 406 type: 'video-live-ending',
386 payload: { 407 payload: {
387 videoId: fullVideo.id, 408 videoId: fullVideo.id,
409
388 replayDirectory: live.saveReplay 410 replayDirectory: live.saveReplay
389 ? await this.findReplayDirectory(fullVideo) 411 ? await this.findReplayDirectory(fullVideo)
390 : undefined, 412 : undefined,
413
414 liveSessionId: liveSession.id,
415
391 publishedAt: fullVideo.publishedAt.toISOString() 416 publishedAt: fullVideo.publishedAt.toISOString()
392 } 417 }
393 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) 418 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
@@ -445,6 +470,23 @@ class LiveManager {
445 return playlist.save() 470 return playlist.save()
446 } 471 }
447 472
473 private saveStartingSession (videoLive: MVideoLiveVideo) {
474 const liveSession = new VideoLiveSessionModel({
475 startDate: new Date(),
476 liveVideoId: videoLive.videoId
477 })
478
479 return liveSession.save()
480 }
481
482 private async saveEndingSession (videoId: number, error: LiveVideoError | null) {
483 const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId)
484 liveSession.endDate = new Date()
485 liveSession.error = error
486
487 return liveSession.save()
488 }
489
448 static get Instance () { 490 static get Instance () {
449 return this.instance || (this.instance = new this()) 491 return this.instance || (this.instance = new this())
450 } 492 }