]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Fix fast restream in saved permanent live
authorChocobozzz <me@florianbigard.com>
Thu, 23 Jun 2022 08:29:43 +0000 (10:29 +0200)
committerChocobozzz <me@florianbigard.com>
Thu, 23 Jun 2022 08:30:48 +0000 (10:30 +0200)
server/initializers/constants.ts
server/lib/job-queue/handlers/video-live-ending.ts
server/lib/live/live-utils.ts
server/tests/api/live/index.ts
server/tests/api/live/live-fast-restream.ts [new file with mode: 0644]
server/tests/api/live/live-save-replay.ts
server/tests/api/live/live.ts
shared/server-commands/server/config-command.ts
shared/server-commands/videos/live-command.ts

index 9201f95b39bb98ef09f6de8697171c68d1e986f5..6c6628d2862bc9b67d8d4cec32281a7eac16bc11 100644 (file)
@@ -865,7 +865,7 @@ if (isTestInstance() === true) {
 
   PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000
 
-  VIDEO_LIVE.CLEANUP_DELAY = 5000
+  VIDEO_LIVE.CLEANUP_DELAY = getIntEnv('PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY') ?? 5000
   VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY = 2
   VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY = 1
   VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION = 1
@@ -1174,3 +1174,9 @@ function buildLanguages () {
 function generateContentHash () {
   return randomBytes(20).toString('hex')
 }
+
+function getIntEnv (path: string) {
+  if (process.env[path]) return parseInt(process.env[path])
+
+  return undefined
+}
index feec257fca2852198a35bfb22faddc2235de2b1f..450bda2fdf97be765150bc4ef8ce8c2dd72bd4ae 100644 (file)
@@ -4,7 +4,7 @@ import { join } from 'path'
 import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg'
 import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
 import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
-import { cleanupNormalLive, cleanupPermanentLive, cleanupTMPLiveFiles, LiveSegmentShaStore } from '@server/lib/live'
+import { cleanupUnsavedNormalLive, cleanupPermanentLive, cleanupTMPLiveFiles, LiveSegmentShaStore } from '@server/lib/live'
 import {
   generateHLSMasterPlaylistFilename,
   generateHlsSha256SegmentsFilename,
@@ -22,15 +22,17 @@ import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
 import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
 import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models'
 import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
-import { logger } from '../../../helpers/logger'
+import { logger, loggerTagsFactory } from '../../../helpers/logger'
+
+const lTags = loggerTagsFactory('live', 'job')
 
 async function processVideoLiveEnding (job: Job) {
   const payload = job.data as VideoLiveEndingPayload
 
-  logger.info('Processing video live ending for %s.', payload.videoId, { payload })
+  logger.info('Processing video live ending for %s.', payload.videoId, { payload, ...lTags() })
 
   function logError () {
-    logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId)
+    logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId, lTags())
   }
 
   const liveVideo = await VideoModel.load(payload.videoId)
@@ -73,8 +75,6 @@ async function saveReplayToExternalVideo (options: {
 }) {
   const { liveVideo, liveSession, publishedAt, replayDirectory } = options
 
-  await cleanupTMPLiveFiles(getLiveDirectory(liveVideo))
-
   const video = new VideoModel({
     name: `${liveVideo.name} - ${new Date(publishedAt).toLocaleString()}`,
     isLive: false,
@@ -243,7 +243,7 @@ async function cleanupLiveAndFederate (options: {
     if (live.permanentLive) {
       await cleanupPermanentLive(video, streamingPlaylist)
     } else {
-      await cleanupNormalLive(video, streamingPlaylist)
+      await cleanupUnsavedNormalLive(video, streamingPlaylist)
     }
   }
 
index 6365e23db371892b4b14601606587bdefe534eb6..6305a97a86fadcabd68dee1cb450a46248db6fc4 100644 (file)
@@ -10,20 +10,20 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) {
   return 'concat-' + num[1] + '.ts'
 }
 
-async function cleanupPermanentLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) {
+async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
   const hlsDirectory = getLiveDirectory(video)
 
   await cleanupTMPLiveFiles(hlsDirectory)
 
-  if (streamingPlaylist) await streamingPlaylist.destroy()
+  await streamingPlaylist.destroy()
 }
 
-async function cleanupNormalLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) {
+async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
   const hlsDirectory = getLiveDirectory(video)
 
   await remove(hlsDirectory)
 
-  if (streamingPlaylist) await streamingPlaylist.destroy()
+  await streamingPlaylist.destroy()
 }
 
 async function cleanupTMPLiveFiles (hlsDirectory: string) {
@@ -49,7 +49,7 @@ async function cleanupTMPLiveFiles (hlsDirectory: string) {
 
 export {
   cleanupPermanentLive,
-  cleanupNormalLive,
+  cleanupUnsavedNormalLive,
   cleanupTMPLiveFiles,
   buildConcatenatedName
 }
index 71bc150d8d66d44bfccea8ad2277dfa0ee027771..c88943f652921f78b223ae333b719a6f70f1c4f7 100644 (file)
@@ -1,4 +1,5 @@
 import './live-constraints'
+import './live-fast-restream'
 import './live-socket-messages'
 import './live-permanent'
 import './live-rtmps'
diff --git a/server/tests/api/live/live-fast-restream.ts b/server/tests/api/live/live-fast-restream.ts
new file mode 100644 (file)
index 0000000..4b5d041
--- /dev/null
@@ -0,0 +1,128 @@
+/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
+
+import 'mocha'
+import * as chai from 'chai'
+import { wait } from '@shared/core-utils'
+import { HttpStatusCode, LiveVideoCreate, VideoPrivacy } from '@shared/models'
+import {
+  cleanupTests,
+  createSingleServer,
+  makeRawRequest,
+  PeerTubeServer,
+  setAccessTokensToServers,
+  setDefaultVideoChannel,
+  stopFfmpeg,
+  waitJobs
+} from '@shared/server-commands'
+
+const expect = chai.expect
+
+describe('Fast restream in live', function () {
+  let server: PeerTubeServer
+
+  async function createLiveWrapper (options: { permanent: boolean, replay: boolean }) {
+    const attributes: LiveVideoCreate = {
+      channelId: server.store.channel.id,
+      privacy: VideoPrivacy.PUBLIC,
+      name: 'my super live',
+      saveReplay: options.replay,
+      permanentLive: options.permanent
+    }
+
+    const { uuid } = await server.live.create({ fields: attributes })
+    return uuid
+  }
+
+  async function fastRestreamWrapper ({ replay }: { replay: boolean }) {
+    const liveVideoUUID = await createLiveWrapper({ permanent: true, replay })
+    await waitJobs([ server ])
+
+    const rtmpOptions = {
+      videoId: liveVideoUUID,
+      copyCodecs: true,
+      fixtureName: 'video_short.mp4'
+    }
+
+    // Streaming session #1
+    let ffmpegCommand = await server.live.sendRTMPStreamInVideo(rtmpOptions)
+    await server.live.waitUntilPublished({ videoId: liveVideoUUID })
+    await stopFfmpeg(ffmpegCommand)
+    await server.live.waitUntilWaiting({ videoId: liveVideoUUID })
+
+    // Streaming session #2
+    ffmpegCommand = await server.live.sendRTMPStreamInVideo(rtmpOptions)
+    await server.live.waitUntilSegmentGeneration({ videoUUID: liveVideoUUID, segment: 0, playlistNumber: 0, totalSessions: 2 })
+
+    return { ffmpegCommand, liveVideoUUID }
+  }
+
+  async function ensureLastLiveWorks (liveId: string) {
+    // Equivalent to PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY
+    for (let i = 0; i < 100; i++) {
+      const video = await server.videos.get({ id: liveId })
+      expect(video.streamingPlaylists).to.have.lengthOf(1)
+
+      await server.live.getSegment({ videoUUID: liveId, segment: 0, playlistNumber: 0 })
+      await makeRawRequest(video.streamingPlaylists[0].playlistUrl, HttpStatusCode.OK_200)
+
+      await wait(100)
+    }
+  }
+
+  async function runTest (replay: boolean) {
+    const { ffmpegCommand, liveVideoUUID } = await fastRestreamWrapper({ replay })
+
+    await ensureLastLiveWorks(liveVideoUUID)
+
+    await stopFfmpeg(ffmpegCommand)
+    await server.live.waitUntilWaiting({ videoId: liveVideoUUID })
+
+    // Wait for replays
+    await waitJobs([ server ])
+
+    const { total, data: sessions } = await server.live.listSessions({ videoId: liveVideoUUID })
+
+    expect(total).to.equal(2)
+    expect(sessions).to.have.lengthOf(2)
+
+    for (const session of sessions) {
+      expect(session.error).to.be.null
+
+      if (replay) {
+        expect(session.replayVideo).to.exist
+
+        await server.videos.get({ id: session.replayVideo.uuid })
+      } else {
+        expect(session.replayVideo).to.not.exist
+      }
+    }
+  }
+
+  before(async function () {
+    this.timeout(120000)
+
+    const env = { 'PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY': '10000' }
+    server = await createSingleServer(1, {}, { env })
+
+    // Get the access tokens
+    await setAccessTokensToServers([ server ])
+    await setDefaultVideoChannel([ server ])
+
+    await server.config.enableMinimumTranscoding(false, true)
+    await server.config.enableLive({ allowReplay: true, transcoding: true, resolutions: 'min' })
+  })
+
+  it('Should correctly fast reastream in a permanent live with and without save replay', async function () {
+    this.timeout(240000)
+
+    // A test can take a long time, so prefer to run them in parallel
+    await Promise.all([
+      runTest(true),
+      runTest(false)
+    ])
+  })
+
+  after(async function () {
+    await cleanupTests([ server ])
+  })
+})
index 99d500711c08ab0883c737a43a7a98df5d54f416..7ddcb04ef6bbee6c5bf4913d94ec815a049d9093 100644 (file)
@@ -12,7 +12,6 @@ import {
   createMultipleServers,
   doubleFollow,
   findExternalSavedVideo,
-  makeRawRequest,
   PeerTubeServer,
   setAccessTokensToServers,
   setDefaultVideoChannel,
@@ -442,46 +441,6 @@ describe('Save replay setting', function () {
       await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404)
       await checkLiveCleanup(servers[0], liveVideoUUID, [])
     })
-
-    it('Should correctly save replays with multiple sessions', async function () {
-      this.timeout(120000)
-
-      liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true })
-      await waitJobs(servers)
-
-      // Streaming session #1
-      ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
-      await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
-      await stopFfmpeg(ffmpegCommand)
-      await servers[0].live.waitUntilWaiting({ videoId: liveVideoUUID })
-
-      // Streaming session #2
-      ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
-      await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
-
-      await wait(5000)
-      const video = await servers[0].videos.get({ id: liveVideoUUID })
-      expect(video.streamingPlaylists).to.have.lengthOf(1)
-      await makeRawRequest(video.streamingPlaylists[0].playlistUrl)
-
-      await stopFfmpeg(ffmpegCommand)
-      await waitUntilLiveWaitingOnAllServers(servers, liveVideoUUID)
-
-      // Wait for replays
-      await waitJobs(servers)
-
-      const { total, data: sessions } = await servers[0].live.listSessions({ videoId: liveVideoUUID })
-
-      expect(total).to.equal(2)
-      expect(sessions).to.have.lengthOf(2)
-
-      for (const session of sessions) {
-        expect(session.error).to.be.null
-        expect(session.replayVideo).to.exist
-
-        await servers[0].videos.get({ id: session.replayVideo.uuid })
-      }
-    })
   })
 
   after(async function () {
index 9b8fbe3e2433ca23b48576e4a45d863de197763f..5d354aad1cbdc77124b4365c86bcfb1e3ac0246c 100644 (file)
@@ -395,7 +395,7 @@ describe('Test live', function () {
         for (let i = 0; i < resolutions.length; i++) {
           const segmentNum = 3
           const segmentName = `${i}-00000${segmentNum}.ts`
-          await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, resolution: i, segment: segmentNum })
+          await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, playlistNumber: i, segment: segmentNum })
 
           const subPlaylist = await servers[0].streamingPlaylists.get({
             url: `${servers[0].url}/static/streaming-playlists/hls/${video.uuid}/${i}.m3u8`
@@ -628,9 +628,9 @@ describe('Test live', function () {
         commands[0].waitUntilPublished({ videoId: liveVideoReplayId })
       ])
 
-      await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoId, resolution: 0, segment: 2 })
-      await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoReplayId, resolution: 0, segment: 2 })
-      await commands[0].waitUntilSegmentGeneration({ videoUUID: permanentLiveVideoReplayId, resolution: 0, segment: 2 })
+      await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoId, playlistNumber: 0, segment: 2 })
+      await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoReplayId, playlistNumber: 0, segment: 2 })
+      await commands[0].waitUntilSegmentGeneration({ videoUUID: permanentLiveVideoReplayId, playlistNumber: 0, segment: 2 })
 
       {
         const video = await servers[0].videos.get({ id: permanentLiveVideoReplayId })
index 5320dead4705bfe42cbba88d944ce33e1a4174f8..3803aaf9527e419eb331653550085f2533708d3b 100644 (file)
@@ -39,15 +39,18 @@ export class ConfigCommand extends AbstractCommand {
   enableLive (options: {
     allowReplay?: boolean
     transcoding?: boolean
+    resolutions?: 'min' | 'max' // Default max
   } = {}) {
+    const { allowReplay, transcoding, resolutions = 'max' } = options
+
     return this.updateExistingSubConfig({
       newConfig: {
         live: {
           enabled: true,
-          allowReplay: options.allowReplay ?? true,
+          allowReplay: allowReplay ?? true,
           transcoding: {
-            enabled: options.transcoding ?? true,
-            resolutions: ConfigCommand.getCustomConfigResolutions(true)
+            enabled: transcoding ?? true,
+            resolutions: ConfigCommand.getCustomConfigResolutions(resolutions === 'max')
           }
         }
       }
index 2ff65881be3a108c95ab87d13937befa8a943428..3df47ed4d7d8f2799ae84293b71668711197541f 100644 (file)
@@ -154,13 +154,33 @@ export class LiveCommand extends AbstractCommand {
 
   waitUntilSegmentGeneration (options: OverrideCommandOptions & {
     videoUUID: string
-    resolution: number
+    playlistNumber: number
     segment: number
+    totalSessions?: number
   }) {
-    const { resolution, segment, videoUUID } = options
-    const segmentName = `${resolution}-00000${segment}.ts`
+    const { playlistNumber, segment, videoUUID, totalSessions = 1 } = options
+    const segmentName = `${playlistNumber}-00000${segment}.ts`
 
-    return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, 2, false)
+    return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, totalSessions * 2, false)
+  }
+
+  getSegment (options: OverrideCommandOptions & {
+    videoUUID: string
+    playlistNumber: number
+    segment: number
+  }) {
+    const { playlistNumber, segment, videoUUID } = options
+
+    const segmentName = `${playlistNumber}-00000${segment}.ts`
+    const url = `${this.server.url}/static/streaming-playlists/hls/${videoUUID}/${segmentName}`
+
+    return this.getRawRequest({
+      ...options,
+
+      url,
+      implicitToken: false,
+      defaultExpectedStatus: HttpStatusCode.OK_200
+    })
   }
 
   async waitUntilReplacedByReplay (options: OverrideCommandOptions & {