]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Fix S3 live sync
authorChocobozzz <me@florianbigard.com>
Wed, 10 May 2023 09:16:05 +0000 (11:16 +0200)
committerChocobozzz <me@florianbigard.com>
Wed, 10 May 2023 09:16:05 +0000 (11:16 +0200)
Ensure TS chunks referenced in M3U8 playlist are already uploaded on S3

server/lib/live/shared/muxing-session.ts
server/lib/object-storage/shared/object-storage-helpers.ts
server/lib/object-storage/videos.ts
server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts

index ef4ecb83e9affea3e63e566bc1aaf085a483b6f2..57e28aabf46a3b02e4ab281e6ef43d933e994855 100644 (file)
@@ -8,7 +8,12 @@ import { computeOutputFPS } from '@server/helpers/ffmpeg'
 import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
 import { CONFIG } from '@server/initializers/config'
 import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
-import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
+import {
+  removeHLSFileObjectStorageByPath,
+  storeHLSFileFromContent,
+  storeHLSFileFromFilename,
+  storeHLSFileFromPath
+} from '@server/lib/object-storage'
 import { VideoFileModel } from '@server/models/video/video-file'
 import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
 import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
@@ -74,6 +79,9 @@ class MuxingSession extends EventEmitter {
 
   private readonly lTags: LoggerTagsFn
 
+  // Path -> Queue
+  private readonly objectStorageSendQueues = new Map<string, PQueue>()
+
   private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
 
   private streamingPlaylist: MStreamingPlaylistVideo
@@ -151,7 +159,6 @@ class MuxingSession extends EventEmitter {
 
     this.watchMasterFile()
     this.watchTSFiles()
-    this.watchM3U8File()
   }
 
   abort () {
@@ -192,41 +199,15 @@ class MuxingSession extends EventEmitter {
     })
   }
 
-  private watchM3U8File () {
-    const sendQueues = new Map<string, PQueue>()
-
-    const onChange = async (m3u8Path: string) => {
-      if (m3u8Path.endsWith('.m3u8') !== true) return
-      if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
-
-      logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags())
-
-      try {
-        if (!sendQueues.has(m3u8Path)) {
-          sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
-        }
-
-        const queue = sendQueues.get(m3u8Path)
-        await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
-      } catch (err) {
-        logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
-      }
-    }
-
-    this.filesWatcher.on('change', onChange)
-  }
-
   private watchTSFiles () {
     const startStreamDateTime = new Date().getTime()
 
-    const playlistIdMatcher = /^([\d+])-/
-
     const addHandler = async (segmentPath: string) => {
       if (segmentPath.endsWith('.ts') !== true) return
 
       logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags())
 
-      const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
+      const playlistId = this.getPlaylistIdFromTS(segmentPath)
 
       const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || []
       this.processSegments(segmentsToProcess)
@@ -349,6 +330,8 @@ class MuxingSession extends EventEmitter {
     if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
       try {
         await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
+
+        await this.processM3U8ToObjectStorage(segmentPath)
       } catch (err) {
         logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
       }
@@ -362,6 +345,29 @@ class MuxingSession extends EventEmitter {
     }
   }
 
+  private async processM3U8ToObjectStorage (segmentPath: string) {
+    const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath))
+
+    logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags())
+
+    const segmentName = basename(segmentPath)
+
+    const playlistContent = await readFile(m3u8Path, 'utf-8')
+    // Remove new chunk references, that will be processed later
+    const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n'
+
+    try {
+      if (!this.objectStorageSendQueues.has(m3u8Path)) {
+        this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
+      }
+
+      const queue = this.objectStorageSendQueues.get(m3u8Path)
+      await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent))
+    } catch (err) {
+      logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
+    }
+  }
+
   private onTranscodingError () {
     this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
   }
@@ -484,6 +490,16 @@ class MuxingSession extends EventEmitter {
       ? new RemoteTranscodingWrapper(options)
       : new FFmpegTranscodingWrapper(options)
   }
+
+  private getPlaylistIdFromTS (segmentPath: string) {
+    const playlistIdMatcher = /^([\d+])-/
+
+    return basename(segmentPath).match(playlistIdMatcher)[1]
+  }
+
+  private getPlaylistNameFromTS (segmentPath: string) {
+    return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8`
+  }
 }
 
 // ---------------------------------------------------------------------------
index be94b01a8e2851d618bb6e281f24ce4f7d16d116..f517c5f6940963968aba928dd52fa7cca838f5dc 100644 (file)
@@ -59,6 +59,20 @@ async function storeObject (options: {
   return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
 }
 
+async function storeContent (options: {
+  content: string
+  inputPath: string
+  objectStorageKey: string
+  bucketInfo: BucketInfo
+  isPrivate: boolean
+}): Promise<string> {
+  const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options
+
+  logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
+
+  return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate })
+}
+
 // ---------------------------------------------------------------------------
 
 async function updateObjectACL (options: {
@@ -206,6 +220,7 @@ export {
   buildKey,
 
   storeObject,
+  storeContent,
 
   removeObject,
   removeObjectByFullKey,
@@ -223,7 +238,7 @@ export {
 // ---------------------------------------------------------------------------
 
 async function uploadToStorage (options: {
-  content: ReadStream
+  content: ReadStream | string
   objectStorageKey: string
   bucketInfo: BucketInfo
   isPrivate: boolean
index bfdef94fd9dd71670200f607e899c84df6e8fbc2..57d978e4cc3bbc85eafc35068d5b7ca51a5cd312 100644 (file)
@@ -42,6 +42,15 @@ function storeHLSFileFromPath (playlist: MStreamingPlaylistVideo, path: string)
   })
 }
 
+function storeHLSFileFromContent (playlist: MStreamingPlaylistVideo, path: string, content: string) {
+  return storeObject({
+    inputPath: path,
+    objectStorageKey: generateHLSObjectStorageKey(playlist, basename(path)),
+    bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS,
+    isPrivate: playlist.Video.hasPrivateStaticPath()
+  })
+}
+
 // ---------------------------------------------------------------------------
 
 function storeWebTorrentFile (video: MVideo, file: MVideoFile) {
@@ -166,6 +175,7 @@ export {
   storeWebTorrentFile,
   storeHLSFileFromFilename,
   storeHLSFileFromPath,
+  storeHLSFileFromContent,
 
   updateWebTorrentFileACL,
   updateHLSFilesACL,
index 48a70d891cf6d54aa7abcae312dd1a6c899a07e0..c3b0a95cc0082632d9b268ac6bd145e292585793 100644 (file)
@@ -80,6 +80,7 @@ export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateO
     const outputDirectory = privatePayload.outputDirectory
     const videoUUID = privatePayload.videoUUID
 
+    // Always process the chunk first before moving m3u8 that references this chunk
     if (updatePayload.type === 'add-chunk') {
       await move(
         updatePayload.videoChunkFile as string,