aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2021-02-08 10:51:10 +0100
committerChocobozzz <chocobozzz@cpy.re>2021-02-08 15:38:45 +0100
commit9129b7694d577322327ee79e9b9aa64deee92765 (patch)
treeeb23b7a952048c3725f29109d38c36368976dec0 /server/lib/job-queue/job-queue.ts
parent81b46cbc3417c46263c210c61b51a84a457abaaa (diff)
downloadPeerTube-9129b7694d577322327ee79e9b9aa64deee92765.tar.gz
PeerTube-9129b7694d577322327ee79e9b9aa64deee92765.tar.zst
PeerTube-9129b7694d577322327ee79e9b9aa64deee92765.zip
Allow to specify transcoding and import jobs concurrency
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r--server/lib/job-queue/job-queue.ts12
1 files changed, 10 insertions, 2 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 38b1d6f1f..72fed6072 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,5 +1,6 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { jobStates } from '@server/helpers/custom-validators/jobs' 2import { jobStates } from '@server/helpers/custom-validators/jobs'
3import { CONFIG } from '@server/initializers/config'
3import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 4import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
4import { 5import {
5 ActivitypubFollowPayload, 6 ActivitypubFollowPayload,
@@ -105,11 +106,11 @@ class JobQueue {
105 } 106 }
106 } 107 }
107 108
108 for (const handlerName of Object.keys(handlers)) { 109 for (const handlerName of (Object.keys(handlers) as JobType[])) {
109 const queue = new Bull(handlerName, queueOptions) 110 const queue = new Bull(handlerName, queueOptions)
110 const handler = handlers[handlerName] 111 const handler = handlers[handlerName]
111 112
112 queue.process(JOB_CONCURRENCY[handlerName], handler) 113 queue.process(this.getJobConcurrency(handlerName), handler)
113 .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) 114 .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err }))
114 115
115 queue.on('failed', (job, err) => { 116 queue.on('failed', (job, err) => {
@@ -235,6 +236,13 @@ class JobQueue {
235 return jobTypes.filter(t => t === jobType) 236 return jobTypes.filter(t => t === jobType)
236 } 237 }
237 238
239 private getJobConcurrency (jobType: JobType) {
240 if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
241 if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
242
243 return JOB_CONCURRENCY[jobType]
244 }
245
238 static get Instance () { 246 static get Instance () {
239 return this.instance || (this.instance = new this()) 247 return this.instance || (this.instance = new this())
240 } 248 }