diff options
author | Chocobozzz <me@florianbigard.com> | 2023-04-21 14:55:10 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | 0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch) | |
tree | 226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib/live/shared/transcoding-wrapper | |
parent | 6bcb854cdea8688a32240bc5719c7d139806e00b (diff) | |
download | PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip |
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib/live/shared/transcoding-wrapper')
4 files changed, 219 insertions, 0 deletions
diff --git a/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts new file mode 100644 index 000000000..226ba4573 --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/abstract-transcoding-wrapper.ts | |||
@@ -0,0 +1,101 @@ | |||
1 | import EventEmitter from 'events' | ||
2 | import { LoggerTagsFn } from '@server/helpers/logger' | ||
3 | import { MStreamingPlaylistVideo, MVideoLiveVideo } from '@server/types/models' | ||
4 | import { LiveVideoError } from '@shared/models' | ||
5 | |||
6 | interface TranscodingWrapperEvents { | ||
7 | 'end': () => void | ||
8 | |||
9 | 'error': (options: { err: Error }) => void | ||
10 | } | ||
11 | |||
12 | declare interface AbstractTranscodingWrapper { | ||
13 | on<U extends keyof TranscodingWrapperEvents>( | ||
14 | event: U, listener: TranscodingWrapperEvents[U] | ||
15 | ): this | ||
16 | |||
17 | emit<U extends keyof TranscodingWrapperEvents>( | ||
18 | event: U, ...args: Parameters<TranscodingWrapperEvents[U]> | ||
19 | ): boolean | ||
20 | } | ||
21 | |||
22 | interface AbstractTranscodingWrapperOptions { | ||
23 | streamingPlaylist: MStreamingPlaylistVideo | ||
24 | videoLive: MVideoLiveVideo | ||
25 | |||
26 | lTags: LoggerTagsFn | ||
27 | |||
28 | inputUrl: string | ||
29 | fps: number | ||
30 | toTranscode: { | ||
31 | resolution: number | ||
32 | fps: number | ||
33 | }[] | ||
34 | |||
35 | bitrate: number | ||
36 | ratio: number | ||
37 | hasAudio: boolean | ||
38 | |||
39 | segmentListSize: number | ||
40 | segmentDuration: number | ||
41 | |||
42 | outDirectory: string | ||
43 | } | ||
44 | |||
45 | abstract class AbstractTranscodingWrapper extends EventEmitter { | ||
46 | protected readonly videoLive: MVideoLiveVideo | ||
47 | |||
48 | protected readonly toTranscode: { | ||
49 | resolution: number | ||
50 | fps: number | ||
51 | }[] | ||
52 | |||
53 | protected readonly inputUrl: string | ||
54 | protected readonly fps: number | ||
55 | protected readonly bitrate: number | ||
56 | protected readonly ratio: number | ||
57 | protected readonly hasAudio: boolean | ||
58 | |||
59 | protected readonly segmentListSize: number | ||
60 | protected readonly segmentDuration: number | ||
61 | |||
62 | protected readonly videoUUID: string | ||
63 | |||
64 | protected readonly outDirectory: string | ||
65 | |||
66 | protected readonly lTags: LoggerTagsFn | ||
67 | |||
68 | protected readonly streamingPlaylist: MStreamingPlaylistVideo | ||
69 | |||
70 | constructor (options: AbstractTranscodingWrapperOptions) { | ||
71 | super() | ||
72 | |||
73 | this.lTags = options.lTags | ||
74 | |||
75 | this.videoLive = options.videoLive | ||
76 | this.videoUUID = options.videoLive.Video.uuid | ||
77 | this.streamingPlaylist = options.streamingPlaylist | ||
78 | |||
79 | this.inputUrl = options.inputUrl | ||
80 | this.fps = options.fps | ||
81 | this.toTranscode = options.toTranscode | ||
82 | |||
83 | this.bitrate = options.bitrate | ||
84 | this.ratio = options.ratio | ||
85 | this.hasAudio = options.hasAudio | ||
86 | |||
87 | this.segmentListSize = options.segmentListSize | ||
88 | this.segmentDuration = options.segmentDuration | ||
89 | |||
90 | this.outDirectory = options.outDirectory | ||
91 | } | ||
92 | |||
93 | abstract run (): Promise<void> | ||
94 | |||
95 | abstract abort (error?: LiveVideoError): void | ||
96 | } | ||
97 | |||
98 | export { | ||
99 | AbstractTranscodingWrapper, | ||
100 | AbstractTranscodingWrapperOptions | ||
101 | } | ||
diff --git a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts new file mode 100644 index 000000000..1f4c12bd4 --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts | |||
@@ -0,0 +1,95 @@ | |||
1 | import { FfmpegCommand } from 'fluent-ffmpeg' | ||
2 | import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { CONFIG } from '@server/initializers/config' | ||
5 | import { VIDEO_LIVE } from '@server/initializers/constants' | ||
6 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' | ||
7 | import { FFmpegLive } from '@shared/ffmpeg' | ||
8 | import { getLiveSegmentTime } from '../../live-utils' | ||
9 | import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper' | ||
10 | |||
11 | export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper { | ||
12 | private ffmpegCommand: FfmpegCommand | ||
13 | private ended = false | ||
14 | |||
15 | async run () { | ||
16 | this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED | ||
17 | ? await this.buildFFmpegLive().getLiveTranscodingCommand({ | ||
18 | inputUrl: this.inputUrl, | ||
19 | |||
20 | outPath: this.outDirectory, | ||
21 | masterPlaylistName: this.streamingPlaylist.playlistFilename, | ||
22 | |||
23 | segmentListSize: this.segmentListSize, | ||
24 | segmentDuration: this.segmentDuration, | ||
25 | |||
26 | toTranscode: this.toTranscode, | ||
27 | |||
28 | bitrate: this.bitrate, | ||
29 | ratio: this.ratio, | ||
30 | |||
31 | hasAudio: this.hasAudio | ||
32 | }) | ||
33 | : this.buildFFmpegLive().getLiveMuxingCommand({ | ||
34 | inputUrl: this.inputUrl, | ||
35 | outPath: this.outDirectory, | ||
36 | |||
37 | masterPlaylistName: this.streamingPlaylist.playlistFilename, | ||
38 | |||
39 | segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE, | ||
40 | segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode) | ||
41 | }) | ||
42 | |||
43 | logger.info('Running local live muxing/transcoding for %s.', this.videoUUID, this.lTags()) | ||
44 | |||
45 | this.ffmpegCommand.run() | ||
46 | |||
47 | let ffmpegShellCommand: string | ||
48 | this.ffmpegCommand.on('start', cmdline => { | ||
49 | ffmpegShellCommand = cmdline | ||
50 | |||
51 | logger.debug('Running ffmpeg command for live', { ffmpegShellCommand, ...this.lTags() }) | ||
52 | }) | ||
53 | |||
54 | this.ffmpegCommand.on('error', (err, stdout, stderr) => { | ||
55 | this.onFFmpegError({ err, stdout, stderr, ffmpegShellCommand }) | ||
56 | }) | ||
57 | |||
58 | this.ffmpegCommand.on('end', () => { | ||
59 | this.onFFmpegEnded() | ||
60 | }) | ||
61 | |||
62 | this.ffmpegCommand.run() | ||
63 | } | ||
64 | |||
65 | abort () { | ||
66 | // Nothing to do, ffmpeg will automatically exit | ||
67 | } | ||
68 | |||
69 | private onFFmpegError (options: { | ||
70 | err: any | ||
71 | stdout: string | ||
72 | stderr: string | ||
73 | ffmpegShellCommand: string | ||
74 | }) { | ||
75 | const { err, stdout, stderr, ffmpegShellCommand } = options | ||
76 | |||
77 | // Don't care that we killed the ffmpeg process | ||
78 | if (err?.message?.includes('Exiting normally')) return | ||
79 | |||
80 | logger.error('FFmpeg transcoding error.', { err, stdout, stderr, ffmpegShellCommand, ...this.lTags() }) | ||
81 | |||
82 | this.emit('error', { err }) | ||
83 | } | ||
84 | |||
85 | private onFFmpegEnded () { | ||
86 | if (this.ended) return | ||
87 | |||
88 | this.ended = true | ||
89 | this.emit('end') | ||
90 | } | ||
91 | |||
92 | private buildFFmpegLive () { | ||
93 | return new FFmpegLive(getFFmpegCommandWrapperOptions('live', VideoTranscodingProfilesManager.Instance.getAvailableEncoders())) | ||
94 | } | ||
95 | } | ||
diff --git a/server/lib/live/shared/transcoding-wrapper/index.ts b/server/lib/live/shared/transcoding-wrapper/index.ts new file mode 100644 index 000000000..ae28fa1ca --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/index.ts | |||
@@ -0,0 +1,3 @@ | |||
1 | export * from './abstract-transcoding-wrapper' | ||
2 | export * from './ffmpeg-transcoding-wrapper' | ||
3 | export * from './remote-transcoding-wrapper' | ||
diff --git a/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts new file mode 100644 index 000000000..345eaf442 --- /dev/null +++ b/server/lib/live/shared/transcoding-wrapper/remote-transcoding-wrapper.ts | |||
@@ -0,0 +1,20 @@ | |||
1 | import { LiveRTMPHLSTranscodingJobHandler } from '@server/lib/runners' | ||
2 | import { AbstractTranscodingWrapper } from './abstract-transcoding-wrapper' | ||
3 | |||
4 | export class RemoteTranscodingWrapper extends AbstractTranscodingWrapper { | ||
5 | async run () { | ||
6 | await new LiveRTMPHLSTranscodingJobHandler().create({ | ||
7 | rtmpUrl: this.inputUrl, | ||
8 | toTranscode: this.toTranscode, | ||
9 | video: this.videoLive.Video, | ||
10 | outputDirectory: this.outDirectory, | ||
11 | playlist: this.streamingPlaylist, | ||
12 | segmentListSize: this.segmentListSize, | ||
13 | segmentDuration: this.segmentDuration | ||
14 | }) | ||
15 | } | ||
16 | |||
17 | abort () { | ||
18 | this.emit('end') | ||
19 | } | ||
20 | } | ||