diff options
author | Chocobozzz <me@florianbigard.com> | 2023-04-21 14:55:10 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | 0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch) | |
tree | 226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib/runners | |
parent | 6bcb854cdea8688a32240bc5719c7d139806e00b (diff) | |
download | PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip |
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib/runners')
13 files changed, 924 insertions, 0 deletions
diff --git a/server/lib/runners/index.ts b/server/lib/runners/index.ts new file mode 100644 index 000000000..a737c7b59 --- /dev/null +++ b/server/lib/runners/index.ts | |||
@@ -0,0 +1,3 @@ | |||
1 | export * from './job-handlers' | ||
2 | export * from './runner' | ||
3 | export * from './runner-urls' | ||
diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts new file mode 100644 index 000000000..73fc14574 --- /dev/null +++ b/server/lib/runners/job-handlers/abstract-job-handler.ts | |||
@@ -0,0 +1,271 @@ | |||
1 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
2 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
3 | import { RUNNER_JOBS } from '@server/initializers/constants' | ||
4 | import { sequelizeTypescript } from '@server/initializers/database' | ||
5 | import { PeerTubeSocket } from '@server/lib/peertube-socket' | ||
6 | import { RunnerJobModel } from '@server/models/runner/runner-job' | ||
7 | import { setAsUpdated } from '@server/models/shared' | ||
8 | import { MRunnerJob } from '@server/types/models/runners' | ||
9 | import { pick } from '@shared/core-utils' | ||
10 | import { | ||
11 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
12 | RunnerJobLiveRTMPHLSTranscodingPrivatePayload, | ||
13 | RunnerJobState, | ||
14 | RunnerJobSuccessPayload, | ||
15 | RunnerJobType, | ||
16 | RunnerJobUpdatePayload, | ||
17 | RunnerJobVODAudioMergeTranscodingPayload, | ||
18 | RunnerJobVODAudioMergeTranscodingPrivatePayload, | ||
19 | RunnerJobVODHLSTranscodingPayload, | ||
20 | RunnerJobVODHLSTranscodingPrivatePayload, | ||
21 | RunnerJobVODWebVideoTranscodingPayload, | ||
22 | RunnerJobVODWebVideoTranscodingPrivatePayload | ||
23 | } from '@shared/models' | ||
24 | |||
25 | type CreateRunnerJobArg = | ||
26 | { | ||
27 | type: Extract<RunnerJobType, 'vod-web-video-transcoding'> | ||
28 | payload: RunnerJobVODWebVideoTranscodingPayload | ||
29 | privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | ||
30 | } | | ||
31 | { | ||
32 | type: Extract<RunnerJobType, 'vod-hls-transcoding'> | ||
33 | payload: RunnerJobVODHLSTranscodingPayload | ||
34 | privatePayload: RunnerJobVODHLSTranscodingPrivatePayload | ||
35 | } | | ||
36 | { | ||
37 | type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'> | ||
38 | payload: RunnerJobVODAudioMergeTranscodingPayload | ||
39 | privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload | ||
40 | } | | ||
41 | { | ||
42 | type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'> | ||
43 | payload: RunnerJobLiveRTMPHLSTranscodingPayload | ||
44 | privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload | ||
45 | } | ||
46 | |||
47 | export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> { | ||
48 | |||
49 | protected readonly lTags = loggerTagsFactory('runner') | ||
50 | |||
51 | // --------------------------------------------------------------------------- | ||
52 | |||
53 | abstract create (options: C): Promise<MRunnerJob> | ||
54 | |||
55 | protected async createRunnerJob (options: CreateRunnerJobArg & { | ||
56 | jobUUID: string | ||
57 | priority: number | ||
58 | dependsOnRunnerJob?: MRunnerJob | ||
59 | }): Promise<MRunnerJob> { | ||
60 | const { priority, dependsOnRunnerJob } = options | ||
61 | |||
62 | const runnerJob = new RunnerJobModel({ | ||
63 | ...pick(options, [ 'type', 'payload', 'privatePayload' ]), | ||
64 | |||
65 | uuid: options.jobUUID, | ||
66 | |||
67 | state: dependsOnRunnerJob | ||
68 | ? RunnerJobState.WAITING_FOR_PARENT_JOB | ||
69 | : RunnerJobState.PENDING, | ||
70 | |||
71 | dependsOnRunnerJobId: dependsOnRunnerJob?.id, | ||
72 | |||
73 | priority | ||
74 | }) | ||
75 | |||
76 | const job = await sequelizeTypescript.transaction(async transaction => { | ||
77 | return runnerJob.save({ transaction }) | ||
78 | }) | ||
79 | |||
80 | if (runnerJob.state === RunnerJobState.PENDING) { | ||
81 | PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | ||
82 | } | ||
83 | |||
84 | return job | ||
85 | } | ||
86 | |||
87 | // --------------------------------------------------------------------------- | ||
88 | |||
89 | protected abstract specificUpdate (options: { | ||
90 | runnerJob: MRunnerJob | ||
91 | updatePayload?: U | ||
92 | }): Promise<void> | void | ||
93 | |||
94 | async update (options: { | ||
95 | runnerJob: MRunnerJob | ||
96 | progress?: number | ||
97 | updatePayload?: U | ||
98 | }) { | ||
99 | const { runnerJob, progress } = options | ||
100 | |||
101 | await this.specificUpdate(options) | ||
102 | |||
103 | if (progress) runnerJob.progress = progress | ||
104 | |||
105 | await retryTransactionWrapper(() => { | ||
106 | return sequelizeTypescript.transaction(async transaction => { | ||
107 | if (runnerJob.changed()) { | ||
108 | return runnerJob.save({ transaction }) | ||
109 | } | ||
110 | |||
111 | // Don't update the job too often | ||
112 | if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) { | ||
113 | await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction }) | ||
114 | } | ||
115 | }) | ||
116 | }) | ||
117 | } | ||
118 | |||
119 | // --------------------------------------------------------------------------- | ||
120 | |||
121 | async complete (options: { | ||
122 | runnerJob: MRunnerJob | ||
123 | resultPayload: S | ||
124 | }) { | ||
125 | const { runnerJob } = options | ||
126 | |||
127 | try { | ||
128 | await this.specificComplete(options) | ||
129 | |||
130 | runnerJob.state = RunnerJobState.COMPLETED | ||
131 | } catch (err) { | ||
132 | logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) }) | ||
133 | |||
134 | runnerJob.state = RunnerJobState.ERRORED | ||
135 | runnerJob.error = err.message | ||
136 | } | ||
137 | |||
138 | runnerJob.progress = null | ||
139 | runnerJob.finishedAt = new Date() | ||
140 | |||
141 | await retryTransactionWrapper(() => { | ||
142 | return sequelizeTypescript.transaction(async transaction => { | ||
143 | await runnerJob.save({ transaction }) | ||
144 | }) | ||
145 | }) | ||
146 | |||
147 | const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob) | ||
148 | |||
149 | if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners() | ||
150 | } | ||
151 | |||
152 | protected abstract specificComplete (options: { | ||
153 | runnerJob: MRunnerJob | ||
154 | resultPayload: S | ||
155 | }): Promise<void> | void | ||
156 | |||
157 | // --------------------------------------------------------------------------- | ||
158 | |||
159 | async cancel (options: { | ||
160 | runnerJob: MRunnerJob | ||
161 | fromParent?: boolean | ||
162 | }) { | ||
163 | const { runnerJob, fromParent } = options | ||
164 | |||
165 | await this.specificCancel(options) | ||
166 | |||
167 | const cancelState = fromParent | ||
168 | ? RunnerJobState.PARENT_CANCELLED | ||
169 | : RunnerJobState.CANCELLED | ||
170 | |||
171 | runnerJob.setToErrorOrCancel(cancelState) | ||
172 | |||
173 | await retryTransactionWrapper(() => { | ||
174 | return sequelizeTypescript.transaction(async transaction => { | ||
175 | await runnerJob.save({ transaction }) | ||
176 | }) | ||
177 | }) | ||
178 | |||
179 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | ||
180 | for (const child of children) { | ||
181 | logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid)) | ||
182 | |||
183 | await this.cancel({ runnerJob: child, fromParent: true }) | ||
184 | } | ||
185 | } | ||
186 | |||
187 | protected abstract specificCancel (options: { | ||
188 | runnerJob: MRunnerJob | ||
189 | }): Promise<void> | void | ||
190 | |||
191 | // --------------------------------------------------------------------------- | ||
192 | |||
193 | protected abstract isAbortSupported (): boolean | ||
194 | |||
195 | async abort (options: { | ||
196 | runnerJob: MRunnerJob | ||
197 | }) { | ||
198 | const { runnerJob } = options | ||
199 | |||
200 | if (this.isAbortSupported() !== true) { | ||
201 | return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' }) | ||
202 | } | ||
203 | |||
204 | await this.specificAbort(options) | ||
205 | |||
206 | runnerJob.resetToPending() | ||
207 | |||
208 | await retryTransactionWrapper(() => { | ||
209 | return sequelizeTypescript.transaction(async transaction => { | ||
210 | await runnerJob.save({ transaction }) | ||
211 | }) | ||
212 | }) | ||
213 | } | ||
214 | |||
215 | protected setAbortState (runnerJob: MRunnerJob) { | ||
216 | runnerJob.resetToPending() | ||
217 | } | ||
218 | |||
219 | protected abstract specificAbort (options: { | ||
220 | runnerJob: MRunnerJob | ||
221 | }): Promise<void> | void | ||
222 | |||
223 | // --------------------------------------------------------------------------- | ||
224 | |||
225 | async error (options: { | ||
226 | runnerJob: MRunnerJob | ||
227 | message: string | ||
228 | fromParent?: boolean | ||
229 | }) { | ||
230 | const { runnerJob, message, fromParent } = options | ||
231 | |||
232 | const errorState = fromParent | ||
233 | ? RunnerJobState.PARENT_ERRORED | ||
234 | : RunnerJobState.ERRORED | ||
235 | |||
236 | const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES | ||
237 | ? RunnerJobState.PENDING | ||
238 | : errorState | ||
239 | |||
240 | await this.specificError({ ...options, nextState }) | ||
241 | |||
242 | if (nextState === errorState) { | ||
243 | runnerJob.setToErrorOrCancel(nextState) | ||
244 | runnerJob.error = message | ||
245 | } else { | ||
246 | runnerJob.resetToPending() | ||
247 | } | ||
248 | |||
249 | await retryTransactionWrapper(() => { | ||
250 | return sequelizeTypescript.transaction(async transaction => { | ||
251 | await runnerJob.save({ transaction }) | ||
252 | }) | ||
253 | }) | ||
254 | |||
255 | if (runnerJob.state === errorState) { | ||
256 | const children = await RunnerJobModel.listChildrenOf(runnerJob) | ||
257 | |||
258 | for (const child of children) { | ||
259 | logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid)) | ||
260 | |||
261 | await this.error({ runnerJob: child, message: 'Parent error', fromParent: true }) | ||
262 | } | ||
263 | } | ||
264 | } | ||
265 | |||
266 | protected abstract specificError (options: { | ||
267 | runnerJob: MRunnerJob | ||
268 | message: string | ||
269 | nextState: RunnerJobState | ||
270 | }): Promise<void> | void | ||
271 | } | ||
diff --git a/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts new file mode 100644 index 000000000..517645848 --- /dev/null +++ b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts | |||
@@ -0,0 +1,71 @@ | |||
1 | |||
2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' | ||
5 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
6 | import { MRunnerJob } from '@server/types/models/runners' | ||
7 | import { | ||
8 | LiveRTMPHLSTranscodingUpdatePayload, | ||
9 | RunnerJobSuccessPayload, | ||
10 | RunnerJobUpdatePayload, | ||
11 | RunnerJobVODPrivatePayload | ||
12 | } from '@shared/models' | ||
13 | import { AbstractJobHandler } from './abstract-job-handler' | ||
14 | import { loadTranscodingRunnerVideo } from './shared' | ||
15 | |||
16 | // eslint-disable-next-line max-len | ||
17 | export abstract class AbstractVODTranscodingJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> extends AbstractJobHandler<C, U, S> { | ||
18 | |||
19 | // --------------------------------------------------------------------------- | ||
20 | |||
21 | protected isAbortSupported () { | ||
22 | return true | ||
23 | } | ||
24 | |||
25 | protected specificUpdate (_options: { | ||
26 | runnerJob: MRunnerJob | ||
27 | updatePayload?: LiveRTMPHLSTranscodingUpdatePayload | ||
28 | }) { | ||
29 | // empty | ||
30 | } | ||
31 | |||
32 | protected specificAbort (_options: { | ||
33 | runnerJob: MRunnerJob | ||
34 | }) { | ||
35 | // empty | ||
36 | } | ||
37 | |||
38 | protected async specificError (options: { | ||
39 | runnerJob: MRunnerJob | ||
40 | }) { | ||
41 | const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags) | ||
42 | if (!video) return | ||
43 | |||
44 | await moveToFailedTranscodingState(video) | ||
45 | |||
46 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | ||
47 | } | ||
48 | |||
49 | protected async specificCancel (options: { | ||
50 | runnerJob: MRunnerJob | ||
51 | }) { | ||
52 | const { runnerJob } = options | ||
53 | |||
54 | const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags) | ||
55 | if (!video) return | ||
56 | |||
57 | const pending = await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | ||
58 | |||
59 | logger.debug(`Pending transcode decreased to ${pending} after cancel`, this.lTags(video.uuid)) | ||
60 | |||
61 | if (pending === 0) { | ||
62 | logger.info( | ||
63 | `All transcoding jobs of ${video.uuid} have been processed or canceled, moving it to its next state`, | ||
64 | this.lTags(video.uuid) | ||
65 | ) | ||
66 | |||
67 | const privatePayload = runnerJob.privatePayload as RunnerJobVODPrivatePayload | ||
68 | await retryTransactionWrapper(moveToNextState, { video, isNewVideo: privatePayload.isNewVideo }) | ||
69 | } | ||
70 | } | ||
71 | } | ||
diff --git a/server/lib/runners/job-handlers/index.ts b/server/lib/runners/job-handlers/index.ts new file mode 100644 index 000000000..0fca72b9a --- /dev/null +++ b/server/lib/runners/job-handlers/index.ts | |||
@@ -0,0 +1,6 @@ | |||
1 | export * from './abstract-job-handler' | ||
2 | export * from './live-rtmp-hls-transcoding-job-handler' | ||
3 | export * from './vod-audio-merge-transcoding-job-handler' | ||
4 | export * from './vod-hls-transcoding-job-handler' | ||
5 | export * from './vod-web-video-transcoding-job-handler' | ||
6 | export * from './runner-job-handlers' | ||
diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts new file mode 100644 index 000000000..c3d0e427d --- /dev/null +++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts | |||
@@ -0,0 +1,170 @@ | |||
1 | import { move, remove } from 'fs-extra' | ||
2 | import { join } from 'path' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { JOB_PRIORITY } from '@server/initializers/constants' | ||
5 | import { LiveManager } from '@server/lib/live' | ||
6 | import { MStreamingPlaylist, MVideo } from '@server/types/models' | ||
7 | import { MRunnerJob } from '@server/types/models/runners' | ||
8 | import { buildUUID } from '@shared/extra-utils' | ||
9 | import { | ||
10 | LiveRTMPHLSTranscodingSuccess, | ||
11 | LiveRTMPHLSTranscodingUpdatePayload, | ||
12 | LiveVideoError, | ||
13 | RunnerJobLiveRTMPHLSTranscodingPayload, | ||
14 | RunnerJobLiveRTMPHLSTranscodingPrivatePayload, | ||
15 | RunnerJobState | ||
16 | } from '@shared/models' | ||
17 | import { AbstractJobHandler } from './abstract-job-handler' | ||
18 | |||
19 | type CreateOptions = { | ||
20 | video: MVideo | ||
21 | playlist: MStreamingPlaylist | ||
22 | |||
23 | rtmpUrl: string | ||
24 | |||
25 | toTranscode: { | ||
26 | resolution: number | ||
27 | fps: number | ||
28 | }[] | ||
29 | |||
30 | segmentListSize: number | ||
31 | segmentDuration: number | ||
32 | |||
33 | outputDirectory: string | ||
34 | } | ||
35 | |||
36 | // eslint-disable-next-line max-len | ||
37 | export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateOptions, LiveRTMPHLSTranscodingUpdatePayload, LiveRTMPHLSTranscodingSuccess> { | ||
38 | |||
39 | async create (options: CreateOptions) { | ||
40 | const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory } = options | ||
41 | |||
42 | const jobUUID = buildUUID() | ||
43 | const payload: RunnerJobLiveRTMPHLSTranscodingPayload = { | ||
44 | input: { | ||
45 | rtmpUrl | ||
46 | }, | ||
47 | output: { | ||
48 | toTranscode, | ||
49 | segmentListSize, | ||
50 | segmentDuration | ||
51 | } | ||
52 | } | ||
53 | |||
54 | const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = { | ||
55 | videoUUID: video.uuid, | ||
56 | masterPlaylistName: playlist.playlistFilename, | ||
57 | outputDirectory | ||
58 | } | ||
59 | |||
60 | const job = await this.createRunnerJob({ | ||
61 | type: 'live-rtmp-hls-transcoding', | ||
62 | jobUUID, | ||
63 | payload, | ||
64 | privatePayload, | ||
65 | priority: JOB_PRIORITY.TRANSCODING | ||
66 | }) | ||
67 | |||
68 | return job | ||
69 | } | ||
70 | |||
71 | // --------------------------------------------------------------------------- | ||
72 | |||
73 | async specificUpdate (options: { | ||
74 | runnerJob: MRunnerJob | ||
75 | updatePayload: LiveRTMPHLSTranscodingUpdatePayload | ||
76 | }) { | ||
77 | const { runnerJob, updatePayload } = options | ||
78 | |||
79 | const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload | ||
80 | const outputDirectory = privatePayload.outputDirectory | ||
81 | const videoUUID = privatePayload.videoUUID | ||
82 | |||
83 | if (updatePayload.type === 'add-chunk') { | ||
84 | await move( | ||
85 | updatePayload.videoChunkFile as string, | ||
86 | join(outputDirectory, updatePayload.videoChunkFilename), | ||
87 | { overwrite: true } | ||
88 | ) | ||
89 | } else if (updatePayload.type === 'remove-chunk') { | ||
90 | await remove(join(outputDirectory, updatePayload.videoChunkFilename)) | ||
91 | } | ||
92 | |||
93 | if (updatePayload.resolutionPlaylistFile && updatePayload.resolutionPlaylistFilename) { | ||
94 | await move( | ||
95 | updatePayload.resolutionPlaylistFile as string, | ||
96 | join(outputDirectory, updatePayload.resolutionPlaylistFilename), | ||
97 | { overwrite: true } | ||
98 | ) | ||
99 | } | ||
100 | |||
101 | if (updatePayload.masterPlaylistFile) { | ||
102 | await move(updatePayload.masterPlaylistFile as string, join(outputDirectory, privatePayload.masterPlaylistName), { overwrite: true }) | ||
103 | } | ||
104 | |||
105 | logger.info( | ||
106 | 'Runner live RTMP to HLS job %s for %s updated.', | ||
107 | runnerJob.uuid, videoUUID, { updatePayload, ...this.lTags(videoUUID, runnerJob.uuid) } | ||
108 | ) | ||
109 | } | ||
110 | |||
111 | // --------------------------------------------------------------------------- | ||
112 | |||
113 | protected specificComplete (options: { | ||
114 | runnerJob: MRunnerJob | ||
115 | }) { | ||
116 | return this.stopLive({ | ||
117 | runnerJob: options.runnerJob, | ||
118 | type: 'ended' | ||
119 | }) | ||
120 | } | ||
121 | |||
122 | // --------------------------------------------------------------------------- | ||
123 | |||
124 | protected isAbortSupported () { | ||
125 | return false | ||
126 | } | ||
127 | |||
128 | protected specificAbort () { | ||
129 | throw new Error('Not implemented') | ||
130 | } | ||
131 | |||
132 | protected specificError (options: { | ||
133 | runnerJob: MRunnerJob | ||
134 | nextState: RunnerJobState | ||
135 | }) { | ||
136 | return this.stopLive({ | ||
137 | runnerJob: options.runnerJob, | ||
138 | type: 'errored' | ||
139 | }) | ||
140 | } | ||
141 | |||
142 | protected specificCancel (options: { | ||
143 | runnerJob: MRunnerJob | ||
144 | }) { | ||
145 | return this.stopLive({ | ||
146 | runnerJob: options.runnerJob, | ||
147 | type: 'cancelled' | ||
148 | }) | ||
149 | } | ||
150 | |||
151 | private stopLive (options: { | ||
152 | runnerJob: MRunnerJob | ||
153 | type: 'ended' | 'errored' | 'cancelled' | ||
154 | }) { | ||
155 | const { runnerJob, type } = options | ||
156 | |||
157 | const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload | ||
158 | const videoUUID = privatePayload.videoUUID | ||
159 | |||
160 | const errorType = { | ||
161 | ended: null, | ||
162 | errored: LiveVideoError.RUNNER_JOB_ERROR, | ||
163 | cancelled: LiveVideoError.RUNNER_JOB_CANCEL | ||
164 | } | ||
165 | |||
166 | LiveManager.Instance.stopSessionOf(privatePayload.videoUUID, errorType[type]) | ||
167 | |||
168 | logger.info('Runner live RTMP to HLS job %s for video %s %s.', runnerJob.uuid, videoUUID, type, this.lTags(runnerJob.uuid, videoUUID)) | ||
169 | } | ||
170 | } | ||
diff --git a/server/lib/runners/job-handlers/runner-job-handlers.ts b/server/lib/runners/job-handlers/runner-job-handlers.ts new file mode 100644 index 000000000..7bad1bc77 --- /dev/null +++ b/server/lib/runners/job-handlers/runner-job-handlers.ts | |||
@@ -0,0 +1,18 @@ | |||
1 | import { MRunnerJob } from '@server/types/models/runners' | ||
2 | import { RunnerJobSuccessPayload, RunnerJobType, RunnerJobUpdatePayload } from '@shared/models' | ||
3 | import { AbstractJobHandler } from './abstract-job-handler' | ||
4 | import { LiveRTMPHLSTranscodingJobHandler } from './live-rtmp-hls-transcoding-job-handler' | ||
5 | import { VODAudioMergeTranscodingJobHandler } from './vod-audio-merge-transcoding-job-handler' | ||
6 | import { VODHLSTranscodingJobHandler } from './vod-hls-transcoding-job-handler' | ||
7 | import { VODWebVideoTranscodingJobHandler } from './vod-web-video-transcoding-job-handler' | ||
8 | |||
9 | const processors: Record<RunnerJobType, new() => AbstractJobHandler<unknown, RunnerJobUpdatePayload, RunnerJobSuccessPayload>> = { | ||
10 | 'vod-web-video-transcoding': VODWebVideoTranscodingJobHandler, | ||
11 | 'vod-hls-transcoding': VODHLSTranscodingJobHandler, | ||
12 | 'vod-audio-merge-transcoding': VODAudioMergeTranscodingJobHandler, | ||
13 | 'live-rtmp-hls-transcoding': LiveRTMPHLSTranscodingJobHandler | ||
14 | } | ||
15 | |||
16 | export function getRunnerJobHandlerClass (job: MRunnerJob) { | ||
17 | return processors[job.type] | ||
18 | } | ||
diff --git a/server/lib/runners/job-handlers/shared/index.ts b/server/lib/runners/job-handlers/shared/index.ts new file mode 100644 index 000000000..348273ae2 --- /dev/null +++ b/server/lib/runners/job-handlers/shared/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './vod-helpers' | |||
diff --git a/server/lib/runners/job-handlers/shared/vod-helpers.ts b/server/lib/runners/job-handlers/shared/vod-helpers.ts new file mode 100644 index 000000000..93ae89ff8 --- /dev/null +++ b/server/lib/runners/job-handlers/shared/vod-helpers.ts | |||
@@ -0,0 +1,44 @@ | |||
1 | import { move } from 'fs-extra' | ||
2 | import { dirname, join } from 'path' | ||
3 | import { logger, LoggerTagsFn } from '@server/helpers/logger' | ||
4 | import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' | ||
5 | import { onWebTorrentVideoFileTranscoding } from '@server/lib/transcoding/web-transcoding' | ||
6 | import { buildNewFile } from '@server/lib/video-file' | ||
7 | import { VideoModel } from '@server/models/video/video' | ||
8 | import { MVideoFullLight } from '@server/types/models' | ||
9 | import { MRunnerJob } from '@server/types/models/runners' | ||
10 | import { RunnerJobVODAudioMergeTranscodingPrivatePayload, RunnerJobVODWebVideoTranscodingPrivatePayload } from '@shared/models' | ||
11 | |||
12 | export async function onVODWebVideoOrAudioMergeTranscodingJob (options: { | ||
13 | video: MVideoFullLight | ||
14 | videoFilePath: string | ||
15 | privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | RunnerJobVODAudioMergeTranscodingPrivatePayload | ||
16 | }) { | ||
17 | const { video, videoFilePath, privatePayload } = options | ||
18 | |||
19 | const videoFile = await buildNewFile({ path: videoFilePath, mode: 'web-video' }) | ||
20 | videoFile.videoId = video.id | ||
21 | |||
22 | const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename) | ||
23 | await move(videoFilePath, newVideoFilePath) | ||
24 | |||
25 | await onWebTorrentVideoFileTranscoding({ | ||
26 | video, | ||
27 | videoFile, | ||
28 | videoOutputPath: newVideoFilePath | ||
29 | }) | ||
30 | |||
31 | await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video }) | ||
32 | } | ||
33 | |||
34 | export async function loadTranscodingRunnerVideo (runnerJob: MRunnerJob, lTags: LoggerTagsFn) { | ||
35 | const videoUUID = runnerJob.privatePayload.videoUUID | ||
36 | |||
37 | const video = await VideoModel.loadFull(videoUUID) | ||
38 | if (!video) { | ||
39 | logger.info('Video %s does not exist anymore after transcoding runner job.', videoUUID, lTags(videoUUID)) | ||
40 | return undefined | ||
41 | } | ||
42 | |||
43 | return video | ||
44 | } | ||
diff --git a/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts new file mode 100644 index 000000000..a7b33f87e --- /dev/null +++ b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts | |||
@@ -0,0 +1,97 @@ | |||
1 | import { pick } from 'lodash' | ||
2 | import { logger } from '@server/helpers/logger' | ||
3 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
4 | import { MVideo } from '@server/types/models' | ||
5 | import { MRunnerJob } from '@server/types/models/runners' | ||
6 | import { buildUUID } from '@shared/extra-utils' | ||
7 | import { getVideoStreamDuration } from '@shared/ffmpeg' | ||
8 | import { | ||
9 | RunnerJobUpdatePayload, | ||
10 | RunnerJobVODAudioMergeTranscodingPayload, | ||
11 | RunnerJobVODWebVideoTranscodingPrivatePayload, | ||
12 | VODAudioMergeTranscodingSuccess | ||
13 | } from '@shared/models' | ||
14 | import { generateRunnerTranscodingVideoInputFileUrl, generateRunnerTranscodingVideoPreviewFileUrl } from '../runner-urls' | ||
15 | import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler' | ||
16 | import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared' | ||
17 | |||
18 | type CreateOptions = { | ||
19 | video: MVideo | ||
20 | isNewVideo: boolean | ||
21 | resolution: number | ||
22 | fps: number | ||
23 | priority: number | ||
24 | dependsOnRunnerJob?: MRunnerJob | ||
25 | } | ||
26 | |||
27 | // eslint-disable-next-line max-len | ||
28 | export class VODAudioMergeTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODAudioMergeTranscodingSuccess> { | ||
29 | |||
30 | async create (options: CreateOptions) { | ||
31 | const { video, resolution, fps, priority, dependsOnRunnerJob } = options | ||
32 | |||
33 | const jobUUID = buildUUID() | ||
34 | const payload: RunnerJobVODAudioMergeTranscodingPayload = { | ||
35 | input: { | ||
36 | audioFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid), | ||
37 | previewFileUrl: generateRunnerTranscodingVideoPreviewFileUrl(jobUUID, video.uuid) | ||
38 | }, | ||
39 | output: { | ||
40 | resolution, | ||
41 | fps | ||
42 | } | ||
43 | } | ||
44 | |||
45 | const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = { | ||
46 | ...pick(options, [ 'isNewVideo' ]), | ||
47 | |||
48 | videoUUID: video.uuid | ||
49 | } | ||
50 | |||
51 | const job = await this.createRunnerJob({ | ||
52 | type: 'vod-audio-merge-transcoding', | ||
53 | jobUUID, | ||
54 | payload, | ||
55 | privatePayload, | ||
56 | priority, | ||
57 | dependsOnRunnerJob | ||
58 | }) | ||
59 | |||
60 | await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode') | ||
61 | |||
62 | return job | ||
63 | } | ||
64 | |||
65 | // --------------------------------------------------------------------------- | ||
66 | |||
67 | async specificComplete (options: { | ||
68 | runnerJob: MRunnerJob | ||
69 | resultPayload: VODAudioMergeTranscodingSuccess | ||
70 | }) { | ||
71 | const { runnerJob, resultPayload } = options | ||
72 | const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload | ||
73 | |||
74 | const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags) | ||
75 | if (!video) return | ||
76 | |||
77 | const videoFilePath = resultPayload.videoFile as string | ||
78 | |||
79 | // ffmpeg generated a new video file, so update the video duration | ||
80 | // See https://trac.ffmpeg.org/ticket/5456 | ||
81 | video.duration = await getVideoStreamDuration(videoFilePath) | ||
82 | await video.save() | ||
83 | |||
84 | // We can remove the old audio file | ||
85 | const oldAudioFile = video.VideoFiles[0] | ||
86 | await video.removeWebTorrentFile(oldAudioFile) | ||
87 | await oldAudioFile.destroy() | ||
88 | video.VideoFiles = [] | ||
89 | |||
90 | await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload }) | ||
91 | |||
92 | logger.info( | ||
93 | 'Runner VOD audio merge transcoding job %s for %s ended.', | ||
94 | runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid) | ||
95 | ) | ||
96 | } | ||
97 | } | ||
diff --git a/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts new file mode 100644 index 000000000..02566b9d5 --- /dev/null +++ b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts | |||
@@ -0,0 +1,114 @@ | |||
1 | import { move } from 'fs-extra' | ||
2 | import { dirname, join } from 'path' | ||
3 | import { logger } from '@server/helpers/logger' | ||
4 | import { renameVideoFileInPlaylist } from '@server/lib/hls' | ||
5 | import { getHlsResolutionPlaylistFilename } from '@server/lib/paths' | ||
6 | import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' | ||
7 | import { onHLSVideoFileTranscoding } from '@server/lib/transcoding/hls-transcoding' | ||
8 | import { buildNewFile, removeAllWebTorrentFiles } from '@server/lib/video-file' | ||
9 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
10 | import { MVideo } from '@server/types/models' | ||
11 | import { MRunnerJob } from '@server/types/models/runners' | ||
12 | import { pick } from '@shared/core-utils' | ||
13 | import { buildUUID } from '@shared/extra-utils' | ||
14 | import { | ||
15 | RunnerJobUpdatePayload, | ||
16 | RunnerJobVODHLSTranscodingPayload, | ||
17 | RunnerJobVODHLSTranscodingPrivatePayload, | ||
18 | VODHLSTranscodingSuccess | ||
19 | } from '@shared/models' | ||
20 | import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls' | ||
21 | import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler' | ||
22 | import { loadTranscodingRunnerVideo } from './shared' | ||
23 | |||
24 | type CreateOptions = { | ||
25 | video: MVideo | ||
26 | isNewVideo: boolean | ||
27 | deleteWebVideoFiles: boolean | ||
28 | resolution: number | ||
29 | fps: number | ||
30 | priority: number | ||
31 | dependsOnRunnerJob?: MRunnerJob | ||
32 | } | ||
33 | |||
34 | // eslint-disable-next-line max-len | ||
35 | export class VODHLSTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODHLSTranscodingSuccess> { | ||
36 | |||
37 | async create (options: CreateOptions) { | ||
38 | const { video, resolution, fps, dependsOnRunnerJob, priority } = options | ||
39 | |||
40 | const jobUUID = buildUUID() | ||
41 | |||
42 | const payload: RunnerJobVODHLSTranscodingPayload = { | ||
43 | input: { | ||
44 | videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid) | ||
45 | }, | ||
46 | output: { | ||
47 | resolution, | ||
48 | fps | ||
49 | } | ||
50 | } | ||
51 | |||
52 | const privatePayload: RunnerJobVODHLSTranscodingPrivatePayload = { | ||
53 | ...pick(options, [ 'isNewVideo', 'deleteWebVideoFiles' ]), | ||
54 | |||
55 | videoUUID: video.uuid | ||
56 | } | ||
57 | |||
58 | const job = await this.createRunnerJob({ | ||
59 | type: 'vod-hls-transcoding', | ||
60 | jobUUID, | ||
61 | payload, | ||
62 | privatePayload, | ||
63 | priority, | ||
64 | dependsOnRunnerJob | ||
65 | }) | ||
66 | |||
67 | await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode') | ||
68 | |||
69 | return job | ||
70 | } | ||
71 | |||
72 | // --------------------------------------------------------------------------- | ||
73 | |||
74 | async specificComplete (options: { | ||
75 | runnerJob: MRunnerJob | ||
76 | resultPayload: VODHLSTranscodingSuccess | ||
77 | }) { | ||
78 | const { runnerJob, resultPayload } = options | ||
79 | const privatePayload = runnerJob.privatePayload as RunnerJobVODHLSTranscodingPrivatePayload | ||
80 | |||
81 | const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags) | ||
82 | if (!video) return | ||
83 | |||
84 | const videoFilePath = resultPayload.videoFile as string | ||
85 | const resolutionPlaylistFilePath = resultPayload.resolutionPlaylistFile as string | ||
86 | |||
87 | const videoFile = await buildNewFile({ path: videoFilePath, mode: 'hls' }) | ||
88 | const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename) | ||
89 | await move(videoFilePath, newVideoFilePath) | ||
90 | |||
91 | const resolutionPlaylistFilename = getHlsResolutionPlaylistFilename(videoFile.filename) | ||
92 | const newResolutionPlaylistFilePath = join(dirname(resolutionPlaylistFilePath), resolutionPlaylistFilename) | ||
93 | await move(resolutionPlaylistFilePath, newResolutionPlaylistFilePath) | ||
94 | |||
95 | await renameVideoFileInPlaylist(newResolutionPlaylistFilePath, videoFile.filename) | ||
96 | |||
97 | await onHLSVideoFileTranscoding({ | ||
98 | video, | ||
99 | videoFile, | ||
100 | m3u8OutputPath: newResolutionPlaylistFilePath, | ||
101 | videoOutputPath: newVideoFilePath | ||
102 | }) | ||
103 | |||
104 | await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video }) | ||
105 | |||
106 | if (privatePayload.deleteWebVideoFiles === true) { | ||
107 | logger.info('Removing web video files of %s now we have a HLS version of it.', video.uuid, this.lTags(video.uuid)) | ||
108 | |||
109 | await removeAllWebTorrentFiles(video) | ||
110 | } | ||
111 | |||
112 | logger.info('Runner VOD HLS job %s for %s ended.', runnerJob.uuid, video.uuid, this.lTags(runnerJob.uuid, video.uuid)) | ||
113 | } | ||
114 | } | ||
diff --git a/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts new file mode 100644 index 000000000..57761a7a1 --- /dev/null +++ b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts | |||
@@ -0,0 +1,84 @@ | |||
1 | import { pick } from 'lodash' | ||
2 | import { logger } from '@server/helpers/logger' | ||
3 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
4 | import { MVideo } from '@server/types/models' | ||
5 | import { MRunnerJob } from '@server/types/models/runners' | ||
6 | import { buildUUID } from '@shared/extra-utils' | ||
7 | import { | ||
8 | RunnerJobUpdatePayload, | ||
9 | RunnerJobVODWebVideoTranscodingPayload, | ||
10 | RunnerJobVODWebVideoTranscodingPrivatePayload, | ||
11 | VODWebVideoTranscodingSuccess | ||
12 | } from '@shared/models' | ||
13 | import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls' | ||
14 | import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler' | ||
15 | import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared' | ||
16 | |||
17 | type CreateOptions = { | ||
18 | video: MVideo | ||
19 | isNewVideo: boolean | ||
20 | resolution: number | ||
21 | fps: number | ||
22 | priority: number | ||
23 | dependsOnRunnerJob?: MRunnerJob | ||
24 | } | ||
25 | |||
26 | // eslint-disable-next-line max-len | ||
27 | export class VODWebVideoTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODWebVideoTranscodingSuccess> { | ||
28 | |||
29 | async create (options: CreateOptions) { | ||
30 | const { video, resolution, fps, priority, dependsOnRunnerJob } = options | ||
31 | |||
32 | const jobUUID = buildUUID() | ||
33 | const payload: RunnerJobVODWebVideoTranscodingPayload = { | ||
34 | input: { | ||
35 | videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid) | ||
36 | }, | ||
37 | output: { | ||
38 | resolution, | ||
39 | fps | ||
40 | } | ||
41 | } | ||
42 | |||
43 | const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = { | ||
44 | ...pick(options, [ 'isNewVideo' ]), | ||
45 | |||
46 | videoUUID: video.uuid | ||
47 | } | ||
48 | |||
49 | const job = await this.createRunnerJob({ | ||
50 | type: 'vod-web-video-transcoding', | ||
51 | jobUUID, | ||
52 | payload, | ||
53 | privatePayload, | ||
54 | dependsOnRunnerJob, | ||
55 | priority | ||
56 | }) | ||
57 | |||
58 | await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode') | ||
59 | |||
60 | return job | ||
61 | } | ||
62 | |||
63 | // --------------------------------------------------------------------------- | ||
64 | |||
65 | async specificComplete (options: { | ||
66 | runnerJob: MRunnerJob | ||
67 | resultPayload: VODWebVideoTranscodingSuccess | ||
68 | }) { | ||
69 | const { runnerJob, resultPayload } = options | ||
70 | const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload | ||
71 | |||
72 | const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags) | ||
73 | if (!video) return | ||
74 | |||
75 | const videoFilePath = resultPayload.videoFile as string | ||
76 | |||
77 | await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload }) | ||
78 | |||
79 | logger.info( | ||
80 | 'Runner VOD web video transcoding job %s for %s ended.', | ||
81 | runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid) | ||
82 | ) | ||
83 | } | ||
84 | } | ||
diff --git a/server/lib/runners/runner-urls.ts b/server/lib/runners/runner-urls.ts new file mode 100644 index 000000000..329fb1170 --- /dev/null +++ b/server/lib/runners/runner-urls.ts | |||
@@ -0,0 +1,9 @@ | |||
1 | import { WEBSERVER } from '@server/initializers/constants' | ||
2 | |||
3 | export function generateRunnerTranscodingVideoInputFileUrl (jobUUID: string, videoUUID: string) { | ||
4 | return WEBSERVER.URL + '/api/v1/runners/jobs/' + jobUUID + '/files/videos/' + videoUUID + '/max-quality' | ||
5 | } | ||
6 | |||
7 | export function generateRunnerTranscodingVideoPreviewFileUrl (jobUUID: string, videoUUID: string) { | ||
8 | return WEBSERVER.URL + '/api/v1/runners/jobs/' + jobUUID + '/files/videos/' + videoUUID + '/previews/max-quality' | ||
9 | } | ||
diff --git a/server/lib/runners/runner.ts b/server/lib/runners/runner.ts new file mode 100644 index 000000000..74c814ba1 --- /dev/null +++ b/server/lib/runners/runner.ts | |||
@@ -0,0 +1,36 @@ | |||
1 | import express from 'express' | ||
2 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
3 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | ||
4 | import { sequelizeTypescript } from '@server/initializers/database' | ||
5 | import { MRunner } from '@server/types/models/runners' | ||
6 | |||
7 | const lTags = loggerTagsFactory('runner') | ||
8 | |||
9 | const updatingRunner = new Set<number>() | ||
10 | |||
11 | function updateLastRunnerContact (req: express.Request, runner: MRunner) { | ||
12 | const now = new Date() | ||
13 | |||
14 | // Don't update last runner contact too often | ||
15 | if (now.getTime() - runner.lastContact.getTime() < 2000) return | ||
16 | if (updatingRunner.has(runner.id)) return | ||
17 | |||
18 | updatingRunner.add(runner.id) | ||
19 | |||
20 | runner.lastContact = now | ||
21 | runner.ip = req.ip | ||
22 | |||
23 | logger.debug('Updating last runner contact for %s', runner.name, lTags(runner.name)) | ||
24 | |||
25 | retryTransactionWrapper(() => { | ||
26 | return sequelizeTypescript.transaction(async transaction => { | ||
27 | return runner.save({ transaction }) | ||
28 | }) | ||
29 | }) | ||
30 | .catch(err => logger.error('Cannot update last runner contact for %s', runner.name, { err, ...lTags(runner.name) })) | ||
31 | .finally(() => updatingRunner.delete(runner.id)) | ||
32 | } | ||
33 | |||
34 | export { | ||
35 | updateLastRunnerContact | ||
36 | } | ||