diff options
author | Chocobozzz <me@florianbigard.com> | 2022-08-08 15:48:17 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-08-09 09:18:07 +0200 |
commit | bd911b54b555b11df7e9849cf92d358bccfecf6e (patch) | |
tree | 23e94b4acbe6819fedc1cb5e067b700cbdd880c3 /server/lib/job-queue/job-queue.ts | |
parent | 5a921e7b74910414626bfc9672b857e987e3ebed (diff) | |
download | PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.gz PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.tar.zst PeerTube-bd911b54b555b11df7e9849cf92d358bccfecf6e.zip |
Use bullmq job dependency
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 81 |
1 files changed, 66 insertions, 15 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0cf5d53ce..50d732beb 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,4 +1,6 @@ | |||
1 | import { | 1 | import { |
2 | FlowJob, | ||
3 | FlowProducer, | ||
2 | Job, | 4 | Job, |
3 | JobsOptions, | 5 | JobsOptions, |
4 | Queue, | 6 | Queue, |
@@ -13,7 +15,7 @@ import { | |||
13 | import { jobStates } from '@server/helpers/custom-validators/jobs' | 15 | import { jobStates } from '@server/helpers/custom-validators/jobs' |
14 | import { CONFIG } from '@server/initializers/config' | 16 | import { CONFIG } from '@server/initializers/config' |
15 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 17 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
16 | import { timeoutPromise } from '@shared/core-utils' | 18 | import { pick, timeoutPromise } from '@shared/core-utils' |
17 | import { | 19 | import { |
18 | ActivitypubFollowPayload, | 20 | ActivitypubFollowPayload, |
19 | ActivitypubHttpBroadcastPayload, | 21 | ActivitypubHttpBroadcastPayload, |
@@ -22,10 +24,12 @@ import { | |||
22 | ActorKeysPayload, | 24 | ActorKeysPayload, |
23 | DeleteResumableUploadMetaFilePayload, | 25 | DeleteResumableUploadMetaFilePayload, |
24 | EmailPayload, | 26 | EmailPayload, |
27 | FederateVideoPayload, | ||
25 | JobState, | 28 | JobState, |
26 | JobType, | 29 | JobType, |
27 | ManageVideoTorrentPayload, | 30 | ManageVideoTorrentPayload, |
28 | MoveObjectStoragePayload, | 31 | MoveObjectStoragePayload, |
32 | NotifyPayload, | ||
29 | RefreshPayload, | 33 | RefreshPayload, |
30 | VideoFileImportPayload, | 34 | VideoFileImportPayload, |
31 | VideoImportPayload, | 35 | VideoImportPayload, |
@@ -45,8 +49,10 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica | |||
45 | import { refreshAPObject } from './handlers/activitypub-refresher' | 49 | import { refreshAPObject } from './handlers/activitypub-refresher' |
46 | import { processActorKeys } from './handlers/actor-keys' | 50 | import { processActorKeys } from './handlers/actor-keys' |
47 | import { processEmail } from './handlers/email' | 51 | import { processEmail } from './handlers/email' |
52 | import { processFederateVideo } from './handlers/federate-video' | ||
48 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' | 53 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' |
49 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' | 54 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' |
55 | import { processNotify } from './handlers/notify' | ||
50 | import { processVideoFileImport } from './handlers/video-file-import' | 56 | import { processVideoFileImport } from './handlers/video-file-import' |
51 | import { processVideoImport } from './handlers/video-import' | 57 | import { processVideoImport } from './handlers/video-import' |
52 | import { processVideoLiveEnding } from './handlers/video-live-ending' | 58 | import { processVideoLiveEnding } from './handlers/video-live-ending' |
@@ -54,7 +60,7 @@ import { processVideoStudioEdition } from './handlers/video-studio-edition' | |||
54 | import { processVideoTranscoding } from './handlers/video-transcoding' | 60 | import { processVideoTranscoding } from './handlers/video-transcoding' |
55 | import { processVideosViewsStats } from './handlers/video-views-stats' | 61 | import { processVideosViewsStats } from './handlers/video-views-stats' |
56 | 62 | ||
57 | type CreateJobArgument = | 63 | export type CreateJobArgument = |
58 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 64 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
59 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | | 65 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | |
60 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | 66 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | |
@@ -73,7 +79,9 @@ type CreateJobArgument = | |||
73 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | | 79 | { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | |
74 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | | 80 | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | |
75 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | | 81 | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | |
76 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | 82 | { type: 'notify', payload: NotifyPayload } | |
83 | { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | | ||
84 | { type: 'federate-video', payload: FederateVideoPayload } | ||
77 | 85 | ||
78 | export type CreateJobOptions = { | 86 | export type CreateJobOptions = { |
79 | delay?: number | 87 | delay?: number |
@@ -98,7 +106,9 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | |||
98 | 'video-redundancy': processVideoRedundancy, | 106 | 'video-redundancy': processVideoRedundancy, |
99 | 'move-to-object-storage': processMoveToObjectStorage, | 107 | 'move-to-object-storage': processMoveToObjectStorage, |
100 | 'manage-video-torrent': processManageVideoTorrent, | 108 | 'manage-video-torrent': processManageVideoTorrent, |
101 | 'video-studio-edition': processVideoStudioEdition | 109 | 'notify': processNotify, |
110 | 'video-studio-edition': processVideoStudioEdition, | ||
111 | 'federate-video': processFederateVideo | ||
102 | } | 112 | } |
103 | 113 | ||
104 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { | 114 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { |
@@ -123,7 +133,9 @@ const jobTypes: JobType[] = [ | |||
123 | 'video-live-ending', | 133 | 'video-live-ending', |
124 | 'move-to-object-storage', | 134 | 'move-to-object-storage', |
125 | 'manage-video-torrent', | 135 | 'manage-video-torrent', |
126 | 'video-studio-edition' | 136 | 'video-studio-edition', |
137 | 'notify', | ||
138 | 'federate-video' | ||
127 | ] | 139 | ] |
128 | 140 | ||
129 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) | 141 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) |
@@ -137,6 +149,8 @@ class JobQueue { | |||
137 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} | 149 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} |
138 | private queueEvents: { [id in JobType]?: QueueEvents } = {} | 150 | private queueEvents: { [id in JobType]?: QueueEvents } = {} |
139 | 151 | ||
152 | private flowProducer: FlowProducer | ||
153 | |||
140 | private initialized = false | 154 | private initialized = false |
141 | private jobRedisPrefix: string | 155 | private jobRedisPrefix: string |
142 | 156 | ||
@@ -157,6 +171,11 @@ class JobQueue { | |||
157 | this.buildQueueEvent(handlerName, produceOnly) | 171 | this.buildQueueEvent(handlerName, produceOnly) |
158 | } | 172 | } |
159 | 173 | ||
174 | this.flowProducer = new FlowProducer({ | ||
175 | connection: this.getRedisConnection(), | ||
176 | prefix: this.jobRedisPrefix | ||
177 | }) | ||
178 | |||
160 | this.addRepeatableJobs() | 179 | this.addRepeatableJobs() |
161 | } | 180 | } |
162 | 181 | ||
@@ -243,6 +262,8 @@ class JobQueue { | |||
243 | } | 262 | } |
244 | } | 263 | } |
245 | 264 | ||
265 | // --------------------------------------------------------------------------- | ||
266 | |||
246 | async terminate () { | 267 | async terminate () { |
247 | const promises = Object.keys(this.workers) | 268 | const promises = Object.keys(this.workers) |
248 | .map(handlerName => { | 269 | .map(handlerName => { |
@@ -278,28 +299,56 @@ class JobQueue { | |||
278 | } | 299 | } |
279 | } | 300 | } |
280 | 301 | ||
281 | createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { | 302 | // --------------------------------------------------------------------------- |
282 | this.createJobWithPromise(obj, options) | 303 | |
283 | .catch(err => logger.error('Cannot create job.', { err, obj })) | 304 | createJobAsync (options: CreateJobArgument & CreateJobOptions): void { |
305 | this.createJob(options) | ||
306 | .catch(err => logger.error('Cannot create job.', { err, options })) | ||
284 | } | 307 | } |
285 | 308 | ||
286 | async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { | 309 | async createJob (options: CreateJobArgument & CreateJobOptions) { |
287 | const queue: Queue = this.queues[obj.type] | 310 | const queue: Queue = this.queues[options.type] |
288 | if (queue === undefined) { | 311 | if (queue === undefined) { |
289 | logger.error('Unknown queue %s: cannot create job.', obj.type) | 312 | logger.error('Unknown queue %s: cannot create job.', options.type) |
290 | return | 313 | return |
291 | } | 314 | } |
292 | 315 | ||
293 | const jobArgs: JobsOptions = { | 316 | const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ])) |
317 | |||
318 | return queue.add('job', options.payload, jobOptions) | ||
319 | } | ||
320 | |||
321 | async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { | ||
322 | let lastJob: FlowJob | ||
323 | |||
324 | for (const job of jobs) { | ||
325 | if (!job) continue | ||
326 | |||
327 | lastJob = { | ||
328 | name: 'job', | ||
329 | data: job.payload, | ||
330 | queueName: job.type, | ||
331 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), | ||
332 | children: lastJob | ||
333 | ? [ lastJob ] | ||
334 | : [] | ||
335 | } | ||
336 | } | ||
337 | |||
338 | return this.flowProducer.add(lastJob) | ||
339 | } | ||
340 | |||
341 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { | ||
342 | return { | ||
294 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 343 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
295 | attempts: JOB_ATTEMPTS[obj.type], | 344 | attempts: JOB_ATTEMPTS[type], |
296 | priority: options.priority, | 345 | priority: options.priority, |
297 | delay: options.delay | 346 | delay: options.delay |
298 | } | 347 | } |
299 | |||
300 | return queue.add('job', obj.payload, jobArgs) | ||
301 | } | 348 | } |
302 | 349 | ||
350 | // --------------------------------------------------------------------------- | ||
351 | |||
303 | async listForApi (options: { | 352 | async listForApi (options: { |
304 | state?: JobState | 353 | state?: JobState |
305 | start: number | 354 | start: number |
@@ -367,6 +416,8 @@ class JobQueue { | |||
367 | return Promise.all(promises) | 416 | return Promise.all(promises) |
368 | } | 417 | } |
369 | 418 | ||
419 | // --------------------------------------------------------------------------- | ||
420 | |||
370 | async removeOldJobs () { | 421 | async removeOldJobs () { |
371 | for (const key of Object.keys(this.queues)) { | 422 | for (const key of Object.keys(this.queues)) { |
372 | const queue: Queue = this.queues[key] | 423 | const queue: Queue = this.queues[key] |