diff options
author | Chocobozzz <me@florianbigard.com> | 2022-08-09 11:35:07 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-08-09 11:39:31 +0200 |
commit | e2b2c726b1c1c31794d324c3afd7c24e1f953131 (patch) | |
tree | 344aa10d8659e5e701c7127d7390ff45244afb86 /server/lib | |
parent | cfb5edbd9e666b93d9680a34df59096d23d51bd5 (diff) | |
download | PeerTube-e2b2c726b1c1c31794d324c3afd7c24e1f953131.tar.gz PeerTube-e2b2c726b1c1c31794d324c3afd7c24e1f953131.tar.zst PeerTube-e2b2c726b1c1c31794d324c3afd7c24e1f953131.zip |
Fix job queue tests
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 47 |
1 files changed, 31 insertions, 16 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 386d20103..14e3a00aa 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -284,18 +284,22 @@ class JobQueue { | |||
284 | } | 284 | } |
285 | 285 | ||
286 | async pause () { | 286 | async pause () { |
287 | for (const handler of Object.keys(this.workers)) { | 287 | for (const handlerName of Object.keys(this.workers)) { |
288 | const worker: Worker = this.workers[handler] | 288 | const worker: Worker = this.workers[handlerName] |
289 | const queue: Queue = this.queues[handlerName] | ||
289 | 290 | ||
290 | await worker.pause() | 291 | await worker.pause() |
292 | await queue.pause() | ||
291 | } | 293 | } |
292 | } | 294 | } |
293 | 295 | ||
294 | resume () { | 296 | async resume () { |
295 | for (const handler of Object.keys(this.workers)) { | 297 | for (const handlerName of Object.keys(this.workers)) { |
296 | const worker: Worker = this.workers[handler] | 298 | const worker: Worker = this.workers[handlerName] |
299 | const queue: Queue = this.queues[handlerName] | ||
297 | 300 | ||
298 | worker.resume() | 301 | worker.resume() |
302 | await queue.resume() | ||
299 | } | 303 | } |
300 | } | 304 | } |
301 | 305 | ||
@@ -373,10 +377,10 @@ class JobQueue { | |||
373 | }): Promise<Job[]> { | 377 | }): Promise<Job[]> { |
374 | const { state, start, count, asc, jobType } = options | 378 | const { state, start, count, asc, jobType } = options |
375 | 379 | ||
376 | const states = state ? [ state ] : jobStates | 380 | const states = this.buildStateFilter(state) |
377 | let results: Job[] = [] | 381 | const filteredJobTypes = this.buildTypeFilter(jobType) |
378 | 382 | ||
379 | const filteredJobTypes = this.filterJobTypes(jobType) | 383 | let results: Job[] = [] |
380 | 384 | ||
381 | for (const jobType of filteredJobTypes) { | 385 | for (const jobType of filteredJobTypes) { |
382 | const queue: Queue = this.queues[jobType] | 386 | const queue: Queue = this.queues[jobType] |
@@ -404,9 +408,9 @@ class JobQueue { | |||
404 | 408 | ||
405 | async count (state: JobState, jobType?: JobType): Promise<number> { | 409 | async count (state: JobState, jobType?: JobType): Promise<number> { |
406 | const states = state ? [ state ] : jobStates | 410 | const states = state ? [ state ] : jobStates |
407 | let total = 0 | 411 | const filteredJobTypes = this.buildTypeFilter(jobType) |
408 | 412 | ||
409 | const filteredJobTypes = this.filterJobTypes(jobType) | 413 | let total = 0 |
410 | 414 | ||
411 | for (const type of filteredJobTypes) { | 415 | for (const type of filteredJobTypes) { |
412 | const queue = this.queues[type] | 416 | const queue = this.queues[type] |
@@ -425,6 +429,23 @@ class JobQueue { | |||
425 | return total | 429 | return total |
426 | } | 430 | } |
427 | 431 | ||
432 | private buildStateFilter (state?: JobState) { | ||
433 | if (!state) return jobStates | ||
434 | |||
435 | const states = [ state ] | ||
436 | |||
437 | // Include parent if filtering on waiting | ||
438 | if (state === 'waiting') states.push('waiting-children') | ||
439 | |||
440 | return states | ||
441 | } | ||
442 | |||
443 | private buildTypeFilter (jobType?: JobType) { | ||
444 | if (!jobType) return jobTypes | ||
445 | |||
446 | return jobTypes.filter(t => t === jobType) | ||
447 | } | ||
448 | |||
428 | async getStats () { | 449 | async getStats () { |
429 | const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() })) | 450 | const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() })) |
430 | 451 | ||
@@ -452,12 +473,6 @@ class JobQueue { | |||
452 | } | 473 | } |
453 | } | 474 | } |
454 | 475 | ||
455 | private filterJobTypes (jobType?: JobType) { | ||
456 | if (!jobType) return jobTypes | ||
457 | |||
458 | return jobTypes.filter(t => t === jobType) | ||
459 | } | ||
460 | |||
461 | private getJobConcurrency (jobType: JobType) { | 476 | private getJobConcurrency (jobType: JobType) { |
462 | if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY | 477 | if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY |
463 | if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY | 478 | if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY |