aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-09 11:35:07 +0200
committerChocobozzz <me@florianbigard.com>2022-08-09 11:39:31 +0200
commite2b2c726b1c1c31794d324c3afd7c24e1f953131 (patch)
tree344aa10d8659e5e701c7127d7390ff45244afb86 /server/lib/job-queue/job-queue.ts
parentcfb5edbd9e666b93d9680a34df59096d23d51bd5 (diff)
downloadPeerTube-e2b2c726b1c1c31794d324c3afd7c24e1f953131.tar.gz
PeerTube-e2b2c726b1c1c31794d324c3afd7c24e1f953131.tar.zst
PeerTube-e2b2c726b1c1c31794d324c3afd7c24e1f953131.zip
Fix job queue tests
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r--server/lib/job-queue/job-queue.ts47
1 files changed, 31 insertions, 16 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 386d20103..14e3a00aa 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -284,18 +284,22 @@ class JobQueue {
284 } 284 }
285 285
286 async pause () { 286 async pause () {
287 for (const handler of Object.keys(this.workers)) { 287 for (const handlerName of Object.keys(this.workers)) {
288 const worker: Worker = this.workers[handler] 288 const worker: Worker = this.workers[handlerName]
289 const queue: Queue = this.queues[handlerName]
289 290
290 await worker.pause() 291 await worker.pause()
292 await queue.pause()
291 } 293 }
292 } 294 }
293 295
294 resume () { 296 async resume () {
295 for (const handler of Object.keys(this.workers)) { 297 for (const handlerName of Object.keys(this.workers)) {
296 const worker: Worker = this.workers[handler] 298 const worker: Worker = this.workers[handlerName]
299 const queue: Queue = this.queues[handlerName]
297 300
298 worker.resume() 301 worker.resume()
302 await queue.resume()
299 } 303 }
300 } 304 }
301 305
@@ -373,10 +377,10 @@ class JobQueue {
373 }): Promise<Job[]> { 377 }): Promise<Job[]> {
374 const { state, start, count, asc, jobType } = options 378 const { state, start, count, asc, jobType } = options
375 379
376 const states = state ? [ state ] : jobStates 380 const states = this.buildStateFilter(state)
377 let results: Job[] = [] 381 const filteredJobTypes = this.buildTypeFilter(jobType)
378 382
379 const filteredJobTypes = this.filterJobTypes(jobType) 383 let results: Job[] = []
380 384
381 for (const jobType of filteredJobTypes) { 385 for (const jobType of filteredJobTypes) {
382 const queue: Queue = this.queues[jobType] 386 const queue: Queue = this.queues[jobType]
@@ -404,9 +408,9 @@ class JobQueue {
404 408
405 async count (state: JobState, jobType?: JobType): Promise<number> { 409 async count (state: JobState, jobType?: JobType): Promise<number> {
406 const states = state ? [ state ] : jobStates 410 const states = state ? [ state ] : jobStates
407 let total = 0 411 const filteredJobTypes = this.buildTypeFilter(jobType)
408 412
409 const filteredJobTypes = this.filterJobTypes(jobType) 413 let total = 0
410 414
411 for (const type of filteredJobTypes) { 415 for (const type of filteredJobTypes) {
412 const queue = this.queues[type] 416 const queue = this.queues[type]
@@ -425,6 +429,23 @@ class JobQueue {
425 return total 429 return total
426 } 430 }
427 431
432 private buildStateFilter (state?: JobState) {
433 if (!state) return jobStates
434
435 const states = [ state ]
436
437 // Include parent if filtering on waiting
438 if (state === 'waiting') states.push('waiting-children')
439
440 return states
441 }
442
443 private buildTypeFilter (jobType?: JobType) {
444 if (!jobType) return jobTypes
445
446 return jobTypes.filter(t => t === jobType)
447 }
448
428 async getStats () { 449 async getStats () {
429 const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() })) 450 const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
430 451
@@ -452,12 +473,6 @@ class JobQueue {
452 } 473 }
453 } 474 }
454 475
455 private filterJobTypes (jobType?: JobType) {
456 if (!jobType) return jobTypes
457
458 return jobTypes.filter(t => t === jobType)
459 }
460
461 private getJobConcurrency (jobType: JobType) { 476 private getJobConcurrency (jobType: JobType) {
462 if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY 477 if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
463 if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY 478 if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY