aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live-manager.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/live-manager.ts')
-rw-r--r--server/lib/live-manager.ts60
1 files changed, 23 insertions, 37 deletions
diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts
index e85998686..31753619c 100644
--- a/server/lib/live-manager.ts
+++ b/server/lib/live-manager.ts
@@ -1,5 +1,4 @@
1 1
2import { AsyncQueue, queue } from 'async'
3import * as chokidar from 'chokidar' 2import * as chokidar from 'chokidar'
4import { FfmpegCommand } from 'fluent-ffmpeg' 3import { FfmpegCommand } from 'fluent-ffmpeg'
5import { ensureDir, stat } from 'fs-extra' 4import { ensureDir, stat } from 'fs-extra'
@@ -50,12 +49,6 @@ const config = {
50 } 49 }
51} 50}
52 51
53type SegmentSha256QueueParam = {
54 operation: 'update' | 'delete'
55 videoUUID: string
56 segmentPath: string
57}
58
59class LiveManager { 52class LiveManager {
60 53
61 private static instance: LiveManager 54 private static instance: LiveManager
@@ -71,7 +64,6 @@ class LiveManager {
71 return isAbleToUploadVideo(userId, 1000) 64 return isAbleToUploadVideo(userId, 1000)
72 }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD }) 65 }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
73 66
74 private segmentsSha256Queue: AsyncQueue<SegmentSha256QueueParam>
75 private rtmpServer: any 67 private rtmpServer: any
76 68
77 private constructor () { 69 private constructor () {
@@ -96,18 +88,6 @@ class LiveManager {
96 logger.info('Live session ended.', { sessionId }) 88 logger.info('Live session ended.', { sessionId })
97 }) 89 })
98 90
99 this.segmentsSha256Queue = queue<SegmentSha256QueueParam, Error>((options, cb) => {
100 const promise = options.operation === 'update'
101 ? this.addSegmentSha(options)
102 : Promise.resolve(this.removeSegmentSha(options))
103
104 promise.then(() => cb())
105 .catch(err => {
106 logger.error('Cannot update/remove sha segment %s.', options.segmentPath, { err })
107 cb()
108 })
109 })
110
111 registerConfigChangedHandler(() => { 91 registerConfigChangedHandler(() => {
112 if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) { 92 if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) {
113 this.run() 93 this.run()
@@ -294,11 +274,18 @@ class LiveManager {
294 274
295 const tsWatcher = chokidar.watch(outPath + '/*.ts') 275 const tsWatcher = chokidar.watch(outPath + '/*.ts')
296 276
297 const updateSegment = segmentPath => this.segmentsSha256Queue.push({ operation: 'update', segmentPath, videoUUID }) 277 let segmentsToProcess: string[] = []
298 278
299 const addHandler = segmentPath => { 279 const addHandler = segmentPath => {
300 updateSegment(segmentPath) 280 // Add sha hash of previous segments, because ffmpeg should have finished generating them
281 for (const previousSegment of segmentsToProcess) {
282 this.addSegmentSha(videoUUID, previousSegment)
283 .catch(err => logger.error('Cannot add sha segment of video %s -> %s.', videoUUID, previousSegment, { err }))
284 }
285
286 segmentsToProcess = [ segmentPath ]
301 287
288 // Duration constraint check
302 if (this.isDurationConstraintValid(startStreamDateTime) !== true) { 289 if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
303 logger.info('Stopping session of %s: max duration exceeded.', videoUUID) 290 logger.info('Stopping session of %s: max duration exceeded.', videoUUID)
304 291
@@ -323,10 +310,9 @@ class LiveManager {
323 } 310 }
324 } 311 }
325 312
326 const deleteHandler = segmentPath => this.segmentsSha256Queue.push({ operation: 'delete', segmentPath, videoUUID }) 313 const deleteHandler = segmentPath => this.removeSegmentSha(videoUUID, segmentPath)
327 314
328 tsWatcher.on('add', p => addHandler(p)) 315 tsWatcher.on('add', p => addHandler(p))
329 tsWatcher.on('change', p => updateSegment(p))
330 tsWatcher.on('unlink', p => deleteHandler(p)) 316 tsWatcher.on('unlink', p => deleteHandler(p))
331 317
332 const masterWatcher = chokidar.watch(outPath + '/master.m3u8') 318 const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
@@ -399,33 +385,33 @@ class LiveManager {
399 } 385 }
400 } 386 }
401 387
402 private async addSegmentSha (options: SegmentSha256QueueParam) { 388 private async addSegmentSha (videoUUID: string, segmentPath: string) {
403 const segmentName = basename(options.segmentPath) 389 const segmentName = basename(segmentPath)
404 logger.debug('Updating live sha segment %s.', options.segmentPath) 390 logger.debug('Adding live sha segment %s.', segmentPath)
405 391
406 const shaResult = await buildSha256Segment(options.segmentPath) 392 const shaResult = await buildSha256Segment(segmentPath)
407 393
408 if (!this.segmentsSha256.has(options.videoUUID)) { 394 if (!this.segmentsSha256.has(videoUUID)) {
409 this.segmentsSha256.set(options.videoUUID, new Map()) 395 this.segmentsSha256.set(videoUUID, new Map())
410 } 396 }
411 397
412 const filesMap = this.segmentsSha256.get(options.videoUUID) 398 const filesMap = this.segmentsSha256.get(videoUUID)
413 filesMap.set(segmentName, shaResult) 399 filesMap.set(segmentName, shaResult)
414 } 400 }
415 401
416 private removeSegmentSha (options: SegmentSha256QueueParam) { 402 private removeSegmentSha (videoUUID: string, segmentPath: string) {
417 const segmentName = basename(options.segmentPath) 403 const segmentName = basename(segmentPath)
418 404
419 logger.debug('Removing live sha segment %s.', options.segmentPath) 405 logger.debug('Removing live sha segment %s.', segmentPath)
420 406
421 const filesMap = this.segmentsSha256.get(options.videoUUID) 407 const filesMap = this.segmentsSha256.get(videoUUID)
422 if (!filesMap) { 408 if (!filesMap) {
423 logger.warn('Unknown files map to remove sha for %s.', options.videoUUID) 409 logger.warn('Unknown files map to remove sha for %s.', videoUUID)
424 return 410 return
425 } 411 }
426 412
427 if (!filesMap.has(segmentName)) { 413 if (!filesMap.has(segmentName)) {
428 logger.warn('Unknown segment in files map for video %s and segment %s.', options.videoUUID, options.segmentPath) 414 logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath)
429 return 415 return
430 } 416 }
431 417