aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-04-21 09:06:52 +0200
committerChocobozzz <me@florianbigard.com>2022-04-21 11:47:57 +0200
commit4ec52d04dcc5d664612331f8e08d7d90da990415 (patch)
tree4b193f9f8f210caaf2dbe05ef3e37fa3a6fc28f0 /server/lib/live
parent2024a3b9338d667640aa115da6071ea83d088c50 (diff)
downloadPeerTube-4ec52d04dcc5d664612331f8e08d7d90da990415.tar.gz
PeerTube-4ec52d04dcc5d664612331f8e08d7d90da990415.tar.zst
PeerTube-4ec52d04dcc5d664612331f8e08d7d90da990415.zip
Add ability to save replay of permanent lives
Diffstat (limited to 'server/lib/live')
-rw-r--r--server/lib/live/live-manager.ts69
-rw-r--r--server/lib/live/live-utils.ts4
-rw-r--r--server/lib/live/shared/muxing-session.ts33
3 files changed, 64 insertions, 42 deletions
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 920d3a5ec..5ffe41ee3 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -1,6 +1,7 @@
1 1
2import { readFile } from 'fs-extra' 2import { readdir, readFile } from 'fs-extra'
3import { createServer, Server } from 'net' 3import { createServer, Server } from 'net'
4import { join } from 'path'
4import { createServer as createServerTLS, Server as ServerTLS } from 'tls' 5import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
5import { 6import {
6 computeLowerResolutionsToTranscode, 7 computeLowerResolutionsToTranscode,
@@ -18,10 +19,11 @@ import { VideoModel } from '@server/models/video/video'
18import { VideoLiveModel } from '@server/models/video/video-live' 19import { VideoLiveModel } from '@server/models/video/video-live'
19import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 20import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
20import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' 21import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models'
22import { wait } from '@shared/core-utils'
21import { VideoState, VideoStreamingPlaylistType } from '@shared/models' 23import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
22import { federateVideoIfNeeded } from '../activitypub/videos' 24import { federateVideoIfNeeded } from '../activitypub/videos'
23import { JobQueue } from '../job-queue' 25import { JobQueue } from '../job-queue'
24import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../paths' 26import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths'
25import { PeerTubeSocket } from '../peertube-socket' 27import { PeerTubeSocket } from '../peertube-socket'
26import { LiveQuotaStore } from './live-quota-store' 28import { LiveQuotaStore } from './live-quota-store'
27import { LiveSegmentShaStore } from './live-segment-sha-store' 29import { LiveSegmentShaStore } from './live-segment-sha-store'
@@ -322,7 +324,7 @@ class LiveManager {
322 324
323 muxingSession.destroy() 325 muxingSession.destroy()
324 326
325 return this.onAfterMuxingCleanup(videoId) 327 return this.onAfterMuxingCleanup({ videoId })
326 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) 328 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
327 }) 329 })
328 330
@@ -349,12 +351,15 @@ class LiveManager {
349 351
350 live.Video = video 352 live.Video = video
351 353
352 setTimeout(() => { 354 await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
353 federateVideoIfNeeded(video, false)
354 .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags }))
355 355
356 PeerTubeSocket.Instance.sendVideoLiveNewState(video) 356 try {
357 }, getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) 357 await federateVideoIfNeeded(video, false)
358 } catch (err) {
359 logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })
360 }
361
362 PeerTubeSocket.Instance.sendVideoLiveNewState(video)
358 } catch (err) { 363 } catch (err) {
359 logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags }) 364 logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags })
360 } 365 }
@@ -364,25 +369,32 @@ class LiveManager {
364 this.videoSessions.delete(videoId) 369 this.videoSessions.delete(videoId)
365 } 370 }
366 371
367 private async onAfterMuxingCleanup (videoUUID: string, cleanupNow = false) { 372 private async onAfterMuxingCleanup (options: {
373 videoId: number | string
374 cleanupNow?: boolean // Default false
375 }) {
376 const { videoId, cleanupNow = false } = options
377
368 try { 378 try {
369 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID) 379 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
370 if (!fullVideo) return 380 if (!fullVideo) return
371 381
372 const live = await VideoLiveModel.loadByVideoId(fullVideo.id) 382 const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
373 383
374 if (!live.permanentLive) { 384 JobQueue.Instance.createJob({
375 JobQueue.Instance.createJob({ 385 type: 'video-live-ending',
376 type: 'video-live-ending', 386 payload: {
377 payload: { 387 videoId: fullVideo.id,
378 videoId: fullVideo.id 388 replayDirectory: live.saveReplay
379 } 389 ? await this.findReplayDirectory(fullVideo)
380 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) 390 : undefined,
381 391 publishedAt: fullVideo.publishedAt.toISOString()
382 fullVideo.state = VideoState.LIVE_ENDED 392 }
383 } else { 393 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
384 fullVideo.state = VideoState.WAITING_FOR_LIVE 394
385 } 395 fullVideo.state = live.permanentLive
396 ? VideoState.WAITING_FOR_LIVE
397 : VideoState.LIVE_ENDED
386 398
387 await fullVideo.save() 399 await fullVideo.save()
388 400
@@ -390,7 +402,7 @@ class LiveManager {
390 402
391 await federateVideoIfNeeded(fullVideo, false) 403 await federateVideoIfNeeded(fullVideo, false)
392 } catch (err) { 404 } catch (err) {
393 logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) }) 405 logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') })
394 } 406 }
395 } 407 }
396 408
@@ -398,10 +410,19 @@ class LiveManager {
398 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() 410 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
399 411
400 for (const uuid of videoUUIDs) { 412 for (const uuid of videoUUIDs) {
401 await this.onAfterMuxingCleanup(uuid, true) 413 await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true })
402 } 414 }
403 } 415 }
404 416
417 private async findReplayDirectory (video: MVideo) {
418 const directory = getLiveReplayBaseDirectory(video)
419 const files = await readdir(directory)
420
421 if (files.length === 0) return undefined
422
423 return join(directory, files.sort().reverse()[0])
424 }
425
405 private buildAllResolutionsToTranscode (originResolution: number) { 426 private buildAllResolutionsToTranscode (originResolution: number) {
406 const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED 427 const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
407 ? computeLowerResolutionsToTranscode(originResolution, 'live') 428 ? computeLowerResolutionsToTranscode(originResolution, 'live')
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts
index 3bf723b98..46c7fd2f8 100644
--- a/server/lib/live/live-utils.ts
+++ b/server/lib/live/live-utils.ts
@@ -9,12 +9,12 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) {
9 return 'concat-' + num[1] + '.ts' 9 return 'concat-' + num[1] + '.ts'
10} 10}
11 11
12async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { 12async function cleanupLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) {
13 const hlsDirectory = getLiveDirectory(video) 13 const hlsDirectory = getLiveDirectory(video)
14 14
15 await remove(hlsDirectory) 15 await remove(hlsDirectory)
16 16
17 await streamingPlaylist.destroy() 17 if (streamingPlaylist) await streamingPlaylist.destroy()
18} 18}
19 19
20export { 20export {
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index a703f5b5f..588ee8749 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -11,7 +11,7 @@ import { CONFIG } from '@server/initializers/config'
11import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' 11import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
12import { VideoFileModel } from '@server/models/video/video-file' 12import { VideoFileModel } from '@server/models/video/video-file'
13import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' 13import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
14import { getLiveDirectory } from '../../paths' 14import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths'
15import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' 15import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
16import { isAbleToUploadVideo } from '../../user' 16import { isAbleToUploadVideo } from '../../user'
17import { LiveQuotaStore } from '../live-quota-store' 17import { LiveQuotaStore } from '../live-quota-store'
@@ -63,6 +63,9 @@ class MuxingSession extends EventEmitter {
63 private readonly videoUUID: string 63 private readonly videoUUID: string
64 private readonly saveReplay: boolean 64 private readonly saveReplay: boolean
65 65
66 private readonly outDirectory: string
67 private readonly replayDirectory: string
68
66 private readonly lTags: LoggerTagsFn 69 private readonly lTags: LoggerTagsFn
67 70
68 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} 71 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
@@ -110,19 +113,22 @@ class MuxingSession extends EventEmitter {
110 113
111 this.saveReplay = this.videoLive.saveReplay 114 this.saveReplay = this.videoLive.saveReplay
112 115
116 this.outDirectory = getLiveDirectory(this.videoLive.Video)
117 this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
118
113 this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) 119 this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
114 } 120 }
115 121
116 async runMuxing () { 122 async runMuxing () {
117 this.createFiles() 123 this.createFiles()
118 124
119 const outPath = await this.prepareDirectories() 125 await this.prepareDirectories()
120 126
121 this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED 127 this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED
122 ? await getLiveTranscodingCommand({ 128 ? await getLiveTranscodingCommand({
123 inputUrl: this.inputUrl, 129 inputUrl: this.inputUrl,
124 130
125 outPath, 131 outPath: this.outDirectory,
126 masterPlaylistName: this.streamingPlaylist.playlistFilename, 132 masterPlaylistName: this.streamingPlaylist.playlistFilename,
127 133
128 latencyMode: this.videoLive.latencyMode, 134 latencyMode: this.videoLive.latencyMode,
@@ -137,15 +143,15 @@ class MuxingSession extends EventEmitter {
137 }) 143 })
138 : getLiveMuxingCommand({ 144 : getLiveMuxingCommand({
139 inputUrl: this.inputUrl, 145 inputUrl: this.inputUrl,
140 outPath, 146 outPath: this.outDirectory,
141 masterPlaylistName: this.streamingPlaylist.playlistFilename, 147 masterPlaylistName: this.streamingPlaylist.playlistFilename,
142 latencyMode: this.videoLive.latencyMode 148 latencyMode: this.videoLive.latencyMode
143 }) 149 })
144 150
145 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) 151 logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
146 152
147 this.watchTSFiles(outPath) 153 this.watchTSFiles(this.outDirectory)
148 this.watchMasterFile(outPath) 154 this.watchMasterFile(this.outDirectory)
149 155
150 let ffmpegShellCommand: string 156 let ffmpegShellCommand: string
151 this.ffmpegCommand.on('start', cmdline => { 157 this.ffmpegCommand.on('start', cmdline => {
@@ -155,10 +161,10 @@ class MuxingSession extends EventEmitter {
155 }) 161 })
156 162
157 this.ffmpegCommand.on('error', (err, stdout, stderr) => { 163 this.ffmpegCommand.on('error', (err, stdout, stderr) => {
158 this.onFFmpegError({ err, stdout, stderr, outPath, ffmpegShellCommand }) 164 this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand })
159 }) 165 })
160 166
161 this.ffmpegCommand.on('end', () => this.onFFmpegEnded(outPath)) 167 this.ffmpegCommand.on('end', () => this.onFFmpegEnded(this.outDirectory))
162 168
163 this.ffmpegCommand.run() 169 this.ffmpegCommand.run()
164 } 170 }
@@ -304,16 +310,11 @@ class MuxingSession extends EventEmitter {
304 } 310 }
305 311
306 private async prepareDirectories () { 312 private async prepareDirectories () {
307 const outPath = getLiveDirectory(this.videoLive.Video) 313 await ensureDir(this.outDirectory)
308 await ensureDir(outPath)
309
310 const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY)
311 314
312 if (this.videoLive.saveReplay === true) { 315 if (this.videoLive.saveReplay === true) {
313 await ensureDir(replayDirectory) 316 await ensureDir(this.replayDirectory)
314 } 317 }
315
316 return outPath
317 } 318 }
318 319
319 private isDurationConstraintValid (streamingStartTime: number) { 320 private isDurationConstraintValid (streamingStartTime: number) {
@@ -364,7 +365,7 @@ class MuxingSession extends EventEmitter {
364 365
365 private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) { 366 private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) {
366 const segmentName = basename(segmentPath) 367 const segmentName = basename(segmentPath)
367 const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, buildConcatenatedName(segmentName)) 368 const dest = join(this.replayDirectory, buildConcatenatedName(segmentName))
368 369
369 try { 370 try {
370 const data = await readFile(segmentPath) 371 const data = await readFile(segmentPath)