diff options
author | Chocobozzz <me@florianbigard.com> | 2022-05-03 11:38:07 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-05-03 14:49:15 +0200 |
commit | 26e3e98ff0e222a9fb9226938ac6902af77921bd (patch) | |
tree | 73d1c6f2524e380862d3365f12043fc319d40841 /server/lib/live | |
parent | 86c5229b4d726202378ef46854383bcafca22310 (diff) | |
download | PeerTube-26e3e98ff0e222a9fb9226938ac6902af77921bd.tar.gz PeerTube-26e3e98ff0e222a9fb9226938ac6902af77921bd.tar.zst PeerTube-26e3e98ff0e222a9fb9226938ac6902af77921bd.zip |
Support live session in server
Diffstat (limited to 'server/lib/live')
-rw-r--r-- | server/lib/live/live-manager.ts | 64 | ||||
-rw-r--r-- | server/lib/live/shared/muxing-session.ts | 10 |
2 files changed, 60 insertions, 14 deletions
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index da09aa05c..df2804a0e 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts | |||
@@ -17,10 +17,11 @@ import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/ | |||
17 | import { UserModel } from '@server/models/user/user' | 17 | import { UserModel } from '@server/models/user/user' |
18 | import { VideoModel } from '@server/models/video/video' | 18 | import { VideoModel } from '@server/models/video/video' |
19 | import { VideoLiveModel } from '@server/models/video/video-live' | 19 | import { VideoLiveModel } from '@server/models/video/video-live' |
20 | import { VideoLiveSessionModel } from '@server/models/video/video-live-session' | ||
20 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | 21 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' |
21 | import { MStreamingPlaylistVideo, MVideo, MVideoLiveVideo } from '@server/types/models' | 22 | import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models' |
22 | import { wait } from '@shared/core-utils' | 23 | import { wait } from '@shared/core-utils' |
23 | import { VideoState, VideoStreamingPlaylistType } from '@shared/models' | 24 | import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models' |
24 | import { federateVideoIfNeeded } from '../activitypub/videos' | 25 | import { federateVideoIfNeeded } from '../activitypub/videos' |
25 | import { JobQueue } from '../job-queue' | 26 | import { JobQueue } from '../job-queue' |
26 | import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' | 27 | import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths' |
@@ -174,10 +175,13 @@ class LiveManager { | |||
174 | return !!this.rtmpServer | 175 | return !!this.rtmpServer |
175 | } | 176 | } |
176 | 177 | ||
177 | stopSessionOf (videoId: number) { | 178 | stopSessionOf (videoId: number, error: LiveVideoError | null) { |
178 | const sessionId = this.videoSessions.get(videoId) | 179 | const sessionId = this.videoSessions.get(videoId) |
179 | if (!sessionId) return | 180 | if (!sessionId) return |
180 | 181 | ||
182 | this.saveEndingSession(videoId, error) | ||
183 | .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) | ||
184 | |||
181 | this.videoSessions.delete(videoId) | 185 | this.videoSessions.delete(videoId) |
182 | this.abortSession(sessionId) | 186 | this.abortSession(sessionId) |
183 | } | 187 | } |
@@ -274,6 +278,8 @@ class LiveManager { | |||
274 | const videoUUID = videoLive.Video.uuid | 278 | const videoUUID = videoLive.Video.uuid |
275 | const localLTags = lTags(sessionId, videoUUID) | 279 | const localLTags = lTags(sessionId, videoUUID) |
276 | 280 | ||
281 | const liveSession = await this.saveStartingSession(videoLive) | ||
282 | |||
277 | const user = await UserModel.loadByLiveId(videoLive.id) | 283 | const user = await UserModel.loadByLiveId(videoLive.id) |
278 | LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id) | 284 | LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id) |
279 | 285 | ||
@@ -299,24 +305,27 @@ class LiveManager { | |||
299 | localLTags | 305 | localLTags |
300 | ) | 306 | ) |
301 | 307 | ||
302 | this.stopSessionOf(videoId) | 308 | this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH) |
303 | }) | 309 | }) |
304 | 310 | ||
305 | muxingSession.on('duration-exceeded', ({ videoId }) => { | 311 | muxingSession.on('duration-exceeded', ({ videoId }) => { |
306 | logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) | 312 | logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) |
307 | 313 | ||
308 | this.stopSessionOf(videoId) | 314 | this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED) |
309 | }) | 315 | }) |
310 | 316 | ||
311 | muxingSession.on('quota-exceeded', ({ videoId }) => { | 317 | muxingSession.on('quota-exceeded', ({ videoId }) => { |
312 | logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) | 318 | logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) |
313 | 319 | ||
314 | this.stopSessionOf(videoId) | 320 | this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED) |
321 | }) | ||
322 | |||
323 | muxingSession.on('ffmpeg-error', ({ videoId }) => { | ||
324 | this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR) | ||
315 | }) | 325 | }) |
316 | 326 | ||
317 | muxingSession.on('ffmpeg-error', ({ sessionId }) => this.abortSession(sessionId)) | ||
318 | muxingSession.on('ffmpeg-end', ({ videoId }) => { | 327 | muxingSession.on('ffmpeg-end', ({ videoId }) => { |
319 | this.onMuxingFFmpegEnd(videoId) | 328 | this.onMuxingFFmpegEnd(videoId, sessionId) |
320 | }) | 329 | }) |
321 | 330 | ||
322 | muxingSession.on('after-cleanup', ({ videoId }) => { | 331 | muxingSession.on('after-cleanup', ({ videoId }) => { |
@@ -324,7 +333,7 @@ class LiveManager { | |||
324 | 333 | ||
325 | muxingSession.destroy() | 334 | muxingSession.destroy() |
326 | 335 | ||
327 | return this.onAfterMuxingCleanup({ videoId }) | 336 | return this.onAfterMuxingCleanup({ videoId, liveSession }) |
328 | .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) | 337 | .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) |
329 | }) | 338 | }) |
330 | 339 | ||
@@ -365,15 +374,19 @@ class LiveManager { | |||
365 | } | 374 | } |
366 | } | 375 | } |
367 | 376 | ||
368 | private onMuxingFFmpegEnd (videoId: number) { | 377 | private onMuxingFFmpegEnd (videoId: number, sessionId: string) { |
369 | this.videoSessions.delete(videoId) | 378 | this.videoSessions.delete(videoId) |
379 | |||
380 | this.saveEndingSession(videoId, null) | ||
381 | .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) | ||
370 | } | 382 | } |
371 | 383 | ||
372 | private async onAfterMuxingCleanup (options: { | 384 | private async onAfterMuxingCleanup (options: { |
373 | videoId: number | string | 385 | videoId: number | string |
386 | liveSession?: MVideoLiveSession | ||
374 | cleanupNow?: boolean // Default false | 387 | cleanupNow?: boolean // Default false |
375 | }) { | 388 | }) { |
376 | const { videoId, cleanupNow = false } = options | 389 | const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options |
377 | 390 | ||
378 | try { | 391 | try { |
379 | const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) | 392 | const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId) |
@@ -381,13 +394,25 @@ class LiveManager { | |||
381 | 394 | ||
382 | const live = await VideoLiveModel.loadByVideoId(fullVideo.id) | 395 | const live = await VideoLiveModel.loadByVideoId(fullVideo.id) |
383 | 396 | ||
397 | const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findCurrentSessionOf(fullVideo.id) | ||
398 | |||
399 | // On server restart during a live | ||
400 | if (!liveSession.endDate) { | ||
401 | liveSession.endDate = new Date() | ||
402 | await liveSession.save() | ||
403 | } | ||
404 | |||
384 | JobQueue.Instance.createJob({ | 405 | JobQueue.Instance.createJob({ |
385 | type: 'video-live-ending', | 406 | type: 'video-live-ending', |
386 | payload: { | 407 | payload: { |
387 | videoId: fullVideo.id, | 408 | videoId: fullVideo.id, |
409 | |||
388 | replayDirectory: live.saveReplay | 410 | replayDirectory: live.saveReplay |
389 | ? await this.findReplayDirectory(fullVideo) | 411 | ? await this.findReplayDirectory(fullVideo) |
390 | : undefined, | 412 | : undefined, |
413 | |||
414 | liveSessionId: liveSession.id, | ||
415 | |||
391 | publishedAt: fullVideo.publishedAt.toISOString() | 416 | publishedAt: fullVideo.publishedAt.toISOString() |
392 | } | 417 | } |
393 | }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) | 418 | }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) |
@@ -445,6 +470,23 @@ class LiveManager { | |||
445 | return playlist.save() | 470 | return playlist.save() |
446 | } | 471 | } |
447 | 472 | ||
473 | private saveStartingSession (videoLive: MVideoLiveVideo) { | ||
474 | const liveSession = new VideoLiveSessionModel({ | ||
475 | startDate: new Date(), | ||
476 | liveVideoId: videoLive.videoId | ||
477 | }) | ||
478 | |||
479 | return liveSession.save() | ||
480 | } | ||
481 | |||
482 | private async saveEndingSession (videoId: number, error: LiveVideoError | null) { | ||
483 | const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId) | ||
484 | liveSession.endDate = new Date() | ||
485 | liveSession.error = error | ||
486 | |||
487 | return liveSession.save() | ||
488 | } | ||
489 | |||
448 | static get Instance () { | 490 | static get Instance () { |
449 | return this.instance || (this.instance = new this()) | 491 | return this.instance || (this.instance = new this()) |
450 | } | 492 | } |
diff --git a/server/lib/live/shared/muxing-session.ts b/server/lib/live/shared/muxing-session.ts index 588ee8749..1ee9b430f 100644 --- a/server/lib/live/shared/muxing-session.ts +++ b/server/lib/live/shared/muxing-session.ts | |||
@@ -28,7 +28,7 @@ interface MuxingSessionEvents { | |||
28 | 'quota-exceeded': ({ videoId: number }) => void | 28 | 'quota-exceeded': ({ videoId: number }) => void |
29 | 29 | ||
30 | 'ffmpeg-end': ({ videoId: number }) => void | 30 | 'ffmpeg-end': ({ videoId: number }) => void |
31 | 'ffmpeg-error': ({ sessionId: string }) => void | 31 | 'ffmpeg-error': ({ videoId: string }) => void |
32 | 32 | ||
33 | 'after-cleanup': ({ videoId: number }) => void | 33 | 'after-cleanup': ({ videoId: number }) => void |
34 | } | 34 | } |
@@ -164,7 +164,11 @@ class MuxingSession extends EventEmitter { | |||
164 | this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand }) | 164 | this.onFFmpegError({ err, stdout, stderr, outPath: this.outDirectory, ffmpegShellCommand }) |
165 | }) | 165 | }) |
166 | 166 | ||
167 | this.ffmpegCommand.on('end', () => this.onFFmpegEnded(this.outDirectory)) | 167 | this.ffmpegCommand.on('end', () => { |
168 | this.emit('ffmpeg-end', ({ videoId: this.videoId })) | ||
169 | |||
170 | this.onFFmpegEnded(this.outDirectory) | ||
171 | }) | ||
168 | 172 | ||
169 | this.ffmpegCommand.run() | 173 | this.ffmpegCommand.run() |
170 | } | 174 | } |
@@ -197,7 +201,7 @@ class MuxingSession extends EventEmitter { | |||
197 | 201 | ||
198 | logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) | 202 | logger.error('Live transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) |
199 | 203 | ||
200 | this.emit('ffmpeg-error', ({ sessionId: this.sessionId })) | 204 | this.emit('ffmpeg-error', ({ videoId: this.videoId })) |
201 | } | 205 | } |
202 | 206 | ||
203 | private onFFmpegEnded (outPath: string) { | 207 | private onFFmpegEnded (outPath: string) { |