aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live/shared/muxing-session.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-05-10 11:16:05 +0200
committerChocobozzz <me@florianbigard.com>2023-05-10 11:16:05 +0200
commit34023e12534f22f28d0b5afc1db3fdf2fd1e7e60 (patch)
tree29b219412b79231ac7a62e482982bf1a9a7aada8 /server/lib/live/shared/muxing-session.ts
parentf9eee54f2a601b8db7ebb7c11d149b39170e6d68 (diff)
downloadPeerTube-34023e12534f22f28d0b5afc1db3fdf2fd1e7e60.tar.gz
PeerTube-34023e12534f22f28d0b5afc1db3fdf2fd1e7e60.tar.zst
PeerTube-34023e12534f22f28d0b5afc1db3fdf2fd1e7e60.zip
Fix S3 live sync
Ensure TS chunks referenced in M3U8 playlist are already uploaded on S3
Diffstat (limited to 'server/lib/live/shared/muxing-session.ts')
-rw-r--r--server/lib/live/shared/muxing-session.ts74
1 files changed, 45 insertions, 29 deletions
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index ef4ecb83e..57e28aabf 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -8,7 +8,12 @@ import { computeOutputFPS } from '@server/helpers/ffmpeg'
8import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' 8import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
9import { CONFIG } from '@server/initializers/config' 9import { CONFIG } from '@server/initializers/config'
10import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' 10import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
11import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage' 11import {
12 removeHLSFileObjectStorageByPath,
13 storeHLSFileFromContent,
14 storeHLSFileFromFilename,
15 storeHLSFileFromPath
16} from '@server/lib/object-storage'
12import { VideoFileModel } from '@server/models/video/video-file' 17import { VideoFileModel } from '@server/models/video/video-file'
13import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 18import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
14import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' 19import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
@@ -74,6 +79,9 @@ class MuxingSession extends EventEmitter {
74 79
75 private readonly lTags: LoggerTagsFn 80 private readonly lTags: LoggerTagsFn
76 81
82 // Path -> Queue
83 private readonly objectStorageSendQueues = new Map<string, PQueue>()
84
77 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} 85 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
78 86
79 private streamingPlaylist: MStreamingPlaylistVideo 87 private streamingPlaylist: MStreamingPlaylistVideo
@@ -151,7 +159,6 @@ class MuxingSession extends EventEmitter {
151 159
152 this.watchMasterFile() 160 this.watchMasterFile()
153 this.watchTSFiles() 161 this.watchTSFiles()
154 this.watchM3U8File()
155 } 162 }
156 163
157 abort () { 164 abort () {
@@ -192,41 +199,15 @@ class MuxingSession extends EventEmitter {
192 }) 199 })
193 } 200 }
194 201
195 private watchM3U8File () {
196 const sendQueues = new Map<string, PQueue>()
197
198 const onChange = async (m3u8Path: string) => {
199 if (m3u8Path.endsWith('.m3u8') !== true) return
200 if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
201
202 logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags())
203
204 try {
205 if (!sendQueues.has(m3u8Path)) {
206 sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
207 }
208
209 const queue = sendQueues.get(m3u8Path)
210 await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
211 } catch (err) {
212 logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
213 }
214 }
215
216 this.filesWatcher.on('change', onChange)
217 }
218
219 private watchTSFiles () { 202 private watchTSFiles () {
220 const startStreamDateTime = new Date().getTime() 203 const startStreamDateTime = new Date().getTime()
221 204
222 const playlistIdMatcher = /^([\d+])-/
223
224 const addHandler = async (segmentPath: string) => { 205 const addHandler = async (segmentPath: string) => {
225 if (segmentPath.endsWith('.ts') !== true) return 206 if (segmentPath.endsWith('.ts') !== true) return
226 207
227 logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) 208 logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags())
228 209
229 const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] 210 const playlistId = this.getPlaylistIdFromTS(segmentPath)
230 211
231 const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] 212 const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || []
232 this.processSegments(segmentsToProcess) 213 this.processSegments(segmentsToProcess)
@@ -349,6 +330,8 @@ class MuxingSession extends EventEmitter {
349 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { 330 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
350 try { 331 try {
351 await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) 332 await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
333
334 await this.processM3U8ToObjectStorage(segmentPath)
352 } catch (err) { 335 } catch (err) {
353 logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) 336 logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
354 } 337 }
@@ -362,6 +345,29 @@ class MuxingSession extends EventEmitter {
362 } 345 }
363 } 346 }
364 347
348 private async processM3U8ToObjectStorage (segmentPath: string) {
349 const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath))
350
351 logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags())
352
353 const segmentName = basename(segmentPath)
354
355 const playlistContent = await readFile(m3u8Path, 'utf-8')
356 // Remove new chunk references, that will be processed later
357 const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n'
358
359 try {
360 if (!this.objectStorageSendQueues.has(m3u8Path)) {
361 this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
362 }
363
364 const queue = this.objectStorageSendQueues.get(m3u8Path)
365 await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent))
366 } catch (err) {
367 logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
368 }
369 }
370
365 private onTranscodingError () { 371 private onTranscodingError () {
366 this.emit('transcoding-error', ({ videoUUID: this.videoUUID })) 372 this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
367 } 373 }
@@ -484,6 +490,16 @@ class MuxingSession extends EventEmitter {
484 ? new RemoteTranscodingWrapper(options) 490 ? new RemoteTranscodingWrapper(options)
485 : new FFmpegTranscodingWrapper(options) 491 : new FFmpegTranscodingWrapper(options)
486 } 492 }
493
494 private getPlaylistIdFromTS (segmentPath: string) {
495 const playlistIdMatcher = /^([\d+])-/
496
497 return basename(segmentPath).match(playlistIdMatcher)[1]
498 }
499
500 private getPlaylistNameFromTS (segmentPath: string) {
501 return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8`
502 }
487} 503}
488 504
489// --------------------------------------------------------------------------- 505// ---------------------------------------------------------------------------