aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/runners/job-handlers
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-05-04 15:29:34 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit5e47f6ab984a7d00782e4c7030afffa1ba480add (patch)
tree1ce586b591a8d71acbc301eba29b9a5e6490439e /server/lib/runners/job-handlers
parent6a4905602636afd6650c9e6f4d0fcc2105d91100 (diff)
downloadPeerTube-5e47f6ab984a7d00782e4c7030afffa1ba480add.tar.gz
PeerTube-5e47f6ab984a7d00782e4c7030afffa1ba480add.tar.zst
PeerTube-5e47f6ab984a7d00782e4c7030afffa1ba480add.zip
Support studio transcoding in peertube runner
Diffstat (limited to 'server/lib/runners/job-handlers')
-rw-r--r--server/lib/runners/job-handlers/abstract-job-handler.ts11
-rw-r--r--server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts10
-rw-r--r--server/lib/runners/job-handlers/index.ts3
-rw-r--r--server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts2
-rw-r--r--server/lib/runners/job-handlers/runner-job-handlers.ts4
-rw-r--r--server/lib/runners/job-handlers/video-edition-transcoding-job-handler.ts157
-rw-r--r--server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts2
-rw-r--r--server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts2
-rw-r--r--server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts2
9 files changed, 177 insertions, 16 deletions
diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts
index 74b455107..76fd1c5ac 100644
--- a/server/lib/runners/job-handlers/abstract-job-handler.ts
+++ b/server/lib/runners/job-handlers/abstract-job-handler.ts
@@ -1,3 +1,4 @@
1import { throttle } from 'lodash'
1import { retryTransactionWrapper } from '@server/helpers/database-utils' 2import { retryTransactionWrapper } from '@server/helpers/database-utils'
2import { logger, loggerTagsFactory } from '@server/helpers/logger' 3import { logger, loggerTagsFactory } from '@server/helpers/logger'
3import { RUNNER_JOBS } from '@server/initializers/constants' 4import { RUNNER_JOBS } from '@server/initializers/constants'
@@ -14,6 +15,8 @@ import {
14 RunnerJobSuccessPayload, 15 RunnerJobSuccessPayload,
15 RunnerJobType, 16 RunnerJobType,
16 RunnerJobUpdatePayload, 17 RunnerJobUpdatePayload,
18 RunnerJobVideoEditionTranscodingPayload,
19 RunnerJobVideoEditionTranscodingPrivatePayload,
17 RunnerJobVODAudioMergeTranscodingPayload, 20 RunnerJobVODAudioMergeTranscodingPayload,
18 RunnerJobVODAudioMergeTranscodingPrivatePayload, 21 RunnerJobVODAudioMergeTranscodingPrivatePayload,
19 RunnerJobVODHLSTranscodingPayload, 22 RunnerJobVODHLSTranscodingPayload,
@@ -21,7 +24,6 @@ import {
21 RunnerJobVODWebVideoTranscodingPayload, 24 RunnerJobVODWebVideoTranscodingPayload,
22 RunnerJobVODWebVideoTranscodingPrivatePayload 25 RunnerJobVODWebVideoTranscodingPrivatePayload
23} from '@shared/models' 26} from '@shared/models'
24import { throttle } from 'lodash'
25 27
26type CreateRunnerJobArg = 28type CreateRunnerJobArg =
27 { 29 {
@@ -43,6 +45,11 @@ type CreateRunnerJobArg =
43 type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'> 45 type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'>
44 payload: RunnerJobLiveRTMPHLSTranscodingPayload 46 payload: RunnerJobLiveRTMPHLSTranscodingPayload
45 privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload 47 privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload
48 } |
49 {
50 type: Extract<RunnerJobType, 'video-edition-transcoding'>
51 payload: RunnerJobVideoEditionTranscodingPayload
52 privatePayload: RunnerJobVideoEditionTranscodingPrivatePayload
46 } 53 }
47 54
48export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> { 55export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> {
@@ -62,6 +69,8 @@ export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S
62 }): Promise<MRunnerJob> { 69 }): Promise<MRunnerJob> {
63 const { priority, dependsOnRunnerJob } = options 70 const { priority, dependsOnRunnerJob } = options
64 71
72 logger.debug('Creating runner job', { options, ...this.lTags(options.type) })
73
65 const runnerJob = new RunnerJobModel({ 74 const runnerJob = new RunnerJobModel({
66 ...pick(options, [ 'type', 'payload', 'privatePayload' ]), 75 ...pick(options, [ 'type', 'payload', 'privatePayload' ]),
67 76
diff --git a/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts
index 517645848..a910ae383 100644
--- a/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts
+++ b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts
@@ -4,27 +4,19 @@ import { logger } from '@server/helpers/logger'
4import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' 4import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state'
5import { VideoJobInfoModel } from '@server/models/video/video-job-info' 5import { VideoJobInfoModel } from '@server/models/video/video-job-info'
6import { MRunnerJob } from '@server/types/models/runners' 6import { MRunnerJob } from '@server/types/models/runners'
7import { 7import { RunnerJobSuccessPayload, RunnerJobUpdatePayload, RunnerJobVODPrivatePayload } from '@shared/models'
8 LiveRTMPHLSTranscodingUpdatePayload,
9 RunnerJobSuccessPayload,
10 RunnerJobUpdatePayload,
11 RunnerJobVODPrivatePayload
12} from '@shared/models'
13import { AbstractJobHandler } from './abstract-job-handler' 8import { AbstractJobHandler } from './abstract-job-handler'
14import { loadTranscodingRunnerVideo } from './shared' 9import { loadTranscodingRunnerVideo } from './shared'
15 10
16// eslint-disable-next-line max-len 11// eslint-disable-next-line max-len
17export abstract class AbstractVODTranscodingJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> extends AbstractJobHandler<C, U, S> { 12export abstract class AbstractVODTranscodingJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> extends AbstractJobHandler<C, U, S> {
18 13
19 // ---------------------------------------------------------------------------
20
21 protected isAbortSupported () { 14 protected isAbortSupported () {
22 return true 15 return true
23 } 16 }
24 17
25 protected specificUpdate (_options: { 18 protected specificUpdate (_options: {
26 runnerJob: MRunnerJob 19 runnerJob: MRunnerJob
27 updatePayload?: LiveRTMPHLSTranscodingUpdatePayload
28 }) { 20 }) {
29 // empty 21 // empty
30 } 22 }
diff --git a/server/lib/runners/job-handlers/index.ts b/server/lib/runners/job-handlers/index.ts
index 0fca72b9a..a40cee865 100644
--- a/server/lib/runners/job-handlers/index.ts
+++ b/server/lib/runners/job-handlers/index.ts
@@ -1,6 +1,7 @@
1export * from './abstract-job-handler' 1export * from './abstract-job-handler'
2export * from './live-rtmp-hls-transcoding-job-handler' 2export * from './live-rtmp-hls-transcoding-job-handler'
3export * from './runner-job-handlers'
4export * from './video-edition-transcoding-job-handler'
3export * from './vod-audio-merge-transcoding-job-handler' 5export * from './vod-audio-merge-transcoding-job-handler'
4export * from './vod-hls-transcoding-job-handler' 6export * from './vod-hls-transcoding-job-handler'
5export * from './vod-web-video-transcoding-job-handler' 7export * from './vod-web-video-transcoding-job-handler'
6export * from './runner-job-handlers'
diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
index c3d0e427d..48a70d891 100644
--- a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
+++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
@@ -70,7 +70,7 @@ export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateO
70 70
71 // --------------------------------------------------------------------------- 71 // ---------------------------------------------------------------------------
72 72
73 async specificUpdate (options: { 73 protected async specificUpdate (options: {
74 runnerJob: MRunnerJob 74 runnerJob: MRunnerJob
75 updatePayload: LiveRTMPHLSTranscodingUpdatePayload 75 updatePayload: LiveRTMPHLSTranscodingUpdatePayload
76 }) { 76 }) {
diff --git a/server/lib/runners/job-handlers/runner-job-handlers.ts b/server/lib/runners/job-handlers/runner-job-handlers.ts
index 7bad1bc77..4ea6684ea 100644
--- a/server/lib/runners/job-handlers/runner-job-handlers.ts
+++ b/server/lib/runners/job-handlers/runner-job-handlers.ts
@@ -2,6 +2,7 @@ import { MRunnerJob } from '@server/types/models/runners'
2import { RunnerJobSuccessPayload, RunnerJobType, RunnerJobUpdatePayload } from '@shared/models' 2import { RunnerJobSuccessPayload, RunnerJobType, RunnerJobUpdatePayload } from '@shared/models'
3import { AbstractJobHandler } from './abstract-job-handler' 3import { AbstractJobHandler } from './abstract-job-handler'
4import { LiveRTMPHLSTranscodingJobHandler } from './live-rtmp-hls-transcoding-job-handler' 4import { LiveRTMPHLSTranscodingJobHandler } from './live-rtmp-hls-transcoding-job-handler'
5import { VideoEditionTranscodingJobHandler } from './video-edition-transcoding-job-handler'
5import { VODAudioMergeTranscodingJobHandler } from './vod-audio-merge-transcoding-job-handler' 6import { VODAudioMergeTranscodingJobHandler } from './vod-audio-merge-transcoding-job-handler'
6import { VODHLSTranscodingJobHandler } from './vod-hls-transcoding-job-handler' 7import { VODHLSTranscodingJobHandler } from './vod-hls-transcoding-job-handler'
7import { VODWebVideoTranscodingJobHandler } from './vod-web-video-transcoding-job-handler' 8import { VODWebVideoTranscodingJobHandler } from './vod-web-video-transcoding-job-handler'
@@ -10,7 +11,8 @@ const processors: Record<RunnerJobType, new() => AbstractJobHandler<unknown, Run
10 'vod-web-video-transcoding': VODWebVideoTranscodingJobHandler, 11 'vod-web-video-transcoding': VODWebVideoTranscodingJobHandler,
11 'vod-hls-transcoding': VODHLSTranscodingJobHandler, 12 'vod-hls-transcoding': VODHLSTranscodingJobHandler,
12 'vod-audio-merge-transcoding': VODAudioMergeTranscodingJobHandler, 13 'vod-audio-merge-transcoding': VODAudioMergeTranscodingJobHandler,
13 'live-rtmp-hls-transcoding': LiveRTMPHLSTranscodingJobHandler 14 'live-rtmp-hls-transcoding': LiveRTMPHLSTranscodingJobHandler,
15 'video-edition-transcoding': VideoEditionTranscodingJobHandler
14} 16}
15 17
16export function getRunnerJobHandlerClass (job: MRunnerJob) { 18export function getRunnerJobHandlerClass (job: MRunnerJob) {
diff --git a/server/lib/runners/job-handlers/video-edition-transcoding-job-handler.ts b/server/lib/runners/job-handlers/video-edition-transcoding-job-handler.ts
new file mode 100644
index 000000000..39a755c48
--- /dev/null
+++ b/server/lib/runners/job-handlers/video-edition-transcoding-job-handler.ts
@@ -0,0 +1,157 @@
1
2import { basename } from 'path'
3import { logger } from '@server/helpers/logger'
4import { onVideoEditionEnded, safeCleanupStudioTMPFiles } from '@server/lib/video-studio'
5import { MVideo } from '@server/types/models'
6import { MRunnerJob } from '@server/types/models/runners'
7import { buildUUID } from '@shared/extra-utils'
8import {
9 isVideoStudioTaskIntro,
10 isVideoStudioTaskOutro,
11 isVideoStudioTaskWatermark,
12 RunnerJobState,
13 RunnerJobUpdatePayload,
14 RunnerJobVideoEditionTranscodingPayload,
15 RunnerJobVideoEditionTranscodingPrivatePayload,
16 VideoEditionTranscodingSuccess,
17 VideoState,
18 VideoStudioTaskPayload
19} from '@shared/models'
20import { generateRunnerEditionTranscodingVideoInputFileUrl, generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls'
21import { AbstractJobHandler } from './abstract-job-handler'
22import { loadTranscodingRunnerVideo } from './shared'
23
24type CreateOptions = {
25 video: MVideo
26 tasks: VideoStudioTaskPayload[]
27 priority: number
28}
29
30// eslint-disable-next-line max-len
31export class VideoEditionTranscodingJobHandler extends AbstractJobHandler<CreateOptions, RunnerJobUpdatePayload, VideoEditionTranscodingSuccess> {
32
33 async create (options: CreateOptions) {
34 const { video, priority, tasks } = options
35
36 const jobUUID = buildUUID()
37 const payload: RunnerJobVideoEditionTranscodingPayload = {
38 input: {
39 videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid)
40 },
41 tasks: tasks.map(t => {
42 if (isVideoStudioTaskIntro(t) || isVideoStudioTaskOutro(t)) {
43 return {
44 ...t,
45
46 options: {
47 ...t.options,
48
49 file: generateRunnerEditionTranscodingVideoInputFileUrl(jobUUID, video.uuid, basename(t.options.file))
50 }
51 }
52 }
53
54 if (isVideoStudioTaskWatermark(t)) {
55 return {
56 ...t,
57
58 options: {
59 ...t.options,
60
61 file: generateRunnerEditionTranscodingVideoInputFileUrl(jobUUID, video.uuid, basename(t.options.file))
62 }
63 }
64 }
65
66 return t
67 })
68 }
69
70 const privatePayload: RunnerJobVideoEditionTranscodingPrivatePayload = {
71 videoUUID: video.uuid,
72 originalTasks: tasks
73 }
74
75 const job = await this.createRunnerJob({
76 type: 'video-edition-transcoding',
77 jobUUID,
78 payload,
79 privatePayload,
80 priority
81 })
82
83 return job
84 }
85
86 // ---------------------------------------------------------------------------
87
88 protected isAbortSupported () {
89 return true
90 }
91
92 protected specificUpdate (_options: {
93 runnerJob: MRunnerJob
94 }) {
95 // empty
96 }
97
98 protected specificAbort (_options: {
99 runnerJob: MRunnerJob
100 }) {
101 // empty
102 }
103
104 protected async specificComplete (options: {
105 runnerJob: MRunnerJob
106 resultPayload: VideoEditionTranscodingSuccess
107 }) {
108 const { runnerJob, resultPayload } = options
109 const privatePayload = runnerJob.privatePayload as RunnerJobVideoEditionTranscodingPrivatePayload
110
111 const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags)
112 if (!video) {
113 await safeCleanupStudioTMPFiles(privatePayload.originalTasks)
114
115 }
116
117 const videoFilePath = resultPayload.videoFile as string
118
119 await onVideoEditionEnded({ video, editionResultPath: videoFilePath, tasks: privatePayload.originalTasks })
120
121 logger.info(
122 'Runner video edition transcoding job %s for %s ended.',
123 runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid)
124 )
125 }
126
127 protected specificError (options: {
128 runnerJob: MRunnerJob
129 nextState: RunnerJobState
130 }) {
131 if (options.nextState === RunnerJobState.ERRORED) {
132 return this.specificErrorOrCancel(options)
133 }
134
135 return Promise.resolve()
136 }
137
138 protected specificCancel (options: {
139 runnerJob: MRunnerJob
140 }) {
141 return this.specificErrorOrCancel(options)
142 }
143
144 private async specificErrorOrCancel (options: {
145 runnerJob: MRunnerJob
146 }) {
147 const { runnerJob } = options
148
149 const payload = runnerJob.privatePayload as RunnerJobVideoEditionTranscodingPrivatePayload
150 await safeCleanupStudioTMPFiles(payload.originalTasks)
151
152 const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags)
153 if (!video) return
154
155 return video.setNewState(VideoState.PUBLISHED, false, undefined)
156 }
157}
diff --git a/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts
index a7b33f87e..5f247d792 100644
--- a/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts
+++ b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts
@@ -64,7 +64,7 @@ export class VODAudioMergeTranscodingJobHandler extends AbstractVODTranscodingJo
64 64
65 // --------------------------------------------------------------------------- 65 // ---------------------------------------------------------------------------
66 66
67 async specificComplete (options: { 67 protected async specificComplete (options: {
68 runnerJob: MRunnerJob 68 runnerJob: MRunnerJob
69 resultPayload: VODAudioMergeTranscodingSuccess 69 resultPayload: VODAudioMergeTranscodingSuccess
70 }) { 70 }) {
diff --git a/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts
index 02566b9d5..cc94bcbda 100644
--- a/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts
+++ b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts
@@ -71,7 +71,7 @@ export class VODHLSTranscodingJobHandler extends AbstractVODTranscodingJobHandle
71 71
72 // --------------------------------------------------------------------------- 72 // ---------------------------------------------------------------------------
73 73
74 async specificComplete (options: { 74 protected async specificComplete (options: {
75 runnerJob: MRunnerJob 75 runnerJob: MRunnerJob
76 resultPayload: VODHLSTranscodingSuccess 76 resultPayload: VODHLSTranscodingSuccess
77 }) { 77 }) {
diff --git a/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts
index 57761a7a1..663d3306e 100644
--- a/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts
+++ b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts
@@ -62,7 +62,7 @@ export class VODWebVideoTranscodingJobHandler extends AbstractVODTranscodingJobH
62 62
63 // --------------------------------------------------------------------------- 63 // ---------------------------------------------------------------------------
64 64
65 async specificComplete (options: { 65 protected async specificComplete (options: {
66 runnerJob: MRunnerJob 66 runnerJob: MRunnerJob
67 resultPayload: VODWebVideoTranscodingSuccess 67 resultPayload: VODWebVideoTranscodingSuccess
68 }) { 68 }) {