aboutsummaryrefslogtreecommitdiffhomepage
path: root/packages/peertube-runner/server/process
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-05-19 13:46:40 +0200
committerChocobozzz <me@florianbigard.com>2023-05-19 13:52:38 +0200
commitdef4ea4f3824689abd4ea66f5ab6e47d553b7ddd (patch)
tree2f614aac8a0221cc4a8762adcb1e696a70db6d4e /packages/peertube-runner/server/process
parent6403a6bd019a30848f0e9b5f6e7b9734229ab0f1 (diff)
downloadPeerTube-def4ea4f3824689abd4ea66f5ab6e47d553b7ddd.tar.gz
PeerTube-def4ea4f3824689abd4ea66f5ab6e47d553b7ddd.tar.zst
PeerTube-def4ea4f3824689abd4ea66f5ab6e47d553b7ddd.zip
More robust chunk handler
Diffstat (limited to 'packages/peertube-runner/server/process')
-rw-r--r--packages/peertube-runner/server/process/shared/process-live.ts66
1 files changed, 46 insertions, 20 deletions
diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts
index b17b51c7c..df1b677f0 100644
--- a/packages/peertube-runner/server/process/shared/process-live.ts
+++ b/packages/peertube-runner/server/process/shared/process-live.ts
@@ -21,6 +21,9 @@ export class ProcessLiveRTMPHLSTranscoding {
21 private readonly outputPath: string 21 private readonly outputPath: string
22 private readonly fsWatchers: FSWatcher[] = [] 22 private readonly fsWatchers: FSWatcher[] = []
23 23
24 // Playlist name -> chunks
25 private readonly pendingChunksPerPlaylist = new Map<string, string[]>()
26
24 private readonly playlistsCreated = new Set<string>() 27 private readonly playlistsCreated = new Set<string>()
25 private allPlaylistsCreated = false 28 private allPlaylistsCreated = false
26 29
@@ -68,9 +71,19 @@ export class ProcessLiveRTMPHLSTranscoding {
68 } 71 }
69 }) 72 })
70 73
71 tsWatcher.on('add', p => { 74 tsWatcher.on('add', async p => {
72 this.sendAddedChunkUpdate(p) 75 try {
73 .catch(err => this.onUpdateError(err, rej)) 76 await this.sendPendingChunks()
77 } catch (err) {
78 this.onUpdateError(err, rej)
79 }
80
81 const playlistName = this.getPlaylistIdFromTS(p)
82
83 const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || []
84 pendingChunks.push(p)
85
86 this.pendingChunksPerPlaylist.set(playlistName, pendingChunks)
74 }) 87 })
75 88
76 tsWatcher.on('unlink', p => { 89 tsWatcher.on('unlink', p => {
@@ -230,31 +243,38 @@ export class ProcessLiveRTMPHLSTranscoding {
230 return this.updateWithRetry(payload) 243 return this.updateWithRetry(payload)
231 } 244 }
232 245
233 private sendAddedChunkUpdate (addedChunk: string): Promise<any> { 246 private async sendPendingChunks (): Promise<any> {
234 if (this.ended) return Promise.resolve() 247 if (this.ended) return Promise.resolve()
235 248
236 logger.debug(`Sending added live chunk ${addedChunk} update`) 249 for (const playlist of this.pendingChunksPerPlaylist.keys()) {
250 for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) {
251 logger.debug(`Sending added live chunk ${chunk} update`)
237 252
238 const videoChunkFilename = basename(addedChunk) 253 const videoChunkFilename = basename(chunk)
239 254
240 let payload: LiveRTMPHLSTranscodingUpdatePayload = { 255 let payload: LiveRTMPHLSTranscodingUpdatePayload = {
241 type: 'add-chunk', 256 type: 'add-chunk',
242 videoChunkFilename, 257 videoChunkFilename,
243 videoChunkFile: addedChunk 258 videoChunkFile: chunk
244 } 259 }
245 260
246 if (this.allPlaylistsCreated) { 261 if (this.allPlaylistsCreated) {
247 const playlistName = this.getPlaylistName(videoChunkFilename) 262 const playlistName = this.getPlaylistName(videoChunkFilename)
248 263
249 payload = { 264 payload = {
250 ...payload, 265 ...payload,
251 masterPlaylistFile: join(this.outputPath, 'master.m3u8'), 266 masterPlaylistFile: join(this.outputPath, 'master.m3u8'),
252 resolutionPlaylistFilename: playlistName, 267 resolutionPlaylistFilename: playlistName,
253 resolutionPlaylistFile: join(this.outputPath, playlistName) 268 resolutionPlaylistFile: join(this.outputPath, playlistName)
269 }
270 }
271
272 this.updateWithRetry(payload)
273 .catch(err => logger.error({ err }, 'Cannot update with retry'))
254 } 274 }
255 }
256 275
257 return this.updateWithRetry(payload) 276 this.pendingChunksPerPlaylist.set(playlist, [])
277 }
258 } 278 }
259 279
260 private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> { 280 private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> {
@@ -281,6 +301,12 @@ export class ProcessLiveRTMPHLSTranscoding {
281 return `${videoChunkFilename.split('-')[0]}.m3u8` 301 return `${videoChunkFilename.split('-')[0]}.m3u8`
282 } 302 }
283 303
304 private getPlaylistIdFromTS (segmentPath: string) {
305 const playlistIdMatcher = /^([\d+])-/
306
307 return basename(segmentPath).match(playlistIdMatcher)[1]
308 }
309
284 // --------------------------------------------------------------------------- 310 // ---------------------------------------------------------------------------
285 311
286 private cleanup () { 312 private cleanup () {