aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2020-12-14 12:00:35 +0100
committerChocobozzz <me@florianbigard.com>2020-12-14 12:00:35 +0100
commit402145b8630d1908c35f8c22846ddc4475f25d3e (patch)
treeff50cb09c5f56cc408ef20a7c959ef7a0642b76b /server/lib/job-queue
parentc04816108e8ec62eb29caf82806f3927dc5eb85a (diff)
downloadPeerTube-402145b8630d1908c35f8c22846ddc4475f25d3e.tar.gz
PeerTube-402145b8630d1908c35f8c22846ddc4475f25d3e.tar.zst
PeerTube-402145b8630d1908c35f8c22846ddc4475f25d3e.zip
Refactor jobs state
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/job-queue.ts27
1 files changed, 15 insertions, 12 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 49f06584d..5d0b797b0 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,4 +1,6 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { jobStates } from '@server/helpers/custom-validators/jobs'
3import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
2import { 4import {
3 ActivitypubFollowPayload, 5 ActivitypubFollowPayload,
4 ActivitypubHttpBroadcastPayload, 6 ActivitypubHttpBroadcastPayload,
@@ -15,20 +17,19 @@ import {
15 VideoTranscodingPayload 17 VideoTranscodingPayload
16} from '../../../shared/models' 18} from '../../../shared/models'
17import { logger } from '../../helpers/logger' 19import { logger } from '../../helpers/logger'
18import { Redis } from '../redis'
19import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' 20import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
21import { Redis } from '../redis'
22import { processActivityPubFollow } from './handlers/activitypub-follow'
20import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 23import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
21import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 24import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
22import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 25import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
23import { processEmail } from './handlers/email'
24import { processVideoTranscoding } from './handlers/video-transcoding'
25import { processActivityPubFollow } from './handlers/activitypub-follow'
26import { processVideoImport } from './handlers/video-import'
27import { processVideosViews } from './handlers/video-views'
28import { refreshAPObject } from './handlers/activitypub-refresher' 26import { refreshAPObject } from './handlers/activitypub-refresher'
27import { processEmail } from './handlers/email'
29import { processVideoFileImport } from './handlers/video-file-import' 28import { processVideoFileImport } from './handlers/video-file-import'
30import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 29import { processVideoImport } from './handlers/video-import'
31import { processVideoLiveEnding } from './handlers/video-live-ending' 30import { processVideoLiveEnding } from './handlers/video-live-ending'
31import { processVideoTranscoding } from './handlers/video-transcoding'
32import { processVideosViews } from './handlers/video-views'
32 33
33type CreateJobArgument = 34type CreateJobArgument =
34 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 35 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -154,13 +155,15 @@ class JobQueue {
154 } 155 }
155 156
156 async listForApi (options: { 157 async listForApi (options: {
157 state: JobState | JobState[] 158 state?: JobState
158 start: number 159 start: number
159 count: number 160 count: number
160 asc?: boolean 161 asc?: boolean
161 jobType: JobType 162 jobType: JobType
162 }): Promise<Bull.Job[]> { 163 }): Promise<Bull.Job[]> {
163 const { state = Array.isArray(options.state) ? options.state : [ options.state ], start, count, asc, jobType } = options 164 const { state, start, count, asc, jobType } = options
165
166 const states = state ? [ state ] : jobStates
164 let results: Bull.Job[] = [] 167 let results: Bull.Job[] = []
165 168
166 const filteredJobTypes = this.filterJobTypes(jobType) 169 const filteredJobTypes = this.filterJobTypes(jobType)
@@ -172,7 +175,7 @@ class JobQueue {
172 continue 175 continue
173 } 176 }
174 177
175 const jobs = await queue.getJobs(state as Bull.JobStatus[], 0, start + count, asc) 178 const jobs = await queue.getJobs(states, 0, start + count, asc)
176 results = results.concat(jobs) 179 results = results.concat(jobs)
177 } 180 }
178 181
@@ -188,8 +191,8 @@ class JobQueue {
188 return results.slice(start, start + count) 191 return results.slice(start, start + count)
189 } 192 }
190 193
191 async count (state: JobState | JobState[], jobType?: JobType): Promise<number> { 194 async count (state: JobState, jobType?: JobType): Promise<number> {
192 const states = Array.isArray(state) ? state : [ state ] 195 const states = state ? [ state ] : jobStates
193 let total = 0 196 let total = 0
194 197
195 const filteredJobTypes = this.filterJobTypes(jobType) 198 const filteredJobTypes = this.filterJobTypes(jobType)