aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-06-23 10:29:43 +0200
committerChocobozzz <me@florianbigard.com>2022-06-23 10:30:48 +0200
commit53023be33af420675d0060eb95c99a8038457564 (patch)
tree699a13694b0c1c8589d10de95184708a451fcab4
parent50341c8fe988ca2a3d7c700f9aa918673dc979c2 (diff)
downloadPeerTube-53023be33af420675d0060eb95c99a8038457564.tar.gz
PeerTube-53023be33af420675d0060eb95c99a8038457564.tar.zst
PeerTube-53023be33af420675d0060eb95c99a8038457564.zip
Fix fast restream in saved permanent live
-rw-r--r--server/initializers/constants.ts8
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts14
-rw-r--r--server/lib/live/live-utils.ts10
-rw-r--r--server/tests/api/live/index.ts1
-rw-r--r--server/tests/api/live/live-fast-restream.ts128
-rw-r--r--server/tests/api/live/live-save-replay.ts41
-rw-r--r--server/tests/api/live/live.ts8
-rw-r--r--shared/server-commands/server/config-command.ts9
-rw-r--r--shared/server-commands/videos/live-command.ts28
9 files changed, 182 insertions, 65 deletions
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 9201f95b3..6c6628d28 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -865,7 +865,7 @@ if (isTestInstance() === true) {
865 865
866 PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000 866 PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000
867 867
868 VIDEO_LIVE.CLEANUP_DELAY = 5000 868 VIDEO_LIVE.CLEANUP_DELAY = getIntEnv('PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY') ?? 5000
869 VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY = 2 869 VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY = 2
870 VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY = 1 870 VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY = 1
871 VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION = 1 871 VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION = 1
@@ -1174,3 +1174,9 @@ function buildLanguages () {
1174function generateContentHash () { 1174function generateContentHash () {
1175 return randomBytes(20).toString('hex') 1175 return randomBytes(20).toString('hex')
1176} 1176}
1177
1178function getIntEnv (path: string) {
1179 if (process.env[path]) return parseInt(process.env[path])
1180
1181 return undefined
1182}
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index feec257fc..450bda2fd 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -4,7 +4,7 @@ import { join } from 'path'
4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg' 4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg'
5import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' 5import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
6import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' 6import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
7import { cleanupNormalLive, cleanupPermanentLive, cleanupTMPLiveFiles, LiveSegmentShaStore } from '@server/lib/live' 7import { cleanupUnsavedNormalLive, cleanupPermanentLive, cleanupTMPLiveFiles, LiveSegmentShaStore } from '@server/lib/live'
8import { 8import {
9 generateHLSMasterPlaylistFilename, 9 generateHLSMasterPlaylistFilename,
10 generateHlsSha256SegmentsFilename, 10 generateHlsSha256SegmentsFilename,
@@ -22,15 +22,17 @@ import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
22import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 22import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
23import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' 23import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models'
24import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' 24import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
25import { logger } from '../../../helpers/logger' 25import { logger, loggerTagsFactory } from '../../../helpers/logger'
26
27const lTags = loggerTagsFactory('live', 'job')
26 28
27async function processVideoLiveEnding (job: Job) { 29async function processVideoLiveEnding (job: Job) {
28 const payload = job.data as VideoLiveEndingPayload 30 const payload = job.data as VideoLiveEndingPayload
29 31
30 logger.info('Processing video live ending for %s.', payload.videoId, { payload }) 32 logger.info('Processing video live ending for %s.', payload.videoId, { payload, ...lTags() })
31 33
32 function logError () { 34 function logError () {
33 logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId) 35 logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId, lTags())
34 } 36 }
35 37
36 const liveVideo = await VideoModel.load(payload.videoId) 38 const liveVideo = await VideoModel.load(payload.videoId)
@@ -73,8 +75,6 @@ async function saveReplayToExternalVideo (options: {
73}) { 75}) {
74 const { liveVideo, liveSession, publishedAt, replayDirectory } = options 76 const { liveVideo, liveSession, publishedAt, replayDirectory } = options
75 77
76 await cleanupTMPLiveFiles(getLiveDirectory(liveVideo))
77
78 const video = new VideoModel({ 78 const video = new VideoModel({
79 name: `${liveVideo.name} - ${new Date(publishedAt).toLocaleString()}`, 79 name: `${liveVideo.name} - ${new Date(publishedAt).toLocaleString()}`,
80 isLive: false, 80 isLive: false,
@@ -243,7 +243,7 @@ async function cleanupLiveAndFederate (options: {
243 if (live.permanentLive) { 243 if (live.permanentLive) {
244 await cleanupPermanentLive(video, streamingPlaylist) 244 await cleanupPermanentLive(video, streamingPlaylist)
245 } else { 245 } else {
246 await cleanupNormalLive(video, streamingPlaylist) 246 await cleanupUnsavedNormalLive(video, streamingPlaylist)
247 } 247 }
248 } 248 }
249 249
diff --git a/server/lib/live/live-utils.ts b/server/lib/live/live-utils.ts
index 6365e23db..6305a97a8 100644
--- a/server/lib/live/live-utils.ts
+++ b/server/lib/live/live-utils.ts
@@ -10,20 +10,20 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) {
10 return 'concat-' + num[1] + '.ts' 10 return 'concat-' + num[1] + '.ts'
11} 11}
12 12
13async function cleanupPermanentLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) { 13async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
14 const hlsDirectory = getLiveDirectory(video) 14 const hlsDirectory = getLiveDirectory(video)
15 15
16 await cleanupTMPLiveFiles(hlsDirectory) 16 await cleanupTMPLiveFiles(hlsDirectory)
17 17
18 if (streamingPlaylist) await streamingPlaylist.destroy() 18 await streamingPlaylist.destroy()
19} 19}
20 20
21async function cleanupNormalLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) { 21async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
22 const hlsDirectory = getLiveDirectory(video) 22 const hlsDirectory = getLiveDirectory(video)
23 23
24 await remove(hlsDirectory) 24 await remove(hlsDirectory)
25 25
26 if (streamingPlaylist) await streamingPlaylist.destroy() 26 await streamingPlaylist.destroy()
27} 27}
28 28
29async function cleanupTMPLiveFiles (hlsDirectory: string) { 29async function cleanupTMPLiveFiles (hlsDirectory: string) {
@@ -49,7 +49,7 @@ async function cleanupTMPLiveFiles (hlsDirectory: string) {
49 49
50export { 50export {
51 cleanupPermanentLive, 51 cleanupPermanentLive,
52 cleanupNormalLive, 52 cleanupUnsavedNormalLive,
53 cleanupTMPLiveFiles, 53 cleanupTMPLiveFiles,
54 buildConcatenatedName 54 buildConcatenatedName
55} 55}
diff --git a/server/tests/api/live/index.ts b/server/tests/api/live/index.ts
index 71bc150d8..c88943f65 100644
--- a/server/tests/api/live/index.ts
+++ b/server/tests/api/live/index.ts
@@ -1,4 +1,5 @@
1import './live-constraints' 1import './live-constraints'
2import './live-fast-restream'
2import './live-socket-messages' 3import './live-socket-messages'
3import './live-permanent' 4import './live-permanent'
4import './live-rtmps' 5import './live-rtmps'
diff --git a/server/tests/api/live/live-fast-restream.ts b/server/tests/api/live/live-fast-restream.ts
new file mode 100644
index 000000000..4b5d041ec
--- /dev/null
+++ b/server/tests/api/live/live-fast-restream.ts
@@ -0,0 +1,128 @@
1/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
2
3import 'mocha'
4import * as chai from 'chai'
5import { wait } from '@shared/core-utils'
6import { HttpStatusCode, LiveVideoCreate, VideoPrivacy } from '@shared/models'
7import {
8 cleanupTests,
9 createSingleServer,
10 makeRawRequest,
11 PeerTubeServer,
12 setAccessTokensToServers,
13 setDefaultVideoChannel,
14 stopFfmpeg,
15 waitJobs
16} from '@shared/server-commands'
17
18const expect = chai.expect
19
20describe('Fast restream in live', function () {
21 let server: PeerTubeServer
22
23 async function createLiveWrapper (options: { permanent: boolean, replay: boolean }) {
24 const attributes: LiveVideoCreate = {
25 channelId: server.store.channel.id,
26 privacy: VideoPrivacy.PUBLIC,
27 name: 'my super live',
28 saveReplay: options.replay,
29 permanentLive: options.permanent
30 }
31
32 const { uuid } = await server.live.create({ fields: attributes })
33 return uuid
34 }
35
36 async function fastRestreamWrapper ({ replay }: { replay: boolean }) {
37 const liveVideoUUID = await createLiveWrapper({ permanent: true, replay })
38 await waitJobs([ server ])
39
40 const rtmpOptions = {
41 videoId: liveVideoUUID,
42 copyCodecs: true,
43 fixtureName: 'video_short.mp4'
44 }
45
46 // Streaming session #1
47 let ffmpegCommand = await server.live.sendRTMPStreamInVideo(rtmpOptions)
48 await server.live.waitUntilPublished({ videoId: liveVideoUUID })
49 await stopFfmpeg(ffmpegCommand)
50 await server.live.waitUntilWaiting({ videoId: liveVideoUUID })
51
52 // Streaming session #2
53 ffmpegCommand = await server.live.sendRTMPStreamInVideo(rtmpOptions)
54 await server.live.waitUntilSegmentGeneration({ videoUUID: liveVideoUUID, segment: 0, playlistNumber: 0, totalSessions: 2 })
55
56 return { ffmpegCommand, liveVideoUUID }
57 }
58
59 async function ensureLastLiveWorks (liveId: string) {
60 // Equivalent to PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY
61 for (let i = 0; i < 100; i++) {
62 const video = await server.videos.get({ id: liveId })
63 expect(video.streamingPlaylists).to.have.lengthOf(1)
64
65 await server.live.getSegment({ videoUUID: liveId, segment: 0, playlistNumber: 0 })
66 await makeRawRequest(video.streamingPlaylists[0].playlistUrl, HttpStatusCode.OK_200)
67
68 await wait(100)
69 }
70 }
71
72 async function runTest (replay: boolean) {
73 const { ffmpegCommand, liveVideoUUID } = await fastRestreamWrapper({ replay })
74
75 await ensureLastLiveWorks(liveVideoUUID)
76
77 await stopFfmpeg(ffmpegCommand)
78 await server.live.waitUntilWaiting({ videoId: liveVideoUUID })
79
80 // Wait for replays
81 await waitJobs([ server ])
82
83 const { total, data: sessions } = await server.live.listSessions({ videoId: liveVideoUUID })
84
85 expect(total).to.equal(2)
86 expect(sessions).to.have.lengthOf(2)
87
88 for (const session of sessions) {
89 expect(session.error).to.be.null
90
91 if (replay) {
92 expect(session.replayVideo).to.exist
93
94 await server.videos.get({ id: session.replayVideo.uuid })
95 } else {
96 expect(session.replayVideo).to.not.exist
97 }
98 }
99 }
100
101 before(async function () {
102 this.timeout(120000)
103
104 const env = { 'PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY': '10000' }
105 server = await createSingleServer(1, {}, { env })
106
107 // Get the access tokens
108 await setAccessTokensToServers([ server ])
109 await setDefaultVideoChannel([ server ])
110
111 await server.config.enableMinimumTranscoding(false, true)
112 await server.config.enableLive({ allowReplay: true, transcoding: true, resolutions: 'min' })
113 })
114
115 it('Should correctly fast reastream in a permanent live with and without save replay', async function () {
116 this.timeout(240000)
117
118 // A test can take a long time, so prefer to run them in parallel
119 await Promise.all([
120 runTest(true),
121 runTest(false)
122 ])
123 })
124
125 after(async function () {
126 await cleanupTests([ server ])
127 })
128})
diff --git a/server/tests/api/live/live-save-replay.ts b/server/tests/api/live/live-save-replay.ts
index 99d500711..7ddcb04ef 100644
--- a/server/tests/api/live/live-save-replay.ts
+++ b/server/tests/api/live/live-save-replay.ts
@@ -12,7 +12,6 @@ import {
12 createMultipleServers, 12 createMultipleServers,
13 doubleFollow, 13 doubleFollow,
14 findExternalSavedVideo, 14 findExternalSavedVideo,
15 makeRawRequest,
16 PeerTubeServer, 15 PeerTubeServer,
17 setAccessTokensToServers, 16 setAccessTokensToServers,
18 setDefaultVideoChannel, 17 setDefaultVideoChannel,
@@ -442,46 +441,6 @@ describe('Save replay setting', function () {
442 await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404) 441 await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404)
443 await checkLiveCleanup(servers[0], liveVideoUUID, []) 442 await checkLiveCleanup(servers[0], liveVideoUUID, [])
444 }) 443 })
445
446 it('Should correctly save replays with multiple sessions', async function () {
447 this.timeout(120000)
448
449 liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true })
450 await waitJobs(servers)
451
452 // Streaming session #1
453 ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
454 await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
455 await stopFfmpeg(ffmpegCommand)
456 await servers[0].live.waitUntilWaiting({ videoId: liveVideoUUID })
457
458 // Streaming session #2
459 ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
460 await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
461
462 await wait(5000)
463 const video = await servers[0].videos.get({ id: liveVideoUUID })
464 expect(video.streamingPlaylists).to.have.lengthOf(1)
465 await makeRawRequest(video.streamingPlaylists[0].playlistUrl)
466
467 await stopFfmpeg(ffmpegCommand)
468 await waitUntilLiveWaitingOnAllServers(servers, liveVideoUUID)
469
470 // Wait for replays
471 await waitJobs(servers)
472
473 const { total, data: sessions } = await servers[0].live.listSessions({ videoId: liveVideoUUID })
474
475 expect(total).to.equal(2)
476 expect(sessions).to.have.lengthOf(2)
477
478 for (const session of sessions) {
479 expect(session.error).to.be.null
480 expect(session.replayVideo).to.exist
481
482 await servers[0].videos.get({ id: session.replayVideo.uuid })
483 }
484 })
485 }) 444 })
486 445
487 after(async function () { 446 after(async function () {
diff --git a/server/tests/api/live/live.ts b/server/tests/api/live/live.ts
index 9b8fbe3e2..5d354aad1 100644
--- a/server/tests/api/live/live.ts
+++ b/server/tests/api/live/live.ts
@@ -395,7 +395,7 @@ describe('Test live', function () {
395 for (let i = 0; i < resolutions.length; i++) { 395 for (let i = 0; i < resolutions.length; i++) {
396 const segmentNum = 3 396 const segmentNum = 3
397 const segmentName = `${i}-00000${segmentNum}.ts` 397 const segmentName = `${i}-00000${segmentNum}.ts`
398 await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, resolution: i, segment: segmentNum }) 398 await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, playlistNumber: i, segment: segmentNum })
399 399
400 const subPlaylist = await servers[0].streamingPlaylists.get({ 400 const subPlaylist = await servers[0].streamingPlaylists.get({
401 url: `${servers[0].url}/static/streaming-playlists/hls/${video.uuid}/${i}.m3u8` 401 url: `${servers[0].url}/static/streaming-playlists/hls/${video.uuid}/${i}.m3u8`
@@ -628,9 +628,9 @@ describe('Test live', function () {
628 commands[0].waitUntilPublished({ videoId: liveVideoReplayId }) 628 commands[0].waitUntilPublished({ videoId: liveVideoReplayId })
629 ]) 629 ])
630 630
631 await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoId, resolution: 0, segment: 2 }) 631 await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoId, playlistNumber: 0, segment: 2 })
632 await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoReplayId, resolution: 0, segment: 2 }) 632 await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoReplayId, playlistNumber: 0, segment: 2 })
633 await commands[0].waitUntilSegmentGeneration({ videoUUID: permanentLiveVideoReplayId, resolution: 0, segment: 2 }) 633 await commands[0].waitUntilSegmentGeneration({ videoUUID: permanentLiveVideoReplayId, playlistNumber: 0, segment: 2 })
634 634
635 { 635 {
636 const video = await servers[0].videos.get({ id: permanentLiveVideoReplayId }) 636 const video = await servers[0].videos.get({ id: permanentLiveVideoReplayId })
diff --git a/shared/server-commands/server/config-command.ts b/shared/server-commands/server/config-command.ts
index 5320dead4..3803aaf95 100644
--- a/shared/server-commands/server/config-command.ts
+++ b/shared/server-commands/server/config-command.ts
@@ -39,15 +39,18 @@ export class ConfigCommand extends AbstractCommand {
39 enableLive (options: { 39 enableLive (options: {
40 allowReplay?: boolean 40 allowReplay?: boolean
41 transcoding?: boolean 41 transcoding?: boolean
42 resolutions?: 'min' | 'max' // Default max
42 } = {}) { 43 } = {}) {
44 const { allowReplay, transcoding, resolutions = 'max' } = options
45
43 return this.updateExistingSubConfig({ 46 return this.updateExistingSubConfig({
44 newConfig: { 47 newConfig: {
45 live: { 48 live: {
46 enabled: true, 49 enabled: true,
47 allowReplay: options.allowReplay ?? true, 50 allowReplay: allowReplay ?? true,
48 transcoding: { 51 transcoding: {
49 enabled: options.transcoding ?? true, 52 enabled: transcoding ?? true,
50 resolutions: ConfigCommand.getCustomConfigResolutions(true) 53 resolutions: ConfigCommand.getCustomConfigResolutions(resolutions === 'max')
51 } 54 }
52 } 55 }
53 } 56 }
diff --git a/shared/server-commands/videos/live-command.ts b/shared/server-commands/videos/live-command.ts
index 2ff65881b..3df47ed4d 100644
--- a/shared/server-commands/videos/live-command.ts
+++ b/shared/server-commands/videos/live-command.ts
@@ -154,13 +154,33 @@ export class LiveCommand extends AbstractCommand {
154 154
155 waitUntilSegmentGeneration (options: OverrideCommandOptions & { 155 waitUntilSegmentGeneration (options: OverrideCommandOptions & {
156 videoUUID: string 156 videoUUID: string
157 resolution: number 157 playlistNumber: number
158 segment: number 158 segment: number
159 totalSessions?: number
159 }) { 160 }) {
160 const { resolution, segment, videoUUID } = options 161 const { playlistNumber, segment, videoUUID, totalSessions = 1 } = options
161 const segmentName = `${resolution}-00000${segment}.ts` 162 const segmentName = `${playlistNumber}-00000${segment}.ts`
162 163
163 return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, 2, false) 164 return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, totalSessions * 2, false)
165 }
166
167 getSegment (options: OverrideCommandOptions & {
168 videoUUID: string
169 playlistNumber: number
170 segment: number
171 }) {
172 const { playlistNumber, segment, videoUUID } = options
173
174 const segmentName = `${playlistNumber}-00000${segment}.ts`
175 const url = `${this.server.url}/static/streaming-playlists/hls/${videoUUID}/${segmentName}`
176
177 return this.getRawRequest({
178 ...options,
179
180 url,
181 implicitToken: false,
182 defaultExpectedStatus: HttpStatusCode.OK_200
183 })
164 } 184 }
165 185
166 async waitUntilReplacedByReplay (options: OverrideCommandOptions & { 186 async waitUntilReplacedByReplay (options: OverrideCommandOptions & {