diff options
author | Chocobozzz <me@florianbigard.com> | 2022-04-21 09:06:52 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-04-21 11:47:57 +0200 |
commit | 4ec52d04dcc5d664612331f8e08d7d90da990415 (patch) | |
tree | 4b193f9f8f210caaf2dbe05ef3e37fa3a6fc28f0 /server/lib/live | |
parent | 2024a3b9338d667640aa115da6071ea83d088c50 (diff) | |
download | PeerTube-4ec52d04dcc5d664612331f8e08d7d90da990415.tar.gz PeerTube-4ec52d04dcc5d664612331f8e08d7d90da990415.tar.zst PeerTube-4ec52d04dcc5d664612331f8e08d7d90da990415.zip |
Add ability to save replay of permanent lives
Diffstat (limited to 'server/lib/live')
-rw-r--r-- | server/lib/live/live-manager.ts | 69 | ||||
-rw-r--r-- | server/lib/live/live-utils.ts | 4 | ||||
-rw-r--r-- | server/lib/live/shared/muxing-session.ts | 33 |
3 files changed, 64 insertions, 42 deletions
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 920d3a5ec..5ffe41ee3 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts | |||
@@ -1,6 +1,7 @@ | |||
1 | 1 | ||
2 | import { readFile } from 'fs-extra' | 2 | import { readdir, readFile } from 'fs-extra' |
3 | import { createServer, Server } from 'net' | 3 | import { createServer, Server } from 'net' |
4 | import { join } from 'path' | ||
4 | import { createServer as createServerTLS, Server as ServerTLS } from 'tls' | 5 | import { createServer as createServerTLS, Server as ServerTLS } from 'tls' |
5 | import { | 6 | import { |
6 | computeLowerResolutionsToTranscode, | 7 | computeLowerResolutionsToTranscode, |
@@ -18,10 +19,11 @@ import { VideoModel } from '@server/models/video/video' | |||
18 | import { VideoLiveModel } from '@server/models/video/video-live' | 19 | import { VideoLiveModel } from '@server/models/video/video-live' |
19 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | 20 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' |
20 | import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' | 21 | import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' |
22 | import { wait } from '@shared/core-utils' | ||
21 | import { VideoState, VideoStreamingPlaylistType } from '@shared/models' | 23 | import { VideoState, VideoStreamingPlaylistType } from '@shared/models' |
22 | import { federateVideoIfNeeded } from '../activitypub/videos' | 24 | import { federateVideoIfNeeded } from '../activitypub/videos' |
23 | import { JobQueue } from '../job-queue' | 25 | import { JobQueue } from '../job-queue' |
24 | import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../paths' | 26 | import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' |
25 | import { PeerTubeSocket } from '../peertube-socket' | 27 | import { PeerTubeSocket } from '../peertube-socket' |
26 | import { LiveQuotaStore } from './live-quota-store' | 28 | import { LiveQuotaStore } from './live-quota-store' |
27 | import { LiveSegmentShaStore } from './live-segment-sha-store' | 29 | import { LiveSegmentShaStore } from './live-segment-sha-store' |
@@ -322,7 +324,7 @@ class LiveManager { | |||
322 | 324 | ||
323 | muxingSession.destroy() | 325 | muxingSession.destroy() |
324 | 326 | ||
325 | return this.onAfterMuxingCleanup(videoId) | 327 | return this.onAfterMuxingCleanup({ videoId }) |
326 | .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) | 328 | .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) |
327 | }) | 329 | }) |
328 | 330 | ||
@@ -349,12 +351,15 @@ class LiveManager { | |||
349 | 351 | ||
350 | live.Video = video | 352 | live.Video = video |
351 | 353 | ||
352 | setTimeout(() => { | 354 | await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) |
353 | federateVideoIfNeeded(video, false) | ||
354 | .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })) | ||
355 | 355 | ||
356 | PeerTubeSocket.Instance.sendVideoLiveNewState(video) | 356 | try { |
357 | }, getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) | 357 | await federateVideoIfNeeded(video, false) |
358 | } catch (err) { | ||
359 | logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags }) | ||
360 | } | ||
361 | |||
362 | PeerTubeSocket.Instance.sendVideoLiveNewState(video) | ||
358 | } catch (err) { | 363 | } catch (err) { |
359 | logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags }) | 364 | logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags }) |
360 | } | 365 | } |
@@ -364,25 +369,32 @@ class LiveManager { | |||
364 | this.videoSessions.delete(videoId) | 369 | this.videoSessions.delete(videoId) |
365 | } | 370 | } |
366 | 371 | ||
367 | private async onAfterMuxingCleanup (videoUUID: string, cleanupNow = false) { | 372 | private async onAfterMuxingCleanup (options: { |
373 | videoId: number | string | ||
374 | cleanupNow?: boolean // Default false | ||
375 | }) { | ||
376 | const { videoId, cleanupNow = false } = options | ||
377 | |||
368 | try { | 378 | try { |
369 | const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID) | 379 | const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) |
370 | if (!fullVideo) return | 380 | if (!fullVideo) return |
371 | 381 | ||
372 | const live = await VideoLiveModel.loadByVideoId(fullVideo.id) | 382 | const live = await VideoLiveModel.loadByVideoId(fullVideo.id) |
373 | 383 | ||
374 | if (!live.permanentLive) { | 384 | JobQueue.Instance.createJob({ |
375 | JobQueue.Instance.createJob({ | 385 | type: 'video-live-ending', |
376 | type: 'video-live-ending', | 386 | payload: { |
377 | payload: { | 387 | videoId: fullVideo.id, |
378 | videoId: fullVideo.id | 388 | replayDirectory: live.saveReplay |
379 | } | 389 | ? await this.findReplayDirectory(fullVideo) |
380 | }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) | 390 | : undefined, |
381 | 391 | publishedAt: fullVideo.publishedAt.toISOString() | |
382 | fullVideo.state = VideoState.LIVE_ENDED | 392 | } |
383 | } else { | 393 | }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) |
384 | fullVideo.state = VideoState.WAITING_FOR_LIVE | 394 | |
385 | } | 395 | fullVideo.state = live.permanentLive |
396 | ? VideoState.WAITING_FOR_LIVE | ||
397 | : VideoState.LIVE_ENDED | ||
386 | 398 | ||
387 | await fullVideo.save() | 399 | await fullVideo.save() |
388 | 400 | ||
@@ -390,7 +402,7 @@ class LiveManager { | |||
390 | 402 | ||
391 | await federateVideoIfNeeded(fullVideo, false) | 403 | await federateVideoIfNeeded(fullVideo, false) |
392 | } catch (err) { | 404 | } catch (err) { |
393 | logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) }) | 405 | logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') }) |
394 | } | 406 | } |
395 | } | 407 | } |
396 | 408 | ||
@@ -398,10 +410,19 @@ class LiveManager { | |||
398 | const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() | 410 | const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() |
399 | 411 | ||
400 | for (const uuid of videoUUIDs) { | 412 | for (const uuid of videoUUIDs) { |
401 | await this.onAfterMuxingCleanup(uuid, true) | 413 | await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true }) |
402 | } | 414 | } |
403 | } | 415 | } |
404 | 416 | ||
417 | private async findReplayDirectory (video: MVideo) { | ||
418 | const directory = getLiveReplayBaseDirectory(video) | ||
419 | const files = await readdir(directory) | ||
420 | |||
421 | if (files.length === 0) return undefined | ||
422 | |||
423 | return join(directory, files.sort().reverse()[0]) | ||
424 | } | ||
425 | |||
405 | private buildAllResolutionsToTranscode (originResolution: number) { | 426 | private buildAllResolutionsToTranscode (originResolution: number) { |
406 | const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED | 427 | const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED |
407 | ? computeLowerResolutionsToTranscode(originResolution, 'live') | 428 | ? computeLowerResolutionsToTranscode(originResolution, 'live') |
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts index 3bf723b98..46c7fd2f8 100644 --- a/server/lib/live/live-utils.ts +++ b/server/lib/live/live-utils.ts | |||
@@ -9,12 +9,12 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) { | |||
9 | return 'concat-' + num[1] + '.ts' | 9 | return 'concat-' + num[1] + '.ts' |
10 | } | 10 | } |
11 | 11 | ||
12 | async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) { | 12 | async function cleanupLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) { |
13 | const hlsDirectory = getLiveDirectory(video) | 13 | const hlsDirectory = getLiveDirectory(video) |
14 | 14 | ||
15 | await remove(hlsDirectory) | 15 | await remove(hlsDirectory) |
16 | 16 | ||
17 | await streamingPlaylist.destroy() | 17 | if (streamingPlaylist) await streamingPlaylist.destroy() |
18 | } | 18 | } |
19 | 19 | ||
20 | export { | 20 | export { |
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index a703f5b5f..588ee8749 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -11,7 +11,7 @@ import { CONFIG } from '@server/initializers/config' | |||
11 | import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' | 11 | import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants' |
12 | import { VideoFileModel } from '@server/models/video/video-file' | 12 | import { VideoFileModel } from '@server/models/video/video-file' |
13 | import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' | 13 | import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' |
14 | import { getLiveDirectory } from '../../paths' | 14 | import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths' |
15 | import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' | 15 | import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles' |
16 | import { isAbleToUploadVideo } from '../../user' | 16 | import { isAbleToUploadVideo } from '../../user' |
17 | import { LiveQuotaStore } from '../live-quota-store' | 17 | import { LiveQuotaStore } from '../live-quota-store' |
@@ -63,6 +63,9 @@ class MuxingSession extends EventEmitter { | |||
63 | private readonly videoUUID: string | 63 | private readonly videoUUID: string |
64 | private readonly saveReplay: boolean | 64 | private readonly saveReplay: boolean |
65 | 65 | ||
66 | private readonly outDirectory: string | ||
67 | private readonly replayDirectory: string | ||
68 | |||
66 | private readonly lTags: LoggerTagsFn | 69 | private readonly lTags: LoggerTagsFn |
67 | 70 | ||
68 | private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} | 71 | private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} |
@@ -110,19 +113,22 @@ class MuxingSession extends EventEmitter { | |||
110 | 113 | ||
111 | this.saveReplay = this.videoLive.saveReplay | 114 | this.saveReplay = this.videoLive.saveReplay |
112 | 115 | ||
116 | this.outDirectory = getLiveDirectory(this.videoLive.Video) | ||
117 | this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString()) | ||
118 | |||
113 | this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) | 119 | this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID) |
114 | } | 120 | } |
115 | 121 | ||
116 | async runMuxing () { | 122 | async runMuxing () { |
117 | this.createFiles() | 123 | this.createFiles() |
118 | 124 | ||
119 | const outPath = await this.prepareDirectories() | 125 | await this.prepareDirectories() |
120 | 126 | ||
121 | this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED | 127 | this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED |
122 | ? await getLiveTranscodingCommand({ | 128 | ? await getLiveTranscodingCommand({ |
123 | inputUrl: this.inputUrl, | 129 | inputUrl: this.inputUrl, |
124 | 130 | ||
125 | outPath, | 131 | outPath: this.outDirectory, |
126 | masterPlaylistName: this.streamingPlaylist.playlistFilename, | 132 | masterPlaylistName: this.streamingPlaylist.playlistFilename, |
127 | 133 | ||
128 | latencyMode: this.videoLive.latencyMode, | 134 | latencyMode: this.videoLive.latencyMode, |
@@ -137,15 +143,15 @@ class MuxingSession extends EventEmitter { | |||
137 | }) | 143 | }) |
138 | : getLiveMuxingCommand({ | 144 | : getLiveMuxingCommand({ |
139 | inputUrl: this.inputUrl, | 145 | inputUrl: this.inputUrl, |
140 | outPath, | 146 | outPath: this.outDirectory, |
141 | masterPlaylistName: this.streamingPlaylist.playlistFilename, | 147 | masterPlaylistName: this.streamingPlaylist.playlistFilename, |
142 | latencyMode: this.videoLive.latencyMode | 148 | latencyMode: this.videoLive.latencyMode |
143 | }) | 149 | }) |
144 | 150 | ||
145 | logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) | 151 | logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) |
146 | 152 | ||
147 | this.watchTSFiles(outPath) | 153 | this.watchTSFiles(this.outDirectory) |
148 | this.watchMasterFile(outPath) | 154 | this.watchMasterFile(this.outDirectory) |
149 | 155 | ||
150 | let ffmpegShellCommand: string | 156 | let ffmpegShellCommand: string |
151 | this.ffmpegCommand.on('start', cmdline => { | 157 | this.ffmpegCommand.on('start', cmdline => { |
@@ -155,10 +161,10 @@ class MuxingSession extends EventEmitter { | |||
155 | }) | 161 | }) |
156 | 162 | ||
157 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | 163 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { |
158 | this.onFFmpegError({ err, stdout, stderr, outPath, ffmpegShellCommand }) | 164 | this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand }) |
159 | }) | 165 | }) |
160 | 166 | ||
161 | this.ffmpegCommand.on('end', () => this.onFFmpegEnded(outPath)) | 167 | this.ffmpegCommand.on('end', () => this.onFFmpegEnded(this.outDirectory)) |
162 | 168 | ||
163 | this.ffmpegCommand.run() | 169 | this.ffmpegCommand.run() |
164 | } | 170 | } |
@@ -304,16 +310,11 @@ class MuxingSession extends EventEmitter { | |||
304 | } | 310 | } |
305 | 311 | ||
306 | private async prepareDirectories () { | 312 | private async prepareDirectories () { |
307 | const outPath = getLiveDirectory(this.videoLive.Video) | 313 | await ensureDir(this.outDirectory) |
308 | await ensureDir(outPath) | ||
309 | |||
310 | const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY) | ||
311 | 314 | ||
312 | if (this.videoLive.saveReplay === true) { | 315 | if (this.videoLive.saveReplay === true) { |
313 | await ensureDir(replayDirectory) | 316 | await ensureDir(this.replayDirectory) |
314 | } | 317 | } |
315 | |||
316 | return outPath | ||
317 | } | 318 | } |
318 | 319 | ||
319 | private isDurationConstraintValid (streamingStartTime: number) { | 320 | private isDurationConstraintValid (streamingStartTime: number) { |
@@ -364,7 +365,7 @@ class MuxingSession extends EventEmitter { | |||
364 | 365 | ||
365 | private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) { | 366 | private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) { |
366 | const segmentName = basename(segmentPath) | 367 | const segmentName = basename(segmentPath) |
367 | const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, buildConcatenatedName(segmentName)) | 368 | const dest = join(this.replayDirectory, buildConcatenatedName(segmentName)) |
368 | 369 | ||
369 | try { | 370 | try { |
370 | const data = await readFile(segmentPath) | 371 | const data = await readFile(segmentPath) |