aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live-manager.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/live-manager.ts')
-rw-r--r--server/lib/live-manager.ts102
1 files changed, 57 insertions, 45 deletions
diff --git a/server/lib/live-manager.ts b/server/lib/live-manager.ts
index 66b5d119b..563ba2578 100644
--- a/server/lib/live-manager.ts
+++ b/server/lib/live-manager.ts
@@ -8,10 +8,10 @@ import { basename, join } from 'path'
8import { isTestInstance } from '@server/helpers/core-utils' 8import { isTestInstance } from '@server/helpers/core-utils'
9import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils' 9import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
10import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils' 10import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
11import { logger } from '@server/helpers/logger' 11import { logger, loggerTagsFactory } from '@server/helpers/logger'
12import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 12import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
13import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants' 13import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
14import { UserModel } from '@server/models/account/user' 14import { UserModel } from '@server/models/user/user'
15import { VideoModel } from '@server/models/video/video' 15import { VideoModel } from '@server/models/video/video'
16import { VideoFileModel } from '@server/models/video/video-file' 16import { VideoFileModel } from '@server/models/video/video-file'
17import { VideoLiveModel } from '@server/models/video/video-live' 17import { VideoLiveModel } from '@server/models/video/video-live'
@@ -23,9 +23,9 @@ import { buildSha256Segment } from './hls'
23import { JobQueue } from './job-queue' 23import { JobQueue } from './job-queue'
24import { cleanupLive } from './job-queue/handlers/video-live-ending' 24import { cleanupLive } from './job-queue/handlers/video-live-ending'
25import { PeerTubeSocket } from './peertube-socket' 25import { PeerTubeSocket } from './peertube-socket'
26import { VideoTranscodingProfilesManager } from './transcoding/video-transcoding-profiles'
26import { isAbleToUploadVideo } from './user' 27import { isAbleToUploadVideo } from './user'
27import { getHLSDirectory } from './video-paths' 28import { getHLSDirectory } from './video-paths'
28import { VideoTranscodingProfilesManager } from './video-transcoding-profiles'
29 29
30import memoizee = require('memoizee') 30import memoizee = require('memoizee')
31const NodeRtmpSession = require('node-media-server/node_rtmp_session') 31const NodeRtmpSession = require('node-media-server/node_rtmp_session')
@@ -48,6 +48,8 @@ const config = {
48 } 48 }
49} 49}
50 50
51const lTags = loggerTagsFactory('live')
52
51class LiveManager { 53class LiveManager {
52 54
53 private static instance: LiveManager 55 private static instance: LiveManager
@@ -75,20 +77,20 @@ class LiveManager {
75 init () { 77 init () {
76 const events = this.getContext().nodeEvent 78 const events = this.getContext().nodeEvent
77 events.on('postPublish', (sessionId: string, streamPath: string) => { 79 events.on('postPublish', (sessionId: string, streamPath: string) => {
78 logger.debug('RTMP received stream', { id: sessionId, streamPath }) 80 logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) })
79 81
80 const splittedPath = streamPath.split('/') 82 const splittedPath = streamPath.split('/')
81 if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) { 83 if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
82 logger.warn('Live path is incorrect.', { streamPath }) 84 logger.warn('Live path is incorrect.', { streamPath, ...lTags(sessionId) })
83 return this.abortSession(sessionId) 85 return this.abortSession(sessionId)
84 } 86 }
85 87
86 this.handleSession(sessionId, streamPath, splittedPath[2]) 88 this.handleSession(sessionId, streamPath, splittedPath[2])
87 .catch(err => logger.error('Cannot handle sessions.', { err })) 89 .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) }))
88 }) 90 })
89 91
90 events.on('donePublish', sessionId => { 92 events.on('donePublish', sessionId => {
91 logger.info('Live session ended.', { sessionId }) 93 logger.info('Live session ended.', { sessionId, ...lTags(sessionId) })
92 }) 94 })
93 95
94 registerConfigChangedHandler(() => { 96 registerConfigChangedHandler(() => {
@@ -104,13 +106,13 @@ class LiveManager {
104 106
105 // Cleanup broken lives, that were terminated by a server restart for example 107 // Cleanup broken lives, that were terminated by a server restart for example
106 this.handleBrokenLives() 108 this.handleBrokenLives()
107 .catch(err => logger.error('Cannot handle broken lives.', { err })) 109 .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
108 110
109 setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE) 111 setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
110 } 112 }
111 113
112 run () { 114 run () {
113 logger.info('Running RTMP server on port %d', config.rtmp.port) 115 logger.info('Running RTMP server on port %d', config.rtmp.port, lTags())
114 116
115 this.rtmpServer = createServer(socket => { 117 this.rtmpServer = createServer(socket => {
116 const session = new NodeRtmpSession(config, socket) 118 const session = new NodeRtmpSession(config, socket)
@@ -119,14 +121,14 @@ class LiveManager {
119 }) 121 })
120 122
121 this.rtmpServer.on('error', err => { 123 this.rtmpServer.on('error', err => {
122 logger.error('Cannot run RTMP server.', { err }) 124 logger.error('Cannot run RTMP server.', { err, ...lTags() })
123 }) 125 })
124 126
125 this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT) 127 this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT)
126 } 128 }
127 129
128 stop () { 130 stop () {
129 logger.info('Stopping RTMP server.') 131 logger.info('Stopping RTMP server.', lTags())
130 132
131 this.rtmpServer.close() 133 this.rtmpServer.close()
132 this.rtmpServer = undefined 134 this.rtmpServer = undefined
@@ -185,7 +187,7 @@ class LiveManager {
185 187
186 return readFile(segmentPath) 188 return readFile(segmentPath)
187 .then(data => appendFile(dest, data)) 189 .then(data => appendFile(dest, data))
188 .catch(err => logger.error('Cannot copy segment %s to repay directory.', segmentPath, { err })) 190 .catch(err => logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...lTags() }))
189 } 191 }
190 192
191 buildConcatenatedName (segmentOrPlaylistPath: string) { 193 buildConcatenatedName (segmentOrPlaylistPath: string) {
@@ -202,7 +204,7 @@ class LiveManager {
202 if (videoLive.saveReplay) { 204 if (videoLive.saveReplay) {
203 await this.addSegmentToReplay(hlsVideoPath, previousSegment) 205 await this.addSegmentToReplay(hlsVideoPath, previousSegment)
204 } 206 }
205 }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err })) 207 }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...lTags(videoUUID) }))
206 } 208 }
207 209
208 private getContext () { 210 private getContext () {
@@ -226,13 +228,13 @@ class LiveManager {
226 private async handleSession (sessionId: string, streamPath: string, streamKey: string) { 228 private async handleSession (sessionId: string, streamPath: string, streamKey: string) {
227 const videoLive = await VideoLiveModel.loadByStreamKey(streamKey) 229 const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
228 if (!videoLive) { 230 if (!videoLive) {
229 logger.warn('Unknown live video with stream key %s.', streamKey) 231 logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId))
230 return this.abortSession(sessionId) 232 return this.abortSession(sessionId)
231 } 233 }
232 234
233 const video = videoLive.Video 235 const video = videoLive.Video
234 if (video.isBlacklisted()) { 236 if (video.isBlacklisted()) {
235 logger.warn('Video is blacklisted. Refusing stream %s.', streamKey) 237 logger.warn('Video is blacklisted. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid))
236 return this.abortSession(sessionId) 238 return this.abortSession(sessionId)
237 } 239 }
238 240
@@ -262,7 +264,10 @@ class LiveManager {
262 264
263 const allResolutions = resolutionsEnabled.concat([ session.videoHeight ]) 265 const allResolutions = resolutionsEnabled.concat([ session.videoHeight ])
264 266
265 logger.info('Will mux/transcode live video of original resolution %d.', session.videoHeight, { allResolutions }) 267 logger.info(
268 'Will mux/transcode live video of original resolution %d.', session.videoHeight,
269 { allResolutions, ...lTags(sessionId, video.uuid) }
270 )
266 271
267 const [ videoStreamingPlaylist ] = await VideoStreamingPlaylistModel.upsert({ 272 const [ videoStreamingPlaylist ] = await VideoStreamingPlaylistModel.upsert({
268 videoId: video.id, 273 videoId: video.id,
@@ -317,7 +322,7 @@ class LiveManager {
317 }) 322 })
318 323
319 VideoFileModel.customUpsert(file, 'streaming-playlist', null) 324 VideoFileModel.customUpsert(file, 'streaming-playlist', null)
320 .catch(err => logger.error('Cannot create file for live streaming.', { err })) 325 .catch(err => logger.error('Cannot create file for live streaming.', { err, ...lTags(sessionId, videoLive.Video.uuid) }))
321 } 326 }
322 327
323 const outPath = getHLSDirectory(videoLive.Video) 328 const outPath = getHLSDirectory(videoLive.Video)
@@ -342,7 +347,7 @@ class LiveManager {
342 }) 347 })
343 : getLiveMuxingCommand(rtmpUrl, outPath) 348 : getLiveMuxingCommand(rtmpUrl, outPath)
344 349
345 logger.info('Running live muxing/transcoding for %s.', videoUUID) 350 logger.info('Running live muxing/transcoding for %s.', videoUUID, lTags(sessionId, videoUUID))
346 this.transSessions.set(sessionId, ffmpegExec) 351 this.transSessions.set(sessionId, ffmpegExec)
347 352
348 const tsWatcher = chokidar.watch(outPath + '/*.ts') 353 const tsWatcher = chokidar.watch(outPath + '/*.ts')
@@ -351,7 +356,7 @@ class LiveManager {
351 const playlistIdMatcher = /^([\d+])-/ 356 const playlistIdMatcher = /^([\d+])-/
352 357
353 const addHandler = segmentPath => { 358 const addHandler = segmentPath => {
354 logger.debug('Live add handler of %s.', segmentPath) 359 logger.debug('Live add handler of %s.', segmentPath, lTags(sessionId, videoUUID))
355 360
356 const playlistId = basename(segmentPath).match(playlistIdMatcher)[0] 361 const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
357 362
@@ -363,7 +368,9 @@ class LiveManager {
363 if (this.hasClientSocketsInBadHealthWithCache(sessionId)) { 368 if (this.hasClientSocketsInBadHealthWithCache(sessionId)) {
364 logger.error( 369 logger.error(
365 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + 370 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
366 ' Stopping session of video %s.', videoUUID) 371 ' Stopping session of video %s.', videoUUID,
372 lTags(sessionId, videoUUID)
373 )
367 374
368 this.stopSessionOf(videoLive.videoId) 375 this.stopSessionOf(videoLive.videoId)
369 return 376 return
@@ -371,7 +378,7 @@ class LiveManager {
371 378
372 // Duration constraint check 379 // Duration constraint check
373 if (this.isDurationConstraintValid(startStreamDateTime) !== true) { 380 if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
374 logger.info('Stopping session of %s: max duration exceeded.', videoUUID) 381 logger.info('Stopping session of %s: max duration exceeded.', videoUUID, lTags(sessionId, videoUUID))
375 382
376 this.stopSessionOf(videoLive.videoId) 383 this.stopSessionOf(videoLive.videoId)
377 return 384 return
@@ -386,12 +393,12 @@ class LiveManager {
386 .then(() => this.isQuotaConstraintValid(user, videoLive)) 393 .then(() => this.isQuotaConstraintValid(user, videoLive))
387 .then(quotaValid => { 394 .then(quotaValid => {
388 if (quotaValid !== true) { 395 if (quotaValid !== true) {
389 logger.info('Stopping session of %s: user quota exceeded.', videoUUID) 396 logger.info('Stopping session of %s: user quota exceeded.', videoUUID, lTags(sessionId, videoUUID))
390 397
391 this.stopSessionOf(videoLive.videoId) 398 this.stopSessionOf(videoLive.videoId)
392 } 399 }
393 }) 400 })
394 .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err })) 401 .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err, ...lTags(sessionId, videoUUID) }))
395 } 402 }
396 } 403 }
397 404
@@ -411,21 +418,21 @@ class LiveManager {
411 418
412 setTimeout(() => { 419 setTimeout(() => {
413 federateVideoIfNeeded(video, false) 420 federateVideoIfNeeded(video, false)
414 .catch(err => logger.error('Cannot federate live video %s.', video.url, { err })) 421 .catch(err => logger.error('Cannot federate live video %s.', video.url, { err, ...lTags(sessionId, videoUUID) }))
415 422
416 PeerTubeSocket.Instance.sendVideoLiveNewState(video) 423 PeerTubeSocket.Instance.sendVideoLiveNewState(video)
417 }, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION) 424 }, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
418 425
419 } catch (err) { 426 } catch (err) {
420 logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err }) 427 logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err, ...lTags(sessionId, videoUUID) })
421 } finally { 428 } finally {
422 masterWatcher.close() 429 masterWatcher.close()
423 .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err })) 430 .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...lTags(sessionId, videoUUID) }))
424 } 431 }
425 }) 432 })
426 433
427 const onFFmpegEnded = () => { 434 const onFFmpegEnded = () => {
428 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl) 435 logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl, lTags(sessionId, videoUUID))
429 436
430 this.transSessions.delete(sessionId) 437 this.transSessions.delete(sessionId)
431 438
@@ -446,10 +453,15 @@ class LiveManager {
446 this.processSegments(outPath, videoUUID, videoLive, segmentsToProcessPerPlaylist[key]) 453 this.processSegments(outPath, videoUUID, videoLive, segmentsToProcessPerPlaylist[key])
447 } 454 }
448 }) 455 })
449 .catch(err => logger.error('Cannot close watchers of %s or process remaining hash segments.', outPath, { err })) 456 .catch(err => {
457 logger.error(
458 'Cannot close watchers of %s or process remaining hash segments.', outPath,
459 { err, ...lTags(sessionId, videoUUID) }
460 )
461 })
450 462
451 this.onEndTransmuxing(videoLive.Video.id) 463 this.onEndTransmuxing(videoLive.Video.id)
452 .catch(err => logger.error('Error in closed transmuxing.', { err })) 464 .catch(err => logger.error('Error in closed transmuxing.', { err, ...lTags(sessionId, videoUUID) }))
453 }, 1000) 465 }, 1000)
454 } 466 }
455 467
@@ -459,7 +471,7 @@ class LiveManager {
459 // Don't care that we killed the ffmpeg process 471 // Don't care that we killed the ffmpeg process
460 if (err?.message?.includes('Exiting normally')) return 472 if (err?.message?.includes('Exiting normally')) return
461 473
462 logger.error('Live transcoding error.', { err, stdout, stderr }) 474 logger.error('Live transcoding error.', { err, stdout, stderr, ...lTags(sessionId, videoUUID) })
463 475
464 this.abortSession(sessionId) 476 this.abortSession(sessionId)
465 }) 477 })
@@ -469,12 +481,12 @@ class LiveManager {
469 ffmpegExec.run() 481 ffmpegExec.run()
470 } 482 }
471 483
472 private async onEndTransmuxing (videoId: number, cleanupNow = false) { 484 private async onEndTransmuxing (videoUUID: string, cleanupNow = false) {
473 try { 485 try {
474 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) 486 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoUUID)
475 if (!fullVideo) return 487 if (!fullVideo) return
476 488
477 const live = await VideoLiveModel.loadByVideoId(videoId) 489 const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
478 490
479 if (!live.permanentLive) { 491 if (!live.permanentLive) {
480 JobQueue.Instance.createJob({ 492 JobQueue.Instance.createJob({
@@ -495,13 +507,13 @@ class LiveManager {
495 507
496 await federateVideoIfNeeded(fullVideo, false) 508 await federateVideoIfNeeded(fullVideo, false)
497 } catch (err) { 509 } catch (err) {
498 logger.error('Cannot save/federate new video state of live streaming of video id %d.', videoId, { err }) 510 logger.error('Cannot save/federate new video state of live streaming of video %d.', videoUUID, { err, ...lTags(videoUUID) })
499 } 511 }
500 } 512 }
501 513
502 private async addSegmentSha (videoUUID: string, segmentPath: string) { 514 private async addSegmentSha (videoUUID: string, segmentPath: string) {
503 const segmentName = basename(segmentPath) 515 const segmentName = basename(segmentPath)
504 logger.debug('Adding live sha segment %s.', segmentPath) 516 logger.debug('Adding live sha segment %s.', segmentPath, lTags(videoUUID))
505 517
506 const shaResult = await buildSha256Segment(segmentPath) 518 const shaResult = await buildSha256Segment(segmentPath)
507 519
@@ -516,16 +528,16 @@ class LiveManager {
516 private removeSegmentSha (videoUUID: string, segmentPath: string) { 528 private removeSegmentSha (videoUUID: string, segmentPath: string) {
517 const segmentName = basename(segmentPath) 529 const segmentName = basename(segmentPath)
518 530
519 logger.debug('Removing live sha segment %s.', segmentPath) 531 logger.debug('Removing live sha segment %s.', segmentPath, lTags(videoUUID))
520 532
521 const filesMap = this.segmentsSha256.get(videoUUID) 533 const filesMap = this.segmentsSha256.get(videoUUID)
522 if (!filesMap) { 534 if (!filesMap) {
523 logger.warn('Unknown files map to remove sha for %s.', videoUUID) 535 logger.warn('Unknown files map to remove sha for %s.', videoUUID, lTags(videoUUID))
524 return 536 return
525 } 537 }
526 538
527 if (!filesMap.has(segmentName)) { 539 if (!filesMap.has(segmentName)) {
528 logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath) 540 logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath, lTags(videoUUID))
529 return 541 return
530 } 542 }
531 543
@@ -547,7 +559,7 @@ class LiveManager {
547 const rtmpSession = this.getContext().sessions.get(sessionId) 559 const rtmpSession = this.getContext().sessions.get(sessionId)
548 560
549 if (!rtmpSession) { 561 if (!rtmpSession) {
550 logger.warn('Cannot get session %s to check players socket health.', sessionId) 562 logger.warn('Cannot get session %s to check players socket health.', sessionId, lTags(sessionId))
551 return 563 return
552 } 564 }
553 565
@@ -555,7 +567,7 @@ class LiveManager {
555 const playerSession = this.getContext().sessions.get(playerSessionId) 567 const playerSession = this.getContext().sessions.get(playerSessionId)
556 568
557 if (!playerSession) { 569 if (!playerSession) {
558 logger.error('Cannot get player session %s to check socket health.', playerSession) 570 logger.error('Cannot get player session %s to check socket health.', playerSession, lTags(sessionId))
559 continue 571 continue
560 } 572 }
561 573
@@ -576,7 +588,7 @@ class LiveManager {
576 private async updateLiveViews () { 588 private async updateLiveViews () {
577 if (!this.isRunning()) return 589 if (!this.isRunning()) return
578 590
579 if (!isTestInstance()) logger.info('Updating live video views.') 591 if (!isTestInstance()) logger.info('Updating live video views.', lTags())
580 592
581 for (const videoId of this.watchersPerVideo.keys()) { 593 for (const videoId of this.watchersPerVideo.keys()) {
582 const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE 594 const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
@@ -597,15 +609,15 @@ class LiveManager {
597 const newWatchers = watchers.filter(w => w > notBefore) 609 const newWatchers = watchers.filter(w => w > notBefore)
598 this.watchersPerVideo.set(videoId, newWatchers) 610 this.watchersPerVideo.set(videoId, newWatchers)
599 611
600 logger.debug('New live video views for %s is %d.', video.url, numWatchers) 612 logger.debug('New live video views for %s is %d.', video.url, numWatchers, lTags())
601 } 613 }
602 } 614 }
603 615
604 private async handleBrokenLives () { 616 private async handleBrokenLives () {
605 const videoIds = await VideoModel.listPublishedLiveIds() 617 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
606 618
607 for (const id of videoIds) { 619 for (const uuid of videoUUIDs) {
608 await this.onEndTransmuxing(id, true) 620 await this.onEndTransmuxing(uuid, true)
609 } 621 }
610 } 622 }
611 623