diff options
Diffstat (limited to 'packages/peertube-runner/server')
-rw-r--r-- | packages/peertube-runner/server/process/shared/process-live.ts | 66 |
1 files changed, 46 insertions, 20 deletions
diff --git a/packages/peertube-runner/server/process/shared/process-live.ts b/packages/peertube-runner/server/process/shared/process-live.ts index b17b51c7c..df1b677f0 100644 --- a/packages/peertube-runner/server/process/shared/process-live.ts +++ b/packages/peertube-runner/server/process/shared/process-live.ts | |||
@@ -21,6 +21,9 @@ export class ProcessLiveRTMPHLSTranscoding { | |||
21 | private readonly outputPath: string | 21 | private readonly outputPath: string |
22 | private readonly fsWatchers: FSWatcher[] = [] | 22 | private readonly fsWatchers: FSWatcher[] = [] |
23 | 23 | ||
24 | // Playlist name -> chunks | ||
25 | private readonly pendingChunksPerPlaylist = new Map<string, string[]>() | ||
26 | |||
24 | private readonly playlistsCreated = new Set<string>() | 27 | private readonly playlistsCreated = new Set<string>() |
25 | private allPlaylistsCreated = false | 28 | private allPlaylistsCreated = false |
26 | 29 | ||
@@ -68,9 +71,19 @@ export class ProcessLiveRTMPHLSTranscoding { | |||
68 | } | 71 | } |
69 | }) | 72 | }) |
70 | 73 | ||
71 | tsWatcher.on('add', p => { | 74 | tsWatcher.on('add', async p => { |
72 | this.sendAddedChunkUpdate(p) | 75 | try { |
73 | .catch(err => this.onUpdateError(err, rej)) | 76 | await this.sendPendingChunks() |
77 | } catch (err) { | ||
78 | this.onUpdateError(err, rej) | ||
79 | } | ||
80 | |||
81 | const playlistName = this.getPlaylistIdFromTS(p) | ||
82 | |||
83 | const pendingChunks = this.pendingChunksPerPlaylist.get(playlistName) || [] | ||
84 | pendingChunks.push(p) | ||
85 | |||
86 | this.pendingChunksPerPlaylist.set(playlistName, pendingChunks) | ||
74 | }) | 87 | }) |
75 | 88 | ||
76 | tsWatcher.on('unlink', p => { | 89 | tsWatcher.on('unlink', p => { |
@@ -230,31 +243,38 @@ export class ProcessLiveRTMPHLSTranscoding { | |||
230 | return this.updateWithRetry(payload) | 243 | return this.updateWithRetry(payload) |
231 | } | 244 | } |
232 | 245 | ||
233 | private sendAddedChunkUpdate (addedChunk: string): Promise<any> { | 246 | private async sendPendingChunks (): Promise<any> { |
234 | if (this.ended) return Promise.resolve() | 247 | if (this.ended) return Promise.resolve() |
235 | 248 | ||
236 | logger.debug(`Sending added live chunk ${addedChunk} update`) | 249 | for (const playlist of this.pendingChunksPerPlaylist.keys()) { |
250 | for (const chunk of this.pendingChunksPerPlaylist.get(playlist)) { | ||
251 | logger.debug(`Sending added live chunk ${chunk} update`) | ||
237 | 252 | ||
238 | const videoChunkFilename = basename(addedChunk) | 253 | const videoChunkFilename = basename(chunk) |
239 | 254 | ||
240 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { | 255 | let payload: LiveRTMPHLSTranscodingUpdatePayload = { |
241 | type: 'add-chunk', | 256 | type: 'add-chunk', |
242 | videoChunkFilename, | 257 | videoChunkFilename, |
243 | videoChunkFile: addedChunk | 258 | videoChunkFile: chunk |
244 | } | 259 | } |
245 | 260 | ||
246 | if (this.allPlaylistsCreated) { | 261 | if (this.allPlaylistsCreated) { |
247 | const playlistName = this.getPlaylistName(videoChunkFilename) | 262 | const playlistName = this.getPlaylistName(videoChunkFilename) |
248 | 263 | ||
249 | payload = { | 264 | payload = { |
250 | ...payload, | 265 | ...payload, |
251 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), | 266 | masterPlaylistFile: join(this.outputPath, 'master.m3u8'), |
252 | resolutionPlaylistFilename: playlistName, | 267 | resolutionPlaylistFilename: playlistName, |
253 | resolutionPlaylistFile: join(this.outputPath, playlistName) | 268 | resolutionPlaylistFile: join(this.outputPath, playlistName) |
269 | } | ||
270 | } | ||
271 | |||
272 | this.updateWithRetry(payload) | ||
273 | .catch(err => logger.error({ err }, 'Cannot update with retry')) | ||
254 | } | 274 | } |
255 | } | ||
256 | 275 | ||
257 | return this.updateWithRetry(payload) | 276 | this.pendingChunksPerPlaylist.set(playlist, []) |
277 | } | ||
258 | } | 278 | } |
259 | 279 | ||
260 | private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> { | 280 | private async updateWithRetry (payload: LiveRTMPHLSTranscodingUpdatePayload, currentTry = 1): Promise<any> { |
@@ -281,6 +301,12 @@ export class ProcessLiveRTMPHLSTranscoding { | |||
281 | return `${videoChunkFilename.split('-')[0]}.m3u8` | 301 | return `${videoChunkFilename.split('-')[0]}.m3u8` |
282 | } | 302 | } |
283 | 303 | ||
304 | private getPlaylistIdFromTS (segmentPath: string) { | ||
305 | const playlistIdMatcher = /^([\d+])-/ | ||
306 | |||
307 | return basename(segmentPath).match(playlistIdMatcher)[1] | ||
308 | } | ||
309 | |||
284 | // --------------------------------------------------------------------------- | 310 | // --------------------------------------------------------------------------- |
285 | 311 | ||
286 | private cleanup () { | 312 | private cleanup () { |