aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-05-19 09:23:20 +0200
committerChocobozzz <me@florianbigard.com>2023-05-19 09:23:20 +0200
commitfe7019b2323768f7e33890303f95c9a45688ac1d (patch)
tree8ccd4aa71d36626d1bffb1e2a8537241e2002404
parentef2e6aabf755feeec96011e70ff2522a491c5cb3 (diff)
downloadPeerTube-fe7019b2323768f7e33890303f95c9a45688ac1d.tar.gz
PeerTube-fe7019b2323768f7e33890303f95c9a45688ac1d.tar.zst
PeerTube-fe7019b2323768f7e33890303f95c9a45688ac1d.zip
Fix peertube runner concurrency
-rw-r--r--packages/peertube-runner/server/server.ts28
-rw-r--r--server/controllers/api/runners/jobs.ts2
-rw-r--r--server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts4
-rw-r--r--server/tests/api/server/open-telemetry.ts6
-rw-r--r--server/tests/peertube-runner/vod-transcoding.ts2
-rw-r--r--shared/models/server/server-error-code.enum.ts1
6 files changed, 35 insertions, 8 deletions
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts
index 8eff4bd2f..cb1533dc6 100644
--- a/packages/peertube-runner/server/server.ts
+++ b/packages/peertube-runner/server/server.ts
@@ -170,18 +170,29 @@ export class RunnerServer {
170 private async checkAvailableJobs () { 170 private async checkAvailableJobs () {
171 if (this.checkingAvailableJobs) return 171 if (this.checkingAvailableJobs) return
172 172
173 logger.info('Checking available jobs')
174
175 this.checkingAvailableJobs = true 173 this.checkingAvailableJobs = true
176 174
175 let hadAvailableJob = false
176
177 for (const server of this.servers) { 177 for (const server of this.servers) {
178 try { 178 try {
179 logger.info('Checking available jobs on ' + server.url)
180
179 const job = await this.requestJob(server) 181 const job = await this.requestJob(server)
180 if (!job) continue 182 if (!job) continue
181 183
184 hadAvailableJob = true
185
182 await this.tryToExecuteJobAsync(server, job) 186 await this.tryToExecuteJobAsync(server, job)
183 } catch (err) { 187 } catch (err) {
184 if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { 188 const code = (err.res?.body as PeerTubeProblemDocument)?.code
189
190 if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) {
191 logger.debug({ err }, 'Runner job is not in processing state anymore, retry later')
192 return
193 }
194
195 if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) {
185 logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) 196 logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`)
186 197
187 await this.unregisterRunner({ url: server.url }) 198 await this.unregisterRunner({ url: server.url })
@@ -193,6 +204,11 @@ export class RunnerServer {
193 } 204 }
194 205
195 this.checkingAvailableJobs = false 206 this.checkingAvailableJobs = false
207
208 if (hadAvailableJob && this.canProcessMoreJobs()) {
209 this.checkAvailableJobs()
210 .catch(err => logger.error({ err }, 'Cannot check more available jobs'))
211 }
196 } 212 }
197 213
198 private async requestJob (server: PeerTubeServer) { 214 private async requestJob (server: PeerTubeServer) {
@@ -211,7 +227,7 @@ export class RunnerServer {
211 } 227 }
212 228
213 private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { 229 private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) {
214 if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return 230 if (!this.canProcessMoreJobs()) return
215 231
216 const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) 232 const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid })
217 233
@@ -242,6 +258,10 @@ export class RunnerServer {
242 return ConfigManager.Instance.setRegisteredInstances(data) 258 return ConfigManager.Instance.setRegisteredInstances(data)
243 } 259 }
244 260
261 private canProcessMoreJobs () {
262 return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency
263 }
264
245 // --------------------------------------------------------------------------- 265 // ---------------------------------------------------------------------------
246 266
247 private async cleanupTMP () { 267 private async cleanupTMP () {
diff --git a/server/controllers/api/runners/jobs.ts b/server/controllers/api/runners/jobs.ts
index bdeb0c6cd..140f062be 100644
--- a/server/controllers/api/runners/jobs.ts
+++ b/server/controllers/api/runners/jobs.ts
@@ -42,6 +42,7 @@ import {
42 RunnerJobType, 42 RunnerJobType,
43 RunnerJobUpdateBody, 43 RunnerJobUpdateBody,
44 RunnerJobUpdatePayload, 44 RunnerJobUpdatePayload,
45 ServerErrorCode,
45 UserRight, 46 UserRight,
46 VideoStudioTranscodingSuccess, 47 VideoStudioTranscodingSuccess,
47 VODAudioMergeTranscodingSuccess, 48 VODAudioMergeTranscodingSuccess,
@@ -168,6 +169,7 @@ async function acceptRunnerJob (req: express.Request, res: express.Response) {
168 169
169 if (runnerJob.state !== RunnerJobState.PENDING) { 170 if (runnerJob.state !== RunnerJobState.PENDING) {
170 res.fail({ 171 res.fail({
172 type: ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE,
171 message: 'This job is not in pending state anymore', 173 message: 'This job is not in pending state anymore',
172 status: HttpStatusCode.CONFLICT_409 174 status: HttpStatusCode.CONFLICT_409
173 }) 175 })
diff --git a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts
index 3c2cf51b7..5bad34860 100644
--- a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts
+++ b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts
@@ -68,6 +68,8 @@ export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper {
68 abort () { 68 abort () {
69 if (this.ended || this.errored || this.aborted) return 69 if (this.ended || this.errored || this.aborted) return
70 70
71 logger.debug('Killing ffmpeg after live abort of ' + this.videoUUID, this.lTags())
72
71 this.ffmpegCommand.kill('SIGINT') 73 this.ffmpegCommand.kill('SIGINT')
72 74
73 this.aborted = true 75 this.aborted = true
@@ -95,6 +97,8 @@ export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper {
95 private onFFmpegEnded () { 97 private onFFmpegEnded () {
96 if (this.ended || this.errored || this.aborted) return 98 if (this.ended || this.errored || this.aborted) return
97 99
100 logger.debug('Live ffmpeg transcoding ended for ' + this.videoUUID, this.lTags())
101
98 this.ended = true 102 this.ended = true
99 this.emit('end') 103 this.emit('end')
100 } 104 }
diff --git a/server/tests/api/server/open-telemetry.ts b/server/tests/api/server/open-telemetry.ts
index 49f3b520b..fd85fc514 100644
--- a/server/tests/api/server/open-telemetry.ts
+++ b/server/tests/api/server/open-telemetry.ts
@@ -31,7 +31,7 @@ describe('Open Telemetry', function () {
31 it('Should enable open telemetry metrics', async function () { 31 it('Should enable open telemetry metrics', async function () {
32 this.timeout(120000) 32 this.timeout(120000)
33 33
34 server = await createSingleServer(1, { 34 await server.run({
35 open_telemetry: { 35 open_telemetry: {
36 metrics: { 36 metrics: {
37 enabled: true 37 enabled: true
@@ -73,7 +73,7 @@ describe('Open Telemetry', function () {
73 it('Should disable http request duration metrics', async function () { 73 it('Should disable http request duration metrics', async function () {
74 await server.kill() 74 await server.kill()
75 75
76 server = await createSingleServer(1, { 76 await server.run({
77 open_telemetry: { 77 open_telemetry: {
78 metrics: { 78 metrics: {
79 enabled: true, 79 enabled: true,
@@ -114,7 +114,7 @@ describe('Open Telemetry', function () {
114 }) 114 })
115 115
116 it('Should enable open telemetry metrics', async function () { 116 it('Should enable open telemetry metrics', async function () {
117 server = await createSingleServer(1, { 117 await server.run({
118 open_telemetry: { 118 open_telemetry: {
119 tracing: { 119 tracing: {
120 enabled: true, 120 enabled: true,
diff --git a/server/tests/peertube-runner/vod-transcoding.ts b/server/tests/peertube-runner/vod-transcoding.ts
index 3c0918102..d7e2df095 100644
--- a/server/tests/peertube-runner/vod-transcoding.ts
+++ b/server/tests/peertube-runner/vod-transcoding.ts
@@ -189,7 +189,7 @@ describe('Test VOD transcoding in peertube-runner program', function () {
189 }) 189 })
190 190
191 it('Should transcode videos on manual run', async function () { 191 it('Should transcode videos on manual run', async function () {
192 this.timeout(120000) 192 this.timeout(240000)
193 193
194 await servers[0].config.disableTranscoding() 194 await servers[0].config.disableTranscoding()
195 195
diff --git a/shared/models/server/server-error-code.enum.ts b/shared/models/server/server-error-code.enum.ts
index 24d3c6d21..2b093380c 100644
--- a/shared/models/server/server-error-code.enum.ts
+++ b/shared/models/server/server-error-code.enum.ts
@@ -48,6 +48,7 @@ export const enum ServerErrorCode {
48 ACCOUNT_APPROVAL_REJECTED = 'account_approval_rejected', 48 ACCOUNT_APPROVAL_REJECTED = 'account_approval_rejected',
49 49
50 RUNNER_JOB_NOT_IN_PROCESSING_STATE = 'runner_job_not_in_processing_state', 50 RUNNER_JOB_NOT_IN_PROCESSING_STATE = 'runner_job_not_in_processing_state',
51 RUNNER_JOB_NOT_IN_PENDING_STATE = 'runner_job_not_in_pending_state',
51 UNKNOWN_RUNNER_TOKEN = 'unknown_runner_token' 52 UNKNOWN_RUNNER_TOKEN = 'unknown_runner_token'
52} 53}
53 54