aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2020-12-14 12:00:35 +0100
committerChocobozzz <me@florianbigard.com>2020-12-14 12:00:35 +0100
commit402145b8630d1908c35f8c22846ddc4475f25d3e (patch)
treeff50cb09c5f56cc408ef20a7c959ef7a0642b76b
parentc04816108e8ec62eb29caf82806f3927dc5eb85a (diff)
downloadPeerTube-402145b8630d1908c35f8c22846ddc4475f25d3e.tar.gz
PeerTube-402145b8630d1908c35f8c22846ddc4475f25d3e.tar.zst
PeerTube-402145b8630d1908c35f8c22846ddc4475f25d3e.zip
Refactor jobs state
-rw-r--r--server/controllers/api/jobs.ts40
-rw-r--r--server/helpers/custom-validators/jobs.ts2
-rw-r--r--server/lib/job-queue/job-queue.ts27
-rw-r--r--server/middlewares/validators/jobs.ts20
-rw-r--r--server/tests/api/server/jobs.ts13
-rw-r--r--shared/extra-utils/server/jobs.ts20
-rw-r--r--shared/models/server/job.model.ts2
7 files changed, 65 insertions, 59 deletions
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts
index 1131a44d6..e14ea2575 100644
--- a/server/controllers/api/jobs.ts
+++ b/server/controllers/api/jobs.ts
@@ -1,7 +1,8 @@
1import * as express from 'express' 1import * as express from 'express'
2import { ResultList } from '../../../shared' 2import { ResultList } from '../../../shared'
3import { Job, JobType, JobState } from '../../../shared/models' 3import { Job, JobState, JobType } from '../../../shared/models'
4import { UserRight } from '../../../shared/models/users' 4import { UserRight } from '../../../shared/models/users'
5import { isArray } from '../../helpers/custom-validators/misc'
5import { JobQueue } from '../../lib/job-queue' 6import { JobQueue } from '../../lib/job-queue'
6import { 7import {
7 asyncMiddleware, 8 asyncMiddleware,
@@ -12,13 +13,11 @@ import {
12 setDefaultSort 13 setDefaultSort
13} from '../../middlewares' 14} from '../../middlewares'
14import { paginationValidator } from '../../middlewares/validators' 15import { paginationValidator } from '../../middlewares/validators'
15import { listJobsStateValidator, listJobsValidator } from '../../middlewares/validators/jobs' 16import { listJobsValidator } from '../../middlewares/validators/jobs'
16import { isArray } from '../../helpers/custom-validators/misc'
17import { jobStates } from '@server/helpers/custom-validators/jobs'
18 17
19const jobsRouter = express.Router() 18const jobsRouter = express.Router()
20 19
21jobsRouter.get('/', 20jobsRouter.get('/:state?',
22 authenticate, 21 authenticate,
23 ensureUserHasRight(UserRight.MANAGE_JOBS), 22 ensureUserHasRight(UserRight.MANAGE_JOBS),
24 paginationValidator, 23 paginationValidator,
@@ -29,18 +28,6 @@ jobsRouter.get('/',
29 asyncMiddleware(listJobs) 28 asyncMiddleware(listJobs)
30) 29)
31 30
32jobsRouter.get('/:state',
33 authenticate,
34 ensureUserHasRight(UserRight.MANAGE_JOBS),
35 paginationValidator,
36 jobsSortValidator,
37 setDefaultSort,
38 setDefaultPagination,
39 listJobsValidator,
40 listJobsStateValidator,
41 asyncMiddleware(listJobs)
42)
43
44// --------------------------------------------------------------------------- 31// ---------------------------------------------------------------------------
45 32
46export { 33export {
@@ -50,7 +37,7 @@ export {
50// --------------------------------------------------------------------------- 37// ---------------------------------------------------------------------------
51 38
52async function listJobs (req: express.Request, res: express.Response) { 39async function listJobs (req: express.Request, res: express.Response) {
53 const state = req.params.state as JobState || jobStates 40 const state = req.params.state as JobState
54 const asc = req.query.sort === 'createdAt' 41 const asc = req.query.sort === 'createdAt'
55 const jobType = req.query.jobType 42 const jobType = req.query.jobType
56 43
@@ -65,17 +52,22 @@ async function listJobs (req: express.Request, res: express.Response) {
65 52
66 const result: ResultList<Job> = { 53 const result: ResultList<Job> = {
67 total, 54 total,
68 data: Array.isArray(state) 55 data: state
69 ? await Promise.all( 56 ? jobs.map(j => formatJob(j, state))
70 jobs.map(async j => formatJob(j, await j.getState() as JobState)) 57 : await Promise.all(jobs.map(j => formatJobWithUnknownState(j)))
71 )
72 : jobs.map(j => formatJob(j, state))
73 } 58 }
59
74 return res.json(result) 60 return res.json(result)
75} 61}
76 62
63async function formatJobWithUnknownState (job: any) {
64 return formatJob(job, await job.getState())
65}
66
77function formatJob (job: any, state: JobState): Job { 67function formatJob (job: any, state: JobState): Job {
78 const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 ? job.stacktrace[0] : null 68 const error = isArray(job.stacktrace) && job.stacktrace.length !== 0
69 ? job.stacktrace[0]
70 : null
79 71
80 return { 72 return {
81 id: job.id, 73 id: job.id,
diff --git a/server/helpers/custom-validators/jobs.ts b/server/helpers/custom-validators/jobs.ts
index 72dc73ee4..f6777ecd5 100644
--- a/server/helpers/custom-validators/jobs.ts
+++ b/server/helpers/custom-validators/jobs.ts
@@ -2,7 +2,7 @@ import { JobState } from '../../../shared/models'
2import { exists } from './misc' 2import { exists } from './misc'
3import { jobTypes } from '@server/lib/job-queue/job-queue' 3import { jobTypes } from '@server/lib/job-queue/job-queue'
4 4
5const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ] 5const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed', 'paused' ]
6 6
7function isValidJobState (value: JobState) { 7function isValidJobState (value: JobState) {
8 return exists(value) && jobStates.includes(value) 8 return exists(value) && jobStates.includes(value)
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 49f06584d..5d0b797b0 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,4 +1,6 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { jobStates } from '@server/helpers/custom-validators/jobs'
3import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
2import { 4import {
3 ActivitypubFollowPayload, 5 ActivitypubFollowPayload,
4 ActivitypubHttpBroadcastPayload, 6 ActivitypubHttpBroadcastPayload,
@@ -15,20 +17,19 @@ import {
15 VideoTranscodingPayload 17 VideoTranscodingPayload
16} from '../../../shared/models' 18} from '../../../shared/models'
17import { logger } from '../../helpers/logger' 19import { logger } from '../../helpers/logger'
18import { Redis } from '../redis'
19import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' 20import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
21import { Redis } from '../redis'
22import { processActivityPubFollow } from './handlers/activitypub-follow'
20import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 23import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
21import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 24import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
22import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 25import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
23import { processEmail } from './handlers/email'
24import { processVideoTranscoding } from './handlers/video-transcoding'
25import { processActivityPubFollow } from './handlers/activitypub-follow'
26import { processVideoImport } from './handlers/video-import'
27import { processVideosViews } from './handlers/video-views'
28import { refreshAPObject } from './handlers/activitypub-refresher' 26import { refreshAPObject } from './handlers/activitypub-refresher'
27import { processEmail } from './handlers/email'
29import { processVideoFileImport } from './handlers/video-file-import' 28import { processVideoFileImport } from './handlers/video-file-import'
30import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 29import { processVideoImport } from './handlers/video-import'
31import { processVideoLiveEnding } from './handlers/video-live-ending' 30import { processVideoLiveEnding } from './handlers/video-live-ending'
31import { processVideoTranscoding } from './handlers/video-transcoding'
32import { processVideosViews } from './handlers/video-views'
32 33
33type CreateJobArgument = 34type CreateJobArgument =
34 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 35 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -154,13 +155,15 @@ class JobQueue {
154 } 155 }
155 156
156 async listForApi (options: { 157 async listForApi (options: {
157 state: JobState | JobState[] 158 state?: JobState
158 start: number 159 start: number
159 count: number 160 count: number
160 asc?: boolean 161 asc?: boolean
161 jobType: JobType 162 jobType: JobType
162 }): Promise<Bull.Job[]> { 163 }): Promise<Bull.Job[]> {
163 const { state = Array.isArray(options.state) ? options.state : [ options.state ], start, count, asc, jobType } = options 164 const { state, start, count, asc, jobType } = options
165
166 const states = state ? [ state ] : jobStates
164 let results: Bull.Job[] = [] 167 let results: Bull.Job[] = []
165 168
166 const filteredJobTypes = this.filterJobTypes(jobType) 169 const filteredJobTypes = this.filterJobTypes(jobType)
@@ -172,7 +175,7 @@ class JobQueue {
172 continue 175 continue
173 } 176 }
174 177
175 const jobs = await queue.getJobs(state as Bull.JobStatus[], 0, start + count, asc) 178 const jobs = await queue.getJobs(states, 0, start + count, asc)
176 results = results.concat(jobs) 179 results = results.concat(jobs)
177 } 180 }
178 181
@@ -188,8 +191,8 @@ class JobQueue {
188 return results.slice(start, start + count) 191 return results.slice(start, start + count)
189 } 192 }
190 193
191 async count (state: JobState | JobState[], jobType?: JobType): Promise<number> { 194 async count (state: JobState, jobType?: JobType): Promise<number> {
192 const states = Array.isArray(state) ? state : [ state ] 195 const states = state ? [ state ] : jobStates
193 let total = 0 196 let total = 0
194 197
195 const filteredJobTypes = this.filterJobTypes(jobType) 198 const filteredJobTypes = this.filterJobTypes(jobType)
diff --git a/server/middlewares/validators/jobs.ts b/server/middlewares/validators/jobs.ts
index 0fc183c1a..99ef25e0a 100644
--- a/server/middlewares/validators/jobs.ts
+++ b/server/middlewares/validators/jobs.ts
@@ -5,6 +5,10 @@ import { logger } from '../../helpers/logger'
5import { areValidationErrors } from './utils' 5import { areValidationErrors } from './utils'
6 6
7const listJobsValidator = [ 7const listJobsValidator = [
8 param('state')
9 .optional()
10 .custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'),
11
8 query('jobType') 12 query('jobType')
9 .optional() 13 .optional()
10 .custom(isValidJobType).withMessage('Should have a valid job state'), 14 .custom(isValidJobType).withMessage('Should have a valid job state'),
@@ -18,22 +22,8 @@ const listJobsValidator = [
18 } 22 }
19] 23]
20 24
21const listJobsStateValidator = [
22 param('state')
23 .custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'),
24
25 (req: express.Request, res: express.Response, next: express.NextFunction) => {
26 logger.debug('Checking listJobsValidator parameters.', { parameters: req.params })
27
28 if (areValidationErrors(req, res)) return
29
30 return next()
31 }
32]
33
34// --------------------------------------------------------------------------- 25// ---------------------------------------------------------------------------
35 26
36export { 27export {
37 listJobsValidator, 28 listJobsValidator
38 listJobsStateValidator
39} 29}
diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts
index 19c8836b5..d0e222997 100644
--- a/server/tests/api/server/jobs.ts
+++ b/server/tests/api/server/jobs.ts
@@ -83,6 +83,19 @@ describe('Test jobs', function () {
83 } 83 }
84 }) 84 })
85 85
86 it('Should list all jobs', async function () {
87 const res = await getJobsList(servers[1].url, servers[1].accessToken)
88
89 const jobs = res.body.data as Job[]
90
91 expect(res.body.total).to.be.above(2)
92 expect(jobs).to.have.length.above(2)
93
94 // We know there are a least 1 delayed job (video views) and 1 completed job (broadcast)
95 expect(jobs.find(j => j.state === 'delayed')).to.not.be.undefined
96 expect(jobs.find(j => j.state === 'completed')).to.not.be.undefined
97 })
98
86 after(async function () { 99 after(async function () {
87 await cleanupTests(servers) 100 await cleanupTests(servers)
88 }) 101 })
diff --git a/shared/extra-utils/server/jobs.ts b/shared/extra-utils/server/jobs.ts
index cac00e9ab..a53749589 100644
--- a/shared/extra-utils/server/jobs.ts
+++ b/shared/extra-utils/server/jobs.ts
@@ -1,12 +1,20 @@
1import * as request from 'supertest' 1import * as request from 'supertest'
2import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes'
3import { makeGetRequest } from '../../../shared/extra-utils'
2import { Job, JobState, JobType } from '../../models' 4import { Job, JobState, JobType } from '../../models'
3import { wait } from '../miscs/miscs' 5import { wait } from '../miscs/miscs'
4import { ServerInfo } from './servers' 6import { ServerInfo } from './servers'
5import { makeGetRequest } from '../../../shared/extra-utils'
6import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes'
7 7
8function getJobsList (url: string, accessToken: string, state: JobState) { 8function buildJobsUrl (state?: JobState) {
9 const path = '/api/v1/jobs/' + state 9 let path = '/api/v1/jobs'
10
11 if (state) path += '/' + state
12
13 return path
14}
15
16function getJobsList (url: string, accessToken: string, state?: JobState) {
17 const path = buildJobsUrl(state)
10 18
11 return request(url) 19 return request(url)
12 .get(path) 20 .get(path)
@@ -19,14 +27,14 @@ function getJobsList (url: string, accessToken: string, state: JobState) {
19function getJobsListPaginationAndSort (options: { 27function getJobsListPaginationAndSort (options: {
20 url: string 28 url: string
21 accessToken: string 29 accessToken: string
22 state: JobState
23 start: number 30 start: number
24 count: number 31 count: number
25 sort: string 32 sort: string
33 state?: JobState
26 jobType?: JobType 34 jobType?: JobType
27}) { 35}) {
28 const { url, accessToken, state, start, count, sort, jobType } = options 36 const { url, accessToken, state, start, count, sort, jobType } = options
29 const path = '/api/v1/jobs/' + state 37 const path = buildJobsUrl(state)
30 38
31 const query = { 39 const query = {
32 start, 40 start,
diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts
index 346b25607..b0ed860a7 100644
--- a/shared/models/server/job.model.ts
+++ b/shared/models/server/job.model.ts
@@ -2,7 +2,7 @@ import { ContextType } from '../activitypub/context'
2import { VideoResolution } from '../videos/video-resolution.enum' 2import { VideoResolution } from '../videos/video-resolution.enum'
3import { SendEmailOptions } from './emailer.model' 3import { SendEmailOptions } from './emailer.model'
4 4
5export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed' 5export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed' | 'paused'
6 6
7export type JobType = 7export type JobType =
8 | 'activitypub-http-unicast' 8 | 'activitypub-http-unicast'