]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Live supports object storage
authorChocobozzz <me@florianbigard.com>
Tue, 4 Oct 2022 08:03:17 +0000 (10:03 +0200)
committerChocobozzz <me@florianbigard.com>
Tue, 4 Oct 2022 08:03:17 +0000 (10:03 +0200)
 * Sync live files (segments, master playlist, resolution playlist,
   segment sha file) into object storage
 * Automatically delete them when the live ends
 * Segment sha file is now a file on disk, and not stored in memory
   anymore

21 files changed:
server.ts
server/controllers/index.ts
server/controllers/live.ts [deleted file]
server/lib/hls.ts
server/lib/job-queue/handlers/move-to-object-storage.ts
server/lib/job-queue/handlers/video-live-ending.ts
server/lib/live/live-manager.ts
server/lib/live/live-segment-sha-store.ts
server/lib/live/live-utils.ts
server/lib/live/shared/muxing-session.ts
server/lib/object-storage/shared/object-storage-helpers.ts
server/lib/object-storage/videos.ts
server/models/video/video-streaming-playlist.ts
server/tests/api/live/live-fast-restream.ts
server/tests/api/live/live.ts
server/tests/api/object-storage/live.ts
server/tests/shared/live.ts
server/tests/shared/streaming-playlists.ts
shared/server-commands/videos/live-command.ts
shared/server-commands/videos/streaming-playlists-command.ts
support/doc/api/openapi.yaml

index 887814d4e004c7f563f4499beab0eee1e91f818e..2085c67d91ebd6b5ab985c3e0cf70477eb307032 100644 (file)
--- a/server.ts
+++ b/server.ts
@@ -102,7 +102,6 @@ import {
   wellKnownRouter,
   lazyStaticRouter,
   servicesRouter,
-  liveRouter,
   pluginsRouter,
   webfingerRouter,
   trackerRouter,
@@ -221,9 +220,6 @@ app.use(apiRoute, apiRouter)
 // Services (oembed...)
 app.use('/services', servicesRouter)
 
-// Live streaming
-app.use('/live', liveRouter)
-
 // Plugins & themes
 app.use('/', pluginsRouter)
 
index e8833d58cad6556d8fd99fc48a0a63caed224289..8574a9e7bb81b7aa8ed7f28df73c6b451bb228c4 100644 (file)
@@ -6,7 +6,6 @@ export * from './feeds'
 export * from './services'
 export * from './static'
 export * from './lazy-static'
-export * from './live'
 export * from './misc'
 export * from './webfinger'
 export * from './tracker'
diff --git a/server/controllers/live.ts b/server/controllers/live.ts
deleted file mode 100644 (file)
index 81008f1..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-import cors from 'cors'
-import express from 'express'
-import { mapToJSON } from '@server/helpers/core-utils'
-import { LiveSegmentShaStore } from '@server/lib/live'
-import { HttpStatusCode } from '@shared/models'
-
-const liveRouter = express.Router()
-
-liveRouter.use('/segments-sha256/:videoUUID',
-  cors(),
-  getSegmentsSha256
-)
-
-// ---------------------------------------------------------------------------
-
-export {
-  liveRouter
-}
-
-// ---------------------------------------------------------------------------
-
-function getSegmentsSha256 (req: express.Request, res: express.Response) {
-  const videoUUID = req.params.videoUUID
-
-  const result = LiveSegmentShaStore.Instance.getSegmentsSha256(videoUUID)
-
-  if (!result) {
-    return res.status(HttpStatusCode.NOT_FOUND_404).end()
-  }
-
-  return res.json(mapToJSON(result))
-}
index a0a5afc0f89b5a44b475e43dd9eb314efd5fc7cb..a41f1ae4858461e17ed22047b3e44d11e0bd8a56 100644 (file)
@@ -15,7 +15,7 @@ import { P2P_MEDIA_LOADER_PEER_VERSION, REQUEST_TIMEOUTS } from '../initializers
 import { sequelizeTypescript } from '../initializers/database'
 import { VideoFileModel } from '../models/video/video-file'
 import { VideoStreamingPlaylistModel } from '../models/video/video-streaming-playlist'
-import { storeHLSFile } from './object-storage'
+import { storeHLSFileFromFilename } from './object-storage'
 import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getHlsResolutionPlaylistFilename } from './paths'
 import { VideoPathManager } from './video-path-manager'
 
@@ -95,7 +95,7 @@ function updateMasterHLSPlaylist (video: MVideo, playlistArg: MStreamingPlaylist
     await writeFile(masterPlaylistPath, masterPlaylists.join('\n') + '\n')
 
     if (playlist.storage === VideoStorage.OBJECT_STORAGE) {
-      playlist.playlistUrl = await storeHLSFile(playlist, playlist.playlistFilename)
+      playlist.playlistUrl = await storeHLSFileFromFilename(playlist, playlist.playlistFilename)
       await remove(masterPlaylistPath)
     }
 
@@ -146,7 +146,7 @@ function updateSha256VODSegments (video: MVideo, playlistArg: MStreamingPlaylist
     await outputJSON(outputPath, json)
 
     if (playlist.storage === VideoStorage.OBJECT_STORAGE) {
-      playlist.segmentsSha256Url = await storeHLSFile(playlist, playlist.segmentsSha256Filename)
+      playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlist, playlist.segmentsSha256Filename)
       await remove(outputPath)
     }
 
index 25bdebeea98b6a5a2acac48b390ac70fad063804..28c3d325d6a0d1340907177c590a19667c49e4e0 100644 (file)
@@ -5,7 +5,7 @@ import { logger, loggerTagsFactory } from '@server/helpers/logger'
 import { updateTorrentMetadata } from '@server/helpers/webtorrent'
 import { CONFIG } from '@server/initializers/config'
 import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants'
-import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage'
+import { storeHLSFileFromFilename, storeWebTorrentFile } from '@server/lib/object-storage'
 import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths'
 import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state'
 import { VideoModel } from '@server/models/video/video'
@@ -88,10 +88,10 @@ async function moveHLSFiles (video: MVideoWithAllFiles) {
 
       // Resolution playlist
       const playlistFilename = getHlsResolutionPlaylistFilename(file.filename)
-      await storeHLSFile(playlistWithVideo, playlistFilename)
+      await storeHLSFileFromFilename(playlistWithVideo, playlistFilename)
 
       // Resolution fragmented file
-      const fileUrl = await storeHLSFile(playlistWithVideo, file.filename)
+      const fileUrl = await storeHLSFileFromFilename(playlistWithVideo, file.filename)
 
       const oldPath = join(getHLSDirectory(video), file.filename)
 
@@ -113,9 +113,9 @@ async function doAfterLastJob (options: {
     const playlistWithVideo = playlist.withVideo(video)
 
     // Master playlist
-    playlist.playlistUrl = await storeHLSFile(playlistWithVideo, playlist.playlistFilename)
+    playlist.playlistUrl = await storeHLSFileFromFilename(playlistWithVideo, playlist.playlistFilename)
     // Sha256 segments file
-    playlist.segmentsSha256Url = await storeHLSFile(playlistWithVideo, playlist.segmentsSha256Filename)
+    playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlistWithVideo, playlist.segmentsSha256Filename)
 
     playlist.storage = VideoStorage.OBJECT_STORAGE
 
index 8a3ee09a2290210354bedfb8b2895fdadd3ab020..abfaf1cd7f2d0b85a75adfc5e4ab24a6724158d5 100644 (file)
@@ -4,7 +4,7 @@ import { join } from 'path'
 import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg'
 import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
 import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
-import { cleanupPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live'
+import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live'
 import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths'
 import { generateVideoMiniature } from '@server/lib/thumbnail'
 import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding'
@@ -141,23 +141,22 @@ async function replaceLiveByReplay (options: {
 }) {
   const { video, liveSession, live, permanentLive, replayDirectory } = options
 
-  await cleanupTMPLiveFiles(video)
+  const videoWithFiles = await VideoModel.loadFull(video.id)
+  const hlsPlaylist = videoWithFiles.getHLSPlaylist()
+
+  await cleanupTMPLiveFiles(videoWithFiles, hlsPlaylist)
 
   await live.destroy()
 
-  video.isLive = false
-  video.waitTranscoding = true
-  video.state = VideoState.TO_TRANSCODE
+  videoWithFiles.isLive = false
+  videoWithFiles.waitTranscoding = true
+  videoWithFiles.state = VideoState.TO_TRANSCODE
 
-  await video.save()
+  await videoWithFiles.save()
 
-  liveSession.replayVideoId = video.id
+  liveSession.replayVideoId = videoWithFiles.id
   await liveSession.save()
 
-  // Remove old HLS playlist video files
-  const videoWithFiles = await VideoModel.loadFull(video.id)
-
-  const hlsPlaylist = videoWithFiles.getHLSPlaylist()
   await VideoFileModel.removeHLSFilesOfVideoId(hlsPlaylist.id)
 
   // Reset playlist
@@ -234,7 +233,7 @@ async function cleanupLiveAndFederate (options: {
 
   if (streamingPlaylist) {
     if (permanentLive) {
-      await cleanupPermanentLive(video, streamingPlaylist)
+      await cleanupAndDestroyPermanentLive(video, streamingPlaylist)
     } else {
       await cleanupUnsavedNormalLive(video, streamingPlaylist)
     }
index 16715862bc78188c80575a82c21c22148fd4c1d2..9470b530b7955e725d3ecde3350188075b74b741 100644 (file)
@@ -21,14 +21,14 @@ import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
 import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
 import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models'
 import { pick, wait } from '@shared/core-utils'
-import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models'
+import { LiveVideoError, VideoState, VideoStorage, VideoStreamingPlaylistType } from '@shared/models'
 import { federateVideoIfNeeded } from '../activitypub/videos'
 import { JobQueue } from '../job-queue'
 import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths'
 import { PeerTubeSocket } from '../peertube-socket'
 import { Hooks } from '../plugins/hooks'
 import { LiveQuotaStore } from './live-quota-store'
-import { cleanupPermanentLive } from './live-utils'
+import { cleanupAndDestroyPermanentLive } from './live-utils'
 import { MuxingSession } from './shared'
 
 const NodeRtmpSession = require('node-media-server/src/node_rtmp_session')
@@ -224,7 +224,7 @@ class LiveManager {
     if (oldStreamingPlaylist) {
       if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid)
 
-      await cleanupPermanentLive(video, oldStreamingPlaylist)
+      await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist)
     }
 
     this.videoSessions.set(video.id, sessionId)
@@ -301,7 +301,7 @@ class LiveManager {
       ...pick(options, [ 'streamingPlaylist', 'inputUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ])
     })
 
-    muxingSession.on('master-playlist-created', () => this.publishAndFederateLive(videoLive, localLTags))
+    muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags))
 
     muxingSession.on('bad-socket-health', ({ videoId }) => {
       logger.error(
@@ -485,6 +485,10 @@ class LiveManager {
 
     playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions)
 
+    playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED
+      ? VideoStorage.OBJECT_STORAGE
+      : VideoStorage.FILE_SYSTEM
+
     return playlist.save()
   }
 
index 4af6f3ebfdfaba48cc530e91ba007852690b762f..faf03dccfebcd23f4c17b90274040198854c7495 100644 (file)
@@ -1,62 +1,73 @@
+import { writeJson } from 'fs-extra'
 import { basename } from 'path'
+import { mapToJSON } from '@server/helpers/core-utils'
 import { logger, loggerTagsFactory } from '@server/helpers/logger'
+import { MStreamingPlaylistVideo } from '@server/types/models'
 import { buildSha256Segment } from '../hls'
+import { storeHLSFileFromPath } from '../object-storage'
 
 const lTags = loggerTagsFactory('live')
 
 class LiveSegmentShaStore {
 
-  private static instance: LiveSegmentShaStore
-
-  private readonly segmentsSha256 = new Map<string, Map<string, string>>()
-
-  private constructor () {
+  private readonly segmentsSha256 = new Map<string, string>()
+
+  private readonly videoUUID: string
+  private readonly sha256Path: string
+  private readonly streamingPlaylist: MStreamingPlaylistVideo
+  private readonly sendToObjectStorage: boolean
+
+  constructor (options: {
+    videoUUID: string
+    sha256Path: string
+    streamingPlaylist: MStreamingPlaylistVideo
+    sendToObjectStorage: boolean
+  }) {
+    this.videoUUID = options.videoUUID
+    this.sha256Path = options.sha256Path
+    this.streamingPlaylist = options.streamingPlaylist
+    this.sendToObjectStorage = options.sendToObjectStorage
   }
 
-  getSegmentsSha256 (videoUUID: string) {
-    return this.segmentsSha256.get(videoUUID)
-  }
-
-  async addSegmentSha (videoUUID: string, segmentPath: string) {
-    const segmentName = basename(segmentPath)
-    logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID))
+  async addSegmentSha (segmentPath: string) {
+    logger.debug('Adding live sha segment %s.', segmentPath, lTags(this.videoUUID))
 
     const shaResult = await buildSha256Segment(segmentPath)
 
-    if (!this.segmentsSha256.has(videoUUID)) {
-      this.segmentsSha256.set(videoUUID, new Map())
-    }
+    const segmentName = basename(segmentPath)
+    this.segmentsSha256.set(segmentName, shaResult)
 
-    const filesMap = this.segmentsSha256.get(videoUUID)
-    filesMap.set(segmentName, shaResult)
+    await this.writeToDisk()
   }
 
-  removeSegmentSha (videoUUID: string, segmentPath: string) {
+  async removeSegmentSha (segmentPath: string) {
     const segmentName = basename(segmentPath)
 
-    logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID))
+    logger.debug('Removing live sha segment %s.', segmentPath, lTags(this.videoUUID))
 
-    const filesMap = this.segmentsSha256.get(videoUUID)
-    if (!filesMap) {
-      logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID))
+    if (!this.segmentsSha256.has(segmentName)) {
+      logger.warn('Unknown segment in files map for video %s and segment %s.', this.videoUUID, segmentPath, lTags(this.videoUUID))
       return
     }
 
-    if (!filesMap.has(segmentName)) {
-      logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID))
-      return
-    }
+    this.segmentsSha256.delete(segmentName)
 
-    filesMap.delete(segmentName)
+    await this.writeToDisk()
   }
 
-  cleanupShaSegments (videoUUID: string) {
-    this.segmentsSha256.delete(videoUUID)
-  }
+  private async writeToDisk () {
+    await writeJson(this.sha256Path, mapToJSON(this.segmentsSha256))
 
-  static get Instance () {
-    return this.instance || (this.instance = new this())
+    if (this.sendToObjectStorage) {
+      const url = await storeHLSFileFromPath(this.streamingPlaylist, this.sha256Path)
+
+      if (this.streamingPlaylist.segmentsSha256Url !== url) {
+        this.streamingPlaylist.segmentsSha256Url = url
+        await this.streamingPlaylist.save()
+      }
+    }
   }
+
 }
 
 export {
index bba8766428aa409a9daaff816ac1136d060cd0eb..d2b8e3a55b8c1469f2e84b1c0f137e92f6a18da7 100644 (file)
@@ -1,9 +1,10 @@
 import { pathExists, readdir, remove } from 'fs-extra'
 import { basename, join } from 'path'
 import { logger } from '@server/helpers/logger'
-import { MStreamingPlaylist, MVideo } from '@server/types/models'
+import { MStreamingPlaylist, MStreamingPlaylistVideo, MVideo } from '@server/types/models'
+import { VideoStorage } from '@shared/models'
+import { listHLSFileKeysOf, removeHLSFileObjectStorage, removeHLSObjectStorage } from '../object-storage'
 import { getLiveDirectory } from '../paths'
-import { LiveSegmentShaStore } from './live-segment-sha-store'
 
 function buildConcatenatedName (segmentOrPlaylistPath: string) {
   const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/)
@@ -11,8 +12,8 @@ function buildConcatenatedName (segmentOrPlaylistPath: string) {
   return 'concat-' + num[1] + '.ts'
 }
 
-async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
-  await cleanupTMPLiveFiles(video)
+async function cleanupAndDestroyPermanentLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
+  await cleanupTMPLiveFiles(video, streamingPlaylist)
 
   await streamingPlaylist.destroy()
 }
@@ -20,32 +21,51 @@ async function cleanupPermanentLive (video: MVideo, streamingPlaylist: MStreamin
 async function cleanupUnsavedNormalLive (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
   const hlsDirectory = getLiveDirectory(video)
 
+  // We uploaded files to object storage too, remove them
+  if (streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
+    await removeHLSObjectStorage(streamingPlaylist.withVideo(video))
+  }
+
   await remove(hlsDirectory)
 
   await streamingPlaylist.destroy()
+}
 
-  LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid)
+async function cleanupTMPLiveFiles (video: MVideo, streamingPlaylist: MStreamingPlaylist) {
+  await cleanupTMPLiveFilesFromObjectStorage(streamingPlaylist.withVideo(video))
+
+  await cleanupTMPLiveFilesFromFilesystem(video)
 }
 
-async function cleanupTMPLiveFiles (video: MVideo) {
-  const hlsDirectory = getLiveDirectory(video)
+export {
+  cleanupAndDestroyPermanentLive,
+  cleanupUnsavedNormalLive,
+  cleanupTMPLiveFiles,
+  buildConcatenatedName
+}
+
+// ---------------------------------------------------------------------------
 
-  LiveSegmentShaStore.Instance.cleanupShaSegments(video.uuid)
+function isTMPLiveFile (name: string) {
+  return name.endsWith('.ts') ||
+    name.endsWith('.m3u8') ||
+    name.endsWith('.json') ||
+    name.endsWith('.mpd') ||
+    name.endsWith('.m4s') ||
+    name.endsWith('.tmp')
+}
+
+async function cleanupTMPLiveFilesFromFilesystem (video: MVideo) {
+  const hlsDirectory = getLiveDirectory(video)
 
   if (!await pathExists(hlsDirectory)) return
 
-  logger.info('Cleanup TMP live files of %s.', hlsDirectory)
+  logger.info('Cleanup TMP live files from filesystem of %s.', hlsDirectory)
 
   const files = await readdir(hlsDirectory)
 
   for (const filename of files) {
-    if (
-      filename.endsWith('.ts') ||
-      filename.endsWith('.m3u8') ||
-      filename.endsWith('.mpd') ||
-      filename.endsWith('.m4s') ||
-      filename.endsWith('.tmp')
-    ) {
+    if (isTMPLiveFile(filename)) {
       const p = join(hlsDirectory, filename)
 
       remove(p)
@@ -54,9 +74,14 @@ async function cleanupTMPLiveFiles (video: MVideo) {
   }
 }
 
-export {
-  cleanupPermanentLive,
-  cleanupUnsavedNormalLive,
-  cleanupTMPLiveFiles,
-  buildConcatenatedName
+async function cleanupTMPLiveFilesFromObjectStorage (streamingPlaylist: MStreamingPlaylistVideo) {
+  if (streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
+
+  const keys = await listHLSFileKeysOf(streamingPlaylist)
+
+  for (const key of keys) {
+    if (isTMPLiveFile(key)) {
+      await removeHLSFileObjectStorage(streamingPlaylist, key)
+    }
+  }
 }
index 505717dce0bf03fb9664e277d0932156005d178b..4c27d5dd885418299675e3645038a6783f2a9162 100644 (file)
@@ -9,8 +9,10 @@ import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers
 import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
 import { CONFIG } from '@server/initializers/config'
 import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
+import { removeHLSFileObjectStorage, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
 import { VideoFileModel } from '@server/models/video/video-file'
 import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
+import { VideoStorage } from '@shared/models'
 import { getLiveDirectory, getLiveReplayBaseDirectory } from '../../paths'
 import { VideoTranscodingProfilesManager } from '../../transcoding/default-transcoding-profiles'
 import { isAbleToUploadVideo } from '../../user'
@@ -21,7 +23,7 @@ import { buildConcatenatedName } from '../live-utils'
 import memoizee = require('memoizee')
 
 interface MuxingSessionEvents {
-  'master-playlist-created': (options: { videoId: number }) => void
+  'live-ready': (options: { videoId: number }) => void
 
   'bad-socket-health': (options: { videoId: number }) => void
   'duration-exceeded': (options: { videoId: number }) => void
@@ -68,12 +70,18 @@ class MuxingSession extends EventEmitter {
   private readonly outDirectory: string
   private readonly replayDirectory: string
 
+  private readonly liveSegmentShaStore: LiveSegmentShaStore
+
   private readonly lTags: LoggerTagsFn
 
   private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
 
   private tsWatcher: FSWatcher
   private masterWatcher: FSWatcher
+  private m3u8Watcher: FSWatcher
+
+  private masterPlaylistCreated = false
+  private liveReady = false
 
   private aborted = false
 
@@ -123,6 +131,13 @@ class MuxingSession extends EventEmitter {
     this.outDirectory = getLiveDirectory(this.videoLive.Video)
     this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
 
+    this.liveSegmentShaStore = new LiveSegmentShaStore({
+      videoUUID: this.videoLive.Video.uuid,
+      sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename),
+      streamingPlaylist: this.streamingPlaylist,
+      sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
+    })
+
     this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
   }
 
@@ -159,8 +174,9 @@ class MuxingSession extends EventEmitter {
 
     logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags())
 
-    this.watchTSFiles()
     this.watchMasterFile()
+    this.watchTSFiles()
+    this.watchM3U8File()
 
     let ffmpegShellCommand: string
     this.ffmpegCommand.on('start', cmdline => {
@@ -219,7 +235,7 @@ class MuxingSession extends EventEmitter {
     setTimeout(() => {
       // Wait latest segments generation, and close watchers
 
-      Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ])
+      Promise.all([ this.tsWatcher.close(), this.masterWatcher.close(), this.m3u8Watcher.close() ])
         .then(() => {
           // Process remaining segments hash
           for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
@@ -240,14 +256,41 @@ class MuxingSession extends EventEmitter {
   private watchMasterFile () {
     this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename)
 
-    this.masterWatcher.on('add', () => {
-      this.emit('master-playlist-created', { videoId: this.videoId })
+    this.masterWatcher.on('add', async () => {
+      if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
+        try {
+          const url = await storeHLSFileFromFilename(this.streamingPlaylist, this.streamingPlaylist.playlistFilename)
+
+          this.streamingPlaylist.playlistUrl = url
+          await this.streamingPlaylist.save()
+        } catch (err) {
+          logger.error('Cannot upload live master file to object storage.', { err, ...this.lTags() })
+        }
+      }
+
+      this.masterPlaylistCreated = true
 
       this.masterWatcher.close()
         .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() }))
     })
   }
 
+  private watchM3U8File () {
+    this.m3u8Watcher = watch(this.outDirectory + '/*.m3u8')
+
+    const onChangeOrAdd = async (m3u8Path: string) => {
+      if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
+
+      try {
+        await storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)
+      } catch (err) {
+        logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
+      }
+    }
+
+    this.m3u8Watcher.on('change', onChangeOrAdd)
+  }
+
   private watchTSFiles () {
     const startStreamDateTime = new Date().getTime()
 
@@ -282,7 +325,21 @@ class MuxingSession extends EventEmitter {
       }
     }
 
-    const deleteHandler = (segmentPath: string) => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath)
+    const deleteHandler = async (segmentPath: string) => {
+      try {
+        await this.liveSegmentShaStore.removeSegmentSha(segmentPath)
+      } catch (err) {
+        logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() })
+      }
+
+      if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
+        try {
+          await removeHLSFileObjectStorage(this.streamingPlaylist, segmentPath)
+        } catch (err) {
+          logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() })
+        }
+      }
+    }
 
     this.tsWatcher.on('add', p => addHandler(p))
     this.tsWatcher.on('unlink', p => deleteHandler(p))
@@ -315,6 +372,7 @@ class MuxingSession extends EventEmitter {
         extname: '.ts',
         infoHash: null,
         fps: this.fps,
+        storage: this.streamingPlaylist.storage,
         videoStreamingPlaylistId: this.streamingPlaylist.id
       })
 
@@ -343,18 +401,36 @@ class MuxingSession extends EventEmitter {
   }
 
   private processSegments (segmentPaths: string[]) {
-    mapSeries(segmentPaths, async previousSegment => {
-      // Add sha hash of previous segments, because ffmpeg should have finished generating them
-      await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment)
+    mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment))
+      .catch(err => {
+        if (this.aborted) return
+
+        logger.error('Cannot process segments', { err, ...this.lTags() })
+      })
+  }
 
-      if (this.saveReplay) {
-        await this.addSegmentToReplay(previousSegment)
+  private async processSegment (segmentPath: string) {
+    // Add sha hash of previous segments, because ffmpeg should have finished generating them
+    await this.liveSegmentShaStore.addSegmentSha(segmentPath)
+
+    if (this.saveReplay) {
+      await this.addSegmentToReplay(segmentPath)
+    }
+
+    if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
+      try {
+        await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
+      } catch (err) {
+        logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
       }
-    }).catch(err => {
-      if (this.aborted) return
+    }
 
-      logger.error('Cannot process segments', { err, ...this.lTags() })
-    })
+    // Master playlist and segment JSON file are created, live is ready
+    if (this.masterPlaylistCreated && !this.liveReady) {
+      this.liveReady = true
+
+      this.emit('live-ready', { videoId: this.videoId })
+    }
   }
 
   private hasClientSocketInBadHealth (sessionId: string) {
index 16161362cbfffd1e9784b30016baf777eda4bff2..c131977e8c622684018d00e77bc492c492571e57 100644 (file)
@@ -22,6 +22,24 @@ type BucketInfo = {
   PREFIX?: string
 }
 
+async function listKeysOfPrefix (prefix: string, bucketInfo: BucketInfo) {
+  const s3Client = getClient()
+
+  const commandPrefix = bucketInfo.PREFIX + prefix
+  const listCommand = new ListObjectsV2Command({
+    Bucket: bucketInfo.BUCKET_NAME,
+    Prefix: commandPrefix
+  })
+
+  const listedObjects = await s3Client.send(listCommand)
+
+  if (isArray(listedObjects.Contents) !== true) return []
+
+  return listedObjects.Contents.map(c => c.Key)
+}
+
+// ---------------------------------------------------------------------------
+
 async function storeObject (options: {
   inputPath: string
   objectStorageKey: string
@@ -36,6 +54,8 @@ async function storeObject (options: {
   return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo })
 }
 
+// ---------------------------------------------------------------------------
+
 async function removeObject (filename: string, bucketInfo: BucketInfo) {
   const command = new DeleteObjectCommand({
     Bucket: bucketInfo.BUCKET_NAME,
@@ -89,6 +109,8 @@ async function removePrefix (prefix: string, bucketInfo: BucketInfo) {
   if (listedObjects.IsTruncated) await removePrefix(prefix, bucketInfo)
 }
 
+// ---------------------------------------------------------------------------
+
 async function makeAvailable (options: {
   key: string
   destination: string
@@ -122,7 +144,8 @@ export {
   storeObject,
   removeObject,
   removePrefix,
-  makeAvailable
+  makeAvailable,
+  listKeysOfPrefix
 }
 
 // ---------------------------------------------------------------------------
index 66e738200ed329e2c7918b656c9b99bf78922374..62aae248b9a9949ed45a6b75e9b01cec0743418e 100644 (file)
@@ -1,19 +1,35 @@
-import { join } from 'path'
+import { basename, join } from 'path'
 import { logger } from '@server/helpers/logger'
 import { CONFIG } from '@server/initializers/config'
 import { MStreamingPlaylistVideo, MVideoFile } from '@server/types/models'
 import { getHLSDirectory } from '../paths'
 import { generateHLSObjectBaseStorageKey, generateHLSObjectStorageKey, generateWebTorrentObjectStorageKey } from './keys'
-import { lTags, makeAvailable, removeObject, removePrefix, storeObject } from './shared'
+import { listKeysOfPrefix, lTags, makeAvailable, removeObject, removePrefix, storeObject } from './shared'
 
-function storeHLSFile (playlist: MStreamingPlaylistVideo, filename: string, path?: string) {
+function listHLSFileKeysOf (playlist: MStreamingPlaylistVideo) {
+  return listKeysOfPrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
+}
+
+// ---------------------------------------------------------------------------
+
+function storeHLSFileFromFilename (playlist: MStreamingPlaylistVideo, filename: string) {
   return storeObject({
-    inputPath: path ?? join(getHLSDirectory(playlist.Video), filename),
+    inputPath: join(getHLSDirectory(playlist.Video), filename),
     objectStorageKey: generateHLSObjectStorageKey(playlist, filename),
     bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
   })
 }
 
+function storeHLSFileFromPath (playlist: MStreamingPlaylistVideo, path: string) {
+  return storeObject({
+    inputPath: path,
+    objectStorageKey: generateHLSObjectStorageKey(playlist, basename(path)),
+    bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS
+  })
+}
+
+// ---------------------------------------------------------------------------
+
 function storeWebTorrentFile (filename: string) {
   return storeObject({
     inputPath: join(CONFIG.STORAGE.VIDEOS_DIR, filename),
@@ -22,6 +38,8 @@ function storeWebTorrentFile (filename: string) {
   })
 }
 
+// ---------------------------------------------------------------------------
+
 function removeHLSObjectStorage (playlist: MStreamingPlaylistVideo) {
   return removePrefix(generateHLSObjectBaseStorageKey(playlist), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
 }
@@ -30,10 +48,14 @@ function removeHLSFileObjectStorage (playlist: MStreamingPlaylistVideo, filename
   return removeObject(generateHLSObjectStorageKey(playlist, filename), CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS)
 }
 
+// ---------------------------------------------------------------------------
+
 function removeWebTorrentObjectStorage (videoFile: MVideoFile) {
   return removeObject(generateWebTorrentObjectStorageKey(videoFile.filename), CONFIG.OBJECT_STORAGE.VIDEOS)
 }
 
+// ---------------------------------------------------------------------------
+
 async function makeHLSFileAvailable (playlist: MStreamingPlaylistVideo, filename: string, destination: string) {
   const key = generateHLSObjectStorageKey(playlist, filename)
 
@@ -62,9 +84,14 @@ async function makeWebTorrentFileAvailable (filename: string, destination: strin
   return destination
 }
 
+// ---------------------------------------------------------------------------
+
 export {
+  listHLSFileKeysOf,
+
   storeWebTorrentFile,
-  storeHLSFile,
+  storeHLSFileFromFilename,
+  storeHLSFileFromPath,
 
   removeHLSObjectStorage,
   removeHLSFileObjectStorage,
index f587989dcddf8678895a2d3caa609968c261a95f..2b6771f27be89a5451bbac0b962a8c13926e71e7 100644 (file)
@@ -245,21 +245,25 @@ export class VideoStreamingPlaylistModel extends Model<Partial<AttributesOnly<Vi
   }
 
   getMasterPlaylistUrl (video: MVideo) {
-    if (this.storage === VideoStorage.OBJECT_STORAGE) {
-      return getHLSPublicFileUrl(this.playlistUrl)
-    }
+    if (video.isOwned()) {
+      if (this.storage === VideoStorage.OBJECT_STORAGE) {
+        return getHLSPublicFileUrl(this.playlistUrl)
+      }
 
-    if (video.isOwned()) return WEBSERVER.URL + this.getMasterPlaylistStaticPath(video.uuid)
+      return WEBSERVER.URL + this.getMasterPlaylistStaticPath(video.uuid)
+    }
 
     return this.playlistUrl
   }
 
   getSha256SegmentsUrl (video: MVideo) {
-    if (this.storage === VideoStorage.OBJECT_STORAGE) {
-      return getHLSPublicFileUrl(this.segmentsSha256Url)
-    }
+    if (video.isOwned()) {
+      if (this.storage === VideoStorage.OBJECT_STORAGE) {
+        return getHLSPublicFileUrl(this.segmentsSha256Url)
+      }
 
-    if (video.isOwned()) return WEBSERVER.URL + this.getSha256SegmentsStaticPath(video.uuid, video.isLive)
+      return WEBSERVER.URL + this.getSha256SegmentsStaticPath(video.uuid)
+    }
 
     return this.segmentsSha256Url
   }
@@ -287,9 +291,7 @@ export class VideoStreamingPlaylistModel extends Model<Partial<AttributesOnly<Vi
     return join(STATIC_PATHS.STREAMING_PLAYLISTS.HLS, videoUUID, this.playlistFilename)
   }
 
-  private getSha256SegmentsStaticPath (videoUUID: string, isLive: boolean) {
-    if (isLive) return join('/live', 'segments-sha256', videoUUID)
-
+  private getSha256SegmentsStaticPath (videoUUID: string) {
     return join(STATIC_PATHS.STREAMING_PLAYLISTS.HLS, videoUUID, this.segmentsSha256Filename)
   }
 }
index 502959258fca6df51d95b0718fcf674284aeeea7..3ea6be9ff8a3275faac9e84c6c9e61b332663625 100644 (file)
@@ -59,7 +59,7 @@ describe('Fast restream in live', function () {
       const video = await server.videos.get({ id: liveId })
       expect(video.streamingPlaylists).to.have.lengthOf(1)
 
-      await server.live.getSegment({ videoUUID: liveId, segment: 0, playlistNumber: 0 })
+      await server.live.getSegmentFile({ videoUUID: liveId, segment: 0, playlistNumber: 0 })
       await makeRawRequest(video.streamingPlaylists[0].playlistUrl, HttpStatusCode.OK_200)
       await makeRawRequest(video.streamingPlaylists[0].segmentsSha256Url, HttpStatusCode.OK_200)
 
index 4e070832dbc09d0914979d4348b729ad3d66e437..5dd2bd9aba0878b3e03bd82cd6e9d4125d65f0e2 100644 (file)
@@ -3,7 +3,7 @@
 import { expect } from 'chai'
 import { basename, join } from 'path'
 import { ffprobePromise, getVideoStream } from '@server/helpers/ffmpeg'
-import { checkLiveSegmentHash, checkResolutionsInMasterPlaylist, testImage } from '@server/tests/shared'
+import { testImage, testVideoResolutions } from '@server/tests/shared'
 import { getAllFiles, wait } from '@shared/core-utils'
 import {
   HttpStatusCode,
@@ -372,46 +372,6 @@ describe('Test live', function () {
       return uuid
     }
 
-    async function testVideoResolutions (liveVideoId: string, resolutions: number[]) {
-      for (const server of servers) {
-        const { data } = await server.videos.list()
-        expect(data.find(v => v.uuid === liveVideoId)).to.exist
-
-        const video = await server.videos.get({ id: liveVideoId })
-
-        expect(video.streamingPlaylists).to.have.lengthOf(1)
-
-        const hlsPlaylist = video.streamingPlaylists.find(s => s.type === VideoStreamingPlaylistType.HLS)
-        expect(hlsPlaylist).to.exist
-
-        // Only finite files are displayed
-        expect(hlsPlaylist.files).to.have.lengthOf(0)
-
-        await checkResolutionsInMasterPlaylist({ server, playlistUrl: hlsPlaylist.playlistUrl, resolutions })
-
-        for (let i = 0; i < resolutions.length; i++) {
-          const segmentNum = 3
-          const segmentName = `${i}-00000${segmentNum}.ts`
-          await commands[0].waitUntilSegmentGeneration({ videoUUID: video.uuid, playlistNumber: i, segment: segmentNum })
-
-          const subPlaylist = await servers[0].streamingPlaylists.get({
-            url: `${servers[0].url}/static/streaming-playlists/hls/${video.uuid}/${i}.m3u8`
-          })
-
-          expect(subPlaylist).to.contain(segmentName)
-
-          const baseUrlAndPath = servers[0].url + '/static/streaming-playlists/hls'
-          await checkLiveSegmentHash({
-            server,
-            baseUrlSegment: baseUrlAndPath,
-            videoUUID: video.uuid,
-            segmentName,
-            hlsPlaylist
-          })
-        }
-      }
-    }
-
     function updateConf (resolutions: number[]) {
       return servers[0].config.updateCustomSubConfig({
         newConfig: {
@@ -449,7 +409,14 @@ describe('Test live', function () {
       await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
       await waitJobs(servers)
 
-      await testVideoResolutions(liveVideoId, [ 720 ])
+      await testVideoResolutions({
+        originServer: servers[0],
+        servers,
+        liveVideoId,
+        resolutions: [ 720 ],
+        objectStorage: false,
+        transcoded: true
+      })
 
       await stopFfmpeg(ffmpegCommand)
     })
@@ -477,7 +444,14 @@ describe('Test live', function () {
       await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
       await waitJobs(servers)
 
-      await testVideoResolutions(liveVideoId, resolutions.concat([ 720 ]))
+      await testVideoResolutions({
+        originServer: servers[0],
+        servers,
+        liveVideoId,
+        resolutions: resolutions.concat([ 720 ]),
+        objectStorage: false,
+        transcoded: true
+      })
 
       await stopFfmpeg(ffmpegCommand)
     })
@@ -522,7 +496,14 @@ describe('Test live', function () {
       await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
       await waitJobs(servers)
 
-      await testVideoResolutions(liveVideoId, resolutions)
+      await testVideoResolutions({
+        originServer: servers[0],
+        servers,
+        liveVideoId,
+        resolutions,
+        objectStorage: false,
+        transcoded: true
+      })
 
       await stopFfmpeg(ffmpegCommand)
       await commands[0].waitUntilEnded({ videoId: liveVideoId })
@@ -611,7 +592,14 @@ describe('Test live', function () {
       await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
       await waitJobs(servers)
 
-      await testVideoResolutions(liveVideoId, resolutions)
+      await testVideoResolutions({
+        originServer: servers[0],
+        servers,
+        liveVideoId,
+        resolutions,
+        objectStorage: false,
+        transcoded: true
+      })
 
       await stopFfmpeg(ffmpegCommand)
       await commands[0].waitUntilEnded({ videoId: liveVideoId })
@@ -640,7 +628,14 @@ describe('Test live', function () {
       await waitUntilLivePublishedOnAllServers(servers, liveVideoId)
       await waitJobs(servers)
 
-      await testVideoResolutions(liveVideoId, [ 720 ])
+      await testVideoResolutions({
+        originServer: servers[0],
+        servers,
+        liveVideoId,
+        resolutions: [ 720 ],
+        objectStorage: false,
+        transcoded: true
+      })
 
       await stopFfmpeg(ffmpegCommand)
       await commands[0].waitUntilEnded({ videoId: liveVideoId })
index 0958ffe0ffbd2478613d388ce64e642a8c55df63..7e16b4c890483bde7560bf37f35d39b5a45a358e 100644 (file)
@@ -1,9 +1,9 @@
 /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
 
 import { expect } from 'chai'
-import { expectStartWith } from '@server/tests/shared'
+import { expectStartWith, testVideoResolutions } from '@server/tests/shared'
 import { areObjectStorageTestsDisabled } from '@shared/core-utils'
-import { HttpStatusCode, LiveVideoCreate, VideoFile, VideoPrivacy } from '@shared/models'
+import { HttpStatusCode, LiveVideoCreate, VideoPrivacy } from '@shared/models'
 import {
   createMultipleServers,
   doubleFollow,
@@ -35,41 +35,43 @@ async function createLive (server: PeerTubeServer, permanent: boolean) {
   return uuid
 }
 
-async function checkFiles (files: VideoFile[]) {
-  for (const file of files) {
-    expectStartWith(file.fileUrl, ObjectStorageCommand.getPlaylistBaseUrl())
+async function checkFilesExist (servers: PeerTubeServer[], videoUUID: string, numberOfFiles: number) {
+  for (const server of servers) {
+    const video = await server.videos.get({ id: videoUUID })
 
-    await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200)
-  }
-}
+    expect(video.files).to.have.lengthOf(0)
+    expect(video.streamingPlaylists).to.have.lengthOf(1)
 
-async function getFiles (server: PeerTubeServer, videoUUID: string) {
-  const video = await server.videos.get({ id: videoUUID })
+    const files = video.streamingPlaylists[0].files
+    expect(files).to.have.lengthOf(numberOfFiles)
 
-  expect(video.files).to.have.lengthOf(0)
-  expect(video.streamingPlaylists).to.have.lengthOf(1)
+    for (const file of files) {
+      expectStartWith(file.fileUrl, ObjectStorageCommand.getPlaylistBaseUrl())
 
-  return video.streamingPlaylists[0].files
+      await makeRawRequest(file.fileUrl, HttpStatusCode.OK_200)
+    }
+  }
 }
 
-async function streamAndEnd (servers: PeerTubeServer[], liveUUID: string) {
-  const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveUUID })
-  await waitUntilLivePublishedOnAllServers(servers, liveUUID)
-
-  const videoLiveDetails = await servers[0].videos.get({ id: liveUUID })
-  const liveDetails = await servers[0].live.get({ videoId: liveUUID })
+async function checkFilesCleanup (server: PeerTubeServer, videoUUID: string, resolutions: number[]) {
+  const resolutionFiles = resolutions.map((_value, i) => `${i}.m3u8`)
 
-  await stopFfmpeg(ffmpegCommand)
-
-  if (liveDetails.permanentLive) {
-    await waitUntilLiveWaitingOnAllServers(servers, liveUUID)
-  } else {
-    await waitUntilLiveReplacedByReplayOnAllServers(servers, liveUUID)
+  for (const playlistName of [ 'master.m3u8' ].concat(resolutionFiles)) {
+    await server.live.getPlaylistFile({
+      videoUUID,
+      playlistName,
+      expectedStatus: HttpStatusCode.NOT_FOUND_404,
+      objectStorage: true
+    })
   }
 
-  await waitJobs(servers)
-
-  return { videoLiveDetails, liveDetails }
+  await server.live.getSegmentFile({
+    videoUUID,
+    playlistNumber: 0,
+    segment: 0,
+    objectStorage: true,
+    expectedStatus: HttpStatusCode.NOT_FOUND_404
+  })
 }
 
 describe('Object storage for lives', function () {
@@ -100,57 +102,124 @@ describe('Object storage for lives', function () {
       videoUUID = await createLive(servers[0], false)
     })
 
-    it('Should create a live and save the replay on object storage', async function () {
+    it('Should create a live and publish it on object storage', async function () {
+      this.timeout(220000)
+
+      const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUID })
+      await waitUntilLivePublishedOnAllServers(servers, videoUUID)
+
+      await testVideoResolutions({
+        originServer: servers[0],
+        servers,
+        liveVideoId: videoUUID,
+        resolutions: [ 720 ],
+        transcoded: false,
+        objectStorage: true
+      })
+
+      await stopFfmpeg(ffmpegCommand)
+    })
+
+    it('Should have saved the replay on object storage', async function () {
       this.timeout(220000)
 
-      await streamAndEnd(servers, videoUUID)
+      await waitUntilLiveReplacedByReplayOnAllServers(servers, videoUUID)
+      await waitJobs(servers)
 
-      for (const server of servers) {
-        const files = await getFiles(server, videoUUID)
-        expect(files).to.have.lengthOf(1)
+      await checkFilesExist(servers, videoUUID, 1)
+    })
 
-        await checkFiles(files)
-      }
+    it('Should have cleaned up live files from object storage', async function () {
+      await checkFilesCleanup(servers[0], videoUUID, [ 720 ])
     })
   })
 
   describe('With live transcoding', async function () {
-    let videoUUIDPermanent: string
-    let videoUUIDNonPermanent: string
+    const resolutions = [ 720, 480, 360, 240, 144 ]
 
     before(async function () {
       await servers[0].config.enableLive({ transcoding: true })
-
-      videoUUIDPermanent = await createLive(servers[0], true)
-      videoUUIDNonPermanent = await createLive(servers[0], false)
     })
 
-    it('Should create a live and save the replay on object storage', async function () {
-      this.timeout(240000)
+    describe('Normal replay', function () {
+      let videoUUIDNonPermanent: string
+
+      before(async function () {
+        videoUUIDNonPermanent = await createLive(servers[0], false)
+      })
+
+      it('Should create a live and publish it on object storage', async function () {
+        this.timeout(240000)
+
+        const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUIDNonPermanent })
+        await waitUntilLivePublishedOnAllServers(servers, videoUUIDNonPermanent)
+
+        await testVideoResolutions({
+          originServer: servers[0],
+          servers,
+          liveVideoId: videoUUIDNonPermanent,
+          resolutions,
+          transcoded: true,
+          objectStorage: true
+        })
+
+        await stopFfmpeg(ffmpegCommand)
+      })
 
-      await streamAndEnd(servers, videoUUIDNonPermanent)
+      it('Should have saved the replay on object storage', async function () {
+        this.timeout(220000)
 
-      for (const server of servers) {
-        const files = await getFiles(server, videoUUIDNonPermanent)
-        expect(files).to.have.lengthOf(5)
+        await waitUntilLiveReplacedByReplayOnAllServers(servers, videoUUIDNonPermanent)
+        await waitJobs(servers)
 
-        await checkFiles(files)
-      }
+        await checkFilesExist(servers, videoUUIDNonPermanent, 5)
+      })
+
+      it('Should have cleaned up live files from object storage', async function () {
+        await checkFilesCleanup(servers[0], videoUUIDNonPermanent, resolutions)
+      })
     })
 
-    it('Should create a live and save the replay of permanent live on object storage', async function () {
-      this.timeout(240000)
+    describe('Permanent replay', function () {
+      let videoUUIDPermanent: string
+
+      before(async function () {
+        videoUUIDPermanent = await createLive(servers[0], true)
+      })
+
+      it('Should create a live and publish it on object storage', async function () {
+        this.timeout(240000)
+
+        const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: videoUUIDPermanent })
+        await waitUntilLivePublishedOnAllServers(servers, videoUUIDPermanent)
+
+        await testVideoResolutions({
+          originServer: servers[0],
+          servers,
+          liveVideoId: videoUUIDPermanent,
+          resolutions,
+          transcoded: true,
+          objectStorage: true
+        })
+
+        await stopFfmpeg(ffmpegCommand)
+      })
+
+      it('Should have saved the replay on object storage', async function () {
+        this.timeout(220000)
 
-      const { videoLiveDetails } = await streamAndEnd(servers, videoUUIDPermanent)
+        await waitUntilLiveWaitingOnAllServers(servers, videoUUIDPermanent)
+        await waitJobs(servers)
 
-      const replay = await findExternalSavedVideo(servers[0], videoLiveDetails)
+        const videoLiveDetails = await servers[0].videos.get({ id: videoUUIDPermanent })
+        const replay = await findExternalSavedVideo(servers[0], videoLiveDetails)
 
-      for (const server of servers) {
-        const files = await getFiles(server, replay.uuid)
-        expect(files).to.have.lengthOf(5)
+        await checkFilesExist(servers, replay.uuid, 5)
+      })
 
-        await checkFiles(files)
-      }
+      it('Should have cleaned up live files from object storage', async function () {
+        await checkFilesCleanup(servers[0], videoUUIDPermanent, resolutions)
+      })
     })
   })
 
index 4bd4786fc18564d8951a3e0037f64b75e1e8e861..aa79622cbb57cce134f987620220974662ccb892 100644 (file)
@@ -3,39 +3,92 @@
 import { expect } from 'chai'
 import { pathExists, readdir } from 'fs-extra'
 import { join } from 'path'
-import { LiveVideo } from '@shared/models'
-import { PeerTubeServer } from '@shared/server-commands'
+import { wait } from '@shared/core-utils'
+import { LiveVideo, VideoStreamingPlaylistType } from '@shared/models'
+import { ObjectStorageCommand, PeerTubeServer } from '@shared/server-commands'
+import { checkLiveSegmentHash, checkResolutionsInMasterPlaylist } from './streaming-playlists'
 
 async function checkLiveCleanup (server: PeerTubeServer, videoUUID: string, savedResolutions: number[] = []) {
-  let live: LiveVideo
-
-  try {
-    live = await server.live.get({ videoId: videoUUID })
-  } catch {}
-
   const basePath = server.servers.buildDirectory('streaming-playlists')
   const hlsPath = join(basePath, 'hls', videoUUID)
 
   if (savedResolutions.length === 0) {
+    return checkUnsavedLiveCleanup(server, videoUUID, hlsPath)
+  }
+
+  return checkSavedLiveCleanup(hlsPath, savedResolutions)
+}
+
+// ---------------------------------------------------------------------------
 
-    if (live?.permanentLive) {
-      expect(await pathExists(hlsPath)).to.be.true
+async function testVideoResolutions (options: {
+  originServer: PeerTubeServer
+  servers: PeerTubeServer[]
+  liveVideoId: string
+  resolutions: number[]
+  transcoded: boolean
+  objectStorage: boolean
+}) {
+  const { originServer, servers, liveVideoId, resolutions, transcoded, objectStorage } = options
 
-      const hlsFiles = await readdir(hlsPath)
-      expect(hlsFiles).to.have.lengthOf(1) // Only replays directory
+  for (const server of servers) {
+    const { data } = await server.videos.list()
+    expect(data.find(v => v.uuid === liveVideoId)).to.exist
 
-      const replayDir = join(hlsPath, 'replay')
-      expect(await pathExists(replayDir)).to.be.true
+    const video = await server.videos.get({ id: liveVideoId })
+    expect(video.streamingPlaylists).to.have.lengthOf(1)
 
-      const replayFiles = await readdir(join(hlsPath, 'replay'))
-      expect(replayFiles).to.have.lengthOf(0)
-    } else {
-      expect(await pathExists(hlsPath)).to.be.false
+    const hlsPlaylist = video.streamingPlaylists.find(s => s.type === VideoStreamingPlaylistType.HLS)
+    expect(hlsPlaylist).to.exist
+    expect(hlsPlaylist.files).to.have.lengthOf(0) // Only fragmented mp4 files are displayed
+
+    await checkResolutionsInMasterPlaylist({ server, playlistUrl: hlsPlaylist.playlistUrl, resolutions, transcoded })
+
+    if (objectStorage) {
+      expect(hlsPlaylist.playlistUrl).to.contain(ObjectStorageCommand.getPlaylistBaseUrl())
     }
 
-    return
+    for (let i = 0; i < resolutions.length; i++) {
+      const segmentNum = 3
+      const segmentName = `${i}-00000${segmentNum}.ts`
+      await originServer.live.waitUntilSegmentGeneration({ videoUUID: video.uuid, playlistNumber: i, segment: segmentNum })
+
+      const baseUrl = objectStorage
+        ? ObjectStorageCommand.getPlaylistBaseUrl() + 'hls'
+        : originServer.url + '/static/streaming-playlists/hls'
+
+      if (objectStorage) {
+        // Playlist file upload
+        await wait(500)
+
+        expect(hlsPlaylist.segmentsSha256Url).to.contain(ObjectStorageCommand.getPlaylistBaseUrl())
+      }
+
+      const subPlaylist = await originServer.streamingPlaylists.get({ url: `${baseUrl}/${video.uuid}/${i}.m3u8` })
+
+      expect(subPlaylist).to.contain(segmentName)
+
+      await checkLiveSegmentHash({
+        server,
+        baseUrlSegment: baseUrl,
+        videoUUID: video.uuid,
+        segmentName,
+        hlsPlaylist
+      })
+    }
   }
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+  checkLiveCleanup,
+  testVideoResolutions
+}
 
+// ---------------------------------------------------------------------------
+
+async function checkSavedLiveCleanup (hlsPath: string, savedResolutions: number[] = []) {
   const files = await readdir(hlsPath)
 
   // fragmented file and playlist per resolution + master playlist + segments sha256 json file
@@ -56,6 +109,27 @@ async function checkLiveCleanup (server: PeerTubeServer, videoUUID: string, save
   expect(shaFile).to.exist
 }
 
-export {
-  checkLiveCleanup
+async function checkUnsavedLiveCleanup (server: PeerTubeServer, videoUUID: string, hlsPath: string) {
+  let live: LiveVideo
+
+  try {
+    live = await server.live.get({ videoId: videoUUID })
+  } catch {}
+
+  if (live?.permanentLive) {
+    expect(await pathExists(hlsPath)).to.be.true
+
+    const hlsFiles = await readdir(hlsPath)
+    expect(hlsFiles).to.have.lengthOf(1) // Only replays directory
+
+    const replayDir = join(hlsPath, 'replay')
+    expect(await pathExists(replayDir)).to.be.true
+
+    const replayFiles = await readdir(join(hlsPath, 'replay'))
+    expect(replayFiles).to.have.lengthOf(0)
+
+    return
+  }
+
+  expect(await pathExists(hlsPath)).to.be.false
 }
index 4d82b36543d1ff12ba73fe41615191b4cef8f20c..eff34944b5f9ba9736703abab0f1e5cc2966a4f2 100644 (file)
@@ -26,7 +26,7 @@ async function checkSegmentHash (options: {
   const offset = parseInt(matches[2], 10)
   const range = `${offset}-${offset + length - 1}`
 
-  const segmentBody = await command.getSegment({
+  const segmentBody = await command.getFragmentedSegment({
     url: `${baseUrlSegment}/${videoName}`,
     expectedStatus: HttpStatusCode.PARTIAL_CONTENT_206,
     range: `bytes=${range}`
@@ -46,7 +46,7 @@ async function checkLiveSegmentHash (options: {
   const { server, baseUrlSegment, videoUUID, segmentName, hlsPlaylist } = options
   const command = server.streamingPlaylists
 
-  const segmentBody = await command.getSegment({ url: `${baseUrlSegment}/${videoUUID}/${segmentName}` })
+  const segmentBody = await command.getFragmentedSegment({ url: `${baseUrlSegment}/${videoUUID}/${segmentName}` })
   const shaBody = await command.getSegmentSha256({ url: hlsPlaylist.segmentsSha256Url })
 
   expect(sha256(segmentBody)).to.equal(shaBody[segmentName])
@@ -56,15 +56,16 @@ async function checkResolutionsInMasterPlaylist (options: {
   server: PeerTubeServer
   playlistUrl: string
   resolutions: number[]
+  transcoded?: boolean // default true
 }) {
-  const { server, playlistUrl, resolutions } = options
+  const { server, playlistUrl, resolutions, transcoded = true } = options
 
   const masterPlaylist = await server.streamingPlaylists.get({ url: playlistUrl })
 
   for (const resolution of resolutions) {
-    const reg = new RegExp(
-      '#EXT-X-STREAM-INF:BANDWIDTH=\\d+,RESOLUTION=\\d+x' + resolution + ',(FRAME-RATE=\\d+,)?CODECS="avc1.64001f,mp4a.40.2"'
-    )
+    const reg = transcoded
+      ? new RegExp('#EXT-X-STREAM-INF:BANDWIDTH=\\d+,RESOLUTION=\\d+x' + resolution + ',(FRAME-RATE=\\d+,)?CODECS="avc1.64001f,mp4a.40.2"')
+      : new RegExp('#EXT-X-STREAM-INF:BANDWIDTH=\\d+,RESOLUTION=\\d+x' + resolution + '')
 
     expect(masterPlaylist).to.match(reg)
   }
index d804fd883b53287b2656b1fe5e369525e4e60215..defae95fb92a1b50844f257017c26046f3aed31d 100644 (file)
@@ -15,6 +15,7 @@ import {
   VideoState
 } from '@shared/models'
 import { unwrapBody } from '../requests'
+import { ObjectStorageCommand } from '../server'
 import { AbstractCommand, OverrideCommandOptions } from '../shared'
 import { sendRTMPStream, testFfmpegStreamError } from './live'
 
@@ -34,6 +35,8 @@ export class LiveCommand extends AbstractCommand {
     })
   }
 
+  // ---------------------------------------------------------------------------
+
   listSessions (options: OverrideCommandOptions & {
     videoId: number | string
   }) {
@@ -70,6 +73,8 @@ export class LiveCommand extends AbstractCommand {
     })
   }
 
+  // ---------------------------------------------------------------------------
+
   update (options: OverrideCommandOptions & {
     videoId: number | string
     fields: LiveVideoUpdate
@@ -110,6 +115,8 @@ export class LiveCommand extends AbstractCommand {
     return body.video
   }
 
+  // ---------------------------------------------------------------------------
+
   async sendRTMPStreamInVideo (options: OverrideCommandOptions & {
     videoId: number | string
     fixtureName?: string
@@ -130,6 +137,8 @@ export class LiveCommand extends AbstractCommand {
     return testFfmpegStreamError(command, options.shouldHaveError)
   }
 
+  // ---------------------------------------------------------------------------
+
   waitUntilPublished (options: OverrideCommandOptions & {
     videoId: number | string
   }) {
@@ -163,15 +172,34 @@ export class LiveCommand extends AbstractCommand {
     return this.server.servers.waitUntilLog(`${videoUUID}/${segmentName}`, totalSessions * 2, false)
   }
 
-  getSegment (options: OverrideCommandOptions & {
+  async waitUntilReplacedByReplay (options: OverrideCommandOptions & {
+    videoId: number | string
+  }) {
+    let video: VideoDetails
+
+    do {
+      video = await this.server.videos.getWithToken({ token: options.token, id: options.videoId })
+
+      await wait(500)
+    } while (video.isLive === true || video.state.id !== VideoState.PUBLISHED)
+  }
+
+  // ---------------------------------------------------------------------------
+
+  getSegmentFile (options: OverrideCommandOptions & {
     videoUUID: string
     playlistNumber: number
     segment: number
+    objectStorage?: boolean // default false
   }) {
-    const { playlistNumber, segment, videoUUID } = options
+    const { playlistNumber, segment, videoUUID, objectStorage = false } = options
 
     const segmentName = `${playlistNumber}-00000${segment}.ts`
-    const url = `${this.server.url}/static/streaming-playlists/hls/${videoUUID}/${segmentName}`
+    const baseUrl = objectStorage
+      ? ObjectStorageCommand.getPlaylistBaseUrl()
+      : `${this.server.url}/static/streaming-playlists/hls`
+
+    const url = `${baseUrl}/${videoUUID}/${segmentName}`
 
     return this.getRawRequest({
       ...options,
@@ -182,18 +210,30 @@ export class LiveCommand extends AbstractCommand {
     })
   }
 
-  async waitUntilReplacedByReplay (options: OverrideCommandOptions & {
-    videoId: number | string
+  getPlaylistFile (options: OverrideCommandOptions & {
+    videoUUID: string
+    playlistName: string
+    objectStorage?: boolean // default false
   }) {
-    let video: VideoDetails
+    const { playlistName, videoUUID, objectStorage = false } = options
 
-    do {
-      video = await this.server.videos.getWithToken({ token: options.token, id: options.videoId })
+    const baseUrl = objectStorage
+      ? ObjectStorageCommand.getPlaylistBaseUrl()
+      : `${this.server.url}/static/streaming-playlists/hls`
 
-      await wait(500)
-    } while (video.isLive === true || video.state.id !== VideoState.PUBLISHED)
+    const url = `${baseUrl}/${videoUUID}/${playlistName}`
+
+    return this.getRawRequest({
+      ...options,
+
+      url,
+      implicitToken: false,
+      defaultExpectedStatus: HttpStatusCode.OK_200
+    })
   }
 
+  // ---------------------------------------------------------------------------
+
   async countPlaylists (options: OverrideCommandOptions & {
     videoUUID: string
   }) {
index 5d40d35cb9e06b1d0d90cb0ba0c14a39edaf177d..7f923d001a39b0f5ddefc61f397d598b8594cf40 100644 (file)
@@ -16,7 +16,7 @@ export class StreamingPlaylistsCommand extends AbstractCommand {
     }))
   }
 
-  getSegment (options: OverrideCommandOptions & {
+  getFragmentedSegment (options: OverrideCommandOptions & {
     url: string
     range?: string
   }) {
index 5077f8d90d0bd83796e0a2d8d7486a2e19fe6363..c62310b761d80324b9c2dbd974b97c1055bc2399 100644 (file)
@@ -145,7 +145,6 @@ info:
     | `/api/*`                    |
     | `/download/*`               |
     | `/lazy-static/*`            |
-    | `/live/segments-sha256/*`   |
     | `/.well-known/webfinger`    |
 
     In addition, all routes serving ActivityPub are CORS-enabled for all origins.