diff options
author | Chocobozzz <me@florianbigard.com> | 2023-04-21 14:55:10 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | 0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch) | |
tree | 226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib/live/live-manager.ts | |
parent | 6bcb854cdea8688a32240bc5719c7d139806e00b (diff) | |
download | PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip |
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib/live/live-manager.ts')
-rw-r--r-- | server/lib/live/live-manager.ts | 94 |
1 files changed, 51 insertions, 43 deletions
diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 05274955d..aa32a9d52 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts | |||
@@ -2,36 +2,30 @@ import { readdir, readFile } from 'fs-extra' | |||
2 | import { createServer, Server } from 'net' | 2 | import { createServer, Server } from 'net' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { createServer as createServerTLS, Server as ServerTLS } from 'tls' | 4 | import { createServer as createServerTLS, Server as ServerTLS } from 'tls' |
5 | import { | ||
6 | computeResolutionsToTranscode, | ||
7 | ffprobePromise, | ||
8 | getLiveSegmentTime, | ||
9 | getVideoStreamBitrate, | ||
10 | getVideoStreamDimensionsInfo, | ||
11 | getVideoStreamFPS, | ||
12 | hasAudioStream | ||
13 | } from '@server/helpers/ffmpeg' | ||
14 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 5 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
15 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' | 6 | import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config' |
16 | import { VIDEO_LIVE } from '@server/initializers/constants' | 7 | import { VIDEO_LIVE } from '@server/initializers/constants' |
8 | import { sequelizeTypescript } from '@server/initializers/database' | ||
17 | import { UserModel } from '@server/models/user/user' | 9 | import { UserModel } from '@server/models/user/user' |
18 | import { VideoModel } from '@server/models/video/video' | 10 | import { VideoModel } from '@server/models/video/video' |
19 | import { VideoLiveModel } from '@server/models/video/video-live' | 11 | import { VideoLiveModel } from '@server/models/video/video-live' |
12 | import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' | ||
20 | import { VideoLiveSessionModel } from '@server/models/video/video-live-session' | 13 | import { VideoLiveSessionModel } from '@server/models/video/video-live-session' |
21 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | 14 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' |
22 | import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models' | 15 | import { MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models' |
23 | import { pick, wait } from '@shared/core-utils' | 16 | import { pick, wait } from '@shared/core-utils' |
17 | import { ffprobePromise, getVideoStreamBitrate, getVideoStreamDimensionsInfo, getVideoStreamFPS, hasAudioStream } from '@shared/ffmpeg' | ||
24 | import { LiveVideoError, VideoState } from '@shared/models' | 18 | import { LiveVideoError, VideoState } from '@shared/models' |
25 | import { federateVideoIfNeeded } from '../activitypub/videos' | 19 | import { federateVideoIfNeeded } from '../activitypub/videos' |
26 | import { JobQueue } from '../job-queue' | 20 | import { JobQueue } from '../job-queue' |
27 | import { getLiveReplayBaseDirectory } from '../paths' | 21 | import { getLiveReplayBaseDirectory } from '../paths' |
28 | import { PeerTubeSocket } from '../peertube-socket' | 22 | import { PeerTubeSocket } from '../peertube-socket' |
29 | import { Hooks } from '../plugins/hooks' | 23 | import { Hooks } from '../plugins/hooks' |
24 | import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolutions' | ||
30 | import { LiveQuotaStore } from './live-quota-store' | 25 | import { LiveQuotaStore } from './live-quota-store' |
31 | import { cleanupAndDestroyPermanentLive } from './live-utils' | 26 | import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils' |
32 | import { MuxingSession } from './shared' | 27 | import { MuxingSession } from './shared' |
33 | import { sequelizeTypescript } from '@server/initializers/database' | 28 | import { RunnerJobModel } from '@server/models/runner/runner-job' |
34 | import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' | ||
35 | 29 | ||
36 | const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') | 30 | const NodeRtmpSession = require('node-media-server/src/node_rtmp_session') |
37 | const context = require('node-media-server/src/node_core_ctx') | 31 | const context = require('node-media-server/src/node_core_ctx') |
@@ -57,7 +51,7 @@ class LiveManager { | |||
57 | private static instance: LiveManager | 51 | private static instance: LiveManager |
58 | 52 | ||
59 | private readonly muxingSessions = new Map<string, MuxingSession>() | 53 | private readonly muxingSessions = new Map<string, MuxingSession>() |
60 | private readonly videoSessions = new Map<number, string>() | 54 | private readonly videoSessions = new Map<string, string>() |
61 | 55 | ||
62 | private rtmpServer: Server | 56 | private rtmpServer: Server |
63 | private rtmpsServer: ServerTLS | 57 | private rtmpsServer: ServerTLS |
@@ -177,14 +171,19 @@ class LiveManager { | |||
177 | return !!this.rtmpServer | 171 | return !!this.rtmpServer |
178 | } | 172 | } |
179 | 173 | ||
180 | stopSessionOf (videoId: number, error: LiveVideoError | null) { | 174 | stopSessionOf (videoUUID: string, error: LiveVideoError | null) { |
181 | const sessionId = this.videoSessions.get(videoId) | 175 | const sessionId = this.videoSessions.get(videoUUID) |
182 | if (!sessionId) return | 176 | if (!sessionId) { |
177 | logger.debug('No live session to stop for video %s', videoUUID, lTags(sessionId, videoUUID)) | ||
178 | return | ||
179 | } | ||
183 | 180 | ||
184 | this.saveEndingSession(videoId, error) | 181 | logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) }) |
185 | .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) | ||
186 | 182 | ||
187 | this.videoSessions.delete(videoId) | 183 | this.saveEndingSession(videoUUID, error) |
184 | .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId, videoUUID) })) | ||
185 | |||
186 | this.videoSessions.delete(videoUUID) | ||
188 | this.abortSession(sessionId) | 187 | this.abortSession(sessionId) |
189 | } | 188 | } |
190 | 189 | ||
@@ -221,6 +220,11 @@ class LiveManager { | |||
221 | return this.abortSession(sessionId) | 220 | return this.abortSession(sessionId) |
222 | } | 221 | } |
223 | 222 | ||
223 | if (this.videoSessions.has(video.uuid)) { | ||
224 | logger.warn('Video %s has already a live session. Refusing stream %s.', video.uuid, streamKey, lTags(sessionId, video.uuid)) | ||
225 | return this.abortSession(sessionId) | ||
226 | } | ||
227 | |||
224 | // Cleanup old potential live (could happen with a permanent live) | 228 | // Cleanup old potential live (could happen with a permanent live) |
225 | const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) | 229 | const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id) |
226 | if (oldStreamingPlaylist) { | 230 | if (oldStreamingPlaylist) { |
@@ -229,7 +233,7 @@ class LiveManager { | |||
229 | await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) | 233 | await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist) |
230 | } | 234 | } |
231 | 235 | ||
232 | this.videoSessions.set(video.id, sessionId) | 236 | this.videoSessions.set(video.uuid, sessionId) |
233 | 237 | ||
234 | const now = Date.now() | 238 | const now = Date.now() |
235 | const probe = await ffprobePromise(inputUrl) | 239 | const probe = await ffprobePromise(inputUrl) |
@@ -253,7 +257,7 @@ class LiveManager { | |||
253 | ) | 257 | ) |
254 | 258 | ||
255 | logger.info( | 259 | logger.info( |
256 | 'Will mux/transcode live video of original resolution %d.', resolution, | 260 | 'Handling live video of original resolution %d.', resolution, |
257 | { allResolutions, ...lTags(sessionId, video.uuid) } | 261 | { allResolutions, ...lTags(sessionId, video.uuid) } |
258 | ) | 262 | ) |
259 | 263 | ||
@@ -301,44 +305,44 @@ class LiveManager { | |||
301 | 305 | ||
302 | muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) | 306 | muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags)) |
303 | 307 | ||
304 | muxingSession.on('bad-socket-health', ({ videoId }) => { | 308 | muxingSession.on('bad-socket-health', ({ videoUUID }) => { |
305 | logger.error( | 309 | logger.error( |
306 | 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + | 310 | 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' + |
307 | ' Stopping session of video %s.', videoUUID, | 311 | ' Stopping session of video %s.', videoUUID, |
308 | localLTags | 312 | localLTags |
309 | ) | 313 | ) |
310 | 314 | ||
311 | this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH) | 315 | this.stopSessionOf(videoUUID, LiveVideoError.BAD_SOCKET_HEALTH) |
312 | }) | 316 | }) |
313 | 317 | ||
314 | muxingSession.on('duration-exceeded', ({ videoId }) => { | 318 | muxingSession.on('duration-exceeded', ({ videoUUID }) => { |
315 | logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) | 319 | logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags) |
316 | 320 | ||
317 | this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED) | 321 | this.stopSessionOf(videoUUID, LiveVideoError.DURATION_EXCEEDED) |
318 | }) | 322 | }) |
319 | 323 | ||
320 | muxingSession.on('quota-exceeded', ({ videoId }) => { | 324 | muxingSession.on('quota-exceeded', ({ videoUUID }) => { |
321 | logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) | 325 | logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags) |
322 | 326 | ||
323 | this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED) | 327 | this.stopSessionOf(videoUUID, LiveVideoError.QUOTA_EXCEEDED) |
324 | }) | 328 | }) |
325 | 329 | ||
326 | muxingSession.on('ffmpeg-error', ({ videoId }) => { | 330 | muxingSession.on('transcoding-error', ({ videoUUID }) => { |
327 | this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR) | 331 | this.stopSessionOf(videoUUID, LiveVideoError.FFMPEG_ERROR) |
328 | }) | 332 | }) |
329 | 333 | ||
330 | muxingSession.on('ffmpeg-end', ({ videoId }) => { | 334 | muxingSession.on('transcoding-end', ({ videoUUID }) => { |
331 | this.onMuxingFFmpegEnd(videoId, sessionId) | 335 | this.onMuxingFFmpegEnd(videoUUID, sessionId) |
332 | }) | 336 | }) |
333 | 337 | ||
334 | muxingSession.on('after-cleanup', ({ videoId }) => { | 338 | muxingSession.on('after-cleanup', ({ videoUUID }) => { |
335 | this.muxingSessions.delete(sessionId) | 339 | this.muxingSessions.delete(sessionId) |
336 | 340 | ||
337 | LiveQuotaStore.Instance.removeLive(user.id, videoLive.id) | 341 | LiveQuotaStore.Instance.removeLive(user.id, videoLive.id) |
338 | 342 | ||
339 | muxingSession.destroy() | 343 | muxingSession.destroy() |
340 | 344 | ||
341 | return this.onAfterMuxingCleanup({ videoId, liveSession }) | 345 | return this.onAfterMuxingCleanup({ videoUUID, liveSession }) |
342 | .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) | 346 | .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags })) |
343 | }) | 347 | }) |
344 | 348 | ||
@@ -379,22 +383,24 @@ class LiveManager { | |||
379 | } | 383 | } |
380 | } | 384 | } |
381 | 385 | ||
382 | private onMuxingFFmpegEnd (videoId: number, sessionId: string) { | 386 | private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) { |
383 | this.videoSessions.delete(videoId) | 387 | this.videoSessions.delete(videoUUID) |
384 | 388 | ||
385 | this.saveEndingSession(videoId, null) | 389 | this.saveEndingSession(videoUUID, null) |
386 | .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) | 390 | .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) })) |
387 | } | 391 | } |
388 | 392 | ||
389 | private async onAfterMuxingCleanup (options: { | 393 | private async onAfterMuxingCleanup (options: { |
390 | videoId: number | string | 394 | videoUUID: string |
391 | liveSession?: MVideoLiveSession | 395 | liveSession?: MVideoLiveSession |
392 | cleanupNow?: boolean // Default false | 396 | cleanupNow?: boolean // Default false |
393 | }) { | 397 | }) { |
394 | const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options | 398 | const { videoUUID, liveSession: liveSessionArg, cleanupNow = false } = options |
399 | |||
400 | logger.debug('Live of video %s has been cleaned up. Moving to its next state.', videoUUID, lTags(videoUUID)) | ||
395 | 401 | ||
396 | try { | 402 | try { |
397 | const fullVideo = await VideoModel.loadFull(videoId) | 403 | const fullVideo = await VideoModel.loadFull(videoUUID) |
398 | if (!fullVideo) return | 404 | if (!fullVideo) return |
399 | 405 | ||
400 | const live = await VideoLiveModel.loadByVideoId(fullVideo.id) | 406 | const live = await VideoLiveModel.loadByVideoId(fullVideo.id) |
@@ -437,15 +443,17 @@ class LiveManager { | |||
437 | 443 | ||
438 | await federateVideoIfNeeded(fullVideo, false) | 444 | await federateVideoIfNeeded(fullVideo, false) |
439 | } catch (err) { | 445 | } catch (err) { |
440 | logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') }) | 446 | logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) }) |
441 | } | 447 | } |
442 | } | 448 | } |
443 | 449 | ||
444 | private async handleBrokenLives () { | 450 | private async handleBrokenLives () { |
451 | await RunnerJobModel.cancelAllJobs({ type: 'live-rtmp-hls-transcoding' }) | ||
452 | |||
445 | const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() | 453 | const videoUUIDs = await VideoModel.listPublishedLiveUUIDs() |
446 | 454 | ||
447 | for (const uuid of videoUUIDs) { | 455 | for (const uuid of videoUUIDs) { |
448 | await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true }) | 456 | await this.onAfterMuxingCleanup({ videoUUID: uuid, cleanupNow: true }) |
449 | } | 457 | } |
450 | } | 458 | } |
451 | 459 | ||
@@ -494,8 +502,8 @@ class LiveManager { | |||
494 | }) | 502 | }) |
495 | } | 503 | } |
496 | 504 | ||
497 | private async saveEndingSession (videoId: number, error: LiveVideoError | null) { | 505 | private async saveEndingSession (videoUUID: string, error: LiveVideoError | null) { |
498 | const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId) | 506 | const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoUUID) |
499 | if (!liveSession) return | 507 | if (!liveSession) return |
500 | 508 | ||
501 | liveSession.endDate = new Date() | 509 | liveSession.endDate = new Date() |