diff options
author | Chocobozzz <me@florianbigard.com> | 2022-05-25 11:08:12 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-05-25 11:08:12 +0200 |
commit | b34ee7fa5f6558bd6fb870756ace1cd12e40e94c (patch) | |
tree | 635c8f4c54355bd29c3e49c95170a2efb13519af | |
parent | 994b474331262ca17e5057aaea456251f45c0ed2 (diff) | |
download | PeerTube-b34ee7fa5f6558bd6fb870756ace1cd12e40e94c.tar.gz PeerTube-b34ee7fa5f6558bd6fb870756ace1cd12e40e94c.tar.zst PeerTube-b34ee7fa5f6558bd6fb870756ace1cd12e40e94c.zip |
Cleanup muxing session method options
-rw-r--r-- | server/lib/live/shared/muxing-session.ts | 41 |
1 files changed, 20 insertions, 21 deletions
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 1ee9b430f..98a7b2613 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -150,8 +150,8 @@ class MuxingSession extends EventEmitter { | |||
150 | 150 | ||
151 | logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) | 151 | logger.info('Running live muxing/transcoding for %s.', this.videoUUID, this.lTags()) |
152 | 152 | ||
153 | this.watchTSFiles(this.outDirectory) | 153 | this.watchTSFiles() |
154 | this.watchMasterFile(this.outDirectory) | 154 | this.watchMasterFile() |
155 | 155 | ||
156 | let ffmpegShellCommand: string | 156 | let ffmpegShellCommand: string |
157 | this.ffmpegCommand.on('start', cmdline => { | 157 | this.ffmpegCommand.on('start', cmdline => { |
@@ -161,13 +161,13 @@ class MuxingSession extends EventEmitter { | |||
161 | }) | 161 | }) |
162 | 162 | ||
163 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | 163 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { |
164 | this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand }) | 164 | this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand }) |
165 | }) | 165 | }) |
166 | 166 | ||
167 | this.ffmpegCommand.on('end', () => { | 167 | this.ffmpegCommand.on('end', () => { |
168 | this.emit('ffmpeg-end', ({ videoId: this.videoId })) | 168 | this.emit('ffmpeg-end', ({ videoId: this.videoId })) |
169 | 169 | ||
170 | this.onFFmpegEnded(this.outDirectory) | 170 | this.onFFmpegEnded() |
171 | }) | 171 | }) |
172 | 172 | ||
173 | this.ffmpegCommand.run() | 173 | this.ffmpegCommand.run() |
@@ -189,12 +189,11 @@ class MuxingSession extends EventEmitter { | |||
189 | err: any | 189 | err: any |
190 | stdout: string | 190 | stdout: string |
191 | stderr: string | 191 | stderr: string |
192 | outPath: string | ||
193 | ffmpegShellCommand: string | 192 | ffmpegShellCommand: string |
194 | }) { | 193 | }) { |
195 | const { err, stdout, stderr, outPath, ffmpegShellCommand } = options | 194 | const { err, stdout, stderr, ffmpegShellCommand } = options |
196 | 195 | ||
197 | this.onFFmpegEnded(outPath) | 196 | this.onFFmpegEnded() |
198 | 197 | ||
199 | // Don't care that we killed the ffmpeg process | 198 | // Don't care that we killed the ffmpeg process |
200 | if (err?.message?.includes('Exiting normally')) return | 199 | if (err?.message?.includes('Exiting normally')) return |
@@ -204,7 +203,7 @@ class MuxingSession extends EventEmitter { | |||
204 | this.emit('ffmpeg-error', ({ videoId: this.videoId })) | 203 | this.emit('ffmpeg-error', ({ videoId: this.videoId })) |
205 | } | 204 | } |
206 | 205 | ||
207 | private onFFmpegEnded (outPath: string) { | 206 | private onFFmpegEnded () { |
208 | logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags()) | 207 | logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputUrl, this.lTags()) |
209 | 208 | ||
210 | setTimeout(() => { | 209 | setTimeout(() => { |
@@ -214,12 +213,12 @@ class MuxingSession extends EventEmitter { | |||
214 | .then(() => { | 213 | .then(() => { |
215 | // Process remaining segments hash | 214 | // Process remaining segments hash |
216 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { | 215 | for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) { |
217 | this.processSegments(outPath, this.segmentsToProcessPerPlaylist[key]) | 216 | this.processSegments(this.segmentsToProcessPerPlaylist[key]) |
218 | } | 217 | } |
219 | }) | 218 | }) |
220 | .catch(err => { | 219 | .catch(err => { |
221 | logger.error( | 220 | logger.error( |
222 | 'Cannot close watchers of %s or process remaining hash segments.', outPath, | 221 | 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory, |
223 | { err, ...this.lTags() } | 222 | { err, ...this.lTags() } |
224 | ) | 223 | ) |
225 | }) | 224 | }) |
@@ -228,21 +227,21 @@ class MuxingSession extends EventEmitter { | |||
228 | }, 1000) | 227 | }, 1000) |
229 | } | 228 | } |
230 | 229 | ||
231 | private watchMasterFile (outPath: string) { | 230 | private watchMasterFile () { |
232 | this.masterWatcher = watch(outPath + '/' + this.streamingPlaylist.playlistFilename) | 231 | this.masterWatcher = watch(this.outDirectory + '/' + this.streamingPlaylist.playlistFilename) |
233 | 232 | ||
234 | this.masterWatcher.on('add', () => { | 233 | this.masterWatcher.on('add', () => { |
235 | this.emit('master-playlist-created', { videoId: this.videoId }) | 234 | this.emit('master-playlist-created', { videoId: this.videoId }) |
236 | 235 | ||
237 | this.masterWatcher.close() | 236 | this.masterWatcher.close() |
238 | .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...this.lTags() })) | 237 | .catch(err => logger.error('Cannot close master watcher of %s.', this.outDirectory, { err, ...this.lTags() })) |
239 | }) | 238 | }) |
240 | } | 239 | } |
241 | 240 | ||
242 | private watchTSFiles (outPath: string) { | 241 | private watchTSFiles () { |
243 | const startStreamDateTime = new Date().getTime() | 242 | const startStreamDateTime = new Date().getTime() |
244 | 243 | ||
245 | this.tsWatcher = watch(outPath + '/*.ts') | 244 | this.tsWatcher = watch(this.outDirectory + '/*.ts') |
246 | 245 | ||
247 | const playlistIdMatcher = /^([\d+])-/ | 246 | const playlistIdMatcher = /^([\d+])-/ |
248 | 247 | ||
@@ -252,7 +251,7 @@ class MuxingSession extends EventEmitter { | |||
252 | const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] | 251 | const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] |
253 | 252 | ||
254 | const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] | 253 | const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || [] |
255 | this.processSegments(outPath, segmentsToProcess) | 254 | this.processSegments(segmentsToProcess) |
256 | 255 | ||
257 | this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] | 256 | this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ] |
258 | 257 | ||
@@ -273,7 +272,7 @@ class MuxingSession extends EventEmitter { | |||
273 | } | 272 | } |
274 | } | 273 | } |
275 | 274 | ||
276 | const deleteHandler = segmentPath => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) | 275 | const deleteHandler = (segmentPath: string) => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath) |
277 | 276 | ||
278 | this.tsWatcher.on('add', p => addHandler(p)) | 277 | this.tsWatcher.on('add', p => addHandler(p)) |
279 | this.tsWatcher.on('unlink', p => deleteHandler(p)) | 278 | this.tsWatcher.on('unlink', p => deleteHandler(p)) |
@@ -332,15 +331,15 @@ class MuxingSession extends EventEmitter { | |||
332 | return now <= max | 331 | return now <= max |
333 | } | 332 | } |
334 | 333 | ||
335 | private processSegments (hlsVideoPath: string, segmentPaths: string[]) { | 334 | private processSegments (segmentPaths: string[]) { |
336 | mapSeries(segmentPaths, async previousSegment => { | 335 | mapSeries(segmentPaths, async previousSegment => { |
337 | // Add sha hash of previous segments, because ffmpeg should have finished generating them | 336 | // Add sha hash of previous segments, because ffmpeg should have finished generating them |
338 | await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) | 337 | await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment) |
339 | 338 | ||
340 | if (this.saveReplay) { | 339 | if (this.saveReplay) { |
341 | await this.addSegmentToReplay(hlsVideoPath, previousSegment) | 340 | await this.addSegmentToReplay(previousSegment) |
342 | } | 341 | } |
343 | }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...this.lTags() })) | 342 | }).catch(err => logger.error('Cannot process segments', { err, ...this.lTags() })) |
344 | } | 343 | } |
345 | 344 | ||
346 | private hasClientSocketInBadHealth (sessionId: string) { | 345 | private hasClientSocketInBadHealth (sessionId: string) { |
@@ -367,7 +366,7 @@ class MuxingSession extends EventEmitter { | |||
367 | return false | 366 | return false |
368 | } | 367 | } |
369 | 368 | ||
370 | private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) { | 369 | private async addSegmentToReplay (segmentPath: string) { |
371 | const segmentName = basename(segmentPath) | 370 | const segmentName = basename(segmentPath) |
372 | const dest = join(this.replayDirectory, buildConcatenatedName(segmentName)) | 371 | const dest = join(this.replayDirectory, buildConcatenatedName(segmentName)) |
373 | 372 | ||