PLUGIN_EXTERNAL_AUTH_TOKEN_LIFETIME = 5000
- VIDEO_LIVE.CLEANUP_DELAY = 5000
+ VIDEO_LIVE.CLEANUP_DELAY = getIntEnv('PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY') ?? 5000
VIDEO_LIVE.SEGMENT_TIME_SECONDS.DEFAULT_LATENCY = 2
VIDEO_LIVE.SEGMENT_TIME_SECONDS.SMALL_LATENCY = 1
VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION = 1
function generateContentHash () {
return randomBytes(20).toString('hex')
}
+
+function getIntEnv (path: string) {
+ if (process.env[path]) return parseInt(process.env[path])
+
+ return undefined
+}
import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg'
import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
-import { cleanupNormalLive, cleanupPermanentLive, cleanupTMPLiveFiles, LiveSegmentShaStore } from '@server/lib/live'
+import { cleanupUnsavedNormalLive, cleanupPermanentLive, cleanupTMPLiveFiles, LiveSegmentShaStore } from '@server/lib/live'
import {
generateHLSMasterPlaylistFilename,
generateHlsSha256SegmentsFilename,
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models'
import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
-import { logger } from '../../../helpers/logger'
+import { logger, loggerTagsFactory } from '../../../helpers/logger'
+
+const lTags = loggerTagsFactory('live', 'job')
async function processVideoLiveEnding (job: Job) {
const payload = job.data as VideoLiveEndingPayload
- logger.info('Processing video live ending for %s.', payload.videoId, { payload })
+ logger.info('Processing video live ending for %s.', payload.videoId, { payload, ...lTags() })
function logError () {
- logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId)
+ logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId, lTags())
}
const liveVideo = await VideoModel.load(payload.videoId)
}) {
const { liveVideo, liveSession, publishedAt, replayDirectory } = options
- await cleanupTMPLiveFiles(getLiveDirectory(liveVideo))
-
const video = new VideoModel({
name: `${liveVideo.name} - ${new Date(publishedAt).toLocaleString()}`,
isLive: false,
if (live.permanentLive) {
await cleanupPermanentLive(video, streamingPlaylist)
} else {
- await cleanupNormalLive(video, streamingPlaylist)
+ await cleanupUnsavedNormalLive(video, streamingPlaylist)
}
}
return 'concat-' + num[1] + '.ts'
}
-async function cleanupPermanentLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) {
+async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
const hlsDirectory = getLiveDirectory(video)
await cleanupTMPLiveFiles(hlsDirectory)
- if (streamingPlaylist) await streamingPlaylist.destroy()
+ await streamingPlaylist.destroy()
}
-async function cleanupNormalLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) {
+async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
const hlsDirectory = getLiveDirectory(video)
await remove(hlsDirectory)
- if (streamingPlaylist) await streamingPlaylist.destroy()
+ await streamingPlaylist.destroy()
}
async function cleanupTMPLiveFiles (hlsDirectory: string) {
export {
cleanupPermanentLive,
- cleanupNormalLive,
+ cleanupUnsavedNormalLive,
cleanupTMPLiveFiles,
buildConcatenatedName
}
import './live-constraints'
+import './live-fast-restream'
import './live-socket-messages'
import './live-permanent'
import './live-rtmps'
--- /dev/null
+/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
+
+import 'mocha'
+import * as chai from 'chai'
+import { wait } from '@shared/core-utils'
+import { HttpStatusCode, LiveVideoCreate, VideoPrivacy } from '@shared/models'
+import {
+ cleanupTests,
+ createSingleServer,
+ makeRawRequest,
+ PeerTubeServer,
+ setAccessTokensToServers,
+ setDefaultVideoChannel,
+ stopFfmpeg,
+ waitJobs
+} from '@shared/server-commands'
+
+const expect = chai.expect
+
+describe('Fast restream in live', function () {
+ let server: PeerTubeServer
+
+ async function createLiveWrapper (options: { permanent: boolean, replay: boolean }) {
+ const attributes: LiveVideoCreate = {
+ channelId: server.store.channel.id,
+ privacy: VideoPrivacy.PUBLIC,
+ name: 'my super live',
+ saveReplay: options.replay,
+ permanentLive: options.permanent
+ }
+
+ const { uuid } = await server.live.create({ fields: attributes })
+ return uuid
+ }
+
+ async function fastRestreamWrapper ({ replay }: { replay: boolean }) {
+ const liveVideoUUID = await createLiveWrapper({ permanent: true, replay })
+ await waitJobs([ server ])
+
+ const rtmpOptions = {
+ videoId: liveVideoUUID,
+ copyCodecs: true,
+ fixtureName: 'video_short.mp4'
+ }
+
+ // Streaming session #1
+ let ffmpegCommand = await server.live.sendRTMPStreamInVideo(rtmpOptions)
+ await server.live.waitUntilPublished({ videoId: liveVideoUUID })
+ await stopFfmpeg(ffmpegCommand)
+ await server.live.waitUntilWaiting({ videoId: liveVideoUUID })
+
+ // Streaming session #2
+ ffmpegCommand = await server.live.sendRTMPStreamInVideo(rtmpOptions)
+ await server.live.waitUntilSegmentGeneration({ videoUUID: liveVideoUUID, segment: 0, playlistNumber: 0, totalSessions: 2 })
+
+ return { ffmpegCommand, liveVideoUUID }
+ }
+
+ async function ensureLastLiveWorks (liveId: string) {
+ // Equivalent to PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY
+ for (let i = 0; i < 100; i++) {
+ const video = await server.videos.get({ id: liveId })
+ expect(video.streamingPlaylists).to.have.lengthOf(1)
+
+ await server.live.getSegment({ videoUUID: liveId, segment: 0, playlistNumber: 0 })
+ await makeRawRequest(video.streamingPlaylists[0].playlistUrl, HttpStatusCode.OK_200)
+
+ await wait(100)
+ }
+ }
+
+ async function runTest (replay: boolean) {
+ const { ffmpegCommand, liveVideoUUID } = await fastRestreamWrapper({ replay })
+
+ await ensureLastLiveWorks(liveVideoUUID)
+
+ await stopFfmpeg(ffmpegCommand)
+ await server.live.waitUntilWaiting({ videoId: liveVideoUUID })
+
+ // Wait for replays
+ await waitJobs([ server ])
+
+ const { total, data: sessions } = await server.live.listSessions({ videoId: liveVideoUUID })
+
+ expect(total).to.equal(2)
+ expect(sessions).to.have.lengthOf(2)
+
+ for (const session of sessions) {
+ expect(session.error).to.be.null
+
+ if (replay) {
+ expect(session.replayVideo).to.exist
+
+ await server.videos.get({ id: session.replayVideo.uuid })
+ } else {
+ expect(session.replayVideo).to.not.exist
+ }
+ }
+ }
+
+ before(async function () {
+ this.timeout(120000)
+
+ const env = { 'PEERTUBE_TEST_CONSTANTS.VIDEO_LIVE.CLEANUP_DELAY': '10000' }
+ server = await createSingleServer(1, {}, { env })
+
+ // Get the access tokens
+ await setAccessTokensToServers([ server ])
+ await setDefaultVideoChannel([ server ])
+
+ await server.config.enableMinimumTranscoding(false, true)
+ await server.config.enableLive({ allowReplay: true, transcoding: true, resolutions: 'min' })
+ })
+
+ it('Should correctly fast reastream in a permanent live with and without save replay', async function () {
+ this.timeout(240000)
+
+ // A test can take a long time, so prefer to run them in parallel
+ await Promise.all([
+ runTest(true),
+ runTest(false)
+ ])
+ })
+
+ after(async function () {
+ await cleanupTests([ server ])
+ })
+})
createMultipleServers,
doubleFollow,
findExternalSavedVideo,
- makeRawRequest,
PeerTubeServer,
setAccessTokensToServers,
setDefaultVideoChannel,
await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404)
await checkLiveCleanup(servers[0], liveVideoUUID, [])
})
-
- it('Should correctly save replays with multiple sessions', async function () {
- this.timeout(120000)
-
- liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true })
- await waitJobs(servers)
-
- // Streaming session #1
- ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
- await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
- await stopFfmpeg(ffmpegCommand)
- await servers[0].live.waitUntilWaiting({ videoId: liveVideoUUID })
-
- // Streaming session #2
- ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
- await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
-
- await wait(5000)
- const video = await servers[0].videos.get({ id: liveVideoUUID })
- expect(video.streamingPlaylists).to.have.lengthOf(1)
- await makeRawRequest(video.streamingPlaylists[0].playlistUrl)
-
- await stopFfmpeg(ffmpegCommand)
- await waitUntilLiveWaitingOnAllServers(servers, liveVideoUUID)
-
- // Wait for replays
- await waitJobs(servers)
-
- const { total, data: sessions } = await servers[0].live.listSessions({ videoId: liveVideoUUID })
-
- expect(total).to.equal(2)
- expect(sessions).to.have.lengthOf(2)
-
- for (const session of sessions) {
- expect(session.error).to.be.null
- expect(session.replayVideo).to.exist
-
- await servers[0].videos.get({ id: session.replayVideo.uuid })
- }
- })
})
after(async function () {
for (let i = 0; i < resolutions.length; i++) {
const segmentNum = 3
const segmentName = `${i}-00000${segmentNum}.ts`
- await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, resolution: i, segment: segmentNum })
+ await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, playlistNumber: i, segment: segmentNum })
const subPlaylist = await servers[0].streamingPlaylists.get({
url: `${servers[0].url}/static/streaming-playlists/hls/${video.uuid}/${i}.m3u8`
commands[0].waitUntilPublished({ videoId: liveVideoReplayId })
])
- await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoId, resolution: 0, segment: 2 })
- await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoReplayId, resolution: 0, segment: 2 })
- await commands[0].waitUntilSegmentGeneration({ videoUUID: permanentLiveVideoReplayId, resolution: 0, segment: 2 })
+ await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoId, playlistNumber: 0, segment: 2 })
+ await commands[0].waitUntilSegmentGeneration({ videoUUID: liveVideoReplayId, playlistNumber: 0, segment: 2 })
+ await commands[0].waitUntilSegmentGeneration({ videoUUID: permanentLiveVideoReplayId, playlistNumber: 0, segment: 2 })
{
const video = await servers[0].videos.get({ id: permanentLiveVideoReplayId })
enableLive (options: {
allowReplay?: boolean
transcoding?: boolean
+ resolutions?: 'min' | 'max' // Default max
} = {}) {
+ const { allowReplay, transcoding, resolutions = 'max' } = options
+
return this.updateExistingSubConfig({
newConfig: {
live: {
enabled: true,
- allowReplay: options.allowReplay ?? true,
+ allowReplay: allowReplay ?? true,
transcoding: {
- enabled: options.transcoding ?? true,
- resolutions: ConfigCommand.getCustomConfigResolutions(true)
+ enabled: transcoding ?? true,
+ resolutions: ConfigCommand.getCustomConfigResolutions(resolutions === 'max')
}
}
}
waitUntilSegmentGeneration (options: OverrideCommandOptions & {
videoUUID: string
- resolution: number
+ playlistNumber: number
segment: number
+ totalSessions?: number
}) {
- const { resolution, segment, videoUUID } = options
- const segmentName = `${resolution}-00000${segment}.ts`
+ const { playlistNumber, segment, videoUUID, totalSessions = 1 } = options
+ const segmentName = `${playlistNumber}-00000${segment}.ts`
- return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, 2, false)
+ return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, totalSessions * 2, false)
+ }
+
+ getSegment (options: OverrideCommandOptions & {
+ videoUUID: string
+ playlistNumber: number
+ segment: number
+ }) {
+ const { playlistNumber, segment, videoUUID } = options
+
+ const segmentName = `${playlistNumber}-00000${segment}.ts`
+ const url = `${this.server.url}/static/streaming-playlists/hls/${videoUUID}/${segmentName}`
+
+ return this.getRawRequest({
+ ...options,
+
+ url,
+ implicitToken: false,
+ defaultExpectedStatus: HttpStatusCode.OK_200
+ })
}
async waitUntilReplacedByReplay (options: OverrideCommandOptions & {