aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live/live-manager.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-04-21 14:55:10 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch)
tree226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib/live/live-manager.ts
parent6bcb854cdea8688a32240bc5719c7d139806e00b (diff)
downloadPeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib/live/live-manager.ts')
-rw-r--r--server/lib/live/live-manager.ts94
1 files changed, 51 insertions, 43 deletions
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index 05274955d..aa32a9d52 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -2,36 +2,30 @@ import { readdir, readFile } from 'fs-extra'
2import { createServer, Server } from 'net' 2import { createServer, Server } from 'net'
3import { join } from 'path' 3import { join } from 'path'
4import { createServer as createServerTLS, Server as ServerTLS } from 'tls' 4import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
5import {
6 computeResolutionsToTranscode,
7 ffprobePromise,
8 getLiveSegmentTime,
9 getVideoStreamBitrate,
10 getVideoStreamDimensionsInfo,
11 getVideoStreamFPS,
12 hasAudioStream
13} from '@server/helpers/ffmpeg'
14import { logger, loggerTagsFactory } from '@server/helpers/logger' 5import { logger, loggerTagsFactory } from '@server/helpers/logger'
15import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' 6import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
16import { VIDEO_LIVE } from '@server/initializers/constants' 7import { VIDEO_LIVE } from '@server/initializers/constants'
8import { sequelizeTypescript } from '@server/initializers/database'
17import { UserModel } from '@server/models/user/user' 9import { UserModel } from '@server/models/user/user'
18import { VideoModel } from '@server/models/video/video' 10import { VideoModel } from '@server/models/video/video'
19import { VideoLiveModel } from '@server/models/video/video-live' 11import { VideoLiveModel } from '@server/models/video/video-live'
12import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting'
20import { VideoLiveSessionModel } from '@server/models/video/video-live-session' 13import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
21import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 14import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
22import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models' 15import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models'
23import { pick, wait } from '@shared/core-utils' 16import { pick, wait } from '@shared/core-utils'
17import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream } from '@shared/ffmpeg'
24import { LiveVideoError, VideoState } from '@shared/models' 18import { LiveVideoError, VideoState } from '@shared/models'
25import { federateVideoIfNeeded } from '../activitypub/videos' 19import { federateVideoIfNeeded } from '../activitypub/videos'
26import { JobQueue } from '../job-queue' 20import { JobQueue } from '../job-queue'
27import { getLiveReplayBaseDirectory } from '../paths' 21import { getLiveReplayBaseDirectory } from '../paths'
28import { PeerTubeSocket } from '../peertube-socket' 22import { PeerTubeSocket } from '../peertube-socket'
29import { Hooks } from '../plugins/hooks' 23import { Hooks } from '../plugins/hooks'
24import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolutions'
30import { LiveQuotaStore } from './live-quota-store' 25import { LiveQuotaStore } from './live-quota-store'
31import { cleanupAndDestroyPermanentLive } from './live-utils' 26import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils'
32import { MuxingSession } from './shared' 27import { MuxingSession } from './shared'
33import { sequelizeTypescript } from '@server/initializers/database' 28import { RunnerJobModel } from '@server/models/runner/runner-job'
34import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting'
35 29
36const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') 30const NodeRtmpSession = require('node-media-server/src/node_rtmp_session')
37const context = require('node-media-server/src/node_core_ctx') 31const context = require('node-media-server/src/node_core_ctx')
@@ -57,7 +51,7 @@ class LiveManager {
57 private static instance: LiveManager 51 private static instance: LiveManager
58 52
59 private readonly muxingSessions = new Map<string, MuxingSession>() 53 private readonly muxingSessions = new Map<string, MuxingSession>()
60 private readonly videoSessions = new Map<number, string>() 54 private readonly videoSessions = new Map<string, string>()
61 55
62 private rtmpServer: Server 56 private rtmpServer: Server
63 private rtmpsServer: ServerTLS 57 private rtmpsServer: ServerTLS
@@ -177,14 +171,19 @@ class LiveManager {
177 return !!this.rtmpServer 171 return !!this.rtmpServer
178 } 172 }
179 173
180 stopSessionOf (videoId: number, error: LiveVideoError | null) { 174 stopSessionOf (videoUUID: string, error: LiveVideoError | null) {
181 const sessionId = this.videoSessions.get(videoId) 175 const sessionId = this.videoSessions.get(videoUUID)
182 if (!sessionId) return 176 if (!sessionId) {
177 logger.debug('No live session to stop for video %s', videoUUID, lTags(sessionId, videoUUID))
178 return
179 }
183 180
184 this.saveEndingSession(videoId, error) 181 logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) })
185 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
186 182
187 this.videoSessions.delete(videoId) 183 this.saveEndingSession(videoUUID, error)
184 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId, videoUUID) }))
185
186 this.videoSessions.delete(videoUUID)
188 this.abortSession(sessionId) 187 this.abortSession(sessionId)
189 } 188 }
190 189
@@ -221,6 +220,11 @@ class LiveManager {
221 return this.abortSession(sessionId) 220 return this.abortSession(sessionId)
222 } 221 }
223 222
223 if (this.videoSessions.has(video.uuid)) {
224 logger.warn('Video %s has already a live session. Refusing stream %s.', video.uuid, streamKey, lTags(sessionId, video.uuid))
225 return this.abortSession(sessionId)
226 }
227
224 // Cleanup old potential live (could happen with a permanent live) 228 // Cleanup old potential live (could happen with a permanent live)
225 const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) 229 const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
226 if (oldStreamingPlaylist) { 230 if (oldStreamingPlaylist) {
@@ -229,7 +233,7 @@ class LiveManager {
229 await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) 233 await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist)
230 } 234 }
231 235
232 this.videoSessions.set(video.id, sessionId) 236 this.videoSessions.set(video.uuid, sessionId)
233 237
234 const now = Date.now() 238 const now = Date.now()
235 const probe = await ffprobePromise(inputUrl) 239 const probe = await ffprobePromise(inputUrl)
@@ -253,7 +257,7 @@ class LiveManager {
253 ) 257 )
254 258
255 logger.info( 259 logger.info(
256 'Will mux/transcode live video of original resolution %d.', resolution, 260 'Handling live video of original resolution %d.', resolution,
257 { allResolutions, ...lTags(sessionId, video.uuid) } 261 { allResolutions, ...lTags(sessionId, video.uuid) }
258 ) 262 )
259 263
@@ -301,44 +305,44 @@ class LiveManager {
301 305
302 muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) 306 muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags))
303 307
304 muxingSession.on('bad-socket-health', ({ videoId }) => { 308 muxingSession.on('bad-socket-health', ({ videoUUID }) => {
305 logger.error( 309 logger.error(
306 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + 310 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
307 ' Stopping session of video %s.', videoUUID, 311 ' Stopping session of video %s.', videoUUID,
308 localLTags 312 localLTags
309 ) 313 )
310 314
311 this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH) 315 this.stopSessionOf(videoUUID, LiveVideoError.BAD_SOCKET_HEALTH)
312 }) 316 })
313 317
314 muxingSession.on('duration-exceeded', ({ videoId }) => { 318 muxingSession.on('duration-exceeded', ({ videoUUID }) => {
315 logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) 319 logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags)
316 320
317 this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED) 321 this.stopSessionOf(videoUUID, LiveVideoError.DURATION_EXCEEDED)
318 }) 322 })
319 323
320 muxingSession.on('quota-exceeded', ({ videoId }) => { 324 muxingSession.on('quota-exceeded', ({ videoUUID }) => {
321 logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) 325 logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags)
322 326
323 this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED) 327 this.stopSessionOf(videoUUID, LiveVideoError.QUOTA_EXCEEDED)
324 }) 328 })
325 329
326 muxingSession.on('ffmpeg-error', ({ videoId }) => { 330 muxingSession.on('transcoding-error', ({ videoUUID }) => {
327 this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR) 331 this.stopSessionOf(videoUUID, LiveVideoError.FFMPEG_ERROR)
328 }) 332 })
329 333
330 muxingSession.on('ffmpeg-end', ({ videoId }) => { 334 muxingSession.on('transcoding-end', ({ videoUUID }) => {
331 this.onMuxingFFmpegEnd(videoId, sessionId) 335 this.onMuxingFFmpegEnd(videoUUID, sessionId)
332 }) 336 })
333 337
334 muxingSession.on('after-cleanup', ({ videoId }) => { 338 muxingSession.on('after-cleanup', ({ videoUUID }) => {
335 this.muxingSessions.delete(sessionId) 339 this.muxingSessions.delete(sessionId)
336 340
337 LiveQuotaStore.Instance.removeLive(user.id, videoLive.id) 341 LiveQuotaStore.Instance.removeLive(user.id, videoLive.id)
338 342
339 muxingSession.destroy() 343 muxingSession.destroy()
340 344
341 return this.onAfterMuxingCleanup({ videoId, liveSession }) 345 return this.onAfterMuxingCleanup({ videoUUID, liveSession })
342 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) 346 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
343 }) 347 })
344 348
@@ -379,22 +383,24 @@ class LiveManager {
379 } 383 }
380 } 384 }
381 385
382 private onMuxingFFmpegEnd (videoId: number, sessionId: string) { 386 private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) {
383 this.videoSessions.delete(videoId) 387 this.videoSessions.delete(videoUUID)
384 388
385 this.saveEndingSession(videoId, null) 389 this.saveEndingSession(videoUUID, null)
386 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) 390 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
387 } 391 }
388 392
389 private async onAfterMuxingCleanup (options: { 393 private async onAfterMuxingCleanup (options: {
390 videoId: number | string 394 videoUUID: string
391 liveSession?: MVideoLiveSession 395 liveSession?: MVideoLiveSession
392 cleanupNow?: boolean // Default false 396 cleanupNow?: boolean // Default false
393 }) { 397 }) {
394 const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options 398 const { videoUUID, liveSession: liveSessionArg, cleanupNow = false } = options
399
400 logger.debug('Live of video %s has been cleaned up. Moving to its next state.', videoUUID, lTags(videoUUID))
395 401
396 try { 402 try {
397 const fullVideo = await VideoModel.loadFull(videoId) 403 const fullVideo = await VideoModel.loadFull(videoUUID)
398 if (!fullVideo) return 404 if (!fullVideo) return
399 405
400 const live = await VideoLiveModel.loadByVideoId(fullVideo.id) 406 const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
@@ -437,15 +443,17 @@ class LiveManager {
437 443
438 await federateVideoIfNeeded(fullVideo, false) 444 await federateVideoIfNeeded(fullVideo, false)
439 } catch (err) { 445 } catch (err) {
440 logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') }) 446 logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) })
441 } 447 }
442 } 448 }
443 449
444 private async handleBrokenLives () { 450 private async handleBrokenLives () {
451 await RunnerJobModel.cancelAllJobs({ type: 'live-rtmp-hls-transcoding' })
452
445 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() 453 const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
446 454
447 for (const uuid of videoUUIDs) { 455 for (const uuid of videoUUIDs) {
448 await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true }) 456 await this.onAfterMuxingCleanup({ videoUUID: uuid, cleanupNow: true })
449 } 457 }
450 } 458 }
451 459
@@ -494,8 +502,8 @@ class LiveManager {
494 }) 502 })
495 } 503 }
496 504
497 private async saveEndingSession (videoId: number, error: LiveVideoError | null) { 505 private async saveEndingSession (videoUUID: string, error: LiveVideoError | null) {
498 const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId) 506 const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoUUID)
499 if (!liveSession) return 507 if (!liveSession) return
500 508
501 liveSession.endDate = new Date() 509 liveSession.endDate = new Date()