aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-09 11:35:07 +0200
committerChocobozzz <me@florianbigard.com>2022-08-09 11:39:31 +0200
commite2b2c726b1c1c31794d324c3afd7c24e1f953131 (patch)
tree344aa10d8659e5e701c7127d7390ff45244afb86
parentcfb5edbd9e666b93d9680a34df59096d23d51bd5 (diff)
downloadPeerTube-e2b2c726b1c1c31794d324c3afd7c24e1f953131.tar.gz
PeerTube-e2b2c726b1c1c31794d324c3afd7c24e1f953131.tar.zst
PeerTube-e2b2c726b1c1c31794d324c3afd7c24e1f953131.zip
Fix job queue tests
-rw-r--r--server/controllers/api/jobs.ts6
-rw-r--r--server/lib/job-queue/job-queue.ts47
-rw-r--r--server/tests/api/check-params/logs.ts2
-rw-r--r--server/tests/api/server/jobs.ts13
4 files changed, 45 insertions, 23 deletions
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts
index 6a53e3083..0a45c33ab 100644
--- a/server/controllers/api/jobs.ts
+++ b/server/controllers/api/jobs.ts
@@ -26,7 +26,7 @@ jobsRouter.post('/pause',
26jobsRouter.post('/resume', 26jobsRouter.post('/resume',
27 authenticate, 27 authenticate,
28 ensureUserHasRight(UserRight.MANAGE_JOBS), 28 ensureUserHasRight(UserRight.MANAGE_JOBS),
29 resumeJobQueue 29 asyncMiddleware(resumeJobQueue)
30) 30)
31 31
32jobsRouter.get('/:state?', 32jobsRouter.get('/:state?',
@@ -55,8 +55,8 @@ async function pauseJobQueue (req: express.Request, res: express.Response) {
55 return res.sendStatus(HttpStatusCode.NO_CONTENT_204) 55 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
56} 56}
57 57
58function resumeJobQueue (req: express.Request, res: express.Response) { 58async function resumeJobQueue (req: express.Request, res: express.Response) {
59 JobQueue.Instance.resume() 59 await JobQueue.Instance.resume()
60 60
61 return res.sendStatus(HttpStatusCode.NO_CONTENT_204) 61 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
62} 62}
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 386d20103..14e3a00aa 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -284,18 +284,22 @@ class JobQueue {
284 } 284 }
285 285
286 async pause () { 286 async pause () {
287 for (const handler of Object.keys(this.workers)) { 287 for (const handlerName of Object.keys(this.workers)) {
288 const worker: Worker = this.workers[handler] 288 const worker: Worker = this.workers[handlerName]
289 const queue: Queue = this.queues[handlerName]
289 290
290 await worker.pause() 291 await worker.pause()
292 await queue.pause()
291 } 293 }
292 } 294 }
293 295
294 resume () { 296 async resume () {
295 for (const handler of Object.keys(this.workers)) { 297 for (const handlerName of Object.keys(this.workers)) {
296 const worker: Worker = this.workers[handler] 298 const worker: Worker = this.workers[handlerName]
299 const queue: Queue = this.queues[handlerName]
297 300
298 worker.resume() 301 worker.resume()
302 await queue.resume()
299 } 303 }
300 } 304 }
301 305
@@ -373,10 +377,10 @@ class JobQueue {
373 }): Promise<Job[]> { 377 }): Promise<Job[]> {
374 const { state, start, count, asc, jobType } = options 378 const { state, start, count, asc, jobType } = options
375 379
376 const states = state ? [ state ] : jobStates 380 const states = this.buildStateFilter(state)
377 let results: Job[] = [] 381 const filteredJobTypes = this.buildTypeFilter(jobType)
378 382
379 const filteredJobTypes = this.filterJobTypes(jobType) 383 let results: Job[] = []
380 384
381 for (const jobType of filteredJobTypes) { 385 for (const jobType of filteredJobTypes) {
382 const queue: Queue = this.queues[jobType] 386 const queue: Queue = this.queues[jobType]
@@ -404,9 +408,9 @@ class JobQueue {
404 408
405 async count (state: JobState, jobType?: JobType): Promise<number> { 409 async count (state: JobState, jobType?: JobType): Promise<number> {
406 const states = state ? [ state ] : jobStates 410 const states = state ? [ state ] : jobStates
407 let total = 0 411 const filteredJobTypes = this.buildTypeFilter(jobType)
408 412
409 const filteredJobTypes = this.filterJobTypes(jobType) 413 let total = 0
410 414
411 for (const type of filteredJobTypes) { 415 for (const type of filteredJobTypes) {
412 const queue = this.queues[type] 416 const queue = this.queues[type]
@@ -425,6 +429,23 @@ class JobQueue {
425 return total 429 return total
426 } 430 }
427 431
432 private buildStateFilter (state?: JobState) {
433 if (!state) return jobStates
434
435 const states = [ state ]
436
437 // Include parent if filtering on waiting
438 if (state === 'waiting') states.push('waiting-children')
439
440 return states
441 }
442
443 private buildTypeFilter (jobType?: JobType) {
444 if (!jobType) return jobTypes
445
446 return jobTypes.filter(t => t === jobType)
447 }
448
428 async getStats () { 449 async getStats () {
429 const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() })) 450 const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
430 451
@@ -452,12 +473,6 @@ class JobQueue {
452 } 473 }
453 } 474 }
454 475
455 private filterJobTypes (jobType?: JobType) {
456 if (!jobType) return jobTypes
457
458 return jobTypes.filter(t => t === jobType)
459 }
460
461 private getJobConcurrency (jobType: JobType) { 476 private getJobConcurrency (jobType: JobType) {
462 if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY 477 if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
463 if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY 478 if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
diff --git a/server/tests/api/check-params/logs.ts b/server/tests/api/check-params/logs.ts
index fa67408b7..f9a99796d 100644
--- a/server/tests/api/check-params/logs.ts
+++ b/server/tests/api/check-params/logs.ts
@@ -122,7 +122,7 @@ describe('Test logs API validators', function () {
122 }) 122 })
123 123
124 it('Should fail with an invalid stackTrace', async function () { 124 it('Should fail with an invalid stackTrace', async function () {
125 await server.logs.createLogClient({ payload: { ...base, stackTrace: 's'.repeat(10000) }, expectedStatus }) 125 await server.logs.createLogClient({ payload: { ...base, stackTrace: 's'.repeat(20000) }, expectedStatus })
126 }) 126 })
127 127
128 it('Should fail with an invalid userAgent', async function () { 128 it('Should fail with an invalid userAgent', async function () {
diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts
index 96ab2a576..843e148a3 100644
--- a/server/tests/api/server/jobs.ts
+++ b/server/tests/api/server/jobs.ts
@@ -60,7 +60,6 @@ describe('Test jobs', function () {
60 if (job.type === 'videos-views-stats') job = body.data[1] 60 if (job.type === 'videos-views-stats') job = body.data[1]
61 61
62 expect(job.state).to.equal('completed') 62 expect(job.state).to.equal('completed')
63 expect(job.type.startsWith('activitypub-')).to.be.true
64 expect(dateIsValid(job.createdAt as string)).to.be.true 63 expect(dateIsValid(job.createdAt as string)).to.be.true
65 expect(dateIsValid(job.processedOn as string)).to.be.true 64 expect(dateIsValid(job.processedOn as string)).to.be.true
66 expect(dateIsValid(job.finishedOn as string)).to.be.true 65 expect(dateIsValid(job.finishedOn as string)).to.be.true
@@ -103,8 +102,16 @@ describe('Test jobs', function () {
103 102
104 await wait(5000) 103 await wait(5000)
105 104
106 const body = await servers[1].jobs.list({ state: 'waiting', jobType: 'video-transcoding' }) 105 {
107 expect(body.data).to.have.lengthOf(4) 106 const body = await servers[1].jobs.list({ state: 'waiting', jobType: 'video-transcoding' })
107 // waiting includes waiting-children
108 expect(body.data).to.have.lengthOf(4)
109 }
110
111 {
112 const body = await servers[1].jobs.list({ state: 'waiting-children', jobType: 'video-transcoding' })
113 expect(body.data).to.have.lengthOf(1)
114 }
108 }) 115 })
109 116
110 it('Should resume the job queue', async function () { 117 it('Should resume the job queue', async function () {