diff options
author | Chocobozzz <me@florianbigard.com> | 2020-12-14 12:00:35 +0100 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2020-12-14 12:00:35 +0100 |
commit | 402145b8630d1908c35f8c22846ddc4475f25d3e (patch) | |
tree | ff50cb09c5f56cc408ef20a7c959ef7a0642b76b | |
parent | c04816108e8ec62eb29caf82806f3927dc5eb85a (diff) | |
download | PeerTube-402145b8630d1908c35f8c22846ddc4475f25d3e.tar.gz PeerTube-402145b8630d1908c35f8c22846ddc4475f25d3e.tar.zst PeerTube-402145b8630d1908c35f8c22846ddc4475f25d3e.zip |
Refactor jobs state
-rw-r--r-- | server/controllers/api/jobs.ts | 40 | ||||
-rw-r--r-- | server/helpers/custom-validators/jobs.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 27 | ||||
-rw-r--r-- | server/middlewares/validators/jobs.ts | 20 | ||||
-rw-r--r-- | server/tests/api/server/jobs.ts | 13 | ||||
-rw-r--r-- | shared/extra-utils/server/jobs.ts | 20 | ||||
-rw-r--r-- | shared/models/server/job.model.ts | 2 |
7 files changed, 65 insertions, 59 deletions
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts index 1131a44d6..e14ea2575 100644 --- a/server/controllers/api/jobs.ts +++ b/server/controllers/api/jobs.ts | |||
@@ -1,7 +1,8 @@ | |||
1 | import * as express from 'express' | 1 | import * as express from 'express' |
2 | import { ResultList } from '../../../shared' | 2 | import { ResultList } from '../../../shared' |
3 | import { Job, JobType, JobState } from '../../../shared/models' | 3 | import { Job, JobState, JobType } from '../../../shared/models' |
4 | import { UserRight } from '../../../shared/models/users' | 4 | import { UserRight } from '../../../shared/models/users' |
5 | import { isArray } from '../../helpers/custom-validators/misc' | ||
5 | import { JobQueue } from '../../lib/job-queue' | 6 | import { JobQueue } from '../../lib/job-queue' |
6 | import { | 7 | import { |
7 | asyncMiddleware, | 8 | asyncMiddleware, |
@@ -12,13 +13,11 @@ import { | |||
12 | setDefaultSort | 13 | setDefaultSort |
13 | } from '../../middlewares' | 14 | } from '../../middlewares' |
14 | import { paginationValidator } from '../../middlewares/validators' | 15 | import { paginationValidator } from '../../middlewares/validators' |
15 | import { listJobsStateValidator, listJobsValidator } from '../../middlewares/validators/jobs' | 16 | import { listJobsValidator } from '../../middlewares/validators/jobs' |
16 | import { isArray } from '../../helpers/custom-validators/misc' | ||
17 | import { jobStates } from '@server/helpers/custom-validators/jobs' | ||
18 | 17 | ||
19 | const jobsRouter = express.Router() | 18 | const jobsRouter = express.Router() |
20 | 19 | ||
21 | jobsRouter.get('/', | 20 | jobsRouter.get('/:state?', |
22 | authenticate, | 21 | authenticate, |
23 | ensureUserHasRight(UserRight.MANAGE_JOBS), | 22 | ensureUserHasRight(UserRight.MANAGE_JOBS), |
24 | paginationValidator, | 23 | paginationValidator, |
@@ -29,18 +28,6 @@ jobsRouter.get('/', | |||
29 | asyncMiddleware(listJobs) | 28 | asyncMiddleware(listJobs) |
30 | ) | 29 | ) |
31 | 30 | ||
32 | jobsRouter.get('/:state', | ||
33 | authenticate, | ||
34 | ensureUserHasRight(UserRight.MANAGE_JOBS), | ||
35 | paginationValidator, | ||
36 | jobsSortValidator, | ||
37 | setDefaultSort, | ||
38 | setDefaultPagination, | ||
39 | listJobsValidator, | ||
40 | listJobsStateValidator, | ||
41 | asyncMiddleware(listJobs) | ||
42 | ) | ||
43 | |||
44 | // --------------------------------------------------------------------------- | 31 | // --------------------------------------------------------------------------- |
45 | 32 | ||
46 | export { | 33 | export { |
@@ -50,7 +37,7 @@ export { | |||
50 | // --------------------------------------------------------------------------- | 37 | // --------------------------------------------------------------------------- |
51 | 38 | ||
52 | async function listJobs (req: express.Request, res: express.Response) { | 39 | async function listJobs (req: express.Request, res: express.Response) { |
53 | const state = req.params.state as JobState || jobStates | 40 | const state = req.params.state as JobState |
54 | const asc = req.query.sort === 'createdAt' | 41 | const asc = req.query.sort === 'createdAt' |
55 | const jobType = req.query.jobType | 42 | const jobType = req.query.jobType |
56 | 43 | ||
@@ -65,17 +52,22 @@ async function listJobs (req: express.Request, res: express.Response) { | |||
65 | 52 | ||
66 | const result: ResultList<Job> = { | 53 | const result: ResultList<Job> = { |
67 | total, | 54 | total, |
68 | data: Array.isArray(state) | 55 | data: state |
69 | ? await Promise.all( | 56 | ? jobs.map(j => formatJob(j, state)) |
70 | jobs.map(async j => formatJob(j, await j.getState() as JobState)) | 57 | : await Promise.all(jobs.map(j => formatJobWithUnknownState(j))) |
71 | ) | ||
72 | : jobs.map(j => formatJob(j, state)) | ||
73 | } | 58 | } |
59 | |||
74 | return res.json(result) | 60 | return res.json(result) |
75 | } | 61 | } |
76 | 62 | ||
63 | async function formatJobWithUnknownState (job: any) { | ||
64 | return formatJob(job, await job.getState()) | ||
65 | } | ||
66 | |||
77 | function formatJob (job: any, state: JobState): Job { | 67 | function formatJob (job: any, state: JobState): Job { |
78 | const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 ? job.stacktrace[0] : null | 68 | const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 |
69 | ? job.stacktrace[0] | ||
70 | : null | ||
79 | 71 | ||
80 | return { | 72 | return { |
81 | id: job.id, | 73 | id: job.id, |
diff --git a/server/helpers/custom-validators/jobs.ts b/server/helpers/custom-validators/jobs.ts index 72dc73ee4..f6777ecd5 100644 --- a/server/helpers/custom-validators/jobs.ts +++ b/server/helpers/custom-validators/jobs.ts | |||
@@ -2,7 +2,7 @@ import { JobState } from '../../../shared/models' | |||
2 | import { exists } from './misc' | 2 | import { exists } from './misc' |
3 | import { jobTypes } from '@server/lib/job-queue/job-queue' | 3 | import { jobTypes } from '@server/lib/job-queue/job-queue' |
4 | 4 | ||
5 | const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ] | 5 | const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed', 'paused' ] |
6 | 6 | ||
7 | function isValidJobState (value: JobState) { | 7 | function isValidJobState (value: JobState) { |
8 | return exists(value) && jobStates.includes(value) | 8 | return exists(value) && jobStates.includes(value) |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 49f06584d..5d0b797b0 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,4 +1,6 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { jobStates } from '@server/helpers/custom-validators/jobs' | ||
3 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | ||
2 | import { | 4 | import { |
3 | ActivitypubFollowPayload, | 5 | ActivitypubFollowPayload, |
4 | ActivitypubHttpBroadcastPayload, | 6 | ActivitypubHttpBroadcastPayload, |
@@ -15,20 +17,19 @@ import { | |||
15 | VideoTranscodingPayload | 17 | VideoTranscodingPayload |
16 | } from '../../../shared/models' | 18 | } from '../../../shared/models' |
17 | import { logger } from '../../helpers/logger' | 19 | import { logger } from '../../helpers/logger' |
18 | import { Redis } from '../redis' | ||
19 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' | 20 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' |
21 | import { Redis } from '../redis' | ||
22 | import { processActivityPubFollow } from './handlers/activitypub-follow' | ||
20 | import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 23 | import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
21 | import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | 24 | import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
22 | import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | 25 | import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
23 | import { processEmail } from './handlers/email' | ||
24 | import { processVideoTranscoding } from './handlers/video-transcoding' | ||
25 | import { processActivityPubFollow } from './handlers/activitypub-follow' | ||
26 | import { processVideoImport } from './handlers/video-import' | ||
27 | import { processVideosViews } from './handlers/video-views' | ||
28 | import { refreshAPObject } from './handlers/activitypub-refresher' | 26 | import { refreshAPObject } from './handlers/activitypub-refresher' |
27 | import { processEmail } from './handlers/email' | ||
29 | import { processVideoFileImport } from './handlers/video-file-import' | 28 | import { processVideoFileImport } from './handlers/video-file-import' |
30 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 29 | import { processVideoImport } from './handlers/video-import' |
31 | import { processVideoLiveEnding } from './handlers/video-live-ending' | 30 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
31 | import { processVideoTranscoding } from './handlers/video-transcoding' | ||
32 | import { processVideosViews } from './handlers/video-views' | ||
32 | 33 | ||
33 | type CreateJobArgument = | 34 | type CreateJobArgument = |
34 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 35 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -154,13 +155,15 @@ class JobQueue { | |||
154 | } | 155 | } |
155 | 156 | ||
156 | async listForApi (options: { | 157 | async listForApi (options: { |
157 | state: JobState | JobState[] | 158 | state?: JobState |
158 | start: number | 159 | start: number |
159 | count: number | 160 | count: number |
160 | asc?: boolean | 161 | asc?: boolean |
161 | jobType: JobType | 162 | jobType: JobType |
162 | }): Promise<Bull.Job[]> { | 163 | }): Promise<Bull.Job[]> { |
163 | const { state = Array.isArray(options.state) ? options.state : [ options.state ], start, count, asc, jobType } = options | 164 | const { state, start, count, asc, jobType } = options |
165 | |||
166 | const states = state ? [ state ] : jobStates | ||
164 | let results: Bull.Job[] = [] | 167 | let results: Bull.Job[] = [] |
165 | 168 | ||
166 | const filteredJobTypes = this.filterJobTypes(jobType) | 169 | const filteredJobTypes = this.filterJobTypes(jobType) |
@@ -172,7 +175,7 @@ class JobQueue { | |||
172 | continue | 175 | continue |
173 | } | 176 | } |
174 | 177 | ||
175 | const jobs = await queue.getJobs(state as Bull.JobStatus[], 0, start + count, asc) | 178 | const jobs = await queue.getJobs(states, 0, start + count, asc) |
176 | results = results.concat(jobs) | 179 | results = results.concat(jobs) |
177 | } | 180 | } |
178 | 181 | ||
@@ -188,8 +191,8 @@ class JobQueue { | |||
188 | return results.slice(start, start + count) | 191 | return results.slice(start, start + count) |
189 | } | 192 | } |
190 | 193 | ||
191 | async count (state: JobState | JobState[], jobType?: JobType): Promise<number> { | 194 | async count (state: JobState, jobType?: JobType): Promise<number> { |
192 | const states = Array.isArray(state) ? state : [ state ] | 195 | const states = state ? [ state ] : jobStates |
193 | let total = 0 | 196 | let total = 0 |
194 | 197 | ||
195 | const filteredJobTypes = this.filterJobTypes(jobType) | 198 | const filteredJobTypes = this.filterJobTypes(jobType) |
diff --git a/server/middlewares/validators/jobs.ts b/server/middlewares/validators/jobs.ts index 0fc183c1a..99ef25e0a 100644 --- a/server/middlewares/validators/jobs.ts +++ b/server/middlewares/validators/jobs.ts | |||
@@ -5,6 +5,10 @@ import { logger } from '../../helpers/logger' | |||
5 | import { areValidationErrors } from './utils' | 5 | import { areValidationErrors } from './utils' |
6 | 6 | ||
7 | const listJobsValidator = [ | 7 | const listJobsValidator = [ |
8 | param('state') | ||
9 | .optional() | ||
10 | .custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'), | ||
11 | |||
8 | query('jobType') | 12 | query('jobType') |
9 | .optional() | 13 | .optional() |
10 | .custom(isValidJobType).withMessage('Should have a valid job state'), | 14 | .custom(isValidJobType).withMessage('Should have a valid job state'), |
@@ -18,22 +22,8 @@ const listJobsValidator = [ | |||
18 | } | 22 | } |
19 | ] | 23 | ] |
20 | 24 | ||
21 | const listJobsStateValidator = [ | ||
22 | param('state') | ||
23 | .custom(isValidJobState).not().isEmpty().withMessage('Should have a valid job state'), | ||
24 | |||
25 | (req: express.Request, res: express.Response, next: express.NextFunction) => { | ||
26 | logger.debug('Checking listJobsValidator parameters.', { parameters: req.params }) | ||
27 | |||
28 | if (areValidationErrors(req, res)) return | ||
29 | |||
30 | return next() | ||
31 | } | ||
32 | ] | ||
33 | |||
34 | // --------------------------------------------------------------------------- | 25 | // --------------------------------------------------------------------------- |
35 | 26 | ||
36 | export { | 27 | export { |
37 | listJobsValidator, | 28 | listJobsValidator |
38 | listJobsStateValidator | ||
39 | } | 29 | } |
diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts index 19c8836b5..d0e222997 100644 --- a/server/tests/api/server/jobs.ts +++ b/server/tests/api/server/jobs.ts | |||
@@ -83,6 +83,19 @@ describe('Test jobs', function () { | |||
83 | } | 83 | } |
84 | }) | 84 | }) |
85 | 85 | ||
86 | it('Should list all jobs', async function () { | ||
87 | const res = await getJobsList(servers[1].url, servers[1].accessToken) | ||
88 | |||
89 | const jobs = res.body.data as Job[] | ||
90 | |||
91 | expect(res.body.total).to.be.above(2) | ||
92 | expect(jobs).to.have.length.above(2) | ||
93 | |||
94 | // We know there are a least 1 delayed job (video views) and 1 completed job (broadcast) | ||
95 | expect(jobs.find(j => j.state === 'delayed')).to.not.be.undefined | ||
96 | expect(jobs.find(j => j.state === 'completed')).to.not.be.undefined | ||
97 | }) | ||
98 | |||
86 | after(async function () { | 99 | after(async function () { |
87 | await cleanupTests(servers) | 100 | await cleanupTests(servers) |
88 | }) | 101 | }) |
diff --git a/shared/extra-utils/server/jobs.ts b/shared/extra-utils/server/jobs.ts index cac00e9ab..a53749589 100644 --- a/shared/extra-utils/server/jobs.ts +++ b/shared/extra-utils/server/jobs.ts | |||
@@ -1,12 +1,20 @@ | |||
1 | import * as request from 'supertest' | 1 | import * as request from 'supertest' |
2 | import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes' | ||
3 | import { makeGetRequest } from '../../../shared/extra-utils' | ||
2 | import { Job, JobState, JobType } from '../../models' | 4 | import { Job, JobState, JobType } from '../../models' |
3 | import { wait } from '../miscs/miscs' | 5 | import { wait } from '../miscs/miscs' |
4 | import { ServerInfo } from './servers' | 6 | import { ServerInfo } from './servers' |
5 | import { makeGetRequest } from '../../../shared/extra-utils' | ||
6 | import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes' | ||
7 | 7 | ||
8 | function getJobsList (url: string, accessToken: string, state: JobState) { | 8 | function buildJobsUrl (state?: JobState) { |
9 | const path = '/api/v1/jobs/' + state | 9 | let path = '/api/v1/jobs' |
10 | |||
11 | if (state) path += '/' + state | ||
12 | |||
13 | return path | ||
14 | } | ||
15 | |||
16 | function getJobsList (url: string, accessToken: string, state?: JobState) { | ||
17 | const path = buildJobsUrl(state) | ||
10 | 18 | ||
11 | return request(url) | 19 | return request(url) |
12 | .get(path) | 20 | .get(path) |
@@ -19,14 +27,14 @@ function getJobsList (url: string, accessToken: string, state: JobState) { | |||
19 | function getJobsListPaginationAndSort (options: { | 27 | function getJobsListPaginationAndSort (options: { |
20 | url: string | 28 | url: string |
21 | accessToken: string | 29 | accessToken: string |
22 | state: JobState | ||
23 | start: number | 30 | start: number |
24 | count: number | 31 | count: number |
25 | sort: string | 32 | sort: string |
33 | state?: JobState | ||
26 | jobType?: JobType | 34 | jobType?: JobType |
27 | }) { | 35 | }) { |
28 | const { url, accessToken, state, start, count, sort, jobType } = options | 36 | const { url, accessToken, state, start, count, sort, jobType } = options |
29 | const path = '/api/v1/jobs/' + state | 37 | const path = buildJobsUrl(state) |
30 | 38 | ||
31 | const query = { | 39 | const query = { |
32 | start, | 40 | start, |
diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts index 346b25607..b0ed860a7 100644 --- a/shared/models/server/job.model.ts +++ b/shared/models/server/job.model.ts | |||
@@ -2,7 +2,7 @@ import { ContextType } from '../activitypub/context' | |||
2 | import { VideoResolution } from '../videos/video-resolution.enum' | 2 | import { VideoResolution } from '../videos/video-resolution.enum' |
3 | import { SendEmailOptions } from './emailer.model' | 3 | import { SendEmailOptions } from './emailer.model' |
4 | 4 | ||
5 | export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed' | 5 | export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed' | 'paused' |
6 | 6 | ||
7 | export type JobType = | 7 | export type JobType = |
8 | | 'activitypub-http-unicast' | 8 | | 'activitypub-http-unicast' |