diff options
author | Chocobozzz <me@florianbigard.com> | 2021-02-08 10:51:10 +0100 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2021-02-08 15:38:45 +0100 |
commit | 9129b7694d577322327ee79e9b9aa64deee92765 (patch) | |
tree | eb23b7a952048c3725f29109d38c36368976dec0 /server/lib/job-queue/job-queue.ts | |
parent | 81b46cbc3417c46263c210c61b51a84a457abaaa (diff) | |
download | PeerTube-9129b7694d577322327ee79e9b9aa64deee92765.tar.gz PeerTube-9129b7694d577322327ee79e9b9aa64deee92765.tar.zst PeerTube-9129b7694d577322327ee79e9b9aa64deee92765.zip |
Allow to specify transcoding and import jobs concurrency
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 12 |
1 files changed, 10 insertions, 2 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 38b1d6f1f..72fed6072 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,5 +1,6 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { jobStates } from '@server/helpers/custom-validators/jobs' | 2 | import { jobStates } from '@server/helpers/custom-validators/jobs' |
3 | import { CONFIG } from '@server/initializers/config' | ||
3 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 4 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
4 | import { | 5 | import { |
5 | ActivitypubFollowPayload, | 6 | ActivitypubFollowPayload, |
@@ -105,11 +106,11 @@ class JobQueue { | |||
105 | } | 106 | } |
106 | } | 107 | } |
107 | 108 | ||
108 | for (const handlerName of Object.keys(handlers)) { | 109 | for (const handlerName of (Object.keys(handlers) as JobType[])) { |
109 | const queue = new Bull(handlerName, queueOptions) | 110 | const queue = new Bull(handlerName, queueOptions) |
110 | const handler = handlers[handlerName] | 111 | const handler = handlers[handlerName] |
111 | 112 | ||
112 | queue.process(JOB_CONCURRENCY[handlerName], handler) | 113 | queue.process(this.getJobConcurrency(handlerName), handler) |
113 | .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) | 114 | .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) |
114 | 115 | ||
115 | queue.on('failed', (job, err) => { | 116 | queue.on('failed', (job, err) => { |
@@ -235,6 +236,13 @@ class JobQueue { | |||
235 | return jobTypes.filter(t => t === jobType) | 236 | return jobTypes.filter(t => t === jobType) |
236 | } | 237 | } |
237 | 238 | ||
239 | private getJobConcurrency (jobType: JobType) { | ||
240 | if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY | ||
241 | if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY | ||
242 | |||
243 | return JOB_CONCURRENCY[jobType] | ||
244 | } | ||
245 | |||
238 | static get Instance () { | 246 | static get Instance () { |
239 | return this.instance || (this.instance = new this()) | 247 | return this.instance || (this.instance = new this()) |
240 | } | 248 | } |