]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/live/shared/muxing-session.ts
Force stop remote live transcoding
[github/Chocobozzz/PeerTube.git] / server / lib / live / shared / muxing-session.ts
index ef4ecb83e9affea3e63e566bc1aaf085a483b6f2..c672ec4d6c28db87bfe39f3cbd2a15422f1abcc4 100644 (file)
@@ -8,7 +8,12 @@ import { computeOutputFPS } from '@server/helpers/ffmpeg'
 import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
 import { CONFIG } from '@server/initializers/config'
 import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
-import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage'
+import {
+  removeHLSFileObjectStorageByPath,
+  storeHLSFileFromContent,
+  storeHLSFileFromFilename,
+  storeHLSFileFromPath
+} from '@server/lib/object-storage'
 import { VideoFileModel } from '@server/models/video/video-file'
 import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
 import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
@@ -57,7 +62,10 @@ class MuxingSession extends EventEmitter {
   private readonly user: MUserId
   private readonly sessionId: string
   private readonly videoLive: MVideoLiveVideo
-  private readonly inputUrl: string
+
+  private readonly inputLocalUrl: string
+  private readonly inputPublicUrl: string
+
   private readonly fps: number
   private readonly allResolutions: number[]
 
@@ -74,6 +82,9 @@ class MuxingSession extends EventEmitter {
 
   private readonly lTags: LoggerTagsFn
 
+  // Path -> Queue
+  private readonly objectStorageSendQueues = new Map<string, PQueue>()
+
   private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
 
   private streamingPlaylist: MStreamingPlaylistVideo
@@ -99,7 +110,10 @@ class MuxingSession extends EventEmitter {
     user: MUserId
     sessionId: string
     videoLive: MVideoLiveVideo
-    inputUrl: string
+
+    inputLocalUrl: string
+    inputPublicUrl: string
+
     fps: number
     bitrate: number
     ratio: number
@@ -112,7 +126,10 @@ class MuxingSession extends EventEmitter {
     this.user = options.user
     this.sessionId = options.sessionId
     this.videoLive = options.videoLive
-    this.inputUrl = options.inputUrl
+
+    this.inputLocalUrl = options.inputLocalUrl
+    this.inputPublicUrl = options.inputPublicUrl
+
     this.fps = options.fps
 
     this.bitrate = options.bitrate
@@ -151,7 +168,6 @@ class MuxingSession extends EventEmitter {
 
     this.watchMasterFile()
     this.watchTSFiles()
-    this.watchM3U8File()
   }
 
   abort () {
@@ -192,41 +208,15 @@ class MuxingSession extends EventEmitter {
     })
   }
 
-  private watchM3U8File () {
-    const sendQueues = new Map<string, PQueue>()
-
-    const onChange = async (m3u8Path: string) => {
-      if (m3u8Path.endsWith('.m3u8') !== true) return
-      if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
-
-      logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags())
-
-      try {
-        if (!sendQueues.has(m3u8Path)) {
-          sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
-        }
-
-        const queue = sendQueues.get(m3u8Path)
-        await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
-      } catch (err) {
-        logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
-      }
-    }
-
-    this.filesWatcher.on('change', onChange)
-  }
-
   private watchTSFiles () {
     const startStreamDateTime = new Date().getTime()
 
-    const playlistIdMatcher = /^([\d+])-/
-
     const addHandler = async (segmentPath: string) => {
       if (segmentPath.endsWith('.ts') !== true) return
 
       logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags())
 
-      const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
+      const playlistId = this.getPlaylistIdFromTS(segmentPath)
 
       const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || []
       this.processSegments(segmentsToProcess)
@@ -349,6 +339,8 @@ class MuxingSession extends EventEmitter {
     if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
       try {
         await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
+
+        await this.processM3U8ToObjectStorage(segmentPath)
       } catch (err) {
         logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
       }
@@ -362,6 +354,29 @@ class MuxingSession extends EventEmitter {
     }
   }
 
+  private async processM3U8ToObjectStorage (segmentPath: string) {
+    const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath))
+
+    logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags())
+
+    const segmentName = basename(segmentPath)
+
+    const playlistContent = await readFile(m3u8Path, 'utf-8')
+    // Remove new chunk references, that will be processed later
+    const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n'
+
+    try {
+      if (!this.objectStorageSendQueues.has(m3u8Path)) {
+        this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
+      }
+
+      const queue = this.objectStorageSendQueues.get(m3u8Path)
+      await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent))
+    } catch (err) {
+      logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
+    }
+  }
+
   private onTranscodingError () {
     this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
   }
@@ -369,7 +384,7 @@ class MuxingSession extends EventEmitter {
   private onTranscodedEnded () {
     this.emit('transcoding-end', ({ videoUUID: this.videoUUID }))
 
-    logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags())
+    logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputLocalUrl, this.lTags())
 
     setTimeout(() => {
       // Wait latest segments generation, and close watchers
@@ -462,7 +477,9 @@ class MuxingSession extends EventEmitter {
 
       lTags: this.lTags,
 
-      inputUrl: this.inputUrl,
+      sessionId: this.sessionId,
+      inputLocalUrl: this.inputLocalUrl,
+      inputPublicUrl: this.inputPublicUrl,
 
       toTranscode: this.allResolutions.map(resolution => ({
         resolution,
@@ -484,6 +501,16 @@ class MuxingSession extends EventEmitter {
       ? new RemoteTranscodingWrapper(options)
       : new FFmpegTranscodingWrapper(options)
   }
+
+  private getPlaylistIdFromTS (segmentPath: string) {
+    const playlistIdMatcher = /^([\d+])-/
+
+    return basename(segmentPath).match(playlistIdMatcher)[1]
+  }
+
+  private getPlaylistNameFromTS (segmentPath: string) {
+    return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8`
+  }
 }
 
 // ---------------------------------------------------------------------------