aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/live
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-05-03 11:38:07 +0200
committerChocobozzz <me@florianbigard.com>2022-05-03 14:49:15 +0200
commit26e3e98ff0e222a9fb9226938ac6902af77921bd (patch)
tree73d1c6f2524e380862d3365f12043fc319d40841 /server/lib/live
parent86c5229b4d726202378ef46854383bcafca22310 (diff)
downloadPeerTube-26e3e98ff0e222a9fb9226938ac6902af77921bd.tar.gz
PeerTube-26e3e98ff0e222a9fb9226938ac6902af77921bd.tar.zst
PeerTube-26e3e98ff0e222a9fb9226938ac6902af77921bd.zip
Support live session in server
Diffstat (limited to 'server/lib/live')
-rw-r--r--server/lib/live/live-manager.ts64
-rw-r--r--server/lib/live/shared/muxing-session.ts10
2 files changed, 60 insertions, 14 deletions
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts
index da09aa05c..df2804a0e 100644
--- a/server/lib/live/live-manager.ts
+++ b/server/lib/live/live-manager.ts
@@ -17,10 +17,11 @@ import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/
17import { UserModel } from '@server/models/user/user' 17import { UserModel } from '@server/models/user/user'
18import { VideoModel } from '@server/models/video/video' 18import { VideoModel } from '@server/models/video/video'
19import { VideoLiveModel } from '@server/models/video/video-live' 19import { VideoLiveModel } from '@server/models/video/video-live'
20import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
20import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' 21import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
21import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' 22import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models'
22import { wait } from '@shared/core-utils' 23import { wait } from '@shared/core-utils'
23import { VideoState, VideoStreamingPlaylistType } from '@shared/models' 24import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models'
24import { federateVideoIfNeeded } from '../activitypub/videos' 25import { federateVideoIfNeeded } from '../activitypub/videos'
25import { JobQueue } from '../job-queue' 26import { JobQueue } from '../job-queue'
26import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' 27import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths'
@@ -174,10 +175,13 @@ class LiveManager {
174 return !!this.rtmpServer 175 return !!this.rtmpServer
175 } 176 }
176 177
177 stopSessionOf (videoId: number) { 178 stopSessionOf (videoId: number, error: LiveVideoError | null) {
178 const sessionId = this.videoSessions.get(videoId) 179 const sessionId = this.videoSessions.get(videoId)
179 if (!sessionId) return 180 if (!sessionId) return
180 181
182 this.saveEndingSession(videoId, error)
183 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
184
181 this.videoSessions.delete(videoId) 185 this.videoSessions.delete(videoId)
182 this.abortSession(sessionId) 186 this.abortSession(sessionId)
183 } 187 }
@@ -274,6 +278,8 @@ class LiveManager {
274 const videoUUID = videoLive.Video.uuid 278 const videoUUID = videoLive.Video.uuid
275 const localLTags = lTags(sessionId, videoUUID) 279 const localLTags = lTags(sessionId, videoUUID)
276 280
281 const liveSession = await this.saveStartingSession(videoLive)
282
277 const user = await UserModel.loadByLiveId(videoLive.id) 283 const user = await UserModel.loadByLiveId(videoLive.id)
278 LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id) 284 LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id)
279 285
@@ -299,24 +305,27 @@ class LiveManager {
299 localLTags 305 localLTags
300 ) 306 )
301 307
302 this.stopSessionOf(videoId) 308 this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH)
303 }) 309 })
304 310
305 muxingSession.on('duration-exceeded', ({ videoId }) => { 311 muxingSession.on('duration-exceeded', ({ videoId }) => {
306 logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) 312 logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags)
307 313
308 this.stopSessionOf(videoId) 314 this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED)
309 }) 315 })
310 316
311 muxingSession.on('quota-exceeded', ({ videoId }) => { 317 muxingSession.on('quota-exceeded', ({ videoId }) => {
312 logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) 318 logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags)
313 319
314 this.stopSessionOf(videoId) 320 this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED)
321 })
322
323 muxingSession.on('ffmpeg-error', ({ videoId }) => {
324 this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR)
315 }) 325 })
316 326
317 muxingSession.on('ffmpeg-error', ({ sessionId }) => this.abortSession(sessionId))
318 muxingSession.on('ffmpeg-end', ({ videoId }) => { 327 muxingSession.on('ffmpeg-end', ({ videoId }) => {
319 this.onMuxingFFmpegEnd(videoId) 328 this.onMuxingFFmpegEnd(videoId, sessionId)
320 }) 329 })
321 330
322 muxingSession.on('after-cleanup', ({ videoId }) => { 331 muxingSession.on('after-cleanup', ({ videoId }) => {
@@ -324,7 +333,7 @@ class LiveManager {
324 333
325 muxingSession.destroy() 334 muxingSession.destroy()
326 335
327 return this.onAfterMuxingCleanup({ videoId }) 336 return this.onAfterMuxingCleanup({ videoId, liveSession })
328 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) 337 .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
329 }) 338 })
330 339
@@ -365,15 +374,19 @@ class LiveManager {
365 } 374 }
366 } 375 }
367 376
368 private onMuxingFFmpegEnd (videoId: number) { 377 private onMuxingFFmpegEnd (videoId: number, sessionId: string) {
369 this.videoSessions.delete(videoId) 378 this.videoSessions.delete(videoId)
379
380 this.saveEndingSession(videoId, null)
381 .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
370 } 382 }
371 383
372 private async onAfterMuxingCleanup (options: { 384 private async onAfterMuxingCleanup (options: {
373 videoId: number | string 385 videoId: number | string
386 liveSession?: MVideoLiveSession
374 cleanupNow?: boolean // Default false 387 cleanupNow?: boolean // Default false
375 }) { 388 }) {
376 const { videoId, cleanupNow = false } = options 389 const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options
377 390
378 try { 391 try {
379 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) 392 const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
@@ -381,13 +394,25 @@ class LiveManager {
381 394
382 const live = await VideoLiveModel.loadByVideoId(fullVideo.id) 395 const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
383 396
397 const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findCurrentSessionOf(fullVideo.id)
398
399 // On server restart during a live
400 if (!liveSession.endDate) {
401 liveSession.endDate = new Date()
402 await liveSession.save()
403 }
404
384 JobQueue.Instance.createJob({ 405 JobQueue.Instance.createJob({
385 type: 'video-live-ending', 406 type: 'video-live-ending',
386 payload: { 407 payload: {
387 videoId: fullVideo.id, 408 videoId: fullVideo.id,
409
388 replayDirectory: live.saveReplay 410 replayDirectory: live.saveReplay
389 ? await this.findReplayDirectory(fullVideo) 411 ? await this.findReplayDirectory(fullVideo)
390 : undefined, 412 : undefined,
413
414 liveSessionId: liveSession.id,
415
391 publishedAt: fullVideo.publishedAt.toISOString() 416 publishedAt: fullVideo.publishedAt.toISOString()
392 } 417 }
393 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) 418 }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
@@ -445,6 +470,23 @@ class LiveManager {
445 return playlist.save() 470 return playlist.save()
446 } 471 }
447 472
473 private saveStartingSession (videoLive: MVideoLiveVideo) {
474 const liveSession = new VideoLiveSessionModel({
475 startDate: new Date(),
476 liveVideoId: videoLive.videoId
477 })
478
479 return liveSession.save()
480 }
481
482 private async saveEndingSession (videoId: number, error: LiveVideoError | null) {
483 const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId)
484 liveSession.endDate = new Date()
485 liveSession.error = error
486
487 return liveSession.save()
488 }
489
448 static get Instance () { 490 static get Instance () {
449 return this.instance || (this.instance = new this()) 491 return this.instance || (this.instance = new this())
450 } 492 }
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts
index 588ee8749..1ee9b430f 100644
--- a/server/lib/live/shared/muxing-session.ts
+++ b/server/lib/live/shared/muxing-session.ts
@@ -28,7 +28,7 @@ interface MuxingSessionEvents {
28 'quota-exceeded': ({ videoId: number }) => void 28 'quota-exceeded': ({ videoId: number }) => void
29 29
30 'ffmpeg-end': ({ videoId: number }) => void 30 'ffmpeg-end': ({ videoId: number }) => void
31 'ffmpeg-error': ({ sessionId: string }) => void 31 'ffmpeg-error': ({ videoId: string }) => void
32 32
33 'after-cleanup': ({ videoId: number }) => void 33 'after-cleanup': ({ videoId: number }) => void
34} 34}
@@ -164,7 +164,11 @@ class MuxingSession extends EventEmitter {
164 this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand }) 164 this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand })
165 }) 165 })
166 166
167 this.ffmpegCommand.on('end', () => this.onFFmpegEnded(this.outDirectory)) 167 this.ffmpegCommand.on('end', () => {
168 this.emit('ffmpeg-end', ({ videoId: this.videoId }))
169
170 this.onFFmpegEnded(this.outDirectory)
171 })
168 172
169 this.ffmpegCommand.run() 173 this.ffmpegCommand.run()
170 } 174 }
@@ -197,7 +201,7 @@ class MuxingSession extends EventEmitter {
197 201
198 logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) 202 logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() })
199 203
200 this.emit('ffmpeg-error', ({ sessionId: this.sessionId })) 204 this.emit('ffmpeg-error', ({ videoId: this.videoId }))
201 } 205 }
202 206
203 private onFFmpegEnded (outPath: string) { 207 private onFFmpegEnded (outPath: string) {