diff options
author | Chocobozzz <me@florianbigard.com> | 2023-05-10 11:16:05 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2023-05-10 11:16:05 +0200 |
commit | 34023e12534f22f28d0b5afc1db3fdf2fd1e7e60 (patch) | |
tree | 29b219412b79231ac7a62e482982bf1a9a7aada8 | |
parent | f9eee54f2a601b8db7ebb7c11d149b39170e6d68 (diff) | |
download | PeerTube-34023e12534f22f28d0b5afc1db3fdf2fd1e7e60.tar.gz PeerTube-34023e12534f22f28d0b5afc1db3fdf2fd1e7e60.tar.zst PeerTube-34023e12534f22f28d0b5afc1db3fdf2fd1e7e60.zip |
Fix S3 live sync
Ensure TS chunks referenced in M3U8 playlist are already uploaded on S3
4 files changed, 72 insertions, 30 deletions
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index ef4ecb83e..57e28aabf 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -8,7 +8,12 @@ import { computeOutputFPS } from '@server/helpers/ffmpeg' | |||
8 | import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' | 8 | import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger' |
9 | import { CONFIG } from '@server/initializers/config' | 9 | import { CONFIG } from '@server/initializers/config' |
10 | import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' | 10 | import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants' |
11 | import { removeHLSFileObjectStorageByPath, storeHLSFileFromFilename, storeHLSFileFromPath } from '@server/lib/object-storage' | 11 | import { |
12 | removeHLSFileObjectStorageByPath, | ||
13 | storeHLSFileFromContent, | ||
14 | storeHLSFileFromFilename, | ||
15 | storeHLSFileFromPath | ||
16 | } from '@server/lib/object-storage' | ||
12 | import { VideoFileModel } from '@server/models/video/video-file' | 17 | import { VideoFileModel } from '@server/models/video/video-file' |
13 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | 18 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' |
14 | import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' | 19 | import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models' |
@@ -74,6 +79,9 @@ class MuxingSession extends EventEmitter { | |||
74 | 79 | ||
75 | private readonly lTags: LoggerTagsFn | 80 | private readonly lTags: LoggerTagsFn |
76 | 81 | ||
82 | // Path -> Queue | ||
83 | private readonly objectStorageSendQueues = new Map<string, PQueue>() | ||
84 | |||
77 | private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} | 85 | private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {} |
78 | 86 | ||
79 | private streamingPlaylist: MStreamingPlaylistVideo | 87 | private streamingPlaylist: MStreamingPlaylistVideo |
@@ -151,7 +159,6 @@ class MuxingSession extends EventEmitter { | |||
151 | 159 | ||
152 | this.watchMasterFile() | 160 | this.watchMasterFile() |
153 | this.watchTSFiles() | 161 | this.watchTSFiles() |
154 | this.watchM3U8File() | ||
155 | } | 162 | } |
156 | 163 | ||
157 | abort () { | 164 | abort () { |
@@ -192,41 +199,15 @@ class MuxingSession extends EventEmitter { | |||
192 | }) | 199 | }) |
193 | } | 200 | } |
194 | 201 | ||
195 | private watchM3U8File () { | ||
196 | const sendQueues = new Map<string, PQueue>() | ||
197 | |||
198 | const onChange = async (m3u8Path: string) => { | ||
199 | if (m3u8Path.endsWith('.m3u8') !== true) return | ||
200 | if (this.streamingPlaylist.storage !== VideoStorage.OBJECT_STORAGE) return | ||
201 | |||
202 | logger.debug('Live change handler of M3U8 file %s.', m3u8Path, this.lTags()) | ||
203 | |||
204 | try { | ||
205 | if (!sendQueues.has(m3u8Path)) { | ||
206 | sendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) | ||
207 | } | ||
208 | |||
209 | const queue = sendQueues.get(m3u8Path) | ||
210 | await queue.add(() => storeHLSFileFromPath(this.streamingPlaylist, m3u8Path)) | ||
211 | } catch (err) { | ||
212 | logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) | ||
213 | } | ||
214 | } | ||
215 | |||
216 | this.filesWatcher.on('change', onChange) | ||
217 | } | ||
218 | |||
219 | private watchTSFiles () { | 202 | private watchTSFiles () { |
220 | const startStreamDateTime = new Date().getTime() | 203 | const startStreamDateTime = new Date().getTime() |
221 | 204 | ||
222 | const playlistIdMatcher = /^([\d+])-/ | ||
223 | |||
224 | const addHandler = async (segmentPath: string) => { | 205 | const addHandler = async (segmentPath: string) => { |
225 | if (segmentPath.endsWith('.ts') !== true) return | 206 | if (segmentPath.endsWith('.ts') !== true) return |
226 | 207 | ||
227 | logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) | 208 | logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags()) |
228 | 209 | ||
229 | const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] | 210 | const playlistId = this.getPlaylistIdFromTS(segmentPath) |
230 | 211 | ||
231 | const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] | 212 | const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] |
232 | this.processSegments(segmentsToProcess) | 213 | this.processSegments(segmentsToProcess) |
@@ -349,6 +330,8 @@ class MuxingSession extends EventEmitter { | |||
349 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { | 330 | if (this.streamingPlaylist.storage === VideoStorage.OBJECT_STORAGE) { |
350 | try { | 331 | try { |
351 | await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) | 332 | await storeHLSFileFromPath(this.streamingPlaylist, segmentPath) |
333 | |||
334 | await this.processM3U8ToObjectStorage(segmentPath) | ||
352 | } catch (err) { | 335 | } catch (err) { |
353 | logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) | 336 | logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() }) |
354 | } | 337 | } |
@@ -362,6 +345,29 @@ class MuxingSession extends EventEmitter { | |||
362 | } | 345 | } |
363 | } | 346 | } |
364 | 347 | ||
348 | private async processM3U8ToObjectStorage (segmentPath: string) { | ||
349 | const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath)) | ||
350 | |||
351 | logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags()) | ||
352 | |||
353 | const segmentName = basename(segmentPath) | ||
354 | |||
355 | const playlistContent = await readFile(m3u8Path, 'utf-8') | ||
356 | // Remove new chunk references, that will be processed later | ||
357 | const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n' | ||
358 | |||
359 | try { | ||
360 | if (!this.objectStorageSendQueues.has(m3u8Path)) { | ||
361 | this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 })) | ||
362 | } | ||
363 | |||
364 | const queue = this.objectStorageSendQueues.get(m3u8Path) | ||
365 | await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent)) | ||
366 | } catch (err) { | ||
367 | logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() }) | ||
368 | } | ||
369 | } | ||
370 | |||
365 | private onTranscodingError () { | 371 | private onTranscodingError () { |
366 | this.emit('transcoding-error', ({ videoUUID: this.videoUUID })) | 372 | this.emit('transcoding-error', ({ videoUUID: this.videoUUID })) |
367 | } | 373 | } |
@@ -484,6 +490,16 @@ class MuxingSession extends EventEmitter { | |||
484 | ? new RemoteTranscodingWrapper(options) | 490 | ? new RemoteTranscodingWrapper(options) |
485 | : new FFmpegTranscodingWrapper(options) | 491 | : new FFmpegTranscodingWrapper(options) |
486 | } | 492 | } |
493 | |||
494 | private getPlaylistIdFromTS (segmentPath: string) { | ||
495 | const playlistIdMatcher = /^([\d+])-/ | ||
496 | |||
497 | return basename(segmentPath).match(playlistIdMatcher)[1] | ||
498 | } | ||
499 | |||
500 | private getPlaylistNameFromTS (segmentPath: string) { | ||
501 | return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8` | ||
502 | } | ||
487 | } | 503 | } |
488 | 504 | ||
489 | // --------------------------------------------------------------------------- | 505 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/object-storage/shared/object-storage-helpers.ts b/server/lib/object-storage/shared/object-storage-helpers.ts index be94b01a8..f517c5f69 100644 --- a/server/lib/object-storage/shared/object-storage-helpers.ts +++ b/server/lib/object-storage/shared/object-storage-helpers.ts | |||
@@ -59,6 +59,20 @@ async function storeObject (options: { | |||
59 | return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate }) | 59 | return uploadToStorage({ objectStorageKey, content: fileStream, bucketInfo, isPrivate }) |
60 | } | 60 | } |
61 | 61 | ||
62 | async function storeContent (options: { | ||
63 | content: string | ||
64 | inputPath: string | ||
65 | objectStorageKey: string | ||
66 | bucketInfo: BucketInfo | ||
67 | isPrivate: boolean | ||
68 | }): Promise<string> { | ||
69 | const { content, objectStorageKey, bucketInfo, inputPath, isPrivate } = options | ||
70 | |||
71 | logger.debug('Uploading %s content to %s%s in bucket %s', inputPath, bucketInfo.PREFIX, objectStorageKey, bucketInfo.BUCKET_NAME, lTags()) | ||
72 | |||
73 | return uploadToStorage({ objectStorageKey, content, bucketInfo, isPrivate }) | ||
74 | } | ||
75 | |||
62 | // --------------------------------------------------------------------------- | 76 | // --------------------------------------------------------------------------- |
63 | 77 | ||
64 | async function updateObjectACL (options: { | 78 | async function updateObjectACL (options: { |
@@ -206,6 +220,7 @@ export { | |||
206 | buildKey, | 220 | buildKey, |
207 | 221 | ||
208 | storeObject, | 222 | storeObject, |
223 | storeContent, | ||
209 | 224 | ||
210 | removeObject, | 225 | removeObject, |
211 | removeObjectByFullKey, | 226 | removeObjectByFullKey, |
@@ -223,7 +238,7 @@ export { | |||
223 | // --------------------------------------------------------------------------- | 238 | // --------------------------------------------------------------------------- |
224 | 239 | ||
225 | async function uploadToStorage (options: { | 240 | async function uploadToStorage (options: { |
226 | content: ReadStream | 241 | content: ReadStream | string |
227 | objectStorageKey: string | 242 | objectStorageKey: string |
228 | bucketInfo: BucketInfo | 243 | bucketInfo: BucketInfo |
229 | isPrivate: boolean | 244 | isPrivate: boolean |
diff --git a/server/lib/object-storage/videos.ts b/server/lib/object-storage/videos.ts index bfdef94fd..57d978e4c 100644 --- a/server/lib/object-storage/videos.ts +++ b/server/lib/object-storage/videos.ts | |||
@@ -42,6 +42,15 @@ function storeHLSFileFromPath (playlist: MStreamingPlaylistVideo, path: string) | |||
42 | }) | 42 | }) |
43 | } | 43 | } |
44 | 44 | ||
45 | function storeHLSFileFromContent (playlist: MStreamingPlaylistVideo, path: string, content: string) { | ||
46 | return storeObject({ | ||
47 | inputPath: path, | ||
48 | objectStorageKey: generateHLSObjectStorageKey(playlist, basename(path)), | ||
49 | bucketInfo: CONFIG.OBJECT_STORAGE.STREAMING_PLAYLISTS, | ||
50 | isPrivate: playlist.Video.hasPrivateStaticPath() | ||
51 | }) | ||
52 | } | ||
53 | |||
45 | // --------------------------------------------------------------------------- | 54 | // --------------------------------------------------------------------------- |
46 | 55 | ||
47 | function storeWebTorrentFile (video: MVideo, file: MVideoFile) { | 56 | function storeWebTorrentFile (video: MVideo, file: MVideoFile) { |
@@ -166,6 +175,7 @@ export { | |||
166 | storeWebTorrentFile, | 175 | storeWebTorrentFile, |
167 | storeHLSFileFromFilename, | 176 | storeHLSFileFromFilename, |
168 | storeHLSFileFromPath, | 177 | storeHLSFileFromPath, |
178 | storeHLSFileFromContent, | ||
169 | 179 | ||
170 | updateWebTorrentFileACL, | 180 | updateWebTorrentFileACL, |
171 | updateHLSFilesACL, | 181 | updateHLSFilesACL, |
diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts index 48a70d891..c3b0a95cc 100644 --- a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts +++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts | |||
@@ -80,6 +80,7 @@ export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateO | |||
80 | const outputDirectory = privatePayload.outputDirectory | 80 | const outputDirectory = privatePayload.outputDirectory |
81 | const videoUUID = privatePayload.videoUUID | 81 | const videoUUID = privatePayload.videoUUID |
82 | 82 | ||
83 | // Always process the chunk first before moving m3u8 that references this chunk | ||
83 | if (updatePayload.type === 'add-chunk') { | 84 | if (updatePayload.type === 'add-chunk') { |
84 | await move( | 85 | await move( |
85 | updatePayload.videoChunkFile as string, | 86 | updatePayload.videoChunkFile as string, |