diff options
author | Chocobozzz <me@florianbigard.com> | 2020-12-14 12:00:35 +0100 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2020-12-14 12:00:35 +0100 |
commit | 402145b8630d1908c35f8c22846ddc4475f25d3e (patch) | |
tree | ff50cb09c5f56cc408ef20a7c959ef7a0642b76b /server/lib/job-queue | |
parent | c04816108e8ec62eb29caf82806f3927dc5eb85a (diff) | |
download | PeerTube-402145b8630d1908c35f8c22846ddc4475f25d3e.tar.gz PeerTube-402145b8630d1908c35f8c22846ddc4475f25d3e.tar.zst PeerTube-402145b8630d1908c35f8c22846ddc4475f25d3e.zip |
Refactor jobs state
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 27 |
1 files changed, 15 insertions, 12 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 49f06584d..5d0b797b0 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,4 +1,6 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { jobStates } from '@server/helpers/custom-validators/jobs' | ||
3 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | ||
2 | import { | 4 | import { |
3 | ActivitypubFollowPayload, | 5 | ActivitypubFollowPayload, |
4 | ActivitypubHttpBroadcastPayload, | 6 | ActivitypubHttpBroadcastPayload, |
@@ -15,20 +17,19 @@ import { | |||
15 | VideoTranscodingPayload | 17 | VideoTranscodingPayload |
16 | } from '../../../shared/models' | 18 | } from '../../../shared/models' |
17 | import { logger } from '../../helpers/logger' | 19 | import { logger } from '../../helpers/logger' |
18 | import { Redis } from '../redis' | ||
19 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' | 20 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' |
21 | import { Redis } from '../redis' | ||
22 | import { processActivityPubFollow } from './handlers/activitypub-follow' | ||
20 | import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 23 | import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
21 | import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | 24 | import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
22 | import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | 25 | import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
23 | import { processEmail } from './handlers/email' | ||
24 | import { processVideoTranscoding } from './handlers/video-transcoding' | ||
25 | import { processActivityPubFollow } from './handlers/activitypub-follow' | ||
26 | import { processVideoImport } from './handlers/video-import' | ||
27 | import { processVideosViews } from './handlers/video-views' | ||
28 | import { refreshAPObject } from './handlers/activitypub-refresher' | 26 | import { refreshAPObject } from './handlers/activitypub-refresher' |
27 | import { processEmail } from './handlers/email' | ||
29 | import { processVideoFileImport } from './handlers/video-file-import' | 28 | import { processVideoFileImport } from './handlers/video-file-import' |
30 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 29 | import { processVideoImport } from './handlers/video-import' |
31 | import { processVideoLiveEnding } from './handlers/video-live-ending' | 30 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
31 | import { processVideoTranscoding } from './handlers/video-transcoding' | ||
32 | import { processVideosViews } from './handlers/video-views' | ||
32 | 33 | ||
33 | type CreateJobArgument = | 34 | type CreateJobArgument = |
34 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 35 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -154,13 +155,15 @@ class JobQueue { | |||
154 | } | 155 | } |
155 | 156 | ||
156 | async listForApi (options: { | 157 | async listForApi (options: { |
157 | state: JobState | JobState[] | 158 | state?: JobState |
158 | start: number | 159 | start: number |
159 | count: number | 160 | count: number |
160 | asc?: boolean | 161 | asc?: boolean |
161 | jobType: JobType | 162 | jobType: JobType |
162 | }): Promise<Bull.Job[]> { | 163 | }): Promise<Bull.Job[]> { |
163 | const { state = Array.isArray(options.state) ? options.state : [ options.state ], start, count, asc, jobType } = options | 164 | const { state, start, count, asc, jobType } = options |
165 | |||
166 | const states = state ? [ state ] : jobStates | ||
164 | let results: Bull.Job[] = [] | 167 | let results: Bull.Job[] = [] |
165 | 168 | ||
166 | const filteredJobTypes = this.filterJobTypes(jobType) | 169 | const filteredJobTypes = this.filterJobTypes(jobType) |
@@ -172,7 +175,7 @@ class JobQueue { | |||
172 | continue | 175 | continue |
173 | } | 176 | } |
174 | 177 | ||
175 | const jobs = await queue.getJobs(state as Bull.JobStatus[], 0, start + count, asc) | 178 | const jobs = await queue.getJobs(states, 0, start + count, asc) |
176 | results = results.concat(jobs) | 179 | results = results.concat(jobs) |
177 | } | 180 | } |
178 | 181 | ||
@@ -188,8 +191,8 @@ class JobQueue { | |||
188 | return results.slice(start, start + count) | 191 | return results.slice(start, start + count) |
189 | } | 192 | } |
190 | 193 | ||
191 | async count (state: JobState | JobState[], jobType?: JobType): Promise<number> { | 194 | async count (state: JobState, jobType?: JobType): Promise<number> { |
192 | const states = Array.isArray(state) ? state : [ state ] | 195 | const states = state ? [ state ] : jobStates |
193 | let total = 0 | 196 | let total = 0 |
194 | 197 | ||
195 | const filteredJobTypes = this.filterJobTypes(jobType) | 198 | const filteredJobTypes = this.filterJobTypes(jobType) |