aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/runners
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-04-21 14:55:10 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch)
tree226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib/runners
parent6bcb854cdea8688a32240bc5719c7d139806e00b (diff)
downloadPeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst
PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib/runners')
-rw-r--r--server/lib/runners/index.ts3
-rw-r--r--server/lib/runners/job-handlers/abstract-job-handler.ts271
-rw-r--r--server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts71
-rw-r--r--server/lib/runners/job-handlers/index.ts6
-rw-r--r--server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts170
-rw-r--r--server/lib/runners/job-handlers/runner-job-handlers.ts18
-rw-r--r--server/lib/runners/job-handlers/shared/index.ts1
-rw-r--r--server/lib/runners/job-handlers/shared/vod-helpers.ts44
-rw-r--r--server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts97
-rw-r--r--server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts114
-rw-r--r--server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts84
-rw-r--r--server/lib/runners/runner-urls.ts9
-rw-r--r--server/lib/runners/runner.ts36
13 files changed, 924 insertions, 0 deletions
diff --git a/server/lib/runners/index.ts b/server/lib/runners/index.ts
new file mode 100644
index 000000000..a737c7b59
--- /dev/null
+++ b/server/lib/runners/index.ts
@@ -0,0 +1,3 @@
1export * from './job-handlers'
2export * from './runner'
3export * from './runner-urls'
diff --git a/server/lib/runners/job-handlers/abstract-job-handler.ts b/server/lib/runners/job-handlers/abstract-job-handler.ts
new file mode 100644
index 000000000..73fc14574
--- /dev/null
+++ b/server/lib/runners/job-handlers/abstract-job-handler.ts
@@ -0,0 +1,271 @@
1import { retryTransactionWrapper } from '@server/helpers/database-utils'
2import { logger, loggerTagsFactory } from '@server/helpers/logger'
3import { RUNNER_JOBS } from '@server/initializers/constants'
4import { sequelizeTypescript } from '@server/initializers/database'
5import { PeerTubeSocket } from '@server/lib/peertube-socket'
6import { RunnerJobModel } from '@server/models/runner/runner-job'
7import { setAsUpdated } from '@server/models/shared'
8import { MRunnerJob } from '@server/types/models/runners'
9import { pick } from '@shared/core-utils'
10import {
11 RunnerJobLiveRTMPHLSTranscodingPayload,
12 RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
13 RunnerJobState,
14 RunnerJobSuccessPayload,
15 RunnerJobType,
16 RunnerJobUpdatePayload,
17 RunnerJobVODAudioMergeTranscodingPayload,
18 RunnerJobVODAudioMergeTranscodingPrivatePayload,
19 RunnerJobVODHLSTranscodingPayload,
20 RunnerJobVODHLSTranscodingPrivatePayload,
21 RunnerJobVODWebVideoTranscodingPayload,
22 RunnerJobVODWebVideoTranscodingPrivatePayload
23} from '@shared/models'
24
25type CreateRunnerJobArg =
26 {
27 type: Extract<RunnerJobType, 'vod-web-video-transcoding'>
28 payload: RunnerJobVODWebVideoTranscodingPayload
29 privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload
30 } |
31 {
32 type: Extract<RunnerJobType, 'vod-hls-transcoding'>
33 payload: RunnerJobVODHLSTranscodingPayload
34 privatePayload: RunnerJobVODHLSTranscodingPrivatePayload
35 } |
36 {
37 type: Extract<RunnerJobType, 'vod-audio-merge-transcoding'>
38 payload: RunnerJobVODAudioMergeTranscodingPayload
39 privatePayload: RunnerJobVODAudioMergeTranscodingPrivatePayload
40 } |
41 {
42 type: Extract<RunnerJobType, 'live-rtmp-hls-transcoding'>
43 payload: RunnerJobLiveRTMPHLSTranscodingPayload
44 privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload
45 }
46
47export abstract class AbstractJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> {
48
49 protected readonly lTags = loggerTagsFactory('runner')
50
51 // ---------------------------------------------------------------------------
52
53 abstract create (options: C): Promise<MRunnerJob>
54
55 protected async createRunnerJob (options: CreateRunnerJobArg & {
56 jobUUID: string
57 priority: number
58 dependsOnRunnerJob?: MRunnerJob
59 }): Promise<MRunnerJob> {
60 const { priority, dependsOnRunnerJob } = options
61
62 const runnerJob = new RunnerJobModel({
63 ...pick(options, [ 'type', 'payload', 'privatePayload' ]),
64
65 uuid: options.jobUUID,
66
67 state: dependsOnRunnerJob
68 ? RunnerJobState.WAITING_FOR_PARENT_JOB
69 : RunnerJobState.PENDING,
70
71 dependsOnRunnerJobId: dependsOnRunnerJob?.id,
72
73 priority
74 })
75
76 const job = await sequelizeTypescript.transaction(async transaction => {
77 return runnerJob.save({ transaction })
78 })
79
80 if (runnerJob.state === RunnerJobState.PENDING) {
81 PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
82 }
83
84 return job
85 }
86
87 // ---------------------------------------------------------------------------
88
89 protected abstract specificUpdate (options: {
90 runnerJob: MRunnerJob
91 updatePayload?: U
92 }): Promise<void> | void
93
94 async update (options: {
95 runnerJob: MRunnerJob
96 progress?: number
97 updatePayload?: U
98 }) {
99 const { runnerJob, progress } = options
100
101 await this.specificUpdate(options)
102
103 if (progress) runnerJob.progress = progress
104
105 await retryTransactionWrapper(() => {
106 return sequelizeTypescript.transaction(async transaction => {
107 if (runnerJob.changed()) {
108 return runnerJob.save({ transaction })
109 }
110
111 // Don't update the job too often
112 if (new Date().getTime() - runnerJob.updatedAt.getTime() > 2000) {
113 await setAsUpdated({ sequelize: sequelizeTypescript, table: 'runnerJob', id: runnerJob.id, transaction })
114 }
115 })
116 })
117 }
118
119 // ---------------------------------------------------------------------------
120
121 async complete (options: {
122 runnerJob: MRunnerJob
123 resultPayload: S
124 }) {
125 const { runnerJob } = options
126
127 try {
128 await this.specificComplete(options)
129
130 runnerJob.state = RunnerJobState.COMPLETED
131 } catch (err) {
132 logger.error('Cannot complete runner job', { err, ...this.lTags(runnerJob.id, runnerJob.type) })
133
134 runnerJob.state = RunnerJobState.ERRORED
135 runnerJob.error = err.message
136 }
137
138 runnerJob.progress = null
139 runnerJob.finishedAt = new Date()
140
141 await retryTransactionWrapper(() => {
142 return sequelizeTypescript.transaction(async transaction => {
143 await runnerJob.save({ transaction })
144 })
145 })
146
147 const [ affectedCount ] = await RunnerJobModel.updateDependantJobsOf(runnerJob)
148
149 if (affectedCount !== 0) PeerTubeSocket.Instance.sendAvailableJobsPingToRunners()
150 }
151
152 protected abstract specificComplete (options: {
153 runnerJob: MRunnerJob
154 resultPayload: S
155 }): Promise<void> | void
156
157 // ---------------------------------------------------------------------------
158
159 async cancel (options: {
160 runnerJob: MRunnerJob
161 fromParent?: boolean
162 }) {
163 const { runnerJob, fromParent } = options
164
165 await this.specificCancel(options)
166
167 const cancelState = fromParent
168 ? RunnerJobState.PARENT_CANCELLED
169 : RunnerJobState.CANCELLED
170
171 runnerJob.setToErrorOrCancel(cancelState)
172
173 await retryTransactionWrapper(() => {
174 return sequelizeTypescript.transaction(async transaction => {
175 await runnerJob.save({ transaction })
176 })
177 })
178
179 const children = await RunnerJobModel.listChildrenOf(runnerJob)
180 for (const child of children) {
181 logger.info(`Cancelling child job ${child.uuid} of ${runnerJob.uuid} because of parent cancel`, this.lTags(child.uuid))
182
183 await this.cancel({ runnerJob: child, fromParent: true })
184 }
185 }
186
187 protected abstract specificCancel (options: {
188 runnerJob: MRunnerJob
189 }): Promise<void> | void
190
191 // ---------------------------------------------------------------------------
192
193 protected abstract isAbortSupported (): boolean
194
195 async abort (options: {
196 runnerJob: MRunnerJob
197 }) {
198 const { runnerJob } = options
199
200 if (this.isAbortSupported() !== true) {
201 return this.error({ runnerJob, message: 'Job has been aborted but it is not supported by this job type' })
202 }
203
204 await this.specificAbort(options)
205
206 runnerJob.resetToPending()
207
208 await retryTransactionWrapper(() => {
209 return sequelizeTypescript.transaction(async transaction => {
210 await runnerJob.save({ transaction })
211 })
212 })
213 }
214
215 protected setAbortState (runnerJob: MRunnerJob) {
216 runnerJob.resetToPending()
217 }
218
219 protected abstract specificAbort (options: {
220 runnerJob: MRunnerJob
221 }): Promise<void> | void
222
223 // ---------------------------------------------------------------------------
224
225 async error (options: {
226 runnerJob: MRunnerJob
227 message: string
228 fromParent?: boolean
229 }) {
230 const { runnerJob, message, fromParent } = options
231
232 const errorState = fromParent
233 ? RunnerJobState.PARENT_ERRORED
234 : RunnerJobState.ERRORED
235
236 const nextState = errorState === RunnerJobState.ERRORED && this.isAbortSupported() && runnerJob.failures < RUNNER_JOBS.MAX_FAILURES
237 ? RunnerJobState.PENDING
238 : errorState
239
240 await this.specificError({ ...options, nextState })
241
242 if (nextState === errorState) {
243 runnerJob.setToErrorOrCancel(nextState)
244 runnerJob.error = message
245 } else {
246 runnerJob.resetToPending()
247 }
248
249 await retryTransactionWrapper(() => {
250 return sequelizeTypescript.transaction(async transaction => {
251 await runnerJob.save({ transaction })
252 })
253 })
254
255 if (runnerJob.state === errorState) {
256 const children = await RunnerJobModel.listChildrenOf(runnerJob)
257
258 for (const child of children) {
259 logger.info(`Erroring child job ${child.uuid} of ${runnerJob.uuid} because of parent error`, this.lTags(child.uuid))
260
261 await this.error({ runnerJob: child, message: 'Parent error', fromParent: true })
262 }
263 }
264 }
265
266 protected abstract specificError (options: {
267 runnerJob: MRunnerJob
268 message: string
269 nextState: RunnerJobState
270 }): Promise<void> | void
271}
diff --git a/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts
new file mode 100644
index 000000000..517645848
--- /dev/null
+++ b/server/lib/runners/job-handlers/abstract-vod-transcoding-job-handler.ts
@@ -0,0 +1,71 @@
1
2import { retryTransactionWrapper } from '@server/helpers/database-utils'
3import { logger } from '@server/helpers/logger'
4import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state'
5import { VideoJobInfoModel } from '@server/models/video/video-job-info'
6import { MRunnerJob } from '@server/types/models/runners'
7import {
8 LiveRTMPHLSTranscodingUpdatePayload,
9 RunnerJobSuccessPayload,
10 RunnerJobUpdatePayload,
11 RunnerJobVODPrivatePayload
12} from '@shared/models'
13import { AbstractJobHandler } from './abstract-job-handler'
14import { loadTranscodingRunnerVideo } from './shared'
15
16// eslint-disable-next-line max-len
17export abstract class AbstractVODTranscodingJobHandler <C, U extends RunnerJobUpdatePayload, S extends RunnerJobSuccessPayload> extends AbstractJobHandler<C, U, S> {
18
19 // ---------------------------------------------------------------------------
20
21 protected isAbortSupported () {
22 return true
23 }
24
25 protected specificUpdate (_options: {
26 runnerJob: MRunnerJob
27 updatePayload?: LiveRTMPHLSTranscodingUpdatePayload
28 }) {
29 // empty
30 }
31
32 protected specificAbort (_options: {
33 runnerJob: MRunnerJob
34 }) {
35 // empty
36 }
37
38 protected async specificError (options: {
39 runnerJob: MRunnerJob
40 }) {
41 const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags)
42 if (!video) return
43
44 await moveToFailedTranscodingState(video)
45
46 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
47 }
48
49 protected async specificCancel (options: {
50 runnerJob: MRunnerJob
51 }) {
52 const { runnerJob } = options
53
54 const video = await loadTranscodingRunnerVideo(options.runnerJob, this.lTags)
55 if (!video) return
56
57 const pending = await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
58
59 logger.debug(`Pending transcode decreased to ${pending} after cancel`, this.lTags(video.uuid))
60
61 if (pending === 0) {
62 logger.info(
63 `All transcoding jobs of ${video.uuid} have been processed or canceled, moving it to its next state`,
64 this.lTags(video.uuid)
65 )
66
67 const privatePayload = runnerJob.privatePayload as RunnerJobVODPrivatePayload
68 await retryTransactionWrapper(moveToNextState, { video, isNewVideo: privatePayload.isNewVideo })
69 }
70 }
71}
diff --git a/server/lib/runners/job-handlers/index.ts b/server/lib/runners/job-handlers/index.ts
new file mode 100644
index 000000000..0fca72b9a
--- /dev/null
+++ b/server/lib/runners/job-handlers/index.ts
@@ -0,0 +1,6 @@
1export * from './abstract-job-handler'
2export * from './live-rtmp-hls-transcoding-job-handler'
3export * from './vod-audio-merge-transcoding-job-handler'
4export * from './vod-hls-transcoding-job-handler'
5export * from './vod-web-video-transcoding-job-handler'
6export * from './runner-job-handlers'
diff --git a/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
new file mode 100644
index 000000000..c3d0e427d
--- /dev/null
+++ b/server/lib/runners/job-handlers/live-rtmp-hls-transcoding-job-handler.ts
@@ -0,0 +1,170 @@
1import { move, remove } from 'fs-extra'
2import { join } from 'path'
3import { logger } from '@server/helpers/logger'
4import { JOB_PRIORITY } from '@server/initializers/constants'
5import { LiveManager } from '@server/lib/live'
6import { MStreamingPlaylist, MVideo } from '@server/types/models'
7import { MRunnerJob } from '@server/types/models/runners'
8import { buildUUID } from '@shared/extra-utils'
9import {
10 LiveRTMPHLSTranscodingSuccess,
11 LiveRTMPHLSTranscodingUpdatePayload,
12 LiveVideoError,
13 RunnerJobLiveRTMPHLSTranscodingPayload,
14 RunnerJobLiveRTMPHLSTranscodingPrivatePayload,
15 RunnerJobState
16} from '@shared/models'
17import { AbstractJobHandler } from './abstract-job-handler'
18
19type CreateOptions = {
20 video: MVideo
21 playlist: MStreamingPlaylist
22
23 rtmpUrl: string
24
25 toTranscode: {
26 resolution: number
27 fps: number
28 }[]
29
30 segmentListSize: number
31 segmentDuration: number
32
33 outputDirectory: string
34}
35
36// eslint-disable-next-line max-len
37export class LiveRTMPHLSTranscodingJobHandler extends AbstractJobHandler<CreateOptions, LiveRTMPHLSTranscodingUpdatePayload, LiveRTMPHLSTranscodingSuccess> {
38
39 async create (options: CreateOptions) {
40 const { video, rtmpUrl, toTranscode, playlist, segmentDuration, segmentListSize, outputDirectory } = options
41
42 const jobUUID = buildUUID()
43 const payload: RunnerJobLiveRTMPHLSTranscodingPayload = {
44 input: {
45 rtmpUrl
46 },
47 output: {
48 toTranscode,
49 segmentListSize,
50 segmentDuration
51 }
52 }
53
54 const privatePayload: RunnerJobLiveRTMPHLSTranscodingPrivatePayload = {
55 videoUUID: video.uuid,
56 masterPlaylistName: playlist.playlistFilename,
57 outputDirectory
58 }
59
60 const job = await this.createRunnerJob({
61 type: 'live-rtmp-hls-transcoding',
62 jobUUID,
63 payload,
64 privatePayload,
65 priority: JOB_PRIORITY.TRANSCODING
66 })
67
68 return job
69 }
70
71 // ---------------------------------------------------------------------------
72
73 async specificUpdate (options: {
74 runnerJob: MRunnerJob
75 updatePayload: LiveRTMPHLSTranscodingUpdatePayload
76 }) {
77 const { runnerJob, updatePayload } = options
78
79 const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload
80 const outputDirectory = privatePayload.outputDirectory
81 const videoUUID = privatePayload.videoUUID
82
83 if (updatePayload.type === 'add-chunk') {
84 await move(
85 updatePayload.videoChunkFile as string,
86 join(outputDirectory, updatePayload.videoChunkFilename),
87 { overwrite: true }
88 )
89 } else if (updatePayload.type === 'remove-chunk') {
90 await remove(join(outputDirectory, updatePayload.videoChunkFilename))
91 }
92
93 if (updatePayload.resolutionPlaylistFile && updatePayload.resolutionPlaylistFilename) {
94 await move(
95 updatePayload.resolutionPlaylistFile as string,
96 join(outputDirectory, updatePayload.resolutionPlaylistFilename),
97 { overwrite: true }
98 )
99 }
100
101 if (updatePayload.masterPlaylistFile) {
102 await move(updatePayload.masterPlaylistFile as string, join(outputDirectory, privatePayload.masterPlaylistName), { overwrite: true })
103 }
104
105 logger.info(
106 'Runner live RTMP to HLS job %s for %s updated.',
107 runnerJob.uuid, videoUUID, { updatePayload, ...this.lTags(videoUUID, runnerJob.uuid) }
108 )
109 }
110
111 // ---------------------------------------------------------------------------
112
113 protected specificComplete (options: {
114 runnerJob: MRunnerJob
115 }) {
116 return this.stopLive({
117 runnerJob: options.runnerJob,
118 type: 'ended'
119 })
120 }
121
122 // ---------------------------------------------------------------------------
123
124 protected isAbortSupported () {
125 return false
126 }
127
128 protected specificAbort () {
129 throw new Error('Not implemented')
130 }
131
132 protected specificError (options: {
133 runnerJob: MRunnerJob
134 nextState: RunnerJobState
135 }) {
136 return this.stopLive({
137 runnerJob: options.runnerJob,
138 type: 'errored'
139 })
140 }
141
142 protected specificCancel (options: {
143 runnerJob: MRunnerJob
144 }) {
145 return this.stopLive({
146 runnerJob: options.runnerJob,
147 type: 'cancelled'
148 })
149 }
150
151 private stopLive (options: {
152 runnerJob: MRunnerJob
153 type: 'ended' | 'errored' | 'cancelled'
154 }) {
155 const { runnerJob, type } = options
156
157 const privatePayload = runnerJob.privatePayload as RunnerJobLiveRTMPHLSTranscodingPrivatePayload
158 const videoUUID = privatePayload.videoUUID
159
160 const errorType = {
161 ended: null,
162 errored: LiveVideoError.RUNNER_JOB_ERROR,
163 cancelled: LiveVideoError.RUNNER_JOB_CANCEL
164 }
165
166 LiveManager.Instance.stopSessionOf(privatePayload.videoUUID, errorType[type])
167
168 logger.info('Runner live RTMP to HLS job %s for video %s %s.', runnerJob.uuid, videoUUID, type, this.lTags(runnerJob.uuid, videoUUID))
169 }
170}
diff --git a/server/lib/runners/job-handlers/runner-job-handlers.ts b/server/lib/runners/job-handlers/runner-job-handlers.ts
new file mode 100644
index 000000000..7bad1bc77
--- /dev/null
+++ b/server/lib/runners/job-handlers/runner-job-handlers.ts
@@ -0,0 +1,18 @@
1import { MRunnerJob } from '@server/types/models/runners'
2import { RunnerJobSuccessPayload, RunnerJobType, RunnerJobUpdatePayload } from '@shared/models'
3import { AbstractJobHandler } from './abstract-job-handler'
4import { LiveRTMPHLSTranscodingJobHandler } from './live-rtmp-hls-transcoding-job-handler'
5import { VODAudioMergeTranscodingJobHandler } from './vod-audio-merge-transcoding-job-handler'
6import { VODHLSTranscodingJobHandler } from './vod-hls-transcoding-job-handler'
7import { VODWebVideoTranscodingJobHandler } from './vod-web-video-transcoding-job-handler'
8
9const processors: Record<RunnerJobType, new() => AbstractJobHandler<unknown, RunnerJobUpdatePayload, RunnerJobSuccessPayload>> = {
10 'vod-web-video-transcoding': VODWebVideoTranscodingJobHandler,
11 'vod-hls-transcoding': VODHLSTranscodingJobHandler,
12 'vod-audio-merge-transcoding': VODAudioMergeTranscodingJobHandler,
13 'live-rtmp-hls-transcoding': LiveRTMPHLSTranscodingJobHandler
14}
15
16export function getRunnerJobHandlerClass (job: MRunnerJob) {
17 return processors[job.type]
18}
diff --git a/server/lib/runners/job-handlers/shared/index.ts b/server/lib/runners/job-handlers/shared/index.ts
new file mode 100644
index 000000000..348273ae2
--- /dev/null
+++ b/server/lib/runners/job-handlers/shared/index.ts
@@ -0,0 +1 @@
export * from './vod-helpers'
diff --git a/server/lib/runners/job-handlers/shared/vod-helpers.ts b/server/lib/runners/job-handlers/shared/vod-helpers.ts
new file mode 100644
index 000000000..93ae89ff8
--- /dev/null
+++ b/server/lib/runners/job-handlers/shared/vod-helpers.ts
@@ -0,0 +1,44 @@
1import { move } from 'fs-extra'
2import { dirname, join } from 'path'
3import { logger, LoggerTagsFn } from '@server/helpers/logger'
4import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding'
5import { onWebTorrentVideoFileTranscoding } from '@server/lib/transcoding/web-transcoding'
6import { buildNewFile } from '@server/lib/video-file'
7import { VideoModel } from '@server/models/video/video'
8import { MVideoFullLight } from '@server/types/models'
9import { MRunnerJob } from '@server/types/models/runners'
10import { RunnerJobVODAudioMergeTranscodingPrivatePayload, RunnerJobVODWebVideoTranscodingPrivatePayload } from '@shared/models'
11
12export async function onVODWebVideoOrAudioMergeTranscodingJob (options: {
13 video: MVideoFullLight
14 videoFilePath: string
15 privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload | RunnerJobVODAudioMergeTranscodingPrivatePayload
16}) {
17 const { video, videoFilePath, privatePayload } = options
18
19 const videoFile = await buildNewFile({ path: videoFilePath, mode: 'web-video' })
20 videoFile.videoId = video.id
21
22 const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename)
23 await move(videoFilePath, newVideoFilePath)
24
25 await onWebTorrentVideoFileTranscoding({
26 video,
27 videoFile,
28 videoOutputPath: newVideoFilePath
29 })
30
31 await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video })
32}
33
34export async function loadTranscodingRunnerVideo (runnerJob: MRunnerJob, lTags: LoggerTagsFn) {
35 const videoUUID = runnerJob.privatePayload.videoUUID
36
37 const video = await VideoModel.loadFull(videoUUID)
38 if (!video) {
39 logger.info('Video %s does not exist anymore after transcoding runner job.', videoUUID, lTags(videoUUID))
40 return undefined
41 }
42
43 return video
44}
diff --git a/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts
new file mode 100644
index 000000000..a7b33f87e
--- /dev/null
+++ b/server/lib/runners/job-handlers/vod-audio-merge-transcoding-job-handler.ts
@@ -0,0 +1,97 @@
1import { pick } from 'lodash'
2import { logger } from '@server/helpers/logger'
3import { VideoJobInfoModel } from '@server/models/video/video-job-info'
4import { MVideo } from '@server/types/models'
5import { MRunnerJob } from '@server/types/models/runners'
6import { buildUUID } from '@shared/extra-utils'
7import { getVideoStreamDuration } from '@shared/ffmpeg'
8import {
9 RunnerJobUpdatePayload,
10 RunnerJobVODAudioMergeTranscodingPayload,
11 RunnerJobVODWebVideoTranscodingPrivatePayload,
12 VODAudioMergeTranscodingSuccess
13} from '@shared/models'
14import { generateRunnerTranscodingVideoInputFileUrl, generateRunnerTranscodingVideoPreviewFileUrl } from '../runner-urls'
15import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler'
16import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared'
17
18type CreateOptions = {
19 video: MVideo
20 isNewVideo: boolean
21 resolution: number
22 fps: number
23 priority: number
24 dependsOnRunnerJob?: MRunnerJob
25}
26
27// eslint-disable-next-line max-len
28export class VODAudioMergeTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODAudioMergeTranscodingSuccess> {
29
30 async create (options: CreateOptions) {
31 const { video, resolution, fps, priority, dependsOnRunnerJob } = options
32
33 const jobUUID = buildUUID()
34 const payload: RunnerJobVODAudioMergeTranscodingPayload = {
35 input: {
36 audioFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid),
37 previewFileUrl: generateRunnerTranscodingVideoPreviewFileUrl(jobUUID, video.uuid)
38 },
39 output: {
40 resolution,
41 fps
42 }
43 }
44
45 const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = {
46 ...pick(options, [ 'isNewVideo' ]),
47
48 videoUUID: video.uuid
49 }
50
51 const job = await this.createRunnerJob({
52 type: 'vod-audio-merge-transcoding',
53 jobUUID,
54 payload,
55 privatePayload,
56 priority,
57 dependsOnRunnerJob
58 })
59
60 await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode')
61
62 return job
63 }
64
65 // ---------------------------------------------------------------------------
66
67 async specificComplete (options: {
68 runnerJob: MRunnerJob
69 resultPayload: VODAudioMergeTranscodingSuccess
70 }) {
71 const { runnerJob, resultPayload } = options
72 const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload
73
74 const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags)
75 if (!video) return
76
77 const videoFilePath = resultPayload.videoFile as string
78
79 // ffmpeg generated a new video file, so update the video duration
80 // See https://trac.ffmpeg.org/ticket/5456
81 video.duration = await getVideoStreamDuration(videoFilePath)
82 await video.save()
83
84 // We can remove the old audio file
85 const oldAudioFile = video.VideoFiles[0]
86 await video.removeWebTorrentFile(oldAudioFile)
87 await oldAudioFile.destroy()
88 video.VideoFiles = []
89
90 await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload })
91
92 logger.info(
93 'Runner VOD audio merge transcoding job %s for %s ended.',
94 runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid)
95 )
96 }
97}
diff --git a/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts
new file mode 100644
index 000000000..02566b9d5
--- /dev/null
+++ b/server/lib/runners/job-handlers/vod-hls-transcoding-job-handler.ts
@@ -0,0 +1,114 @@
1import { move } from 'fs-extra'
2import { dirname, join } from 'path'
3import { logger } from '@server/helpers/logger'
4import { renameVideoFileInPlaylist } from '@server/lib/hls'
5import { getHlsResolutionPlaylistFilename } from '@server/lib/paths'
6import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding'
7import { onHLSVideoFileTranscoding } from '@server/lib/transcoding/hls-transcoding'
8import { buildNewFile, removeAllWebTorrentFiles } from '@server/lib/video-file'
9import { VideoJobInfoModel } from '@server/models/video/video-job-info'
10import { MVideo } from '@server/types/models'
11import { MRunnerJob } from '@server/types/models/runners'
12import { pick } from '@shared/core-utils'
13import { buildUUID } from '@shared/extra-utils'
14import {
15 RunnerJobUpdatePayload,
16 RunnerJobVODHLSTranscodingPayload,
17 RunnerJobVODHLSTranscodingPrivatePayload,
18 VODHLSTranscodingSuccess
19} from '@shared/models'
20import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls'
21import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler'
22import { loadTranscodingRunnerVideo } from './shared'
23
24type CreateOptions = {
25 video: MVideo
26 isNewVideo: boolean
27 deleteWebVideoFiles: boolean
28 resolution: number
29 fps: number
30 priority: number
31 dependsOnRunnerJob?: MRunnerJob
32}
33
34// eslint-disable-next-line max-len
35export class VODHLSTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODHLSTranscodingSuccess> {
36
37 async create (options: CreateOptions) {
38 const { video, resolution, fps, dependsOnRunnerJob, priority } = options
39
40 const jobUUID = buildUUID()
41
42 const payload: RunnerJobVODHLSTranscodingPayload = {
43 input: {
44 videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid)
45 },
46 output: {
47 resolution,
48 fps
49 }
50 }
51
52 const privatePayload: RunnerJobVODHLSTranscodingPrivatePayload = {
53 ...pick(options, [ 'isNewVideo', 'deleteWebVideoFiles' ]),
54
55 videoUUID: video.uuid
56 }
57
58 const job = await this.createRunnerJob({
59 type: 'vod-hls-transcoding',
60 jobUUID,
61 payload,
62 privatePayload,
63 priority,
64 dependsOnRunnerJob
65 })
66
67 await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode')
68
69 return job
70 }
71
72 // ---------------------------------------------------------------------------
73
74 async specificComplete (options: {
75 runnerJob: MRunnerJob
76 resultPayload: VODHLSTranscodingSuccess
77 }) {
78 const { runnerJob, resultPayload } = options
79 const privatePayload = runnerJob.privatePayload as RunnerJobVODHLSTranscodingPrivatePayload
80
81 const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags)
82 if (!video) return
83
84 const videoFilePath = resultPayload.videoFile as string
85 const resolutionPlaylistFilePath = resultPayload.resolutionPlaylistFile as string
86
87 const videoFile = await buildNewFile({ path: videoFilePath, mode: 'hls' })
88 const newVideoFilePath = join(dirname(videoFilePath), videoFile.filename)
89 await move(videoFilePath, newVideoFilePath)
90
91 const resolutionPlaylistFilename = getHlsResolutionPlaylistFilename(videoFile.filename)
92 const newResolutionPlaylistFilePath = join(dirname(resolutionPlaylistFilePath), resolutionPlaylistFilename)
93 await move(resolutionPlaylistFilePath, newResolutionPlaylistFilePath)
94
95 await renameVideoFileInPlaylist(newResolutionPlaylistFilePath, videoFile.filename)
96
97 await onHLSVideoFileTranscoding({
98 video,
99 videoFile,
100 m3u8OutputPath: newResolutionPlaylistFilePath,
101 videoOutputPath: newVideoFilePath
102 })
103
104 await onTranscodingEnded({ isNewVideo: privatePayload.isNewVideo, moveVideoToNextState: true, video })
105
106 if (privatePayload.deleteWebVideoFiles === true) {
107 logger.info('Removing web video files of %s now we have a HLS version of it.', video.uuid, this.lTags(video.uuid))
108
109 await removeAllWebTorrentFiles(video)
110 }
111
112 logger.info('Runner VOD HLS job %s for %s ended.', runnerJob.uuid, video.uuid, this.lTags(runnerJob.uuid, video.uuid))
113 }
114}
diff --git a/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts
new file mode 100644
index 000000000..57761a7a1
--- /dev/null
+++ b/server/lib/runners/job-handlers/vod-web-video-transcoding-job-handler.ts
@@ -0,0 +1,84 @@
1import { pick } from 'lodash'
2import { logger } from '@server/helpers/logger'
3import { VideoJobInfoModel } from '@server/models/video/video-job-info'
4import { MVideo } from '@server/types/models'
5import { MRunnerJob } from '@server/types/models/runners'
6import { buildUUID } from '@shared/extra-utils'
7import {
8 RunnerJobUpdatePayload,
9 RunnerJobVODWebVideoTranscodingPayload,
10 RunnerJobVODWebVideoTranscodingPrivatePayload,
11 VODWebVideoTranscodingSuccess
12} from '@shared/models'
13import { generateRunnerTranscodingVideoInputFileUrl } from '../runner-urls'
14import { AbstractVODTranscodingJobHandler } from './abstract-vod-transcoding-job-handler'
15import { loadTranscodingRunnerVideo, onVODWebVideoOrAudioMergeTranscodingJob } from './shared'
16
17type CreateOptions = {
18 video: MVideo
19 isNewVideo: boolean
20 resolution: number
21 fps: number
22 priority: number
23 dependsOnRunnerJob?: MRunnerJob
24}
25
26// eslint-disable-next-line max-len
27export class VODWebVideoTranscodingJobHandler extends AbstractVODTranscodingJobHandler<CreateOptions, RunnerJobUpdatePayload, VODWebVideoTranscodingSuccess> {
28
29 async create (options: CreateOptions) {
30 const { video, resolution, fps, priority, dependsOnRunnerJob } = options
31
32 const jobUUID = buildUUID()
33 const payload: RunnerJobVODWebVideoTranscodingPayload = {
34 input: {
35 videoFileUrl: generateRunnerTranscodingVideoInputFileUrl(jobUUID, video.uuid)
36 },
37 output: {
38 resolution,
39 fps
40 }
41 }
42
43 const privatePayload: RunnerJobVODWebVideoTranscodingPrivatePayload = {
44 ...pick(options, [ 'isNewVideo' ]),
45
46 videoUUID: video.uuid
47 }
48
49 const job = await this.createRunnerJob({
50 type: 'vod-web-video-transcoding',
51 jobUUID,
52 payload,
53 privatePayload,
54 dependsOnRunnerJob,
55 priority
56 })
57
58 await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingTranscode')
59
60 return job
61 }
62
63 // ---------------------------------------------------------------------------
64
65 async specificComplete (options: {
66 runnerJob: MRunnerJob
67 resultPayload: VODWebVideoTranscodingSuccess
68 }) {
69 const { runnerJob, resultPayload } = options
70 const privatePayload = runnerJob.privatePayload as RunnerJobVODWebVideoTranscodingPrivatePayload
71
72 const video = await loadTranscodingRunnerVideo(runnerJob, this.lTags)
73 if (!video) return
74
75 const videoFilePath = resultPayload.videoFile as string
76
77 await onVODWebVideoOrAudioMergeTranscodingJob({ video, videoFilePath, privatePayload })
78
79 logger.info(
80 'Runner VOD web video transcoding job %s for %s ended.',
81 runnerJob.uuid, video.uuid, this.lTags(video.uuid, runnerJob.uuid)
82 )
83 }
84}
diff --git a/server/lib/runners/runner-urls.ts b/server/lib/runners/runner-urls.ts
new file mode 100644
index 000000000..329fb1170
--- /dev/null
+++ b/server/lib/runners/runner-urls.ts
@@ -0,0 +1,9 @@
1import { WEBSERVER } from '@server/initializers/constants'
2
3export function generateRunnerTranscodingVideoInputFileUrl (jobUUID: string, videoUUID: string) {
4 return WEBSERVER.URL + '/api/v1/runners/jobs/' + jobUUID + '/files/videos/' + videoUUID + '/max-quality'
5}
6
7export function generateRunnerTranscodingVideoPreviewFileUrl (jobUUID: string, videoUUID: string) {
8 return WEBSERVER.URL + '/api/v1/runners/jobs/' + jobUUID + '/files/videos/' + videoUUID + '/previews/max-quality'
9}
diff --git a/server/lib/runners/runner.ts b/server/lib/runners/runner.ts
new file mode 100644
index 000000000..74c814ba1
--- /dev/null
+++ b/server/lib/runners/runner.ts
@@ -0,0 +1,36 @@
1import express from 'express'
2import { retryTransactionWrapper } from '@server/helpers/database-utils'
3import { logger, loggerTagsFactory } from '@server/helpers/logger'
4import { sequelizeTypescript } from '@server/initializers/database'
5import { MRunner } from '@server/types/models/runners'
6
7const lTags = loggerTagsFactory('runner')
8
9const updatingRunner = new Set<number>()
10
11function updateLastRunnerContact (req: express.Request, runner: MRunner) {
12 const now = new Date()
13
14 // Don't update last runner contact too often
15 if (now.getTime() - runner.lastContact.getTime() < 2000) return
16 if (updatingRunner.has(runner.id)) return
17
18 updatingRunner.add(runner.id)
19
20 runner.lastContact = now
21 runner.ip = req.ip
22
23 logger.debug('Updating last runner contact for %s', runner.name, lTags(runner.name))
24
25 retryTransactionWrapper(() => {
26 return sequelizeTypescript.transaction(async transaction => {
27 return runner.save({ transaction })
28 })
29 })
30 .catch(err => logger.error('Cannot update last runner contact for %s', runner.name, { err, ...lTags(runner.name) }))
31 .finally(() => updatingRunner.delete(runner.id))
32}
33
34export {
35 updateLastRunnerContact
36}