aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/runners/job-handlers
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-07-31 14:34:36 +0200
committerChocobozzz <me@florianbigard.com>2023-08-11 15:02:33 +0200
commit3a4992633ee62d5edfbb484d9c6bcb3cf158489d (patch)
treee4510b39bdac9c318fdb4b47018d08f15368b8f0 /server/lib/runners/job-handlers
parent04d1da5621d25d59bd5fa1543b725c497bf5d9a8 (diff)
downloadPeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.gz
PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.zst
PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.zip
Migrate server to ESM
Sorry for the very big commit that may lead to git log issues and merge conflicts, but it's a major step forward: * Server can be faster at startup because imports() are async and we can easily lazy import big modules * Angular doesn't seem to support ES import (with .js extension), so we had to correctly organize peertube into a monorepo: * Use yarn workspace feature * Use typescript reference projects for dependencies * Shared projects have been moved into "packages", each one is now a node module (with a dedicated package.json/tsconfig.json) * server/tools have been moved into apps/ and is now a dedicated app bundled and published on NPM so users don't have to build peertube cli tools manually * server/tests have been moved into packages/ so we don't compile them every time we want to run the server * Use isolatedModule option: * Had to move from const enum to const (https://www.typescriptlang.org/docs/handbook/enums.html#objects-vs-enums) * Had to explictely specify "type" imports when used in decorators * Prefer tsx (that uses esbuild under the hood) instead of ts-node to load typescript files (tests with mocha or scripts): * To reduce test complexity as esbuild doesn't support decorator metadata, we only test server files that do not import server models * We still build tests files into js files for a faster CI * Remove unmaintained peertube CLI import script * Removed some barrels to speed up execution (less imports)
Diffstat (limited to 'server/lib/runners/job-handlers')
-rw-r--r--server/lib/runners/job-handlers/abstract-job-handler.ts269
-rw-r--r--server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts66
-rw-r--r--server/lib/runners/job-handlers/index.ts7
-rw-r--r--server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts173
-rw-r--r--server/lib/runners/job-handlers/runner-job-handlers.ts20
-rw-r--r--server/lib/runners/job-handlers/shared/index.ts1
-rw-r--r--server/lib/runners/job-handlers/shared/vod-helpers.ts44
-rw-r--r--server/lib/runners/job-handlers/video-studio-transcoding-job-handler.ts157
-rw-r--r--server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts97
-rw-r--r--server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts114
-rw-r--r--server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts84
11 files changed, 0 insertions, 1032 deletions
diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts
deleted file mode 100644
index 329977de1..000000000
--- a/server/lib/runners/job-handlers/abstract-job-handler.ts
+++ /dev/null
@@ -1,269 +0,0 @@
1import { throttle } from 'lodash'
2import { saveInTransactionWithRetries } from '@server/helpers/database-utils'
3import { logger, loggerTagsFactory } from '@server/helpers/logger'
4import { RUNNER_JOBS } from '@server/initializers/constants'
5import { sequelizeTypescript } from '@server/initializers/database'
6import { PeerTubeSocket } from '@server/lib/peertube-socket'
7import { RunnerJobModel } from '@server/models/runner/runner-job'
8import { setAsUpdated } from '@server/models/shared'
9import { MRunnerJob } from '@server/types/models/runners'
10import { pick } from '@shared/core-utils'
11import {
12 RunnerJobLiveRTMPHLSTranscodingPayload,
13 RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
14 RunnerJobState,
15 RunnerJobStudioTranscodingPayload,
16 RunnerJobSuccessPayload,
17 RunnerJobType,
18 RunnerJobUpdatePayload,
19 RunnerJobVideoStudioTranscodingPrivatePayload,
20 RunnerJobVODAudioMergeTranscodingPayload,
21 RunnerJobVODAudioMergeTranscodingPrivatePayload,
22 RunnerJobVODHLSTranscodingPayload,
23 RunnerJobVODHLSTranscodingPrivatePayload,
24 RunnerJobVODWebVideoTranscodingPayload,
25 RunnerJobVODWebVideoTranscodingPrivatePayload
26} from '@shared/models'
27
28type CreateRunnerJobArg =
29 {
30 type: Extract<RunnerJobType, 'vod-web-video-transcoding'>
31 payload: RunnerJobVODWebVideoTranscodingPayload
32 privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload
33 } |
34 {
35 type: Extract<RunnerJobType, 'vod-hls-transcoding'>
36 payload: RunnerJobVODHLSTranscodingPayload
37 privatePayload: RunnerJobVODHLSTranscodingPrivatePayload
38 } |
39 {
40 type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'>
41 payload: RunnerJobVODAudioMergeTranscodingPayload
42 privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload
43 } |
44 {
45 type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'>
46 payload: RunnerJobLiveRTMPHLSTranscodingPayload
47 privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload
48 } |
49 {
50 type: Extract<RunnerJobType, 'video-studio-transcoding'>
51 payload: RunnerJobStudioTranscodingPayload
52 privatePayload: RunnerJobVideoStudioTranscodingPrivatePayload
53 }
54
55export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> {
56
57 protected readonly lTags = loggerTagsFactory('runner')
58
59 static setJobAsUpdatedThrottled = throttle(setAsUpdated, 2000)
60
61 // ---------------------------------------------------------------------------
62
63 abstract create (options: C): Promise<MRunnerJob>
64
65 protected async createRunnerJob (options: CreateRunnerJobArg & {
66 jobUUID: string
67 priority: number
68 dependsOnRunnerJob?: MRunnerJob
69 }): Promise<MRunnerJob> {
70 const { priority, dependsOnRunnerJob } = options
71
72 logger.debug('Creating runner job', { options, ...this.lTags(options.type) })
73
74 const runnerJob = new RunnerJobModel({
75 ...pick(options, [ 'type', 'payload', 'privatePayload' ]),
76
77 uuid: options.jobUUID,
78
79 state: dependsOnRunnerJob
80 ? RunnerJobState.WAITING_FOR_PARENT_JOB
81 : RunnerJobState.PENDING,
82
83 dependsOnRunnerJobId: dependsOnRunnerJob?.id,
84
85 priority
86 })
87
88 const job = await sequelizeTypescript.transaction(async transaction => {
89 return runnerJob.save({ transaction })
90 })
91
92 if (runnerJob.state === RunnerJobState.PENDING) {
93 PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
94 }
95
96 return job
97 }
98
99 // ---------------------------------------------------------------------------
100
101 protected abstract specificUpdate (options: {
102 runnerJob: MRunnerJob
103 updatePayload?: U
104 }): Promise<void> | void
105
106 async update (options: {
107 runnerJob: MRunnerJob
108 progress?: number
109 updatePayload?: U
110 }) {
111 const { runnerJob, progress } = options
112
113 await this.specificUpdate(options)
114
115 if (progress) runnerJob.progress = progress
116
117 if (!runnerJob.changed()) {
118 try {
119 await AbstractJobHandler.setJobAsUpdatedThrottled({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id })
120 } catch (err) {
121 logger.warn('Cannot set remote job as updated', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
122 }
123
124 return
125 }
126
127 await saveInTransactionWithRetries(runnerJob)
128 }
129
130 // ---------------------------------------------------------------------------
131
132 async complete (options: {
133 runnerJob: MRunnerJob
134 resultPayload: S
135 }) {
136 const { runnerJob } = options
137
138 runnerJob.state = RunnerJobState.COMPLETING
139 await saveInTransactionWithRetries(runnerJob)
140
141 try {
142 await this.specificComplete(options)
143
144 runnerJob.state = RunnerJobState.COMPLETED
145 } catch (err) {
146 logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
147
148 runnerJob.state = RunnerJobState.ERRORED
149 runnerJob.error = err.message
150 }
151
152 runnerJob.progress = null
153 runnerJob.finishedAt = new Date()
154
155 await saveInTransactionWithRetries(runnerJob)
156
157 const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob)
158
159 if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
160 }
161
162 protected abstract specificComplete (options: {
163 runnerJob: MRunnerJob
164 resultPayload: S
165 }): Promise<void> | void
166
167 // ---------------------------------------------------------------------------
168
169 async cancel (options: {
170 runnerJob: MRunnerJob
171 fromParent?: boolean
172 }) {
173 const { runnerJob, fromParent } = options
174
175 await this.specificCancel(options)
176
177 const cancelState = fromParent
178 ? RunnerJobState.PARENT_CANCELLED
179 : RunnerJobState.CANCELLED
180
181 runnerJob.setToErrorOrCancel(cancelState)
182
183 await saveInTransactionWithRetries(runnerJob)
184
185 const children = await RunnerJobModel.listChildrenOf(runnerJob)
186 for (const child of children) {
187 logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid))
188
189 await this.cancel({ runnerJob: child, fromParent: true })
190 }
191 }
192
193 protected abstract specificCancel (options: {
194 runnerJob: MRunnerJob
195 }): Promise<void> | void
196
197 // ---------------------------------------------------------------------------
198
199 protected abstract isAbortSupported (): boolean
200
201 async abort (options: {
202 runnerJob: MRunnerJob
203 }) {
204 const { runnerJob } = options
205
206 if (this.isAbortSupported() !== true) {
207 return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' })
208 }
209
210 await this.specificAbort(options)
211
212 runnerJob.resetToPending()
213
214 await saveInTransactionWithRetries(runnerJob)
215 }
216
217 protected setAbortState (runnerJob: MRunnerJob) {
218 runnerJob.resetToPending()
219 }
220
221 protected abstract specificAbort (options: {
222 runnerJob: MRunnerJob
223 }): Promise<void> | void
224
225 // ---------------------------------------------------------------------------
226
227 async error (options: {
228 runnerJob: MRunnerJob
229 message: string
230 fromParent?: boolean
231 }) {
232 const { runnerJob, message, fromParent } = options
233
234 const errorState = fromParent
235 ? RunnerJobState.PARENT_ERRORED
236 : RunnerJobState.ERRORED
237
238 const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES
239 ? RunnerJobState.PENDING
240 : errorState
241
242 await this.specificError({ ...options, nextState })
243
244 if (nextState === errorState) {
245 runnerJob.setToErrorOrCancel(nextState)
246 runnerJob.error = message
247 } else {
248 runnerJob.resetToPending()
249 }
250
251 await saveInTransactionWithRetries(runnerJob)
252
253 if (runnerJob.state === errorState) {
254 const children = await RunnerJobModel.listChildrenOf(runnerJob)
255
256 for (const child of children) {
257 logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid))
258
259 await this.error({ runnerJob: child, message: 'Parent error', fromParent: true })
260 }
261 }
262 }
263
264 protected abstract specificError (options: {
265 runnerJob: MRunnerJob
266 message: string
267 nextState: RunnerJobState
268 }): Promise<void> | void
269}
diff --git a/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts
deleted file mode 100644
index f425828d9..000000000
--- a/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts
+++ /dev/null
@@ -1,66 +0,0 @@
1
2import { retryTransactionWrapper } from '@server/helpers/database-utils'
3import { logger } from '@server/helpers/logger'
4import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state'
5import { VideoJobInfoModel } from '@server/models/video/video-job-info'
6import { MRunnerJob } from '@server/types/models/runners'
7import { RunnerJobState, RunnerJobSuccessPayload, RunnerJobUpdatePayload, RunnerJobVODPrivatePayload } from '@shared/models'
8import { AbstractJobHandler } from './abstract-job-handler'
9import { loadTranscodingRunnerVideo } from './shared'
10
11// eslint-disable-next-line max-len
12export abstract class AbstractVODTranscodingJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> extends AbstractJobHandler<C, U, S> {
13
14 protected isAbortSupported () {
15 return true
16 }
17
18 protected specificUpdate (_options: {
19 runnerJob: MRunnerJob
20 }) {
21 // empty
22 }
23
24 protected specificAbort (_options: {
25 runnerJob: MRunnerJob
26 }) {
27 // empty
28 }
29
30 protected async specificError (options: {
31 runnerJob: MRunnerJob
32 nextState: RunnerJobState
33 }) {
34 if (options.nextState !== RunnerJobState.ERRORED) return
35
36 const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags)
37 if (!video) return
38
39 await moveToFailedTranscodingState(video)
40
41 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
42 }
43
44 protected async specificCancel (options: {
45 runnerJob: MRunnerJob
46 }) {
47 const { runnerJob } = options
48
49 const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags)
50 if (!video) return
51
52 const pending = await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
53
54 logger.debug(`Pending transcode decreased to ${pending} after cancel`, this.lTags(video.uuid))
55
56 if (pending === 0) {
57 logger.info(
58 `All transcoding jobs of ${video.uuid} have been processed or canceled, moving it to its next state`,
59 this.lTags(video.uuid)
60 )
61
62 const privatePayload = runnerJob.privatePayload as RunnerJobVODPrivatePayload
63 await retryTransactionWrapper(moveToNextState, { video, isNewVideo: privatePayload.isNewVideo })
64 }
65 }
66}
diff --git a/server/lib/runners/job-handlers/index.ts b/server/lib/runners/job-handlers/index.ts
deleted file mode 100644
index 40ad2f97a..000000000
--- a/server/lib/runners/job-handlers/index.ts
+++ /dev/null
@@ -1,7 +0,0 @@
1export * from './abstract-job-handler'
2export * from './live-rtmp-hls-transcoding-job-handler'
3export * from './runner-job-handlers'
4export * from './video-studio-transcoding-job-handler'
5export * from './vod-audio-merge-transcoding-job-handler'
6export * from './vod-hls-transcoding-job-handler'
7export * from './vod-web-video-transcoding-job-handler'
diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
deleted file mode 100644
index 6b2894f8c..000000000
--- a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
+++ /dev/null
@@ -1,173 +0,0 @@
1import { move, remove } from 'fs-extra'
2import { join } from 'path'
3import { logger } from '@server/helpers/logger'
4import { JOB_PRIORITY } from '@server/initializers/constants'
5import { LiveManager } from '@server/lib/live'
6import { MStreamingPlaylist, MVideo } from '@server/types/models'
7import { MRunnerJob } from '@server/types/models/runners'
8import { buildUUID } from '@shared/extra-utils'
9import {
10 LiveRTMPHLSTranscodingSuccess,
11 LiveRTMPHLSTranscodingUpdatePayload,
12 LiveVideoError,
13 RunnerJobLiveRTMPHLSTranscodingPayload,
14 RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
15 RunnerJobState
16} from '@shared/models'
17import { AbstractJobHandler } from './abstract-job-handler'
18
19type CreateOptions = {
20 video: MVideo
21 playlist: MStreamingPlaylist
22
23 sessionId: string
24 rtmpUrl: string
25
26 toTranscode: {
27 resolution: number
28 fps: number
29 }[]
30
31 segmentListSize: number
32 segmentDuration: number
33
34 outputDirectory: string
35}
36
37// eslint-disable-next-line max-len
38export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateOptions, LiveRTMPHLSTranscodingUpdatePayload, LiveRTMPHLSTranscodingSuccess> {
39
40 async create (options: CreateOptions) {
41 const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory, sessionId } = options
42
43 const jobUUID = buildUUID()
44 const payload: RunnerJobLiveRTMPHLSTranscodingPayload = {
45 input: {
46 rtmpUrl
47 },
48 output: {
49 toTranscode,
50 segmentListSize,
51 segmentDuration
52 }
53 }
54
55 const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = {
56 videoUUID: video.uuid,
57 masterPlaylistName: playlist.playlistFilename,
58 sessionId,
59 outputDirectory
60 }
61
62 const job = await this.createRunnerJob({
63 type: 'live-rtmp-hls-transcoding',
64 jobUUID,
65 payload,
66 privatePayload,
67 priority: JOB_PRIORITY.TRANSCODING
68 })
69
70 return job
71 }
72
73 // ---------------------------------------------------------------------------
74
75 protected async specificUpdate (options: {
76 runnerJob: MRunnerJob
77 updatePayload: LiveRTMPHLSTranscodingUpdatePayload
78 }) {
79 const { runnerJob, updatePayload } = options
80
81 const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload
82 const outputDirectory = privatePayload.outputDirectory
83 const videoUUID = privatePayload.videoUUID
84
85 // Always process the chunk first before moving m3u8 that references this chunk
86 if (updatePayload.type === 'add-chunk') {
87 await move(
88 updatePayload.videoChunkFile as string,
89 join(outputDirectory, updatePayload.videoChunkFilename),
90 { overwrite: true }
91 )
92 } else if (updatePayload.type === 'remove-chunk') {
93 await remove(join(outputDirectory, updatePayload.videoChunkFilename))
94 }
95
96 if (updatePayload.resolutionPlaylistFile && updatePayload.resolutionPlaylistFilename) {
97 await move(
98 updatePayload.resolutionPlaylistFile as string,
99 join(outputDirectory, updatePayload.resolutionPlaylistFilename),
100 { overwrite: true }
101 )
102 }
103
104 if (updatePayload.masterPlaylistFile) {
105 await move(updatePayload.masterPlaylistFile as string, join(outputDirectory, privatePayload.masterPlaylistName), { overwrite: true })
106 }
107
108 logger.debug(
109 'Runner live RTMP to HLS job %s for %s updated.',
110 runnerJob.uuid, videoUUID, { updatePayload, ...this.lTags(videoUUID, runnerJob.uuid) }
111 )
112 }
113
114 // ---------------------------------------------------------------------------
115
116 protected specificComplete (options: {
117 runnerJob: MRunnerJob
118 }) {
119 return this.stopLive({
120 runnerJob: options.runnerJob,
121 type: 'ended'
122 })
123 }
124
125 // ---------------------------------------------------------------------------
126
127 protected isAbortSupported () {
128 return false
129 }
130
131 protected specificAbort () {
132 throw new Error('Not implemented')
133 }
134
135 protected specificError (options: {
136 runnerJob: MRunnerJob
137 nextState: RunnerJobState
138 }) {
139 return this.stopLive({
140 runnerJob: options.runnerJob,
141 type: 'errored'
142 })
143 }
144
145 protected specificCancel (options: {
146 runnerJob: MRunnerJob
147 }) {
148 return this.stopLive({
149 runnerJob: options.runnerJob,
150 type: 'cancelled'
151 })
152 }
153
154 private stopLive (options: {
155 runnerJob: MRunnerJob
156 type: 'ended' | 'errored' | 'cancelled'
157 }) {
158 const { runnerJob, type } = options
159
160 const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload
161 const videoUUID = privatePayload.videoUUID
162
163 const errorType = {
164 ended: null,
165 errored: LiveVideoError.RUNNER_JOB_ERROR,
166 cancelled: LiveVideoError.RUNNER_JOB_CANCEL
167 }
168
169 LiveManager.Instance.stopSessionOf(privatePayload.videoUUID, errorType[type])
170
171 logger.info('Runner live RTMP to HLS job %s for video %s %s.', runnerJob.uuid, videoUUID, type, this.lTags(runnerJob.uuid, videoUUID))
172 }
173}
diff --git a/server/lib/runners/job-handlers/runner-job-handlers.ts b/server/lib/runners/job-handlers/runner-job-handlers.ts
deleted file mode 100644
index 85551c365..000000000
--- a/server/lib/runners/job-handlers/runner-job-handlers.ts
+++ /dev/null
@@ -1,20 +0,0 @@
1import { MRunnerJob } from '@server/types/models/runners'
2import { RunnerJobSuccessPayload, RunnerJobType, RunnerJobUpdatePayload } from '@shared/models'
3import { AbstractJobHandler } from './abstract-job-handler'
4import { LiveRTMPHLSTranscodingJobHandler } from './live-rtmp-hls-transcoding-job-handler'
5import { VideoStudioTranscodingJobHandler } from './video-studio-transcoding-job-handler'
6import { VODAudioMergeTranscodingJobHandler } from './vod-audio-merge-transcoding-job-handler'
7import { VODHLSTranscodingJobHandler } from './vod-hls-transcoding-job-handler'
8import { VODWebVideoTranscodingJobHandler } from './vod-web-video-transcoding-job-handler'
9
10const processors: Record<RunnerJobType, new() => AbstractJobHandler<unknown, RunnerJobUpdatePayload, RunnerJobSuccessPayload>> = {
11 'vod-web-video-transcoding': VODWebVideoTranscodingJobHandler,
12 'vod-hls-transcoding': VODHLSTranscodingJobHandler,
13 'vod-audio-merge-transcoding': VODAudioMergeTranscodingJobHandler,
14 'live-rtmp-hls-transcoding': LiveRTMPHLSTranscodingJobHandler,
15 'video-studio-transcoding': VideoStudioTranscodingJobHandler
16}
17
18export function getRunnerJobHandlerClass (job: MRunnerJob) {
19 return processors[job.type]
20}
diff --git a/server/lib/runners/job-handlers/shared/index.ts b/server/lib/runners/job-handlers/shared/index.ts
deleted file mode 100644
index 348273ae2..000000000
--- a/server/lib/runners/job-handlers/shared/index.ts
+++ /dev/null
@@ -1 +0,0 @@
1export * from './vod-helpers'
diff --git a/server/lib/runners/job-handlers/shared/vod-helpers.ts b/server/lib/runners/job-handlers/shared/vod-helpers.ts
deleted file mode 100644
index 1a2ad02ca..000000000
--- a/server/lib/runners/job-handlers/shared/vod-helpers.ts
+++ /dev/null
@@ -1,44 +0,0 @@
1import { move } from 'fs-extra'
2import { dirname, join } from 'path'
3import { logger, LoggerTagsFn } from '@server/helpers/logger'
4import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding'
5import { onWebVideoFileTranscoding } from '@server/lib/transcoding/web-transcoding'
6import { buildNewFile } from '@server/lib/video-file'
7import { VideoModel } from '@server/models/video/video'
8import { MVideoFullLight } from '@server/types/models'
9import { MRunnerJob } from '@server/types/models/runners'
10import { RunnerJobVODAudioMergeTranscodingPrivatePayload, RunnerJobVODWebVideoTranscodingPrivatePayload } from '@shared/models'
11
12export async function onVODWebVideoOrAudioMergeTranscodingJob (options: {
13 video: MVideoFullLight
14 videoFilePath: string
15 privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | RunnerJobVODAudioMergeTranscodingPrivatePayload
16}) {
17 const { video, videoFilePath, privatePayload } = options
18
19 const videoFile = await buildNewFile({ path: videoFilePath, mode: 'web-video' })
20 videoFile.videoId = video.id
21
22 const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename)
23 await move(videoFilePath, newVideoFilePath)
24
25 await onWebVideoFileTranscoding({
26 video,
27 videoFile,
28 videoOutputPath: newVideoFilePath
29 })
30
31 await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video })
32}
33
34export async function loadTranscodingRunnerVideo (runnerJob: MRunnerJob, lTags: LoggerTagsFn) {
35 const videoUUID = runnerJob.privatePayload.videoUUID
36
37 const video = await VideoModel.loadFull(videoUUID)
38 if (!video) {
39 logger.info('Video %s does not exist anymore after transcoding runner job.', videoUUID, lTags(videoUUID))
40 return undefined
41 }
42
43 return video
44}
diff --git a/server/lib/runners/job-handlers/video-studio-transcoding-job-handler.ts b/server/lib/runners/job-handlers/video-studio-transcoding-job-handler.ts
deleted file mode 100644
index f604382b7..000000000
--- a/server/lib/runners/job-handlers/video-studio-transcoding-job-handler.ts
+++ /dev/null
@@ -1,157 +0,0 @@
1
2import { basename } from 'path'
3import { logger } from '@server/helpers/logger'
4import { onVideoStudioEnded, safeCleanupStudioTMPFiles } from '@server/lib/video-studio'
5import { MVideo } from '@server/types/models'
6import { MRunnerJob } from '@server/types/models/runners'
7import { buildUUID } from '@shared/extra-utils'
8import {
9 isVideoStudioTaskIntro,
10 isVideoStudioTaskOutro,
11 isVideoStudioTaskWatermark,
12 RunnerJobState,
13 RunnerJobUpdatePayload,
14 RunnerJobStudioTranscodingPayload,
15 RunnerJobVideoStudioTranscodingPrivatePayload,
16 VideoStudioTranscodingSuccess,
17 VideoState,
18 VideoStudioTaskPayload
19} from '@shared/models'
20import { generateRunnerEditionTranscodingVideoInputFileUrl, generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls'
21import { AbstractJobHandler } from './abstract-job-handler'
22import { loadTranscodingRunnerVideo } from './shared'
23
24type CreateOptions = {
25 video: MVideo
26 tasks: VideoStudioTaskPayload[]
27 priority: number
28}
29
30// eslint-disable-next-line max-len
31export class VideoStudioTranscodingJobHandler extends AbstractJobHandler<CreateOptions, RunnerJobUpdatePayload, VideoStudioTranscodingSuccess> {
32
33 async create (options: CreateOptions) {
34 const { video, priority, tasks } = options
35
36 const jobUUID = buildUUID()
37 const payload: RunnerJobStudioTranscodingPayload = {
38 input: {
39 videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid)
40 },
41 tasks: tasks.map(t => {
42 if (isVideoStudioTaskIntro(t) || isVideoStudioTaskOutro(t)) {
43 return {
44 ...t,
45
46 options: {
47 ...t.options,
48
49 file: generateRunnerEditionTranscodingVideoInputFileUrl(jobUUID, video.uuid, basename(t.options.file))
50 }
51 }
52 }
53
54 if (isVideoStudioTaskWatermark(t)) {
55 return {
56 ...t,
57
58 options: {
59 ...t.options,
60
61 file: generateRunnerEditionTranscodingVideoInputFileUrl(jobUUID, video.uuid, basename(t.options.file))
62 }
63 }
64 }
65
66 return t
67 })
68 }
69
70 const privatePayload: RunnerJobVideoStudioTranscodingPrivatePayload = {
71 videoUUID: video.uuid,
72 originalTasks: tasks
73 }
74
75 const job = await this.createRunnerJob({
76 type: 'video-studio-transcoding',
77 jobUUID,
78 payload,
79 privatePayload,
80 priority
81 })
82
83 return job
84 }
85
86 // ---------------------------------------------------------------------------
87
88 protected isAbortSupported () {
89 return true
90 }
91
92 protected specificUpdate (_options: {
93 runnerJob: MRunnerJob
94 }) {
95 // empty
96 }
97
98 protected specificAbort (_options: {
99 runnerJob: MRunnerJob
100 }) {
101 // empty
102 }
103
104 protected async specificComplete (options: {
105 runnerJob: MRunnerJob
106 resultPayload: VideoStudioTranscodingSuccess
107 }) {
108 const { runnerJob, resultPayload } = options
109 const privatePayload = runnerJob.privatePayload as RunnerJobVideoStudioTranscodingPrivatePayload
110
111 const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags)
112 if (!video) {
113 await safeCleanupStudioTMPFiles(privatePayload.originalTasks)
114
115 }
116
117 const videoFilePath = resultPayload.videoFile as string
118
119 await onVideoStudioEnded({ video, editionResultPath: videoFilePath, tasks: privatePayload.originalTasks })
120
121 logger.info(
122 'Runner video edition transcoding job %s for %s ended.',
123 runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid)
124 )
125 }
126
127 protected specificError (options: {
128 runnerJob: MRunnerJob
129 nextState: RunnerJobState
130 }) {
131 if (options.nextState === RunnerJobState.ERRORED) {
132 return this.specificErrorOrCancel(options)
133 }
134
135 return Promise.resolve()
136 }
137
138 protected specificCancel (options: {
139 runnerJob: MRunnerJob
140 }) {
141 return this.specificErrorOrCancel(options)
142 }
143
144 private async specificErrorOrCancel (options: {
145 runnerJob: MRunnerJob
146 }) {
147 const { runnerJob } = options
148
149 const payload = runnerJob.privatePayload as RunnerJobVideoStudioTranscodingPrivatePayload
150 await safeCleanupStudioTMPFiles(payload.originalTasks)
151
152 const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags)
153 if (!video) return
154
155 return video.setNewState(VideoState.PUBLISHED, false, undefined)
156 }
157}
diff --git a/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts
deleted file mode 100644
index 137a94535..000000000
--- a/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts
+++ /dev/null
@@ -1,97 +0,0 @@
1import { logger } from '@server/helpers/logger'
2import { VideoJobInfoModel } from '@server/models/video/video-job-info'
3import { MVideo } from '@server/types/models'
4import { MRunnerJob } from '@server/types/models/runners'
5import { pick } from '@shared/core-utils'
6import { buildUUID } from '@shared/extra-utils'
7import { getVideoStreamDuration } from '@shared/ffmpeg'
8import {
9 RunnerJobUpdatePayload,
10 RunnerJobVODAudioMergeTranscodingPayload,
11 RunnerJobVODWebVideoTranscodingPrivatePayload,
12 VODAudioMergeTranscodingSuccess
13} from '@shared/models'
14import { generateRunnerTranscodingVideoInputFileUrl, generateRunnerTranscodingVideoPreviewFileUrl } from '../runner-urls'
15import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler'
16import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared'
17
18type CreateOptions = {
19 video: MVideo
20 isNewVideo: boolean
21 resolution: number
22 fps: number
23 priority: number
24 dependsOnRunnerJob?: MRunnerJob
25}
26
27// eslint-disable-next-line max-len
28export class VODAudioMergeTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODAudioMergeTranscodingSuccess> {
29
30 async create (options: CreateOptions) {
31 const { video, resolution, fps, priority, dependsOnRunnerJob } = options
32
33 const jobUUID = buildUUID()
34 const payload: RunnerJobVODAudioMergeTranscodingPayload = {
35 input: {
36 audioFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid),
37 previewFileUrl: generateRunnerTranscodingVideoPreviewFileUrl(jobUUID, video.uuid)
38 },
39 output: {
40 resolution,
41 fps
42 }
43 }
44
45 const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = {
46 ...pick(options, [ 'isNewVideo' ]),
47
48 videoUUID: video.uuid
49 }
50
51 const job = await this.createRunnerJob({
52 type: 'vod-audio-merge-transcoding',
53 jobUUID,
54 payload,
55 privatePayload,
56 priority,
57 dependsOnRunnerJob
58 })
59
60 await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode')
61
62 return job
63 }
64
65 // ---------------------------------------------------------------------------
66
67 protected async specificComplete (options: {
68 runnerJob: MRunnerJob
69 resultPayload: VODAudioMergeTranscodingSuccess
70 }) {
71 const { runnerJob, resultPayload } = options
72 const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload
73
74 const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags)
75 if (!video) return
76
77 const videoFilePath = resultPayload.videoFile as string
78
79 // ffmpeg generated a new video file, so update the video duration
80 // See https://trac.ffmpeg.org/ticket/5456
81 video.duration = await getVideoStreamDuration(videoFilePath)
82 await video.save()
83
84 // We can remove the old audio file
85 const oldAudioFile = video.VideoFiles[0]
86 await video.removeWebVideoFile(oldAudioFile)
87 await oldAudioFile.destroy()
88 video.VideoFiles = []
89
90 await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload })
91
92 logger.info(
93 'Runner VOD audio merge transcoding job %s for %s ended.',
94 runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid)
95 )
96 }
97}
diff --git a/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts
deleted file mode 100644
index 02845952c..000000000
--- a/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts
+++ /dev/null
@@ -1,114 +0,0 @@
1import { move } from 'fs-extra'
2import { dirname, join } from 'path'
3import { logger } from '@server/helpers/logger'
4import { renameVideoFileInPlaylist } from '@server/lib/hls'
5import { getHlsResolutionPlaylistFilename } from '@server/lib/paths'
6import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding'
7import { onHLSVideoFileTranscoding } from '@server/lib/transcoding/hls-transcoding'
8import { buildNewFile, removeAllWebVideoFiles } from '@server/lib/video-file'
9import { VideoJobInfoModel } from '@server/models/video/video-job-info'
10import { MVideo } from '@server/types/models'
11import { MRunnerJob } from '@server/types/models/runners'
12import { pick } from '@shared/core-utils'
13import { buildUUID } from '@shared/extra-utils'
14import {
15 RunnerJobUpdatePayload,
16 RunnerJobVODHLSTranscodingPayload,
17 RunnerJobVODHLSTranscodingPrivatePayload,
18 VODHLSTranscodingSuccess
19} from '@shared/models'
20import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls'
21import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler'
22import { loadTranscodingRunnerVideo } from './shared'
23
24type CreateOptions = {
25 video: MVideo
26 isNewVideo: boolean
27 deleteWebVideoFiles: boolean
28 resolution: number
29 fps: number
30 priority: number
31 dependsOnRunnerJob?: MRunnerJob
32}
33
34// eslint-disable-next-line max-len
35export class VODHLSTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODHLSTranscodingSuccess> {
36
37 async create (options: CreateOptions) {
38 const { video, resolution, fps, dependsOnRunnerJob, priority } = options
39
40 const jobUUID = buildUUID()
41
42 const payload: RunnerJobVODHLSTranscodingPayload = {
43 input: {
44 videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid)
45 },
46 output: {
47 resolution,
48 fps
49 }
50 }
51
52 const privatePayload: RunnerJobVODHLSTranscodingPrivatePayload = {
53 ...pick(options, [ 'isNewVideo', 'deleteWebVideoFiles' ]),
54
55 videoUUID: video.uuid
56 }
57
58 const job = await this.createRunnerJob({
59 type: 'vod-hls-transcoding',
60 jobUUID,
61 payload,
62 privatePayload,
63 priority,
64 dependsOnRunnerJob
65 })
66
67 await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode')
68
69 return job
70 }
71
72 // ---------------------------------------------------------------------------
73
74 protected async specificComplete (options: {
75 runnerJob: MRunnerJob
76 resultPayload: VODHLSTranscodingSuccess
77 }) {
78 const { runnerJob, resultPayload } = options
79 const privatePayload = runnerJob.privatePayload as RunnerJobVODHLSTranscodingPrivatePayload
80
81 const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags)
82 if (!video) return
83
84 const videoFilePath = resultPayload.videoFile as string
85 const resolutionPlaylistFilePath = resultPayload.resolutionPlaylistFile as string
86
87 const videoFile = await buildNewFile({ path: videoFilePath, mode: 'hls' })
88 const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename)
89 await move(videoFilePath, newVideoFilePath)
90
91 const resolutionPlaylistFilename = getHlsResolutionPlaylistFilename(videoFile.filename)
92 const newResolutionPlaylistFilePath = join(dirname(resolutionPlaylistFilePath), resolutionPlaylistFilename)
93 await move(resolutionPlaylistFilePath, newResolutionPlaylistFilePath)
94
95 await renameVideoFileInPlaylist(newResolutionPlaylistFilePath, videoFile.filename)
96
97 await onHLSVideoFileTranscoding({
98 video,
99 videoFile,
100 m3u8OutputPath: newResolutionPlaylistFilePath,
101 videoOutputPath: newVideoFilePath
102 })
103
104 await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video })
105
106 if (privatePayload.deleteWebVideoFiles === true) {
107 logger.info('Removing web video files of %s now we have a HLS version of it.', video.uuid, this.lTags(video.uuid))
108
109 await removeAllWebVideoFiles(video)
110 }
111
112 logger.info('Runner VOD HLS job %s for %s ended.', runnerJob.uuid, video.uuid, this.lTags(runnerJob.uuid, video.uuid))
113 }
114}
diff --git a/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts
deleted file mode 100644
index 9ee8ab88e..000000000
--- a/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts
+++ /dev/null
@@ -1,84 +0,0 @@
1import { logger } from '@server/helpers/logger'
2import { VideoJobInfoModel } from '@server/models/video/video-job-info'
3import { MVideo } from '@server/types/models'
4import { MRunnerJob } from '@server/types/models/runners'
5import { pick } from '@shared/core-utils'
6import { buildUUID } from '@shared/extra-utils'
7import {
8 RunnerJobUpdatePayload,
9 RunnerJobVODWebVideoTranscodingPayload,
10 RunnerJobVODWebVideoTranscodingPrivatePayload,
11 VODWebVideoTranscodingSuccess
12} from '@shared/models'
13import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls'
14import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler'
15import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared'
16
17type CreateOptions = {
18 video: MVideo
19 isNewVideo: boolean
20 resolution: number
21 fps: number
22 priority: number
23 dependsOnRunnerJob?: MRunnerJob
24}
25
26// eslint-disable-next-line max-len
27export class VODWebVideoTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODWebVideoTranscodingSuccess> {
28
29 async create (options: CreateOptions) {
30 const { video, resolution, fps, priority, dependsOnRunnerJob } = options
31
32 const jobUUID = buildUUID()
33 const payload: RunnerJobVODWebVideoTranscodingPayload = {
34 input: {
35 videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid)
36 },
37 output: {
38 resolution,
39 fps
40 }
41 }
42
43 const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = {
44 ...pick(options, [ 'isNewVideo' ]),
45
46 videoUUID: video.uuid
47 }
48
49 const job = await this.createRunnerJob({
50 type: 'vod-web-video-transcoding',
51 jobUUID,
52 payload,
53 privatePayload,
54 dependsOnRunnerJob,
55 priority
56 })
57
58 await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode')
59
60 return job
61 }
62
63 // ---------------------------------------------------------------------------
64
65 protected async specificComplete (options: {
66 runnerJob: MRunnerJob
67 resultPayload: VODWebVideoTranscodingSuccess
68 }) {
69 const { runnerJob, resultPayload } = options
70 const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload
71
72 const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags)
73 if (!video) return
74
75 const videoFilePath = resultPayload.videoFile as string
76
77 await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload })
78
79 logger.info(
80 'Runner VOD web video transcoding job %s for %s ended.',
81 runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid)
82 )
83 }
84}