aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-05-10 11:16:05 +0200
committerChocobozzz <me@florianbigard.com>2023-05-10 11:16:05 +0200
commit34023e12534f22f28d0b5afc1db3fdf2fd1e7e60 (patch)
tree29b219412b79231ac7a62e482982bf1a9a7aada8
parentf9eee54f2a601b8db7ebb7c11d149b39170e6d68 (diff)
downloadPeerTube-34023e12534f22f28d0b5afc1db3fdf2fd1e7e60.tar.gz
PeerTube-34023e12534f22f28d0b5afc1db3fdf2fd1e7e60.tar.zst
PeerTube-34023e12534f22f28d0b5afc1db3fdf2fd1e7e60.zip
Fix S3 live sync
Ensure TS chunks referenced in M3U8 playlist are already uploaded on S3
-rw-r--r--server/lib/live/shared/muxing-session.ts74
-rw-r--r--server/lib/object-storage/shared/object-storage-helpers.ts17
-rw-r--r--server/lib/object-storage/videos.ts10
-rw-r--r--server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts1
4 files changed, 72 insertions, 30 deletions
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index ef4ecb83e..57e28aabf 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -8,7 +8,12 @@ import { computeOutputFPS } from '@server/helpers/ffmpeg'
8import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' 8import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
9import { CONFIG } from '@server/initializers/config' 9import { CONFIG } from '@server/initializers/config'
10import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' 10import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
11import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage' 11import {
12 removeHLSFileObjectStorageByPath,
13 storeHLSFileFromContent,
14 storeHLSFileFromFilename,
15 storeHLSFileFromPath
16} from '@server/lib/object-storage'
12import { VideoFileModel } from '@server/models/video/video-file' 17import { VideoFileModel } from '@server/models/video/video-file'
13import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 18import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
14import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' 19import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
@@ -74,6 +79,9 @@ class MuxingSession extends EventEmitter {
74 79
75 private readonly lTags: LoggerTagsFn 80 private readonly lTags: LoggerTagsFn
76 81
82 // Path -> Queue
83 private readonly objectStorageSendQueues = new Map<string, PQueue>()
84
77 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} 85 private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
78 86
79 private streamingPlaylist: MStreamingPlaylistVideo 87 private streamingPlaylist: MStreamingPlaylistVideo
@@ -151,7 +159,6 @@ class MuxingSession extends EventEmitter {
151 159
152 this.watchMasterFile() 160 this.watchMasterFile()
153 this.watchTSFiles() 161 this.watchTSFiles()
154 this.watchM3U8File()
155 } 162 }
156 163
157 abort () { 164 abort () {
@@ -192,41 +199,15 @@ class MuxingSession extends EventEmitter {
192 }) 199 })
193 } 200 }
194 201
195 private watchM3U8File () {
196 const sendQueues = new Map<string, PQueue>()
197
198 const onChange = async (m3u8Path: string) => {
199 if (m3u8Path.endsWith('.m3u8') !== true) return
200 if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return
201
202 logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags())
203
204 try {
205 if (!sendQueues.has(m3u8Path)) {
206 sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
207 }
208
209 const queue = sendQueues.get(m3u8Path)
210 await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path))
211 } catch (err) {
212 logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
213 }
214 }
215
216 this.filesWatcher.on('change', onChange)
217 }
218
219 private watchTSFiles () { 202 private watchTSFiles () {
220 const startStreamDateTime = new Date().getTime() 203 const startStreamDateTime = new Date().getTime()
221 204
222 const playlistIdMatcher = /^([\d+])-/
223
224 const addHandler = async (segmentPath: string) => { 205 const addHandler = async (segmentPath: string) => {
225 if (segmentPath.endsWith('.ts') !== true) return 206 if (segmentPath.endsWith('.ts') !== true) return
226 207
227 logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) 208 logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags())
228 209
229 const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] 210 const playlistId = this.getPlaylistIdFromTS(segmentPath)
230 211
231 const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] 212 const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || []
232 this.processSegments(segmentsToProcess) 213 this.processSegments(segmentsToProcess)
@@ -349,6 +330,8 @@ class MuxingSession extends EventEmitter {
349 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { 330 if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) {
350 try { 331 try {
351 await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) 332 await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
333
334 await this.processM3U8ToObjectStorage(segmentPath)
352 } catch (err) { 335 } catch (err) {
353 logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) 336 logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
354 } 337 }
@@ -362,6 +345,29 @@ class MuxingSession extends EventEmitter {
362 } 345 }
363 } 346 }
364 347
348 private async processM3U8ToObjectStorage (segmentPath: string) {
349 const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath))
350
351 logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags())
352
353 const segmentName = basename(segmentPath)
354
355 const playlistContent = await readFile(m3u8Path, 'utf-8')
356 // Remove new chunk references, that will be processed later
357 const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n'
358
359 try {
360 if (!this.objectStorageSendQueues.has(m3u8Path)) {
361 this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
362 }
363
364 const queue = this.objectStorageSendQueues.get(m3u8Path)
365 await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent))
366 } catch (err) {
367 logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
368 }
369 }
370
365 private onTranscodingError () { 371 private onTranscodingError () {
366 this.emit('transcoding-error', ({ videoUUID: this.videoUUID })) 372 this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
367 } 373 }
@@ -484,6 +490,16 @@ class MuxingSession extends EventEmitter {
484 ? new RemoteTranscodingWrapper(options) 490 ? new RemoteTranscodingWrapper(options)
485 : new FFmpegTranscodingWrapper(options) 491 : new FFmpegTranscodingWrapper(options)
486 } 492 }
493
494 private getPlaylistIdFromTS (segmentPath: string) {
495 const playlistIdMatcher = /^([\d+])-/
496
497 return basename(segmentPath).match(playlistIdMatcher)[1]
498 }
499
500 private getPlaylistNameFromTS (segmentPath: string) {
501 return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8`
502 }
487} 503}
488 504
489// --------------------------------------------------------------------------- 505// ---------------------------------------------------------------------------
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts
index be94b01a8..f517c5f69 100644
--- a/server/lib/object-storage/shared/object-storage-helpers.ts
+++ b/server/lib/object-storage/shared/object-storage-helpers.ts
@@ -59,6 +59,20 @@ async function storeObject (options: {
59 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate }) 59 return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate })
60} 60}
61 61
62async function storeContent (options: {
63 content: string
64 inputPath: string
65 objectStorageKey: string
66 bucketInfo: BucketInfo
67 isPrivate: boolean
68}): Promise<string> {
69 const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options
70
71 logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags())
72
73 return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate })
74}
75
62// --------------------------------------------------------------------------- 76// ---------------------------------------------------------------------------
63 77
64async function updateObjectACL (options: { 78async function updateObjectACL (options: {
@@ -206,6 +220,7 @@ export {
206 buildKey, 220 buildKey,
207 221
208 storeObject, 222 storeObject,
223 storeContent,
209 224
210 removeObject, 225 removeObject,
211 removeObjectByFullKey, 226 removeObjectByFullKey,
@@ -223,7 +238,7 @@ export {
223// --------------------------------------------------------------------------- 238// ---------------------------------------------------------------------------
224 239
225async function uploadToStorage (options: { 240async function uploadToStorage (options: {
226 content: ReadStream 241 content: ReadStream | string
227 objectStorageKey: string 242 objectStorageKey: string
228 bucketInfo: BucketInfo 243 bucketInfo: BucketInfo
229 isPrivate: boolean 244 isPrivate: boolean
diff --git a/server/lib/object-storage/videos.ts b/server/lib/object-storage/videos.ts
index bfdef94fd..57d978e4c 100644
--- a/server/lib/object-storage/videos.ts
+++ b/server/lib/object-storage/videos.ts
@@ -42,6 +42,15 @@ function storeHLSFileFromPath (playlist: MStreamingPlaylistVideo, path: string)
42 }) 42 })
43} 43}
44 44
45function storeHLSFileFromContent (playlist: MStreamingPlaylistVideo, path: string, content: string) {
46 return storeObject({
47 inputPath: path,
48 objectStorageKey: generateHLSObjectStorageKey(playlist, basename(path)),
49 bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS,
50 isPrivate: playlist.Video.hasPrivateStaticPath()
51 })
52}
53
45// --------------------------------------------------------------------------- 54// ---------------------------------------------------------------------------
46 55
47function storeWebTorrentFile (video: MVideo, file: MVideoFile) { 56function storeWebTorrentFile (video: MVideo, file: MVideoFile) {
@@ -166,6 +175,7 @@ export {
166 storeWebTorrentFile, 175 storeWebTorrentFile,
167 storeHLSFileFromFilename, 176 storeHLSFileFromFilename,
168 storeHLSFileFromPath, 177 storeHLSFileFromPath,
178 storeHLSFileFromContent,
169 179
170 updateWebTorrentFileACL, 180 updateWebTorrentFileACL,
171 updateHLSFilesACL, 181 updateHLSFilesACL,
diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
index 48a70d891..c3b0a95cc 100644
--- a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
+++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
@@ -80,6 +80,7 @@ export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateO
80 const outputDirectory = privatePayload.outputDirectory 80 const outputDirectory = privatePayload.outputDirectory
81 const videoUUID = privatePayload.videoUUID 81 const videoUUID = privatePayload.videoUUID
82 82
83 // Always process the chunk first before moving m3u8 that references this chunk
83 if (updatePayload.type === 'add-chunk') { 84 if (updatePayload.type === 'add-chunk') {
84 await move( 85 await move(
85 updatePayload.videoChunkFile as string, 86 updatePayload.videoChunkFile as string,