aboutsummaryrefslogtreecommitdiffhomepage
path: root/packages/peertube-runner/server
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-06-22 15:25:39 +0200
committerChocobozzz <me@florianbigard.com>2023-06-29 10:19:55 +0200
commitd68b88bac463e202ebd9575ee1b2a8da693639b9 (patch)
tree449bc66f3c33e2b386f89d59fa713d61c1bd03b1 /packages/peertube-runner/server
parentbc3918b2aed033e1c7617c0610e2e363c9e605db (diff)
downloadPeerTube-d68b88bac463e202ebd9575ee1b2a8da693639b9.tar.gz
PeerTube-d68b88bac463e202ebd9575ee1b2a8da693639b9.tar.zst
PeerTube-d68b88bac463e202ebd9575ee1b2a8da693639b9.zip
Prevent stalled jobs
Diffstat (limited to 'packages/peertube-runner/server')
-rw-r--r--packages/peertube-runner/server/process/shared/common.ts51
-rw-r--r--packages/peertube-runner/server/process/shared/process-studio.ts35
-rw-r--r--packages/peertube-runner/server/process/shared/process-vod.ts110
3 files changed, 125 insertions, 71 deletions
diff --git a/packages/peertube-runner/server/process/shared/common.ts b/packages/peertube-runner/server/process/shared/common.ts
index dbeb9dfc1..a9b37bbc4 100644
--- a/packages/peertube-runner/server/process/shared/common.ts
+++ b/packages/peertube-runner/server/process/shared/common.ts
@@ -35,49 +35,48 @@ export async function downloadInputFile (options: {
35 return destination 35 return destination
36} 36}
37 37
38export async function updateTranscodingProgress (options: { 38export function scheduleTranscodingProgress (options: {
39 server: PeerTubeServer 39 server: PeerTubeServer
40 runnerToken: string 40 runnerToken: string
41 job: JobWithToken 41 job: JobWithToken
42 progress: number 42 progressGetter: () => number
43}) { 43}) {
44 const { server, job, runnerToken, progress } = options 44 const { job, server, progressGetter, runnerToken } = options
45
46 return server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress })
47}
48
49// ---------------------------------------------------------------------------
50
51export function buildFFmpegVOD (options: {
52 server: PeerTubeServer
53 runnerToken: string
54 job: JobWithToken
55}) {
56 const { server, job, runnerToken } = options
57 45
58 const updateInterval = ConfigManager.Instance.isTestInstance() 46 const updateInterval = ConfigManager.Instance.isTestInstance()
59 ? 500 47 ? 500
60 : 60000 48 : 60000
61 49
62 let progress: number 50 const update = () => {
51 server.runnerJobs.update({ jobToken: job.jobToken, jobUUID: job.uuid, runnerToken, progress: progressGetter() })
52 .catch(err => logger.error({ err }, 'Cannot send job progress'))
53 }
63 54
64 const interval = setInterval(() => { 55 const interval = setInterval(() => {
65 updateTranscodingProgress({ server, job, runnerToken, progress }) 56 update()
66 .catch(err => logger.error({ err }, 'Cannot send job progress'))
67 }, updateInterval) 57 }, updateInterval)
68 58
59 update()
60
61 return interval
62}
63
64// ---------------------------------------------------------------------------
65
66export function buildFFmpegVOD (options: {
67 onJobProgress: (progress: number) => void
68}) {
69 const { onJobProgress } = options
70
69 return new FFmpegVOD({ 71 return new FFmpegVOD({
70 ...getCommonFFmpegOptions(), 72 ...getCommonFFmpegOptions(),
71 73
72 onError: () => clearInterval(interval),
73 onEnd: () => clearInterval(interval),
74
75 updateJobProgress: arg => { 74 updateJobProgress: arg => {
76 if (arg < 0 || arg > 100) { 75 const progress = arg < 0 || arg > 100
77 progress = undefined 76 ? undefined
78 } else { 77 : arg
79 progress = arg 78
80 } 79 onJobProgress(progress)
81 } 80 }
82 }) 81 })
83} 82}
diff --git a/packages/peertube-runner/server/process/shared/process-studio.ts b/packages/peertube-runner/server/process/shared/process-studio.ts
index ce014495e..afd9347fe 100644
--- a/packages/peertube-runner/server/process/shared/process-studio.ts
+++ b/packages/peertube-runner/server/process/shared/process-studio.ts
@@ -5,30 +5,42 @@ import { join } from 'path'
5import { buildUUID } from '@shared/extra-utils' 5import { buildUUID } from '@shared/extra-utils'
6import { 6import {
7 RunnerJobStudioTranscodingPayload, 7 RunnerJobStudioTranscodingPayload,
8 VideoStudioTranscodingSuccess,
9 VideoStudioTask, 8 VideoStudioTask,
10 VideoStudioTaskCutPayload, 9 VideoStudioTaskCutPayload,
11 VideoStudioTaskIntroPayload, 10 VideoStudioTaskIntroPayload,
12 VideoStudioTaskOutroPayload, 11 VideoStudioTaskOutroPayload,
13 VideoStudioTaskPayload, 12 VideoStudioTaskPayload,
14 VideoStudioTaskWatermarkPayload 13 VideoStudioTaskWatermarkPayload,
14 VideoStudioTranscodingSuccess
15} from '@shared/models' 15} from '@shared/models'
16import { ConfigManager } from '../../../shared/config-manager' 16import { ConfigManager } from '../../../shared/config-manager'
17import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions } from './common' 17import { buildFFmpegEdition, downloadInputFile, JobWithToken, ProcessOptions, scheduleTranscodingProgress } from './common'
18 18
19export async function processStudioTranscoding (options: ProcessOptions<RunnerJobStudioTranscodingPayload>) { 19export async function processStudioTranscoding (options: ProcessOptions<RunnerJobStudioTranscodingPayload>) {
20 const { server, job, runnerToken } = options 20 const { server, job, runnerToken } = options
21 const payload = job.payload 21 const payload = job.payload
22 22
23 logger.info(`Downloading input file ${payload.input.videoFileUrl} for job ${job.jobToken}`) 23 let inputPath: string
24
25 let outputPath: string 24 let outputPath: string
26 const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) 25 let tmpInputFilePath: string
27 let tmpInputFilePath = inputPath 26
27 let tasksProgress = 0
28 28
29 logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`) 29 const updateProgressInterval = scheduleTranscodingProgress({
30 job,
31 server,
32 runnerToken,
33 progressGetter: () => tasksProgress
34 })
30 35
31 try { 36 try {
37 logger.info(`Downloading input file ${payload.input.videoFileUrl} for job ${job.jobToken}`)
38
39 inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
40 tmpInputFilePath = inputPath
41
42 logger.info(`Input file ${payload.input.videoFileUrl} downloaded for job ${job.jobToken}. Running studio transcoding tasks.`)
43
32 for (const task of payload.tasks) { 44 for (const task of payload.tasks) {
33 const outputFilename = 'output-edition-' + buildUUID() + '.mp4' 45 const outputFilename = 'output-edition-' + buildUUID() + '.mp4'
34 outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename) 46 outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), outputFilename)
@@ -45,6 +57,8 @@ export async function processStudioTranscoding (options: ProcessOptions<RunnerJo
45 57
46 // For the next iteration 58 // For the next iteration
47 tmpInputFilePath = outputPath 59 tmpInputFilePath = outputPath
60
61 tasksProgress += Math.floor(100 / payload.tasks.length)
48 } 62 }
49 63
50 const successBody: VideoStudioTranscodingSuccess = { 64 const successBody: VideoStudioTranscodingSuccess = {
@@ -58,8 +72,9 @@ export async function processStudioTranscoding (options: ProcessOptions<RunnerJo
58 payload: successBody 72 payload: successBody
59 }) 73 })
60 } finally { 74 } finally {
61 await remove(tmpInputFilePath) 75 if (tmpInputFilePath) await remove(tmpInputFilePath)
62 await remove(outputPath) 76 if (outputPath) await remove(outputPath)
77 if (updateProgressInterval) clearInterval(updateProgressInterval)
63 } 78 }
64} 79}
65 80
diff --git a/packages/peertube-runner/server/process/shared/process-vod.ts b/packages/peertube-runner/server/process/shared/process-vod.ts
index 7c1119b50..f7c076b27 100644
--- a/packages/peertube-runner/server/process/shared/process-vod.ts
+++ b/packages/peertube-runner/server/process/shared/process-vod.ts
@@ -11,23 +11,36 @@ import {
11 VODWebVideoTranscodingSuccess 11 VODWebVideoTranscodingSuccess
12} from '@shared/models' 12} from '@shared/models'
13import { ConfigManager } from '../../../shared/config-manager' 13import { ConfigManager } from '../../../shared/config-manager'
14import { buildFFmpegVOD, downloadInputFile, ProcessOptions } from './common' 14import { buildFFmpegVOD, downloadInputFile, ProcessOptions, scheduleTranscodingProgress } from './common'
15 15
16export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) { 16export async function processWebVideoTranscoding (options: ProcessOptions<RunnerJobVODWebVideoTranscodingPayload>) {
17 const { server, job, runnerToken } = options 17 const { server, job, runnerToken } = options
18
18 const payload = job.payload 19 const payload = job.payload
19 20
20 logger.info(`Downloading input file ${payload.input.videoFileUrl} for web video transcoding job ${job.jobToken}`) 21 let ffmpegProgress: number
22 let inputPath: string
23
24 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`)
21 25
22 const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job }) 26 const updateProgressInterval = scheduleTranscodingProgress({
27 job,
28 server,
29 runnerToken,
30 progressGetter: () => ffmpegProgress
31 })
23 32
24 logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running web video transcoding.`) 33 try {
34 logger.info(`Downloading input file ${payload.input.videoFileUrl} for web video transcoding job ${job.jobToken}`)
25 35
26 const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) 36 inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
27 37
28 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) 38 logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running web video transcoding.`)
39
40 const ffmpegVod = buildFFmpegVOD({
41 onJobProgress: progress => { ffmpegProgress = progress }
42 })
29 43
30 try {
31 await ffmpegVod.transcode({ 44 await ffmpegVod.transcode({
32 type: 'video', 45 type: 'video',
33 46
@@ -52,8 +65,9 @@ export async function processWebVideoTranscoding (options: ProcessOptions<Runner
52 payload: successBody 65 payload: successBody
53 }) 66 })
54 } finally { 67 } finally {
55 await remove(inputPath) 68 if (inputPath) await remove(inputPath)
56 await remove(outputPath) 69 if (outputPath) await remove(outputPath)
70 if (updateProgressInterval) clearInterval(updateProgressInterval)
57 } 71 }
58} 72}
59 73
@@ -61,21 +75,32 @@ export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVO
61 const { server, job, runnerToken } = options 75 const { server, job, runnerToken } = options
62 const payload = job.payload 76 const payload = job.payload
63 77
64 logger.info(`Downloading input file ${payload.input.videoFileUrl} for HLS transcoding job ${job.jobToken}`) 78 let ffmpegProgress: number
65 79 let inputPath: string
66 const inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
67
68 logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running HLS transcoding.`)
69 80
70 const uuid = buildUUID() 81 const uuid = buildUUID()
71
72 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`) 82 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `${uuid}-${payload.output.resolution}.m3u8`)
73 const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4` 83 const videoFilename = `${uuid}-${payload.output.resolution}-fragmented.mp4`
74 const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename)) 84 const videoPath = join(join(ConfigManager.Instance.getTranscodingDirectory(), videoFilename))
75 85
76 const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) 86 const updateProgressInterval = scheduleTranscodingProgress({
87 job,
88 server,
89 runnerToken,
90 progressGetter: () => ffmpegProgress
91 })
77 92
78 try { 93 try {
94 logger.info(`Downloading input file ${payload.input.videoFileUrl} for HLS transcoding job ${job.jobToken}`)
95
96 inputPath = await downloadInputFile({ url: payload.input.videoFileUrl, runnerToken, job })
97
98 logger.info(`Downloaded input file ${payload.input.videoFileUrl} for job ${job.jobToken}. Running HLS transcoding.`)
99
100 const ffmpegVod = buildFFmpegVOD({
101 onJobProgress: progress => { ffmpegProgress = progress }
102 })
103
79 await ffmpegVod.transcode({ 104 await ffmpegVod.transcode({
80 type: 'hls', 105 type: 'hls',
81 copyCodecs: false, 106 copyCodecs: false,
@@ -101,9 +126,10 @@ export async function processHLSTranscoding (options: ProcessOptions<RunnerJobVO
101 payload: successBody 126 payload: successBody
102 }) 127 })
103 } finally { 128 } finally {
104 await remove(inputPath) 129 if (inputPath) await remove(inputPath)
105 await remove(outputPath) 130 if (outputPath) await remove(outputPath)
106 await remove(videoPath) 131 if (videoPath) await remove(videoPath)
132 if (updateProgressInterval) clearInterval(updateProgressInterval)
107 } 133 }
108} 134}
109 135
@@ -111,24 +137,37 @@ export async function processAudioMergeTranscoding (options: ProcessOptions<Runn
111 const { server, job, runnerToken } = options 137 const { server, job, runnerToken } = options
112 const payload = job.payload 138 const payload = job.payload
113 139
114 logger.info( 140 let ffmpegProgress: number
115 `Downloading input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` + 141 let audioPath: string
116 `for audio merge transcoding job ${job.jobToken}` 142 let inputPath: string
117 )
118
119 const audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job })
120 const inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job })
121
122 logger.info(
123 `Downloaded input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` +
124 `for job ${job.jobToken}. Running audio merge transcoding.`
125 )
126 143
127 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`) 144 const outputPath = join(ConfigManager.Instance.getTranscodingDirectory(), `output-${buildUUID()}.mp4`)
128 145
129 const ffmpegVod = buildFFmpegVOD({ job, server, runnerToken }) 146 const updateProgressInterval = scheduleTranscodingProgress({
147 job,
148 server,
149 runnerToken,
150 progressGetter: () => ffmpegProgress
151 })
130 152
131 try { 153 try {
154 logger.info(
155 `Downloading input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` +
156 `for audio merge transcoding job ${job.jobToken}`
157 )
158
159 audioPath = await downloadInputFile({ url: payload.input.audioFileUrl, runnerToken, job })
160 inputPath = await downloadInputFile({ url: payload.input.previewFileUrl, runnerToken, job })
161
162 logger.info(
163 `Downloaded input files ${payload.input.audioFileUrl} and ${payload.input.previewFileUrl} ` +
164 `for job ${job.jobToken}. Running audio merge transcoding.`
165 )
166
167 const ffmpegVod = buildFFmpegVOD({
168 onJobProgress: progress => { ffmpegProgress = progress }
169 })
170
132 await ffmpegVod.transcode({ 171 await ffmpegVod.transcode({
133 type: 'merge-audio', 172 type: 'merge-audio',
134 173
@@ -154,8 +193,9 @@ export async function processAudioMergeTranscoding (options: ProcessOptions<Runn
154 payload: successBody 193 payload: successBody
155 }) 194 })
156 } finally { 195 } finally {
157 await remove(audioPath) 196 if (audioPath) await remove(audioPath)
158 await remove(inputPath) 197 if (inputPath) await remove(inputPath)
159 await remove(outputPath) 198 if (outputPath) await remove(outputPath)
199 if (updateProgressInterval) clearInterval(updateProgressInterval)
160 } 200 }
161} 201}