diff options
author | Chocobozzz <me@florianbigard.com> | 2023-05-04 15:29:34 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | 5e47f6ab984a7d00782e4c7030afffa1ba480add (patch) | |
tree | 1ce586b591a8d71acbc301eba29b9a5e6490439e /server/lib/runners/job-handlers | |
parent | 6a4905602636afd6650c9e6f4d0fcc2105d91100 (diff) | |
download | PeerTube-5e47f6ab984a7d00782e4c7030afffa1ba480add.tar.gz PeerTube-5e47f6ab984a7d00782e4c7030afffa1ba480add.tar.zst PeerTube-5e47f6ab984a7d00782e4c7030afffa1ba480add.zip |
Support studio transcoding in peertube runner
Diffstat (limited to 'server/lib/runners/job-handlers')
9 files changed, 177 insertions, 16 deletions
diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts index 74b455107..76fd1c5ac 100644 --- a/server/lib/runners/job-handlers/abstract-job-handler.ts +++ b/server/lib/runners/job-handlers/abstract-job-handler.ts | |||
@@ -1,3 +1,4 @@ | |||
1 | import { throttle } from 'lodash' | ||
1 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | 2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' |
2 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 3 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
3 | import { RUNNER_JOBS } from '@server/initializers/constants' | 4 | import { RUNNER_JOBS } from '@server/initializers/constants' |
@@ -14,6 +15,8 @@ import { | |||
14 | RunnerJobSuccessPayload, | 15 | RunnerJobSuccessPayload, |
15 | RunnerJobType, | 16 | RunnerJobType, |
16 | RunnerJobUpdatePayload, | 17 | RunnerJobUpdatePayload, |
18 | RunnerJobVideoEditionTranscodingPayload, | ||
19 | RunnerJobVideoEditionTranscodingPrivatePayload, | ||
17 | RunnerJobVODAudioMergeTranscodingPayload, | 20 | RunnerJobVODAudioMergeTranscodingPayload, |
18 | RunnerJobVODAudioMergeTranscodingPrivatePayload, | 21 | RunnerJobVODAudioMergeTranscodingPrivatePayload, |
19 | RunnerJobVODHLSTranscodingPayload, | 22 | RunnerJobVODHLSTranscodingPayload, |
@@ -21,7 +24,6 @@ import { | |||
21 | RunnerJobVODWebVideoTranscodingPayload, | 24 | RunnerJobVODWebVideoTranscodingPayload, |
22 | RunnerJobVODWebVideoTranscodingPrivatePayload | 25 | RunnerJobVODWebVideoTranscodingPrivatePayload |
23 | } from '@shared/models' | 26 | } from '@shared/models' |
24 | import { throttle } from 'lodash' | ||
25 | 27 | ||
26 | type CreateRunnerJobArg = | 28 | type CreateRunnerJobArg = |
27 | { | 29 | { |
@@ -43,6 +45,11 @@ type CreateRunnerJobArg = | |||
43 | type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'> | 45 | type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'> |
44 | payload: RunnerJobLiveRTMPHLSTranscodingPayload | 46 | payload: RunnerJobLiveRTMPHLSTranscodingPayload |
45 | privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload | 47 | privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload |
48 | } | | ||
49 | { | ||
50 | type: Extract<RunnerJobType, 'video-edition-transcoding'> | ||
51 | payload: RunnerJobVideoEditionTranscodingPayload | ||
52 | privatePayload: RunnerJobVideoEditionTranscodingPrivatePayload | ||
46 | } | 53 | } |
47 | 54 | ||
48 | export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> { | 55 | export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> { |
@@ -62,6 +69,8 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S | |||
62 | }): Promise<MRunnerJob> { | 69 | }): Promise<MRunnerJob> { |
63 | const { priority, dependsOnRunnerJob } = options | 70 | const { priority, dependsOnRunnerJob } = options |
64 | 71 | ||
72 | logger.debug('Creating runner job', { options, ...this.lTags(options.type) }) | ||
73 | |||
65 | const runnerJob = new RunnerJobModel({ | 74 | const runnerJob = new RunnerJobModel({ |
66 | ...pick(options, [ 'type', 'payload', 'privatePayload' ]), | 75 | ...pick(options, [ 'type', 'payload', 'privatePayload' ]), |
67 | 76 | ||
diff --git a/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts index 517645848..a910ae383 100644 --- a/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts +++ b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts | |||
@@ -4,27 +4,19 @@ import { logger } from '@server/helpers/logger' | |||
4 | import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' | 4 | import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' |
5 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | 5 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' |
6 | import { MRunnerJob } from '@server/types/models/runners' | 6 | import { MRunnerJob } from '@server/types/models/runners' |
7 | import { | 7 | import { RunnerJobSuccessPayload, RunnerJobUpdatePayload, RunnerJobVODPrivatePayload } from '@shared/models' |
8 | LiveRTMPHLSTranscodingUpdatePayload, | ||
9 | RunnerJobSuccessPayload, | ||
10 | RunnerJobUpdatePayload, | ||
11 | RunnerJobVODPrivatePayload | ||
12 | } from '@shared/models' | ||
13 | import { AbstractJobHandler } from './abstract-job-handler' | 8 | import { AbstractJobHandler } from './abstract-job-handler' |
14 | import { loadTranscodingRunnerVideo } from './shared' | 9 | import { loadTranscodingRunnerVideo } from './shared' |
15 | 10 | ||
16 | // eslint-disable-next-line max-len | 11 | // eslint-disable-next-line max-len |
17 | export abstract class AbstractVODTranscodingJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> extends AbstractJobHandler<C, U, S> { | 12 | export abstract class AbstractVODTranscodingJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> extends AbstractJobHandler<C, U, S> { |
18 | 13 | ||
19 | // --------------------------------------------------------------------------- | ||
20 | |||
21 | protected isAbortSupported () { | 14 | protected isAbortSupported () { |
22 | return true | 15 | return true |
23 | } | 16 | } |
24 | 17 | ||
25 | protected specificUpdate (_options: { | 18 | protected specificUpdate (_options: { |
26 | runnerJob: MRunnerJob | 19 | runnerJob: MRunnerJob |
27 | updatePayload?: LiveRTMPHLSTranscodingUpdatePayload | ||
28 | }) { | 20 | }) { |
29 | // empty | 21 | // empty |
30 | } | 22 | } |
diff --git a/server/lib/runners/job-handlers/index.ts b/server/lib/runners/job-handlers/index.ts index 0fca72b9a..a40cee865 100644 --- a/server/lib/runners/job-handlers/index.ts +++ b/server/lib/runners/job-handlers/index.ts | |||
@@ -1,6 +1,7 @@ | |||
1 | export * from './abstract-job-handler' | 1 | export * from './abstract-job-handler' |
2 | export * from './live-rtmp-hls-transcoding-job-handler' | 2 | export * from './live-rtmp-hls-transcoding-job-handler' |
3 | export * from './runner-job-handlers' | ||
4 | export * from './video-edition-transcoding-job-handler' | ||
3 | export * from './vod-audio-merge-transcoding-job-handler' | 5 | export * from './vod-audio-merge-transcoding-job-handler' |
4 | export * from './vod-hls-transcoding-job-handler' | 6 | export * from './vod-hls-transcoding-job-handler' |
5 | export * from './vod-web-video-transcoding-job-handler' | 7 | export * from './vod-web-video-transcoding-job-handler' |
6 | export * from './runner-job-handlers' | ||
diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts index c3d0e427d..48a70d891 100644 --- a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts +++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts | |||
@@ -70,7 +70,7 @@ export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateO | |||
70 | 70 | ||
71 | // --------------------------------------------------------------------------- | 71 | // --------------------------------------------------------------------------- |
72 | 72 | ||
73 | async specificUpdate (options: { | 73 | protected async specificUpdate (options: { |
74 | runnerJob: MRunnerJob | 74 | runnerJob: MRunnerJob |
75 | updatePayload: LiveRTMPHLSTranscodingUpdatePayload | 75 | updatePayload: LiveRTMPHLSTranscodingUpdatePayload |
76 | }) { | 76 | }) { |
diff --git a/server/lib/runners/job-handlers/runner-job-handlers.ts b/server/lib/runners/job-handlers/runner-job-handlers.ts index 7bad1bc77..4ea6684ea 100644 --- a/server/lib/runners/job-handlers/runner-job-handlers.ts +++ b/server/lib/runners/job-handlers/runner-job-handlers.ts | |||
@@ -2,6 +2,7 @@ import { MRunnerJob } from '@server/types/models/runners' | |||
2 | import { RunnerJobSuccessPayload, RunnerJobType, RunnerJobUpdatePayload } from '@shared/models' | 2 | import { RunnerJobSuccessPayload, RunnerJobType, RunnerJobUpdatePayload } from '@shared/models' |
3 | import { AbstractJobHandler } from './abstract-job-handler' | 3 | import { AbstractJobHandler } from './abstract-job-handler' |
4 | import { LiveRTMPHLSTranscodingJobHandler } from './live-rtmp-hls-transcoding-job-handler' | 4 | import { LiveRTMPHLSTranscodingJobHandler } from './live-rtmp-hls-transcoding-job-handler' |
5 | import { VideoEditionTranscodingJobHandler } from './video-edition-transcoding-job-handler' | ||
5 | import { VODAudioMergeTranscodingJobHandler } from './vod-audio-merge-transcoding-job-handler' | 6 | import { VODAudioMergeTranscodingJobHandler } from './vod-audio-merge-transcoding-job-handler' |
6 | import { VODHLSTranscodingJobHandler } from './vod-hls-transcoding-job-handler' | 7 | import { VODHLSTranscodingJobHandler } from './vod-hls-transcoding-job-handler' |
7 | import { VODWebVideoTranscodingJobHandler } from './vod-web-video-transcoding-job-handler' | 8 | import { VODWebVideoTranscodingJobHandler } from './vod-web-video-transcoding-job-handler' |
@@ -10,7 +11,8 @@ const processors: Record<RunnerJobType, new() => AbstractJobHandler<unknown, Run | |||
10 | 'vod-web-video-transcoding': VODWebVideoTranscodingJobHandler, | 11 | 'vod-web-video-transcoding': VODWebVideoTranscodingJobHandler, |
11 | 'vod-hls-transcoding': VODHLSTranscodingJobHandler, | 12 | 'vod-hls-transcoding': VODHLSTranscodingJobHandler, |
12 | 'vod-audio-merge-transcoding': VODAudioMergeTranscodingJobHandler, | 13 | 'vod-audio-merge-transcoding': VODAudioMergeTranscodingJobHandler, |
13 | 'live-rtmp-hls-transcoding': LiveRTMPHLSTranscodingJobHandler | 14 | 'live-rtmp-hls-transcoding': LiveRTMPHLSTranscodingJobHandler, |
15 | 'video-edition-transcoding': VideoEditionTranscodingJobHandler | ||
14 | } | 16 | } |
15 | 17 | ||
16 | export function getRunnerJobHandlerClass (job: MRunnerJob) { | 18 | export function getRunnerJobHandlerClass (job: MRunnerJob) { |
diff --git a/server/lib/runners/job-handlers/video-edition-transcoding-job-handler.ts b/server/lib/runners/job-handlers/video-edition-transcoding-job-handler.ts new file mode 100644 index 000000000..39a755c48 --- /dev/null +++ b/server/lib/runners/job-handlers/video-edition-transcoding-job-handler.ts | |||
@@ -0,0 +1,157 @@ | |||
1 | |||
2 | import { basename } from 'path' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { onVideoEditionEnded, safeCleanupStudioTMPFiles } from '@server/lib/video-studio' | ||
5 | import { MVideo } from '@server/types/models' | ||
6 | import { MRunnerJob } from '@server/types/models/runners' | ||
7 | import { buildUUID } from '@shared/extra-utils' | ||
8 | import { | ||
9 | isVideoStudioTaskIntro, | ||
10 | isVideoStudioTaskOutro, | ||
11 | isVideoStudioTaskWatermark, | ||
12 | RunnerJobState, | ||
13 | RunnerJobUpdatePayload, | ||
14 | RunnerJobVideoEditionTranscodingPayload, | ||
15 | RunnerJobVideoEditionTranscodingPrivatePayload, | ||
16 | VideoEditionTranscodingSuccess, | ||
17 | VideoState, | ||
18 | VideoStudioTaskPayload | ||
19 | } from '@shared/models' | ||
20 | import { generateRunnerEditionTranscodingVideoInputFileUrl, generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls' | ||
21 | import { AbstractJobHandler } from './abstract-job-handler' | ||
22 | import { loadTranscodingRunnerVideo } from './shared' | ||
23 | |||
24 | type CreateOptions = { | ||
25 | video: MVideo | ||
26 | tasks: VideoStudioTaskPayload[] | ||
27 | priority: number | ||
28 | } | ||
29 | |||
30 | // eslint-disable-next-line max-len | ||
31 | export class VideoEditionTranscodingJobHandler extends AbstractJobHandler<CreateOptions, RunnerJobUpdatePayload, VideoEditionTranscodingSuccess> { | ||
32 | |||
33 | async create (options: CreateOptions) { | ||
34 | const { video, priority, tasks } = options | ||
35 | |||
36 | const jobUUID = buildUUID() | ||
37 | const payload: RunnerJobVideoEditionTranscodingPayload = { | ||
38 | input: { | ||
39 | videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid) | ||
40 | }, | ||
41 | tasks: tasks.map(t => { | ||
42 | if (isVideoStudioTaskIntro(t) || isVideoStudioTaskOutro(t)) { | ||
43 | return { | ||
44 | ...t, | ||
45 | |||
46 | options: { | ||
47 | ...t.options, | ||
48 | |||
49 | file: generateRunnerEditionTranscodingVideoInputFileUrl(jobUUID, video.uuid, basename(t.options.file)) | ||
50 | } | ||
51 | } | ||
52 | } | ||
53 | |||
54 | if (isVideoStudioTaskWatermark(t)) { | ||
55 | return { | ||
56 | ...t, | ||
57 | |||
58 | options: { | ||
59 | ...t.options, | ||
60 | |||
61 | file: generateRunnerEditionTranscodingVideoInputFileUrl(jobUUID, video.uuid, basename(t.options.file)) | ||
62 | } | ||
63 | } | ||
64 | } | ||
65 | |||
66 | return t | ||
67 | }) | ||
68 | } | ||
69 | |||
70 | const privatePayload: RunnerJobVideoEditionTranscodingPrivatePayload = { | ||
71 | videoUUID: video.uuid, | ||
72 | originalTasks: tasks | ||
73 | } | ||
74 | |||
75 | const job = await this.createRunnerJob({ | ||
76 | type: 'video-edition-transcoding', | ||
77 | jobUUID, | ||
78 | payload, | ||
79 | privatePayload, | ||
80 | priority | ||
81 | }) | ||
82 | |||
83 | return job | ||
84 | } | ||
85 | |||
86 | // --------------------------------------------------------------------------- | ||
87 | |||
88 | protected isAbortSupported () { | ||
89 | return true | ||
90 | } | ||
91 | |||
92 | protected specificUpdate (_options: { | ||
93 | runnerJob: MRunnerJob | ||
94 | }) { | ||
95 | // empty | ||
96 | } | ||
97 | |||
98 | protected specificAbort (_options: { | ||
99 | runnerJob: MRunnerJob | ||
100 | }) { | ||
101 | // empty | ||
102 | } | ||
103 | |||
104 | protected async specificComplete (options: { | ||
105 | runnerJob: MRunnerJob | ||
106 | resultPayload: VideoEditionTranscodingSuccess | ||
107 | }) { | ||
108 | const { runnerJob, resultPayload } = options | ||
109 | const privatePayload = runnerJob.privatePayload as RunnerJobVideoEditionTranscodingPrivatePayload | ||
110 | |||
111 | const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags) | ||
112 | if (!video) { | ||
113 | await safeCleanupStudioTMPFiles(privatePayload.originalTasks) | ||
114 | |||
115 | } | ||
116 | |||
117 | const videoFilePath = resultPayload.videoFile as string | ||
118 | |||
119 | await onVideoEditionEnded({ video, editionResultPath: videoFilePath, tasks: privatePayload.originalTasks }) | ||
120 | |||
121 | logger.info( | ||
122 | 'Runner video edition transcoding job %s for %s ended.', | ||
123 | runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid) | ||
124 | ) | ||
125 | } | ||
126 | |||
127 | protected specificError (options: { | ||
128 | runnerJob: MRunnerJob | ||
129 | nextState: RunnerJobState | ||
130 | }) { | ||
131 | if (options.nextState === RunnerJobState.ERRORED) { | ||
132 | return this.specificErrorOrCancel(options) | ||
133 | } | ||
134 | |||
135 | return Promise.resolve() | ||
136 | } | ||
137 | |||
138 | protected specificCancel (options: { | ||
139 | runnerJob: MRunnerJob | ||
140 | }) { | ||
141 | return this.specificErrorOrCancel(options) | ||
142 | } | ||
143 | |||
144 | private async specificErrorOrCancel (options: { | ||
145 | runnerJob: MRunnerJob | ||
146 | }) { | ||
147 | const { runnerJob } = options | ||
148 | |||
149 | const payload = runnerJob.privatePayload as RunnerJobVideoEditionTranscodingPrivatePayload | ||
150 | await safeCleanupStudioTMPFiles(payload.originalTasks) | ||
151 | |||
152 | const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags) | ||
153 | if (!video) return | ||
154 | |||
155 | return video.setNewState(VideoState.PUBLISHED, false, undefined) | ||
156 | } | ||
157 | } | ||
diff --git a/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts index a7b33f87e..5f247d792 100644 --- a/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts +++ b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts | |||
@@ -64,7 +64,7 @@ export class VODAudioMergeTranscodingJobHandler extends AbstractVODTranscodingJo | |||
64 | 64 | ||
65 | // --------------------------------------------------------------------------- | 65 | // --------------------------------------------------------------------------- |
66 | 66 | ||
67 | async specificComplete (options: { | 67 | protected async specificComplete (options: { |
68 | runnerJob: MRunnerJob | 68 | runnerJob: MRunnerJob |
69 | resultPayload: VODAudioMergeTranscodingSuccess | 69 | resultPayload: VODAudioMergeTranscodingSuccess |
70 | }) { | 70 | }) { |
diff --git a/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts index 02566b9d5..cc94bcbda 100644 --- a/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts +++ b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts | |||
@@ -71,7 +71,7 @@ export class VODHLSTranscodingJobHandler extends AbstractVODTranscodingJobHandle | |||
71 | 71 | ||
72 | // --------------------------------------------------------------------------- | 72 | // --------------------------------------------------------------------------- |
73 | 73 | ||
74 | async specificComplete (options: { | 74 | protected async specificComplete (options: { |
75 | runnerJob: MRunnerJob | 75 | runnerJob: MRunnerJob |
76 | resultPayload: VODHLSTranscodingSuccess | 76 | resultPayload: VODHLSTranscodingSuccess |
77 | }) { | 77 | }) { |
diff --git a/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts index 57761a7a1..663d3306e 100644 --- a/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts +++ b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts | |||
@@ -62,7 +62,7 @@ export class VODWebVideoTranscodingJobHandler extends AbstractVODTranscodingJobH | |||
62 | 62 | ||
63 | // --------------------------------------------------------------------------- | 63 | // --------------------------------------------------------------------------- |
64 | 64 | ||
65 | async specificComplete (options: { | 65 | protected async specificComplete (options: { |
66 | runnerJob: MRunnerJob | 66 | runnerJob: MRunnerJob |
67 | resultPayload: VODWebVideoTranscodingSuccess | 67 | resultPayload: VODWebVideoTranscodingSuccess |
68 | }) { | 68 | }) { |