aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live/live-manager.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-04-21 09:06:52 +0200
committerChocobozzz <me@florianbigard.com>2022-04-21 11:47:57 +0200
commit4ec52d04dcc5d664612331f8e08d7d90da990415 (patch)
tree4b193f9f8f210caaf2dbe05ef3e37fa3a6fc28f0 /server/lib/live/live-manager.ts
parent2024a3b9338d667640aa115da6071ea83d088c50 (diff)
downloadPeerTube-4ec52d04dcc5d664612331f8e08d7d90da990415.tar.gz
PeerTube-4ec52d04dcc5d664612331f8e08d7d90da990415.tar.zst
PeerTube-4ec52d04dcc5d664612331f8e08d7d90da990415.zip
Add ability to save replay of permanent lives
Diffstat (limited to 'server/lib/live/live-manager.ts')
-rw-r--r--server/lib/live/live-manager.ts69
1 files changed, 45 insertions, 24 deletions
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 920d3a5ec..5ffe41ee3 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -1,6 +1,7 @@
1 1
2import { readFile } from 'fs-extra' 2import { readdir, readFile } from 'fs-extra'
3import { createServer, Server } from 'net' 3import { createServer, Server } from 'net'
4import { join } from 'path'
4import { createServer as createServerTLS, Server as ServerTLS } from 'tls' 5import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
5import { 6import {
6 computeLowerResolutionsToTranscode, 7 computeLowerResolutionsToTranscode,
@@ -18,10 +19,11 @@ import { VideoModel } from '@server/models/video/video'
18import { VideoLiveModel } from '@server/models/video/video-live' 19import { VideoLiveModel } from '@server/models/video/video-live'
19import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 20import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
20import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' 21import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models'
22import { wait } from '@shared/core-utils'
21import { VideoState, VideoStreamingPlaylistType } from '@shared/models' 23import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
22import { federateVideoIfNeeded } from '../activitypub/videos' 24import { federateVideoIfNeeded } from '../activitypub/videos'
23import { JobQueue } from '../job-queue' 25import { JobQueue } from '../job-queue'
24import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../paths' 26import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths'
25import { PeerTubeSocket } from '../peertube-socket' 27import { PeerTubeSocket } from '../peertube-socket'
26import { LiveQuotaStore } from './live-quota-store' 28import { LiveQuotaStore } from './live-quota-store'
27import { LiveSegmentShaStore } from './live-segment-sha-store' 29import { LiveSegmentShaStore } from './live-segment-sha-store'
@@ -322,7 +324,7 @@ class LiveManager {
322 324
323 muxingSession.destroy() 325 muxingSession.destroy()
324 326
325 return this.onAfterMuxingCleanup(videoId) 327 return this.onAfterMuxingCleanup({ videoId })
326 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) 328 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
327 }) 329 })
328 330
@@ -349,12 +351,15 @@ class LiveManager {
349 351
350 live.Video = video 352 live.Video = video
351 353
352 setTimeout(() => { 354 await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
353 federateVideoIfNeeded(video, false)
354 .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags }))
355 355
356 PeerTubeSocket.Instance.sendVideoLiveNewState(video) 356 try {
357 }, getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) 357 await federateVideoIfNeeded(video, false)
358 } catch (err) {
359 logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })
360 }
361
362 PeerTubeSocket.Instance.sendVideoLiveNewState(video)
358 } catch (err) { 363 } catch (err) {
359 logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags }) 364 logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags })
360 } 365 }
@@ -364,25 +369,32 @@ class LiveManager {
364 this.videoSessions.delete(videoId) 369 this.videoSessions.delete(videoId)
365 } 370 }
366 371
367 private async onAfterMuxingCleanup (videoUUID: string, cleanupNow = false) { 372 private async onAfterMuxingCleanup (options: {
373 videoId: number | string
374 cleanupNow?: boolean // Default false
375 }) {
376 const { videoId, cleanupNow = false } = options
377
368 try { 378 try {
369 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID) 379 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
370 if (!fullVideo) return 380 if (!fullVideo) return
371 381
372 const live = await VideoLiveModel.loadByVideoId(fullVideo.id) 382 const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
373 383
374 if (!live.permanentLive) { 384 JobQueue.Instance.createJob({
375 JobQueue.Instance.createJob({ 385 type: 'video-live-ending',
376 type: 'video-live-ending', 386 payload: {
377 payload: { 387 videoId: fullVideo.id,
378 videoId: fullVideo.id 388 replayDirectory: live.saveReplay
379 } 389 ? await this.findReplayDirectory(fullVideo)
380 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) 390 : undefined,
381 391 publishedAt: fullVideo.publishedAt.toISOString()
382 fullVideo.state = VideoState.LIVE_ENDED 392 }
383 } else { 393 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
384 fullVideo.state = VideoState.WAITING_FOR_LIVE 394
385 } 395 fullVideo.state = live.permanentLive
396 ? VideoState.WAITING_FOR_LIVE
397 : VideoState.LIVE_ENDED
386 398
387 await fullVideo.save() 399 await fullVideo.save()
388 400
@@ -390,7 +402,7 @@ class LiveManager {
390 402
391 await federateVideoIfNeeded(fullVideo, false) 403 await federateVideoIfNeeded(fullVideo, false)
392 } catch (err) { 404 } catch (err) {
393 logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) }) 405 logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') })
394 } 406 }
395 } 407 }
396 408
@@ -398,10 +410,19 @@ class LiveManager {
398 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() 410 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
399 411
400 for (const uuid of videoUUIDs) { 412 for (const uuid of videoUUIDs) {
401 await this.onAfterMuxingCleanup(uuid, true) 413 await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true })
402 } 414 }
403 } 415 }
404 416
417 private async findReplayDirectory (video: MVideo) {
418 const directory = getLiveReplayBaseDirectory(video)
419 const files = await readdir(directory)
420
421 if (files.length === 0) return undefined
422
423 return join(directory, files.sort().reverse()[0])
424 }
425
405 private buildAllResolutionsToTranscode (originResolution: number) { 426 private buildAllResolutionsToTranscode (originResolution: number) {
406 const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED 427 const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
407 ? computeLowerResolutionsToTranscode(originResolution, 'live') 428 ? computeLowerResolutionsToTranscode(originResolution, 'live')