diff options
author | Chocobozzz <me@florianbigard.com> | 2023-05-10 11:16:05 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2023-05-10 11:16:05 +0200 |
commit | 34023e12534f22f28d0b5afc1db3fdf2fd1e7e60 (patch) | |
tree | 29b219412b79231ac7a62e482982bf1a9a7aada8 /server/lib/live | |
parent | f9eee54f2a601b8db7ebb7c11d149b39170e6d68 (diff) | |
download | PeerTube-34023e12534f22f28d0b5afc1db3fdf2fd1e7e60.tar.gz PeerTube-34023e12534f22f28d0b5afc1db3fdf2fd1e7e60.tar.zst PeerTube-34023e12534f22f28d0b5afc1db3fdf2fd1e7e60.zip |
Fix S3 live sync
Ensure TS chunks referenced in M3U8 playlist are already uploaded on S3
Diffstat (limited to 'server/lib/live')
-rw-r--r-- | server/lib/live/shared/muxing-session.ts | 74 |
1 files changed, 45 insertions, 29 deletions
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index ef4ecb83e..57e28aabf 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -8,7 +8,12 @@ import { computeOutputFPS } from '@server/helpers/ffmpeg' | |||
8 | import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' | 8 | import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' |
9 | import { CONFIG } from '@server/initializers/config' | 9 | import { CONFIG } from '@server/initializers/config' |
10 | import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' | 10 | import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' |
11 | import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage' | 11 | import { |
12 | removeHLSFileObjectStorageByPath, | ||
13 | storeHLSFileFromContent, | ||
14 | storeHLSFileFromFilename, | ||
15 | storeHLSFileFromPath | ||
16 | } from '@server/lib/object-storage' | ||
12 | import { VideoFileModel } from '@server/models/video/video-file' | 17 | import { VideoFileModel } from '@server/models/video/video-file' |
13 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | 18 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' |
14 | import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' | 19 | import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' |
@@ -74,6 +79,9 @@ class MuxingSession extends EventEmitter { | |||
74 | 79 | ||
75 | private readonly lTags: LoggerTagsFn | 80 | private readonly lTags: LoggerTagsFn |
76 | 81 | ||
82 | // Path -> Queue | ||
83 | private readonly objectStorageSendQueues = new Map<string, PQueue>() | ||
84 | |||
77 | private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} | 85 | private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} |
78 | 86 | ||
79 | private streamingPlaylist: MStreamingPlaylistVideo | 87 | private streamingPlaylist: MStreamingPlaylistVideo |
@@ -151,7 +159,6 @@ class MuxingSession extends EventEmitter { | |||
151 | 159 | ||
152 | this.watchMasterFile() | 160 | this.watchMasterFile() |
153 | this.watchTSFiles() | 161 | this.watchTSFiles() |
154 | this.watchM3U8File() | ||
155 | } | 162 | } |
156 | 163 | ||
157 | abort () { | 164 | abort () { |
@@ -192,41 +199,15 @@ class MuxingSession extends EventEmitter { | |||
192 | }) | 199 | }) |
193 | } | 200 | } |
194 | 201 | ||
195 | private watchM3U8File () { | ||
196 | const sendQueues = new Map<string, PQueue>() | ||
197 | |||
198 | const onChange = async (m3u8Path: string) => { | ||
199 | if (m3u8Path.endsWith('.m3u8') !== true) return | ||
200 | if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return | ||
201 | |||
202 | logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags()) | ||
203 | |||
204 | try { | ||
205 | if (!sendQueues.has(m3u8Path)) { | ||
206 | sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) | ||
207 | } | ||
208 | |||
209 | const queue = sendQueues.get(m3u8Path) | ||
210 | await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)) | ||
211 | } catch (err) { | ||
212 | logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) | ||
213 | } | ||
214 | } | ||
215 | |||
216 | this.filesWatcher.on('change', onChange) | ||
217 | } | ||
218 | |||
219 | private watchTSFiles () { | 202 | private watchTSFiles () { |
220 | const startStreamDateTime = new Date().getTime() | 203 | const startStreamDateTime = new Date().getTime() |
221 | 204 | ||
222 | const playlistIdMatcher = /^([\d+])-/ | ||
223 | |||
224 | const addHandler = async (segmentPath: string) => { | 205 | const addHandler = async (segmentPath: string) => { |
225 | if (segmentPath.endsWith('.ts') !== true) return | 206 | if (segmentPath.endsWith('.ts') !== true) return |
226 | 207 | ||
227 | logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) | 208 | logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) |
228 | 209 | ||
229 | const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] | 210 | const playlistId = this.getPlaylistIdFromTS(segmentPath) |
230 | 211 | ||
231 | const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] | 212 | const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] |
232 | this.processSegments(segmentsToProcess) | 213 | this.processSegments(segmentsToProcess) |
@@ -349,6 +330,8 @@ class MuxingSession extends EventEmitter { | |||
349 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | 330 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { |
350 | try { | 331 | try { |
351 | await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) | 332 | await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) |
333 | |||
334 | await this.processM3U8ToObjectStorage(segmentPath) | ||
352 | } catch (err) { | 335 | } catch (err) { |
353 | logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) | 336 | logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) |
354 | } | 337 | } |
@@ -362,6 +345,29 @@ class MuxingSession extends EventEmitter { | |||
362 | } | 345 | } |
363 | } | 346 | } |
364 | 347 | ||
348 | private async processM3U8ToObjectStorage (segmentPath: string) { | ||
349 | const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath)) | ||
350 | |||
351 | logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags()) | ||
352 | |||
353 | const segmentName = basename(segmentPath) | ||
354 | |||
355 | const playlistContent = await readFile(m3u8Path, 'utf-8') | ||
356 | // Remove new chunk references, that will be processed later | ||
357 | const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n' | ||
358 | |||
359 | try { | ||
360 | if (!this.objectStorageSendQueues.has(m3u8Path)) { | ||
361 | this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) | ||
362 | } | ||
363 | |||
364 | const queue = this.objectStorageSendQueues.get(m3u8Path) | ||
365 | await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent)) | ||
366 | } catch (err) { | ||
367 | logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) | ||
368 | } | ||
369 | } | ||
370 | |||
365 | private onTranscodingError () { | 371 | private onTranscodingError () { |
366 | this.emit('transcoding-error', ({ videoUUID: this.videoUUID })) | 372 | this.emit('transcoding-error', ({ videoUUID: this.videoUUID })) |
367 | } | 373 | } |
@@ -484,6 +490,16 @@ class MuxingSession extends EventEmitter { | |||
484 | ? new RemoteTranscodingWrapper(options) | 490 | ? new RemoteTranscodingWrapper(options) |
485 | : new FFmpegTranscodingWrapper(options) | 491 | : new FFmpegTranscodingWrapper(options) |
486 | } | 492 | } |
493 | |||
494 | private getPlaylistIdFromTS (segmentPath: string) { | ||
495 | const playlistIdMatcher = /^([\d+])-/ | ||
496 | |||
497 | return basename(segmentPath).match(playlistIdMatcher)[1] | ||
498 | } | ||
499 | |||
500 | private getPlaylistNameFromTS (segmentPath: string) { | ||
501 | return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8` | ||
502 | } | ||
487 | } | 503 | } |
488 | 504 | ||
489 | // --------------------------------------------------------------------------- | 505 | // --------------------------------------------------------------------------- |