diff options
author | Chocobozzz <me@florianbigard.com> | 2023-06-22 15:25:39 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2023-06-29 10:19:55 +0200 |
commit | d68b88bac463e202ebd9575ee1b2a8da693639b9 (patch) | |
tree | 449bc66f3c33e2b386f89d59fa713d61c1bd03b1 /packages/peertube-runner/server | |
parent | bc3918b2aed033e1c7617c0610e2e363c9e605db (diff) | |
download | PeerTube-d68b88bac463e202ebd9575ee1b2a8da693639b9.tar.gz PeerTube-d68b88bac463e202ebd9575ee1b2a8da693639b9.tar.zst PeerTube-d68b88bac463e202ebd9575ee1b2a8da693639b9.zip |
Prevent stalled jobs
Diffstat (limited to 'packages/peertube-runner/server')
3 files changed, 125 insertions, 71 deletions
diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts index dbeb9dfc1..a9b37bbc4 100644 --- a/packages/peertube-runner/server/process/shared/common.ts +++ b/packages/peertube-runner/server/process/shared/common.ts | |||
@@ -35,49 +35,48 @@ export async function downloadInputFile (options: { | |||
35 | return destination | 35 | return destination |
36 | } | 36 | } |
37 | 37 | ||
38 | export async function updateTranscodingProgress (options: { | 38 | export function scheduleTranscodingProgress (options: { |
39 | server: PeerTubeServer | 39 | server: PeerTubeServer |
40 | runnerToken: string | 40 | runnerToken: string |
41 | job: JobWithToken | 41 | job: JobWithToken |
42 | progress: number | 42 | progressGetter: () => number |
43 | }) { | 43 | }) { |
44 | const { server, job, runnerToken, progress } = options | 44 | const { job, server, progressGetter, runnerToken } = options |
45 | |||
46 | return server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress }) | ||
47 | } | ||
48 | |||
49 | // --------------------------------------------------------------------------- | ||
50 | |||
51 | export function buildFFmpegVOD (options: { | ||
52 | server: PeerTubeServer | ||
53 | runnerToken: string | ||
54 | job: JobWithToken | ||
55 | }) { | ||
56 | const { server, job, runnerToken } = options | ||
57 | 45 | ||
58 | const updateInterval = ConfigManager.Instance.isTestInstance() | 46 | const updateInterval = ConfigManager.Instance.isTestInstance() |
59 | ? 500 | 47 | ? 500 |
60 | : 60000 | 48 | : 60000 |
61 | 49 | ||
62 | let progress: number | 50 | const update = () => { |
51 | server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress: progressGetter() }) | ||
52 | .catch(err => logger.error({ err }, 'Cannot send job progress')) | ||
53 | } | ||
63 | 54 | ||
64 | const interval = setInterval(() => { | 55 | const interval = setInterval(() => { |
65 | updateTranscodingProgress({ server, job, runnerToken, progress }) | 56 | update() |
66 | .catch(err => logger.error({ err }, 'Cannot send job progress')) | ||
67 | }, updateInterval) | 57 | }, updateInterval) |
68 | 58 | ||
59 | update() | ||
60 | |||
61 | return interval | ||
62 | } | ||
63 | |||
64 | // --------------------------------------------------------------------------- | ||
65 | |||
66 | export function buildFFmpegVOD (options: { | ||
67 | onJobProgress: (progress: number) => void | ||
68 | }) { | ||
69 | const { onJobProgress } = options | ||
70 | |||
69 | return new FFmpegVOD({ | 71 | return new FFmpegVOD({ |
70 | ...getCommonFFmpegOptions(), | 72 | ...getCommonFFmpegOptions(), |
71 | 73 | ||
72 | onError: () => clearInterval(interval), | ||
73 | onEnd: () => clearInterval(interval), | ||
74 | |||
75 | updateJobProgress: arg => { | 74 | updateJobProgress: arg => { |
76 | if (arg < 0 || arg > 100) { | 75 | const progress = arg < 0 || arg > 100 |
77 | progress = undefined | 76 | ? undefined |
78 | } else { | 77 | : arg |
79 | progress = arg | 78 | |
80 | } | 79 | onJobProgress(progress) |
81 | } | 80 | } |
82 | }) | 81 | }) |
83 | } | 82 | } |
diff --git a/packages/peertube-runner/server/process/shared/process-studio.ts b/packages/peertube-runner/server/process/shared/process-studio.ts index ce014495e..afd9347fe 100644 --- a/packages/peertube-runner/server/process/shared/process-studio.ts +++ b/packages/peertube-runner/server/process/shared/process-studio.ts | |||
@@ -5,30 +5,42 @@ import { join } from 'path' | |||
5 | import { buildUUID } from '@shared/extra-utils' | 5 | import { buildUUID } from '@shared/extra-utils' |
6 | import { | 6 | import { |
7 | RunnerJobStudioTranscodingPayload, | 7 | RunnerJobStudioTranscodingPayload, |
8 | VideoStudioTranscodingSuccess, | ||
9 | VideoStudioTask, | 8 | VideoStudioTask, |
10 | VideoStudioTaskCutPayload, | 9 | VideoStudioTaskCutPayload, |
11 | VideoStudioTaskIntroPayload, | 10 | VideoStudioTaskIntroPayload, |
12 | VideoStudioTaskOutroPayload, | 11 | VideoStudioTaskOutroPayload, |
13 | VideoStudioTaskPayload, | 12 | VideoStudioTaskPayload, |
14 | VideoStudioTaskWatermarkPayload | 13 | VideoStudioTaskWatermarkPayload, |
14 | VideoStudioTranscodingSuccess | ||
15 | } from '@shared/models' | 15 | } from '@shared/models' |
16 | import { ConfigManager } from '../../../shared/config-manager' | 16 | import { ConfigManager } from '../../../shared/config-manager' |
17 | import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions } from './common' | 17 | import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions, scheduleTranscodingProgress } from './common' |
18 | 18 | ||
19 | export async function processStudioTranscoding (options: ProcessOptions<RunnerJobStudioTranscodingPayload>) { | 19 | export async function processStudioTranscoding (options: ProcessOptions<RunnerJobStudioTranscodingPayload>) { |
20 | const { server, job, runnerToken } = options | 20 | const { server, job, runnerToken } = options |
21 | const payload = job.payload | 21 | const payload = job.payload |
22 | 22 | ||
23 | logger.info(`Downloading input file ${payload.input.videoFileUrl} for job ${job.jobToken}`) | 23 | let inputPath: string |
24 | |||
25 | let outputPath: string | 24 | let outputPath: string |
26 | const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | 25 | let tmpInputFilePath: string |
27 | let tmpInputFilePath = inputPath | 26 | |
27 | let tasksProgress = 0 | ||
28 | 28 | ||
29 | logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`) | 29 | const updateProgressInterval = scheduleTranscodingProgress({ |
30 | job, | ||
31 | server, | ||
32 | runnerToken, | ||
33 | progressGetter: () => tasksProgress | ||
34 | }) | ||
30 | 35 | ||
31 | try { | 36 | try { |
37 | logger.info(`Downloading input file ${payload.input.videoFileUrl} for job ${job.jobToken}`) | ||
38 | |||
39 | inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
40 | tmpInputFilePath = inputPath | ||
41 | |||
42 | logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`) | ||
43 | |||
32 | for (const task of payload.tasks) { | 44 | for (const task of payload.tasks) { |
33 | const outputFilename = 'output-edition-' + buildUUID() + '.mp4' | 45 | const outputFilename = 'output-edition-' + buildUUID() + '.mp4' |
34 | outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename) | 46 | outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename) |
@@ -45,6 +57,8 @@ export async function processStudioTranscoding (options: ProcessOptions<RunnerJo | |||
45 | 57 | ||
46 | // For the next iteration | 58 | // For the next iteration |
47 | tmpInputFilePath = outputPath | 59 | tmpInputFilePath = outputPath |
60 | |||
61 | tasksProgress += Math.floor(100 / payload.tasks.length) | ||
48 | } | 62 | } |
49 | 63 | ||
50 | const successBody: VideoStudioTranscodingSuccess = { | 64 | const successBody: VideoStudioTranscodingSuccess = { |
@@ -58,8 +72,9 @@ export async function processStudioTranscoding (options: ProcessOptions<RunnerJo | |||
58 | payload: successBody | 72 | payload: successBody |
59 | }) | 73 | }) |
60 | } finally { | 74 | } finally { |
61 | await remove(tmpInputFilePath) | 75 | if (tmpInputFilePath) await remove(tmpInputFilePath) |
62 | await remove(outputPath) | 76 | if (outputPath) await remove(outputPath) |
77 | if (updateProgressInterval) clearInterval(updateProgressInterval) | ||
63 | } | 78 | } |
64 | } | 79 | } |
65 | 80 | ||
diff --git a/packages/peertube-runner/server/process/shared/process-vod.ts b/packages/peertube-runner/server/process/shared/process-vod.ts index 7c1119b50..f7c076b27 100644 --- a/packages/peertube-runner/server/process/shared/process-vod.ts +++ b/packages/peertube-runner/server/process/shared/process-vod.ts | |||
@@ -11,23 +11,36 @@ import { | |||
11 | VODWebVideoTranscodingSuccess | 11 | VODWebVideoTranscodingSuccess |
12 | } from '@shared/models' | 12 | } from '@shared/models' |
13 | import { ConfigManager } from '../../../shared/config-manager' | 13 | import { ConfigManager } from '../../../shared/config-manager' |
14 | import { buildFFmpegVOD, downloadInputFile, ProcessOptions } from './common' | 14 | import { buildFFmpegVOD, downloadInputFile, ProcessOptions, scheduleTranscodingProgress } from './common' |
15 | 15 | ||
16 | export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) { | 16 | export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) { |
17 | const { server, job, runnerToken } = options | 17 | const { server, job, runnerToken } = options |
18 | |||
18 | const payload = job.payload | 19 | const payload = job.payload |
19 | 20 | ||
20 | logger.info(`Downloading input file ${payload.input.videoFileUrl} for web video transcoding job ${job.jobToken}`) | 21 | let ffmpegProgress: number |
22 | let inputPath: string | ||
23 | |||
24 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) | ||
21 | 25 | ||
22 | const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | 26 | const updateProgressInterval = scheduleTranscodingProgress({ |
27 | job, | ||
28 | server, | ||
29 | runnerToken, | ||
30 | progressGetter: () => ffmpegProgress | ||
31 | }) | ||
23 | 32 | ||
24 | logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running web video transcoding.`) | 33 | try { |
34 | logger.info(`Downloading input file ${payload.input.videoFileUrl} for web video transcoding job ${job.jobToken}`) | ||
25 | 35 | ||
26 | const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) | 36 | inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) |
27 | 37 | ||
28 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) | 38 | logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running web video transcoding.`) |
39 | |||
40 | const ffmpegVod = buildFFmpegVOD({ | ||
41 | onJobProgress: progress => { ffmpegProgress = progress } | ||
42 | }) | ||
29 | 43 | ||
30 | try { | ||
31 | await ffmpegVod.transcode({ | 44 | await ffmpegVod.transcode({ |
32 | type: 'video', | 45 | type: 'video', |
33 | 46 | ||
@@ -52,8 +65,9 @@ export async function processWebVideoTranscoding (options: ProcessOptions<Runner | |||
52 | payload: successBody | 65 | payload: successBody |
53 | }) | 66 | }) |
54 | } finally { | 67 | } finally { |
55 | await remove(inputPath) | 68 | if (inputPath) await remove(inputPath) |
56 | await remove(outputPath) | 69 | if (outputPath) await remove(outputPath) |
70 | if (updateProgressInterval) clearInterval(updateProgressInterval) | ||
57 | } | 71 | } |
58 | } | 72 | } |
59 | 73 | ||
@@ -61,21 +75,32 @@ export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVO | |||
61 | const { server, job, runnerToken } = options | 75 | const { server, job, runnerToken } = options |
62 | const payload = job.payload | 76 | const payload = job.payload |
63 | 77 | ||
64 | logger.info(`Downloading input file ${payload.input.videoFileUrl} for HLS transcoding job ${job.jobToken}`) | 78 | let ffmpegProgress: number |
65 | 79 | let inputPath: string | |
66 | const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
67 | |||
68 | logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running HLS transcoding.`) | ||
69 | 80 | ||
70 | const uuid = buildUUID() | 81 | const uuid = buildUUID() |
71 | |||
72 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`) | 82 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`) |
73 | const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4` | 83 | const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4` |
74 | const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename)) | 84 | const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename)) |
75 | 85 | ||
76 | const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) | 86 | const updateProgressInterval = scheduleTranscodingProgress({ |
87 | job, | ||
88 | server, | ||
89 | runnerToken, | ||
90 | progressGetter: () => ffmpegProgress | ||
91 | }) | ||
77 | 92 | ||
78 | try { | 93 | try { |
94 | logger.info(`Downloading input file ${payload.input.videoFileUrl} for HLS transcoding job ${job.jobToken}`) | ||
95 | |||
96 | inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) | ||
97 | |||
98 | logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running HLS transcoding.`) | ||
99 | |||
100 | const ffmpegVod = buildFFmpegVOD({ | ||
101 | onJobProgress: progress => { ffmpegProgress = progress } | ||
102 | }) | ||
103 | |||
79 | await ffmpegVod.transcode({ | 104 | await ffmpegVod.transcode({ |
80 | type: 'hls', | 105 | type: 'hls', |
81 | copyCodecs: false, | 106 | copyCodecs: false, |
@@ -101,9 +126,10 @@ export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVO | |||
101 | payload: successBody | 126 | payload: successBody |
102 | }) | 127 | }) |
103 | } finally { | 128 | } finally { |
104 | await remove(inputPath) | 129 | if (inputPath) await remove(inputPath) |
105 | await remove(outputPath) | 130 | if (outputPath) await remove(outputPath) |
106 | await remove(videoPath) | 131 | if (videoPath) await remove(videoPath) |
132 | if (updateProgressInterval) clearInterval(updateProgressInterval) | ||
107 | } | 133 | } |
108 | } | 134 | } |
109 | 135 | ||
@@ -111,24 +137,37 @@ export async function processAudioMergeTranscoding (options: ProcessOptions<Runn | |||
111 | const { server, job, runnerToken } = options | 137 | const { server, job, runnerToken } = options |
112 | const payload = job.payload | 138 | const payload = job.payload |
113 | 139 | ||
114 | logger.info( | 140 | let ffmpegProgress: number |
115 | `Downloading input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` + | 141 | let audioPath: string |
116 | `for audio merge transcoding job ${job.jobToken}` | 142 | let inputPath: string |
117 | ) | ||
118 | |||
119 | const audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job }) | ||
120 | const inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job }) | ||
121 | |||
122 | logger.info( | ||
123 | `Downloaded input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` + | ||
124 | `for job ${job.jobToken}. Running audio merge transcoding.` | ||
125 | ) | ||
126 | 143 | ||
127 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) | 144 | const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) |
128 | 145 | ||
129 | const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) | 146 | const updateProgressInterval = scheduleTranscodingProgress({ |
147 | job, | ||
148 | server, | ||
149 | runnerToken, | ||
150 | progressGetter: () => ffmpegProgress | ||
151 | }) | ||
130 | 152 | ||
131 | try { | 153 | try { |
154 | logger.info( | ||
155 | `Downloading input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` + | ||
156 | `for audio merge transcoding job ${job.jobToken}` | ||
157 | ) | ||
158 | |||
159 | audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job }) | ||
160 | inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job }) | ||
161 | |||
162 | logger.info( | ||
163 | `Downloaded input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` + | ||
164 | `for job ${job.jobToken}. Running audio merge transcoding.` | ||
165 | ) | ||
166 | |||
167 | const ffmpegVod = buildFFmpegVOD({ | ||
168 | onJobProgress: progress => { ffmpegProgress = progress } | ||
169 | }) | ||
170 | |||
132 | await ffmpegVod.transcode({ | 171 | await ffmpegVod.transcode({ |
133 | type: 'merge-audio', | 172 | type: 'merge-audio', |
134 | 173 | ||
@@ -154,8 +193,9 @@ export async function processAudioMergeTranscoding (options: ProcessOptions<Runn | |||
154 | payload: successBody | 193 | payload: successBody |
155 | }) | 194 | }) |
156 | } finally { | 195 | } finally { |
157 | await remove(audioPath) | 196 | if (audioPath) await remove(audioPath) |
158 | await remove(inputPath) | 197 | if (inputPath) await remove(inputPath) |
159 | await remove(outputPath) | 198 | if (outputPath) await remove(outputPath) |
199 | if (updateProgressInterval) clearInterval(updateProgressInterval) | ||
160 | } | 200 | } |
161 | } | 201 | } |