diff options
author | Chocobozzz <me@florianbigard.com> | 2023-05-19 09:23:20 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2023-05-19 09:23:20 +0200 |
commit | fe7019b2323768f7e33890303f95c9a45688ac1d (patch) | |
tree | 8ccd4aa71d36626d1bffb1e2a8537241e2002404 | |
parent | ef2e6aabf755feeec96011e70ff2522a491c5cb3 (diff) | |
download | PeerTube-fe7019b2323768f7e33890303f95c9a45688ac1d.tar.gz PeerTube-fe7019b2323768f7e33890303f95c9a45688ac1d.tar.zst PeerTube-fe7019b2323768f7e33890303f95c9a45688ac1d.zip |
Fix peertube runner concurrency
-rw-r--r-- | packages/peertube-runner/server/server.ts | 28 | ||||
-rw-r--r-- | server/controllers/api/runners/jobs.ts | 2 | ||||
-rw-r--r-- | server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts | 4 | ||||
-rw-r--r-- | server/tests/api/server/open-telemetry.ts | 6 | ||||
-rw-r--r-- | server/tests/peertube-runner/vod-transcoding.ts | 2 | ||||
-rw-r--r-- | shared/models/server/server-error-code.enum.ts | 1 |
6 files changed, 35 insertions, 8 deletions
diff --git a/packages/peertube-runner/server/server.ts b/packages/peertube-runner/server/server.ts index 8eff4bd2f..cb1533dc6 100644 --- a/packages/peertube-runner/server/server.ts +++ b/packages/peertube-runner/server/server.ts | |||
@@ -170,18 +170,29 @@ export class RunnerServer { | |||
170 | private async checkAvailableJobs () { | 170 | private async checkAvailableJobs () { |
171 | if (this.checkingAvailableJobs) return | 171 | if (this.checkingAvailableJobs) return |
172 | 172 | ||
173 | logger.info('Checking available jobs') | ||
174 | |||
175 | this.checkingAvailableJobs = true | 173 | this.checkingAvailableJobs = true |
176 | 174 | ||
175 | let hadAvailableJob = false | ||
176 | |||
177 | for (const server of this.servers) { | 177 | for (const server of this.servers) { |
178 | try { | 178 | try { |
179 | logger.info('Checking available jobs on ' + server.url) | ||
180 | |||
179 | const job = await this.requestJob(server) | 181 | const job = await this.requestJob(server) |
180 | if (!job) continue | 182 | if (!job) continue |
181 | 183 | ||
184 | hadAvailableJob = true | ||
185 | |||
182 | await this.tryToExecuteJobAsync(server, job) | 186 | await this.tryToExecuteJobAsync(server, job) |
183 | } catch (err) { | 187 | } catch (err) { |
184 | if ((err.res?.body as PeerTubeProblemDocument)?.code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { | 188 | const code = (err.res?.body as PeerTubeProblemDocument)?.code |
189 | |||
190 | if (code === ServerErrorCode.RUNNER_JOB_NOT_IN_PROCESSING_STATE) { | ||
191 | logger.debug({ err }, 'Runner job is not in processing state anymore, retry later') | ||
192 | return | ||
193 | } | ||
194 | |||
195 | if (code === ServerErrorCode.UNKNOWN_RUNNER_TOKEN) { | ||
185 | logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) | 196 | logger.error({ err }, `Unregistering ${server.url} as the runner token ${server.runnerToken} is invalid`) |
186 | 197 | ||
187 | await this.unregisterRunner({ url: server.url }) | 198 | await this.unregisterRunner({ url: server.url }) |
@@ -193,6 +204,11 @@ export class RunnerServer { | |||
193 | } | 204 | } |
194 | 205 | ||
195 | this.checkingAvailableJobs = false | 206 | this.checkingAvailableJobs = false |
207 | |||
208 | if (hadAvailableJob && this.canProcessMoreJobs()) { | ||
209 | this.checkAvailableJobs() | ||
210 | .catch(err => logger.error({ err }, 'Cannot check more available jobs')) | ||
211 | } | ||
196 | } | 212 | } |
197 | 213 | ||
198 | private async requestJob (server: PeerTubeServer) { | 214 | private async requestJob (server: PeerTubeServer) { |
@@ -211,7 +227,7 @@ export class RunnerServer { | |||
211 | } | 227 | } |
212 | 228 | ||
213 | private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { | 229 | private async tryToExecuteJobAsync (server: PeerTubeServer, jobToAccept: { uuid: string }) { |
214 | if (this.processingJobs.length >= ConfigManager.Instance.getConfig().jobs.concurrency) return | 230 | if (!this.canProcessMoreJobs()) return |
215 | 231 | ||
216 | const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) | 232 | const { job } = await server.runnerJobs.accept({ runnerToken: server.runnerToken, jobUUID: jobToAccept.uuid }) |
217 | 233 | ||
@@ -242,6 +258,10 @@ export class RunnerServer { | |||
242 | return ConfigManager.Instance.setRegisteredInstances(data) | 258 | return ConfigManager.Instance.setRegisteredInstances(data) |
243 | } | 259 | } |
244 | 260 | ||
261 | private canProcessMoreJobs () { | ||
262 | return this.processingJobs.length < ConfigManager.Instance.getConfig().jobs.concurrency | ||
263 | } | ||
264 | |||
245 | // --------------------------------------------------------------------------- | 265 | // --------------------------------------------------------------------------- |
246 | 266 | ||
247 | private async cleanupTMP () { | 267 | private async cleanupTMP () { |
diff --git a/server/controllers/api/runners/jobs.ts b/server/controllers/api/runners/jobs.ts index bdeb0c6cd..140f062be 100644 --- a/server/controllers/api/runners/jobs.ts +++ b/server/controllers/api/runners/jobs.ts | |||
@@ -42,6 +42,7 @@ import { | |||
42 | RunnerJobType, | 42 | RunnerJobType, |
43 | RunnerJobUpdateBody, | 43 | RunnerJobUpdateBody, |
44 | RunnerJobUpdatePayload, | 44 | RunnerJobUpdatePayload, |
45 | ServerErrorCode, | ||
45 | UserRight, | 46 | UserRight, |
46 | VideoStudioTranscodingSuccess, | 47 | VideoStudioTranscodingSuccess, |
47 | VODAudioMergeTranscodingSuccess, | 48 | VODAudioMergeTranscodingSuccess, |
@@ -168,6 +169,7 @@ async function acceptRunnerJob (req: express.Request, res: express.Response) { | |||
168 | 169 | ||
169 | if (runnerJob.state !== RunnerJobState.PENDING) { | 170 | if (runnerJob.state !== RunnerJobState.PENDING) { |
170 | res.fail({ | 171 | res.fail({ |
172 | type: ServerErrorCode.RUNNER_JOB_NOT_IN_PENDING_STATE, | ||
171 | message: 'This job is not in pending state anymore', | 173 | message: 'This job is not in pending state anymore', |
172 | status: HttpStatusCode.CONFLICT_409 | 174 | status: HttpStatusCode.CONFLICT_409 |
173 | }) | 175 | }) |
diff --git a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts index 3c2cf51b7..5bad34860 100644 --- a/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts +++ b/server/lib/live/shared/transcoding-wrapper/ffmpeg-transcoding-wrapper.ts | |||
@@ -68,6 +68,8 @@ export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper { | |||
68 | abort () { | 68 | abort () { |
69 | if (this.ended || this.errored || this.aborted) return | 69 | if (this.ended || this.errored || this.aborted) return |
70 | 70 | ||
71 | logger.debug('Killing ffmpeg after live abort of ' + this.videoUUID, this.lTags()) | ||
72 | |||
71 | this.ffmpegCommand.kill('SIGINT') | 73 | this.ffmpegCommand.kill('SIGINT') |
72 | 74 | ||
73 | this.aborted = true | 75 | this.aborted = true |
@@ -95,6 +97,8 @@ export class FFmpegTranscodingWrapper extends AbstractTranscodingWrapper { | |||
95 | private onFFmpegEnded () { | 97 | private onFFmpegEnded () { |
96 | if (this.ended || this.errored || this.aborted) return | 98 | if (this.ended || this.errored || this.aborted) return |
97 | 99 | ||
100 | logger.debug('Live ffmpeg transcoding ended for ' + this.videoUUID, this.lTags()) | ||
101 | |||
98 | this.ended = true | 102 | this.ended = true |
99 | this.emit('end') | 103 | this.emit('end') |
100 | } | 104 | } |
diff --git a/server/tests/api/server/open-telemetry.ts b/server/tests/api/server/open-telemetry.ts index 49f3b520b..fd85fc514 100644 --- a/server/tests/api/server/open-telemetry.ts +++ b/server/tests/api/server/open-telemetry.ts | |||
@@ -31,7 +31,7 @@ describe('Open Telemetry', function () { | |||
31 | it('Should enable open telemetry metrics', async function () { | 31 | it('Should enable open telemetry metrics', async function () { |
32 | this.timeout(120000) | 32 | this.timeout(120000) |
33 | 33 | ||
34 | server = await createSingleServer(1, { | 34 | await server.run({ |
35 | open_telemetry: { | 35 | open_telemetry: { |
36 | metrics: { | 36 | metrics: { |
37 | enabled: true | 37 | enabled: true |
@@ -73,7 +73,7 @@ describe('Open Telemetry', function () { | |||
73 | it('Should disable http request duration metrics', async function () { | 73 | it('Should disable http request duration metrics', async function () { |
74 | await server.kill() | 74 | await server.kill() |
75 | 75 | ||
76 | server = await createSingleServer(1, { | 76 | await server.run({ |
77 | open_telemetry: { | 77 | open_telemetry: { |
78 | metrics: { | 78 | metrics: { |
79 | enabled: true, | 79 | enabled: true, |
@@ -114,7 +114,7 @@ describe('Open Telemetry', function () { | |||
114 | }) | 114 | }) |
115 | 115 | ||
116 | it('Should enable open telemetry metrics', async function () { | 116 | it('Should enable open telemetry metrics', async function () { |
117 | server = await createSingleServer(1, { | 117 | await server.run({ |
118 | open_telemetry: { | 118 | open_telemetry: { |
119 | tracing: { | 119 | tracing: { |
120 | enabled: true, | 120 | enabled: true, |
diff --git a/server/tests/peertube-runner/vod-transcoding.ts b/server/tests/peertube-runner/vod-transcoding.ts index 3c0918102..d7e2df095 100644 --- a/server/tests/peertube-runner/vod-transcoding.ts +++ b/server/tests/peertube-runner/vod-transcoding.ts | |||
@@ -189,7 +189,7 @@ describe('Test VOD transcoding in peertube-runner program', function () { | |||
189 | }) | 189 | }) |
190 | 190 | ||
191 | it('Should transcode videos on manual run', async function () { | 191 | it('Should transcode videos on manual run', async function () { |
192 | this.timeout(120000) | 192 | this.timeout(240000) |
193 | 193 | ||
194 | await servers[0].config.disableTranscoding() | 194 | await servers[0].config.disableTranscoding() |
195 | 195 | ||
diff --git a/shared/models/server/server-error-code.enum.ts b/shared/models/server/server-error-code.enum.ts index 24d3c6d21..2b093380c 100644 --- a/shared/models/server/server-error-code.enum.ts +++ b/shared/models/server/server-error-code.enum.ts | |||
@@ -48,6 +48,7 @@ export const enum ServerErrorCode { | |||
48 | ACCOUNT_APPROVAL_REJECTED = 'account_approval_rejected', | 48 | ACCOUNT_APPROVAL_REJECTED = 'account_approval_rejected', |
49 | 49 | ||
50 | RUNNER_JOB_NOT_IN_PROCESSING_STATE = 'runner_job_not_in_processing_state', | 50 | RUNNER_JOB_NOT_IN_PROCESSING_STATE = 'runner_job_not_in_processing_state', |
51 | RUNNER_JOB_NOT_IN_PENDING_STATE = 'runner_job_not_in_pending_state', | ||
51 | UNKNOWN_RUNNER_TOKEN = 'unknown_runner_token' | 52 | UNKNOWN_RUNNER_TOKEN = 'unknown_runner_token' |
52 | } | 53 | } |
53 | 54 | ||