import { Job } from 'bull'
import { pathExists, readdir, remove } from 'fs-extra'
import { join } from 'path'
-import { ffprobePromise, getAudioStream, getVideoStreamDuration, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg'
-import { VIDEO_LIVE } from '@server/initializers/constants'
-import { buildConcatenatedName, cleanupLive, LiveSegmentShaStore } from '@server/lib/live'
-import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveDirectory } from '@server/lib/paths'
+import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg'
+import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
+import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
+import { cleanupLive, LiveSegmentShaStore } from '@server/lib/live'
+import {
+ generateHLSMasterPlaylistFilename,
+ generateHlsSha256SegmentsFilename,
+ getLiveDirectory,
+ getLiveReplayBaseDirectory
+} from '@server/lib/paths'
import { generateVideoMiniature } from '@server/lib/thumbnail'
import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding'
-import { VideoPathManager } from '@server/lib/video-path-manager'
import { moveToNextState } from '@server/lib/video-state'
import { VideoModel } from '@server/models/video/video'
import { VideoFileModel } from '@server/models/video/video-file'
import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
-import { MStreamingPlaylist, MVideo, MVideoLive } from '@server/types/models'
+import { MVideo, MVideoLive, MVideoWithAllFiles } from '@server/types/models'
import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models'
import { logger } from '../../../helpers/logger'
+import { VideoBlacklistModel } from '@server/models/video/video-blacklist'
async function processVideoLiveEnding (job: Job) {
const payload = job.data as VideoLiveEndingPayload
+ logger.info('Processing video live ending for %s.', payload.videoId, { payload })
+
function logError () {
logger.warn('Video live %d does not exist anymore. Cannot process live ending.', payload.videoId)
}
return
}
- const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
- if (!streamingPlaylist) {
- logError()
- return
- }
-
LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid)
if (live.saveReplay !== true) {
- return cleanupLive(video, streamingPlaylist)
+ return cleanupLiveAndFederate(video)
}
- return saveLive(video, live, streamingPlaylist)
+ if (live.permanentLive) {
+ await saveReplayToExternalVideo(video, payload.publishedAt, payload.replayDirectory)
+
+ return cleanupLiveAndFederate(video)
+ }
+
+ return replaceLiveByReplay(video, live, payload.replayDirectory)
}
// ---------------------------------------------------------------------------
// ---------------------------------------------------------------------------
-async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MStreamingPlaylist) {
- const replayDirectory = VideoPathManager.Instance.getFSHLSOutputPath(video, VIDEO_LIVE.REPLAY_DIRECTORY)
+async function saveReplayToExternalVideo (liveVideo: MVideo, publishedAt: string, replayDirectory: string) {
+ await cleanupTMPLiveFiles(getLiveDirectory(liveVideo))
+
+ const video = new VideoModel({
+ name: `${liveVideo.name} - ${new Date(publishedAt).toLocaleString()}`,
+ isLive: false,
+ state: VideoState.TO_TRANSCODE,
+ duration: 0,
+
+ remote: liveVideo.remote,
+ category: liveVideo.category,
+ licence: liveVideo.licence,
+ language: liveVideo.language,
+ commentsEnabled: liveVideo.commentsEnabled,
+ downloadEnabled: liveVideo.downloadEnabled,
+ waitTranscoding: liveVideo.waitTranscoding,
+ nsfw: liveVideo.nsfw,
+ description: liveVideo.description,
+ support: liveVideo.support,
+ privacy: liveVideo.privacy,
+ channelId: liveVideo.channelId
+ }) as MVideoWithAllFiles
+
+ video.Thumbnails = []
+ video.VideoFiles = []
+ video.VideoStreamingPlaylists = []
+
+ video.url = getLocalVideoActivityPubUrl(video)
+
+ await video.save()
+
+ // If live is blacklisted, also blacklist the replay
+ const blacklist = await VideoBlacklistModel.loadByVideoId(liveVideo.id)
+ if (blacklist) {
+ await VideoBlacklistModel.create({
+ videoId: video.id,
+ unfederated: blacklist.unfederated,
+ reason: blacklist.reason,
+ type: blacklist.type
+ })
+ }
+
+ await assignReplaysToVideo(video, replayDirectory)
- const rootFiles = await readdir(getLiveDirectory(video))
+ await remove(replayDirectory)
+
+ for (const type of [ ThumbnailType.MINIATURE, ThumbnailType.PREVIEW ]) {
+ const image = await generateVideoMiniature({ video, videoFile: video.getMaxQualityFile(), type })
+ await video.addAndSaveThumbnail(image)
+ }
- const playlistFiles = rootFiles.filter(file => {
- return file.endsWith('.m3u8') && file !== streamingPlaylist.playlistFilename
- })
+ await moveToNextState({ video, isNewVideo: true })
+}
+async function replaceLiveByReplay (video: MVideo, live: MVideoLive, replayDirectory: string) {
await cleanupTMPLiveFiles(getLiveDirectory(video))
await live.destroy()
video.isLive = false
- // Reinit views
- video.views = 0
video.state = VideoState.TO_TRANSCODE
await video.save()
hlsPlaylist.segmentsSha256Filename = generateHlsSha256SegmentsFilename()
await hlsPlaylist.save()
+ await assignReplaysToVideo(videoWithFiles, replayDirectory)
+
+ await remove(getLiveReplayBaseDirectory(videoWithFiles))
+
+ // Regenerate the thumbnail & preview?
+ if (videoWithFiles.getMiniature().automaticallyGenerated === true) {
+ const miniature = await generateVideoMiniature({
+ video: videoWithFiles,
+ videoFile: videoWithFiles.getMaxQualityFile(),
+ type: ThumbnailType.MINIATURE
+ })
+ await video.addAndSaveThumbnail(miniature)
+ }
+
+ if (videoWithFiles.getPreview().automaticallyGenerated === true) {
+ const preview = await generateVideoMiniature({
+ video: videoWithFiles,
+ videoFile: videoWithFiles.getMaxQualityFile(),
+ type: ThumbnailType.PREVIEW
+ })
+ await video.addAndSaveThumbnail(preview)
+ }
+
+ await moveToNextState({ video: videoWithFiles, isNewVideo: false })
+}
+
+async function assignReplaysToVideo (video: MVideo, replayDirectory: string) {
let durationDone = false
- for (const playlistFile of playlistFiles) {
- const concatenatedTsFile = buildConcatenatedName(playlistFile)
+ const concatenatedTsFiles = await readdir(replayDirectory)
+
+ for (const concatenatedTsFile of concatenatedTsFiles) {
const concatenatedTsFilePath = join(replayDirectory, concatenatedTsFile)
const probe = await ffprobePromise(concatenatedTsFilePath)
const { resolution, isPortraitMode } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe)
const { resolutionPlaylistPath: outputPath } = await generateHlsPlaylistResolutionFromTS({
- video: videoWithFiles,
+ video,
concatenatedTsFilePath,
resolution,
isPortraitMode,
})
if (!durationDone) {
- videoWithFiles.duration = await getVideoStreamDuration(outputPath)
- await videoWithFiles.save()
+ video.duration = await getVideoStreamDuration(outputPath)
+ await video.save()
durationDone = true
}
}
- await remove(replayDirectory)
-
- // Regenerate the thumbnail & preview?
- if (videoWithFiles.getMiniature().automaticallyGenerated === true) {
- await generateVideoMiniature({
- video: videoWithFiles,
- videoFile: videoWithFiles.getMaxQualityFile(),
- type: ThumbnailType.MINIATURE
- })
- }
+ return video
+}
- if (videoWithFiles.getPreview().automaticallyGenerated === true) {
- await generateVideoMiniature({
- video: videoWithFiles,
- videoFile: videoWithFiles.getMaxQualityFile(),
- type: ThumbnailType.PREVIEW
- })
- }
+async function cleanupLiveAndFederate (video: MVideo) {
+ const streamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
+ await cleanupLive(video, streamingPlaylist)
- await moveToNextState({ video: videoWithFiles, isNewVideo: false })
+ const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.id)
+ return federateVideoIfNeeded(fullVideo, false, undefined)
}
async function cleanupTMPLiveFiles (hlsDirectory: string) {
-import { readFile } from 'fs-extra'
+import { readdir, readFile } from 'fs-extra'
import { createServer, Server } from 'net'
+import { join } from 'path'
import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
import {
computeLowerResolutionsToTranscode,
import { VideoLiveModel } from '@server/models/video/video-live'
import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models'
+import { wait } from '@shared/core-utils'
import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
import { federateVideoIfNeeded } from '../activitypub/videos'
import { JobQueue } from '../job-queue'
-import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename } from '../paths'
+import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths'
import { PeerTubeSocket } from '../peertube-socket'
import { LiveQuotaStore } from './live-quota-store'
import { LiveSegmentShaStore } from './live-segment-sha-store'
muxingSession.destroy()
- return this.onAfterMuxingCleanup(videoId)
+ return this.onAfterMuxingCleanup({ videoId })
.catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
})
live.Video = video
- setTimeout(() => {
- federateVideoIfNeeded(video, false)
- .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags }))
+ await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
- PeerTubeSocket.Instance.sendVideoLiveNewState(video)
- }, getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
+ try {
+ await federateVideoIfNeeded(video, false)
+ } catch (err) {
+ logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })
+ }
+
+ PeerTubeSocket.Instance.sendVideoLiveNewState(video)
} catch (err) {
logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags })
}
this.videoSessions.delete(videoId)
}
- private async onAfterMuxingCleanup (videoUUID: string, cleanupNow = false) {
+ private async onAfterMuxingCleanup (options: {
+ videoId: number | string
+ cleanupNow?: boolean // Default false
+ }) {
+ const { videoId, cleanupNow = false } = options
+
try {
- const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID)
+ const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
if (!fullVideo) return
const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
- if (!live.permanentLive) {
- JobQueue.Instance.createJob({
- type: 'video-live-ending',
- payload: {
- videoId: fullVideo.id
- }
- }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
-
- fullVideo.state = VideoState.LIVE_ENDED
- } else {
- fullVideo.state = VideoState.WAITING_FOR_LIVE
- }
+ JobQueue.Instance.createJob({
+ type: 'video-live-ending',
+ payload: {
+ videoId: fullVideo.id,
+ replayDirectory: live.saveReplay
+ ? await this.findReplayDirectory(fullVideo)
+ : undefined,
+ publishedAt: fullVideo.publishedAt.toISOString()
+ }
+ }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
+
+ fullVideo.state = live.permanentLive
+ ? VideoState.WAITING_FOR_LIVE
+ : VideoState.LIVE_ENDED
await fullVideo.save()
await federateVideoIfNeeded(fullVideo, false)
} catch (err) {
- logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) })
+ logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') })
}
}
const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
for (const uuid of videoUUIDs) {
- await this.onAfterMuxingCleanup(uuid, true)
+ await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true })
}
}
+ private async findReplayDirectory (video: MVideo) {
+ const directory = getLiveReplayBaseDirectory(video)
+ const files = await readdir(directory)
+
+ if (files.length === 0) return undefined
+
+ return join(directory, files.sort().reverse()[0])
+ }
+
private buildAllResolutionsToTranscode (originResolution: number) {
const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
? computeLowerResolutionsToTranscode(originResolution, 'live')
return 'concat-' + num[1] + '.ts'
}
-async function cleanupLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
+async function cleanupLive (video: MVideo, streamingPlaylist?: MStreamingPlaylist) {
const hlsDirectory = getLiveDirectory(video)
await remove(hlsDirectory)
- await streamingPlaylist.destroy()
+ if (streamingPlaylist) await streamingPlaylist.destroy()
}
export {
import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
import { VideoFileModel } from '@server/models/video/video-file'
import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
-import { getLiveDirectory } from '../../paths'
+import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths'
import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
import { isAbleToUploadVideo } from '../../user'
import { LiveQuotaStore } from '../live-quota-store'
private readonly videoUUID: string
private readonly saveReplay: boolean
+ private readonly outDirectory: string
+ private readonly replayDirectory: string
+
private readonly lTags: LoggerTagsFn
private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
this.saveReplay = this.videoLive.saveReplay
+ this.outDirectory = getLiveDirectory(this.videoLive.Video)
+ this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
+
this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
}
async runMuxing () {
this.createFiles()
- const outPath = await this.prepareDirectories()
+ await this.prepareDirectories()
this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED
? await getLiveTranscodingCommand({
inputUrl: this.inputUrl,
- outPath,
+ outPath: this.outDirectory,
masterPlaylistName: this.streamingPlaylist.playlistFilename,
latencyMode: this.videoLive.latencyMode,
})
: getLiveMuxingCommand({
inputUrl: this.inputUrl,
- outPath,
+ outPath: this.outDirectory,
masterPlaylistName: this.streamingPlaylist.playlistFilename,
latencyMode: this.videoLive.latencyMode
})
logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
- this.watchTSFiles(outPath)
- this.watchMasterFile(outPath)
+ this.watchTSFiles(this.outDirectory)
+ this.watchMasterFile(this.outDirectory)
let ffmpegShellCommand: string
this.ffmpegCommand.on('start', cmdline => {
})
this.ffmpegCommand.on('error', (err, stdout, stderr) => {
- this.onFFmpegError({ err, stdout, stderr, outPath, ffmpegShellCommand })
+ this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand })
})
- this.ffmpegCommand.on('end', () => this.onFFmpegEnded(outPath))
+ this.ffmpegCommand.on('end', () => this.onFFmpegEnded(this.outDirectory))
this.ffmpegCommand.run()
}
}
private async prepareDirectories () {
- const outPath = getLiveDirectory(this.videoLive.Video)
- await ensureDir(outPath)
-
- const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY)
+ await ensureDir(this.outDirectory)
if (this.videoLive.saveReplay === true) {
- await ensureDir(replayDirectory)
+ await ensureDir(this.replayDirectory)
}
-
- return outPath
}
private isDurationConstraintValid (streamingStartTime: number) {
private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) {
const segmentName = basename(segmentPath)
- const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, buildConcatenatedName(segmentName))
+ const dest = join(this.replayDirectory, buildConcatenatedName(segmentName))
try {
const data = await readFile(segmentPath)
import { join } from 'path'
import { CONFIG } from '@server/initializers/config'
-import { HLS_REDUNDANCY_DIRECTORY, HLS_STREAMING_PLAYLIST_DIRECTORY } from '@server/initializers/constants'
+import { HLS_REDUNDANCY_DIRECTORY, HLS_STREAMING_PLAYLIST_DIRECTORY, VIDEO_LIVE } from '@server/initializers/constants'
import { isStreamingPlaylist, MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoUUID } from '@server/types/models'
import { removeFragmentedMP4Ext } from '@shared/core-utils'
import { buildUUID } from '@shared/extra-utils'
return getHLSDirectory(video)
}
+function getLiveReplayBaseDirectory (video: MVideoUUID) {
+ return join(getLiveDirectory(video), VIDEO_LIVE.REPLAY_DIRECTORY)
+}
+
function getHLSDirectory (video: MVideoUUID) {
return join(HLS_STREAMING_PLAYLIST_DIRECTORY, video.uuid)
}
getHLSDirectory,
getLiveDirectory,
+ getLiveReplayBaseDirectory,
getHLSRedundancyDirectory,
generateHLSMasterPlaylistFilename,
import { basename, extname as extnameUtil, join } from 'path'
import { toEven } from '@server/helpers/core-utils'
import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
-import { MStreamingPlaylistFilesVideo, MVideoFile, MVideoFullLight } from '@server/types/models'
+import { MStreamingPlaylistFilesVideo, MVideo, MVideoFile, MVideoFullLight } from '@server/types/models'
import { VideoResolution, VideoStorage } from '../../../shared/models/videos'
import { VideoStreamingPlaylistType } from '../../../shared/models/videos/video-streaming-playlist.type'
import {
+ buildFileMetadata,
canDoQuickTranscode,
getVideoStreamDuration,
- buildFileMetadata,
getVideoStreamFPS,
transcodeVOD,
TranscodeVODOptions,
// Concat TS segments from a live video to a fragmented mp4 HLS playlist
async function generateHlsPlaylistResolutionFromTS (options: {
- video: MVideoFullLight
+ video: MVideo
concatenatedTsFilePath: string
resolution: VideoResolution
isPortraitMode: boolean
// Generate an HLS playlist from an input file, and update the master playlist
function generateHlsPlaylistResolution (options: {
- video: MVideoFullLight
+ video: MVideo
videoInputPath: string
resolution: VideoResolution
copyCodecs: boolean
async function generateHlsPlaylistCommon (options: {
type: 'hls' | 'hls-from-ts'
- video: MVideoFullLight
+ video: MVideo
inputPath: string
resolution: VideoResolution
copyCodecs?: boolean
unfederated: options.unfederate === true,
reason: options.reason,
type: VideoBlacklistType.MANUAL
- }
- )
+ })
blacklist.Video = videoInstance
if (options.unfederate === true) {
})
}
- if (body.permanentLive && body.saveReplay) {
- cleanUpReqFiles(req)
-
- return res.fail({ message: 'Cannot set this live as permanent while saving its replay' })
- }
-
const user = res.locals.oauth.token.User
if (!await doesVideoChannelOfAccountExist(body.channelId, user, res)) return cleanUpReqFiles(req)
import { CONFIG } from '@server/initializers/config'
import { WEBSERVER } from '@server/initializers/constants'
import { MVideoLive, MVideoLiveVideo } from '@server/types/models'
-import { LiveVideo, LiveVideoLatencyMode, VideoState } from '@shared/models'
+import { LiveVideo, LiveVideoLatencyMode, VideoPrivacy, VideoState } from '@shared/models'
import { AttributesOnly } from '@shared/typescript-utils'
import { VideoModel } from './video'
import { VideoBlacklistModel } from './video-blacklist'
await makeUploadRequest({ url: server.url, path, token: server.accessToken, fields, attaches })
})
- it('Should fail with save replay and permanent live set to true', async function () {
- const fields = { ...baseCorrectParams, saveReplay: true, permanentLive: true }
-
- await makePostBodyRequest({ url: server.url, path, token: server.accessToken, fields })
- })
-
it('Should fail with bad latency setting', async function () {
const fields = { ...baseCorrectParams, latencyMode: 42 }
setDefaultVideoChannel,
waitJobs
} from '@shared/server-commands'
-import { checkLiveCleanupAfterSave } from '../../shared'
+import { checkLiveCleanup } from '../../shared'
const expect = chai.expect
expect(video.duration).to.be.greaterThan(0)
}
- await checkLiveCleanupAfterSave(servers[0], videoId, resolutions)
+ await checkLiveCleanup(servers[0], videoId, resolutions)
}
async function waitUntilLivePublishedOnAllServers (videoId: string) {
await waitJobs(servers)
})
- it('Should not have cleaned up this live', async function () {
+ it('Should have cleaned up this live', async function () {
this.timeout(40000)
await wait(5000)
for (const server of servers) {
const videoDetails = await server.videos.get({ id: videoUUID })
- expect(videoDetails.streamingPlaylists).to.have.lengthOf(1)
+
+ expect(videoDetails.streamingPlaylists).to.have.lengthOf(0)
}
})
import 'mocha'
import * as chai from 'chai'
import { FfmpegCommand } from 'fluent-ffmpeg'
-import { checkLiveCleanupAfterSave } from '@server/tests/shared'
+import { checkLiveCleanup } from '@server/tests/shared'
import { wait } from '@shared/core-utils'
import { HttpStatusCode, LiveVideoCreate, VideoPrivacy, VideoState } from '@shared/models'
import {
ConfigCommand,
createMultipleServers,
doubleFollow,
+ findExternalSavedVideo,
PeerTubeServer,
setAccessTokensToServers,
setDefaultVideoChannel,
testFfmpegStreamError,
waitJobs,
waitUntilLivePublishedOnAllServers,
- waitUntilLiveSavedOnAllServers
+ waitUntilLiveReplacedByReplayOnAllServers,
+ waitUntilLiveWaitingOnAllServers
} from '@shared/server-commands'
const expect = chai.expect
let liveVideoUUID: string
let ffmpegCommand: FfmpegCommand
- async function createLiveWrapper (saveReplay: boolean) {
+ async function createLiveWrapper (options: { permanent: boolean, replay: boolean }) {
if (liveVideoUUID) {
try {
await servers[0].videos.remove({ id: liveVideoUUID })
channelId: servers[0].store.channel.id,
privacy: VideoPrivacy.PUBLIC,
name: 'my super live',
- saveReplay
+ saveReplay: options.replay,
+ permanentLive: options.permanent
}
const { uuid } = await servers[0].live.create({ fields: attributes })
it('Should correctly create and federate the "waiting for stream" live', async function () {
this.timeout(20000)
- liveVideoUUID = await createLiveWrapper(false)
+ liveVideoUUID = await createLiveWrapper({ permanent: false, replay: false })
await waitJobs(servers)
await checkVideoState(liveVideoUUID, VideoState.LIVE_ENDED)
// No resolutions saved since we did not save replay
- await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [])
+ await checkLiveCleanup(servers[0], liveVideoUUID, [])
})
it('Should correctly terminate the stream on blacklist and delete the live', async function () {
this.timeout(40000)
- liveVideoUUID = await createLiveWrapper(false)
+ liveVideoUUID = await createLiveWrapper({ permanent: false, replay: false })
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
await wait(5000)
await waitJobs(servers)
- await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [])
+ await checkLiveCleanup(servers[0], liveVideoUUID, [])
})
it('Should correctly terminate the stream on delete and delete the video', async function () {
this.timeout(40000)
- liveVideoUUID = await createLiveWrapper(false)
+ liveVideoUUID = await createLiveWrapper({ permanent: false, replay: false })
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
await waitJobs(servers)
await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404)
- await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [])
+ await checkLiveCleanup(servers[0], liveVideoUUID, [])
})
})
- describe('With save replay enabled', function () {
+ describe('With save replay enabled on non permanent live', function () {
it('Should correctly create and federate the "waiting for stream" live', async function () {
this.timeout(20000)
- liveVideoUUID = await createLiveWrapper(true)
+ liveVideoUUID = await createLiveWrapper({ permanent: false, replay: true })
await waitJobs(servers)
await stopFfmpeg(ffmpegCommand)
- await waitUntilLiveSavedOnAllServers(servers, liveVideoUUID)
+ await waitUntilLiveReplacedByReplayOnAllServers(servers, liveVideoUUID)
await waitJobs(servers)
// Live has been transcoded
})
it('Should have cleaned up the live files', async function () {
- await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [ 720 ])
+ await checkLiveCleanup(servers[0], liveVideoUUID, [ 720 ])
})
it('Should correctly terminate the stream on blacklist and blacklist the saved replay video', async function () {
this.timeout(40000)
- liveVideoUUID = await createLiveWrapper(true)
+ liveVideoUUID = await createLiveWrapper({ permanent: false, replay: true })
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
await wait(5000)
await waitJobs(servers)
- await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [ 720 ])
+ await checkLiveCleanup(servers[0], liveVideoUUID, [ 720 ])
})
it('Should correctly terminate the stream on delete and delete the video', async function () {
this.timeout(40000)
- liveVideoUUID = await createLiveWrapper(true)
+ liveVideoUUID = await createLiveWrapper({ permanent: false, replay: true })
ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
await waitJobs(servers)
await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404)
- await checkLiveCleanupAfterSave(servers[0], liveVideoUUID, [])
+ await checkLiveCleanup(servers[0], liveVideoUUID, [])
+ })
+ })
+
+ describe('With save replay enabled on permanent live', function () {
+ let lastReplayUUID: string
+
+ it('Should correctly create and federate the "waiting for stream" live', async function () {
+ this.timeout(20000)
+
+ liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true })
+
+ await waitJobs(servers)
+
+ await checkVideosExist(liveVideoUUID, false, HttpStatusCode.OK_200)
+ await checkVideoState(liveVideoUUID, VideoState.WAITING_FOR_LIVE)
+ })
+
+ it('Should correctly have updated the live and federated it when streaming in the live', async function () {
+ this.timeout(20000)
+
+ ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
+ await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
+
+ await waitJobs(servers)
+
+ await checkVideosExist(liveVideoUUID, true, HttpStatusCode.OK_200)
+ await checkVideoState(liveVideoUUID, VideoState.PUBLISHED)
+ })
+
+ it('Should correctly have saved the live and federated it after the streaming', async function () {
+ this.timeout(30000)
+
+ const liveDetails = await servers[0].videos.get({ id: liveVideoUUID })
+
+ await stopFfmpeg(ffmpegCommand)
+
+ await waitUntilLiveWaitingOnAllServers(servers, liveVideoUUID)
+ await waitJobs(servers)
+
+ const video = await findExternalSavedVideo(servers[0], liveDetails)
+ expect(video).to.exist
+
+ for (const server of servers) {
+ await server.videos.get({ id: video.uuid })
+ }
+
+ lastReplayUUID = video.uuid
+ })
+
+ it('Should have cleaned up the live files', async function () {
+ await checkLiveCleanup(servers[0], liveVideoUUID, [])
+ })
+
+ it('Should correctly terminate the stream on blacklist and blacklist the saved replay video', async function () {
+ this.timeout(60000)
+
+ await servers[0].videos.remove({ id: lastReplayUUID })
+
+ liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true })
+
+ ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
+ await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
+
+ const liveDetails = await servers[0].videos.get({ id: liveVideoUUID })
+
+ await waitJobs(servers)
+ await checkVideosExist(liveVideoUUID, true, HttpStatusCode.OK_200)
+
+ await Promise.all([
+ servers[0].blacklist.add({ videoId: liveVideoUUID, reason: 'bad live', unfederate: true }),
+ testFfmpegStreamError(ffmpegCommand, true)
+ ])
+
+ await waitJobs(servers)
+ await wait(5000)
+ await waitJobs(servers)
+
+ const replay = await findExternalSavedVideo(servers[0], liveDetails)
+ expect(replay).to.exist
+
+ for (const videoId of [ liveVideoUUID, replay.uuid ]) {
+ await checkVideosExist(videoId, false)
+
+ await servers[0].videos.get({ id: videoId, expectedStatus: HttpStatusCode.UNAUTHORIZED_401 })
+ await servers[1].videos.get({ id: videoId, expectedStatus: HttpStatusCode.NOT_FOUND_404 })
+ }
+
+ await checkLiveCleanup(servers[0], liveVideoUUID, [])
+ })
+
+ it('Should correctly terminate the stream on delete and not save the video', async function () {
+ this.timeout(40000)
+
+ liveVideoUUID = await createLiveWrapper({ permanent: true, replay: true })
+
+ ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
+ await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
+
+ const liveDetails = await servers[0].videos.get({ id: liveVideoUUID })
+
+ await waitJobs(servers)
+ await checkVideosExist(liveVideoUUID, true, HttpStatusCode.OK_200)
+
+ await Promise.all([
+ servers[0].videos.remove({ id: liveVideoUUID }),
+ testFfmpegStreamError(ffmpegCommand, true)
+ ])
+
+ await wait(5000)
+ await waitJobs(servers)
+
+ const replay = await findExternalSavedVideo(servers[0], liveDetails)
+ expect(replay).to.not.exist
+
+ await checkVideosExist(liveVideoUUID, false, HttpStatusCode.NOT_FOUND_404)
+ await checkLiveCleanup(servers[0], liveVideoUUID, [])
})
})
import * as chai from 'chai'
import { basename, join } from 'path'
import { ffprobePromise, getVideoStream } from '@server/helpers/ffmpeg'
-import { checkLiveCleanupAfterSave, checkLiveSegmentHash, checkResolutionsInMasterPlaylist, testImage } from '@server/tests/shared'
+import { checkLiveCleanup, checkLiveSegmentHash, checkResolutionsInMasterPlaylist, testImage } from '@server/tests/shared'
import { wait } from '@shared/core-utils'
import {
HttpStatusCode,
it('Should correctly have cleaned up the live files', async function () {
this.timeout(30000)
- await checkLiveCleanupAfterSave(servers[0], liveVideoId, [ 240, 360, 720 ])
+ await checkLiveCleanup(servers[0], liveVideoId, [ 240, 360, 720 ])
})
})
import 'mocha'
import * as chai from 'chai'
-import { FfmpegCommand } from 'fluent-ffmpeg'
import { expectStartWith } from '@server/tests/shared'
import { areObjectStorageTestsDisabled } from '@shared/core-utils'
import { HttpStatusCode, LiveVideoCreate, VideoFile, VideoPrivacy } from '@shared/models'
import {
createMultipleServers,
doubleFollow,
+ findExternalSavedVideo,
killallServers,
makeRawRequest,
ObjectStorageCommand,
stopFfmpeg,
waitJobs,
waitUntilLivePublishedOnAllServers,
- waitUntilLiveSavedOnAllServers
+ waitUntilLiveReplacedByReplayOnAllServers,
+ waitUntilLiveWaitingOnAllServers
} from '@shared/server-commands'
const expect = chai.expect
-async function createLive (server: PeerTubeServer) {
+async function createLive (server: PeerTubeServer, permanent: boolean) {
const attributes: LiveVideoCreate = {
channelId: server.store.channel.id,
privacy: VideoPrivacy.PUBLIC,
name: 'my super live',
- saveReplay: true
+ saveReplay: true,
+ permanentLive: permanent
}
const { uuid } = await server.live.create({ fields: attributes })
}
}
+async function getFiles (server: PeerTubeServer, videoUUID: string) {
+ const video = await server.videos.get({ id: videoUUID })
+
+ expect(video.files).to.have.lengthOf(0)
+ expect(video.streamingPlaylists).to.have.lengthOf(1)
+
+ return video.streamingPlaylists[0].files
+}
+
+async function streamAndEnd (servers: PeerTubeServer[], liveUUID: string) {
+ const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveUUID })
+ await waitUntilLivePublishedOnAllServers(servers, liveUUID)
+
+ const videoLiveDetails = await servers[0].videos.get({ id: liveUUID })
+ const liveDetails = await servers[0].live.get({ videoId: liveUUID })
+
+ await stopFfmpeg(ffmpegCommand)
+
+ if (liveDetails.permanentLive) {
+ await waitUntilLiveWaitingOnAllServers(servers, liveUUID)
+ } else {
+ await waitUntilLiveReplacedByReplayOnAllServers(servers, liveUUID)
+ }
+
+ await waitJobs(servers)
+
+ return { videoLiveDetails, liveDetails }
+}
+
describe('Object storage for lives', function () {
if (areObjectStorageTestsDisabled()) return
- let ffmpegCommand: FfmpegCommand
let servers: PeerTubeServer[]
- let videoUUID: string
before(async function () {
this.timeout(120000)
})
describe('Without live transcoding', async function () {
+ let videoUUID: string
before(async function () {
await servers[0].config.enableLive({ transcoding: false })
- videoUUID = await createLive(servers[0])
+ videoUUID = await createLive(servers[0], false)
})
it('Should create a live and save the replay on object storage', async function () {
this.timeout(220000)
- ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUID })
- await waitUntilLivePublishedOnAllServers(servers, videoUUID)
-
- await stopFfmpeg(ffmpegCommand)
-
- await waitUntilLiveSavedOnAllServers(servers, videoUUID)
- await waitJobs(servers)
+ await streamAndEnd(servers, videoUUID)
for (const server of servers) {
- const video = await server.videos.get({ id: videoUUID })
-
- expect(video.files).to.have.lengthOf(0)
- expect(video.streamingPlaylists).to.have.lengthOf(1)
-
- const files = video.streamingPlaylists[0].files
+ const files = await getFiles(server, videoUUID)
+ expect(files).to.have.lengthOf(1)
await checkFiles(files)
}
})
describe('With live transcoding', async function () {
+ let videoUUIDPermanent: string
+ let videoUUIDNonPermanent: string
before(async function () {
await servers[0].config.enableLive({ transcoding: true })
- videoUUID = await createLive(servers[0])
+ videoUUIDPermanent = await createLive(servers[0], true)
+ videoUUIDNonPermanent = await createLive(servers[0], false)
})
- it('Should import a video and have sent it to object storage', async function () {
+ it('Should create a live and save the replay on object storage', async function () {
this.timeout(240000)
- ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUID })
- await waitUntilLivePublishedOnAllServers(servers, videoUUID)
+ await streamAndEnd(servers, videoUUIDNonPermanent)
- await stopFfmpeg(ffmpegCommand)
+ for (const server of servers) {
+ const files = await getFiles(server, videoUUIDNonPermanent)
+ expect(files).to.have.lengthOf(5)
- await waitUntilLiveSavedOnAllServers(servers, videoUUID)
- await waitJobs(servers)
+ await checkFiles(files)
+ }
+ })
- for (const server of servers) {
- const video = await server.videos.get({ id: videoUUID })
+ it('Should create a live and save the replay of permanent live on object storage', async function () {
+ this.timeout(240000)
+
+ const { videoLiveDetails } = await streamAndEnd(servers, videoUUIDPermanent)
- expect(video.files).to.have.lengthOf(0)
- expect(video.streamingPlaylists).to.have.lengthOf(1)
+ const replay = await findExternalSavedVideo(servers[0], videoLiveDetails)
- const files = video.streamingPlaylists[0].files
+ for (const server of servers) {
+ const files = await getFiles(server, replay.uuid)
expect(files).to.have.lengthOf(5)
await checkFiles(files)
import { join } from 'path'
import { PeerTubeServer } from '@shared/server-commands'
-async function checkLiveCleanupAfterSave (server: PeerTubeServer, videoUUID: string, resolutions: number[] = []) {
+async function checkLiveCleanup (server: PeerTubeServer, videoUUID: string, savedResolutions: number[] = []) {
const basePath = server.servers.buildDirectory('streaming-playlists')
const hlsPath = join(basePath, 'hls', videoUUID)
- if (resolutions.length === 0) {
+ if (savedResolutions.length === 0) {
const result = await pathExists(hlsPath)
expect(result).to.be.false
const files = await readdir(hlsPath)
// fragmented file and playlist per resolution + master playlist + segments sha256 json file
- expect(files).to.have.lengthOf(resolutions.length * 2 + 2)
+ expect(files).to.have.lengthOf(savedResolutions.length * 2 + 2)
- for (const resolution of resolutions) {
+ for (const resolution of savedResolutions) {
const fragmentedFile = files.find(f => f.endsWith(`-${resolution}-fragmented.mp4`))
expect(fragmentedFile).to.exist
}
export {
- checkLiveCleanupAfterSave
+ checkLiveCleanup
}
export interface VideoLiveEndingPayload {
videoId: number
+ publishedAt: string
+
+ replayDirectory?: string
}
export interface ActorKeysPayload {
-import { LiveVideoLatencyMode } from '.'
import { VideoCreate } from '../video-create.model'
+import { LiveVideoLatencyMode } from './live-video-latency-mode.enum'
export interface LiveVideoCreate extends VideoCreate {
- saveReplay?: boolean
permanentLive?: boolean
latencyMode?: LiveVideoLatencyMode
+
+ saveReplay?: boolean
}
return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, 2, false)
}
- async waitUntilSaved (options: OverrideCommandOptions & {
+ async waitUntilReplacedByReplay (options: OverrideCommandOptions & {
videoId: number | string
}) {
let video: VideoDetails
import ffmpeg, { FfmpegCommand } from 'fluent-ffmpeg'
import { buildAbsoluteFixturePath, wait } from '@shared/core-utils'
import { PeerTubeServer } from '../server/server'
+import { VideoDetails, VideoInclude } from '@shared/models'
function sendRTMPStream (options: {
rtmpBaseUrl: string
}
}
-async function waitUntilLiveSavedOnAllServers (servers: PeerTubeServer[], videoId: string) {
+async function waitUntilLiveWaitingOnAllServers (servers: PeerTubeServer[], videoId: string) {
for (const server of servers) {
- await server.live.waitUntilSaved({ videoId })
+ await server.live.waitUntilWaiting({ videoId })
}
}
+async function waitUntilLiveReplacedByReplayOnAllServers (servers: PeerTubeServer[], videoId: string) {
+ for (const server of servers) {
+ await server.live.waitUntilReplacedByReplay({ videoId })
+ }
+}
+
+async function findExternalSavedVideo (server: PeerTubeServer, liveDetails: VideoDetails) {
+ const { data } = await server.videos.list({ token: server.accessToken, sort: '-publishedAt', include: VideoInclude.BLACKLISTED })
+
+ return data.find(v => v.name === liveDetails.name + ' - ' + new Date(liveDetails.publishedAt).toLocaleString())
+}
+
export {
sendRTMPStream,
waitFfmpegUntilError,
testFfmpegStreamError,
stopFfmpeg,
+
waitUntilLivePublishedOnAllServers,
- waitUntilLiveSavedOnAllServers
+ waitUntilLiveReplacedByReplayOnAllServers,
+ waitUntilLiveWaitingOnAllServers,
+
+ findExternalSavedVideo
}