diff options
author | Chocobozzz <me@florianbigard.com> | 2023-02-16 14:10:11 +0100 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2023-02-16 14:10:11 +0100 |
commit | 182082f52d8cd94dc068ce39276af8243a00fb7c (patch) | |
tree | 25d6155e7515c23c2ec28fa2b83928ff80930dec /server/lib/job-queue/job-queue.ts | |
parent | e65ef81cf51746616182a822bd6933bf0d16717a (diff) | |
download | PeerTube-182082f52d8cd94dc068ce39276af8243a00fb7c.tar.gz PeerTube-182082f52d8cd94dc068ce39276af8243a00fb7c.tar.zst PeerTube-182082f52d8cd94dc068ce39276af8243a00fb7c.zip |
Update bullmq
Requires redis >= 6.2
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 36 |
1 files changed, 4 insertions, 32 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 8597eb000..cc6be0bd8 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -7,11 +7,10 @@ import { | |||
7 | QueueEvents, | 7 | QueueEvents, |
8 | QueueEventsOptions, | 8 | QueueEventsOptions, |
9 | QueueOptions, | 9 | QueueOptions, |
10 | QueueScheduler, | ||
11 | QueueSchedulerOptions, | ||
12 | Worker, | 10 | Worker, |
13 | WorkerOptions | 11 | WorkerOptions |
14 | } from 'bullmq' | 12 | } from 'bullmq' |
13 | import { parseDurationToMs } from '@server/helpers/core-utils' | ||
15 | import { jobStates } from '@server/helpers/custom-validators/jobs' | 14 | import { jobStates } from '@server/helpers/custom-validators/jobs' |
16 | import { CONFIG } from '@server/initializers/config' | 15 | import { CONFIG } from '@server/initializers/config' |
17 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 16 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
@@ -41,14 +40,7 @@ import { | |||
41 | VideoTranscodingPayload | 40 | VideoTranscodingPayload |
42 | } from '../../../shared/models' | 41 | } from '../../../shared/models' |
43 | import { logger } from '../../helpers/logger' | 42 | import { logger } from '../../helpers/logger' |
44 | import { | 43 | import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' |
45 | JOB_ATTEMPTS, | ||
46 | JOB_CONCURRENCY, | ||
47 | JOB_REMOVAL_OPTIONS, | ||
48 | JOB_TTL, | ||
49 | REPEAT_JOBS, | ||
50 | WEBSERVER | ||
51 | } from '../../initializers/constants' | ||
52 | import { Hooks } from '../plugins/hooks' | 44 | import { Hooks } from '../plugins/hooks' |
53 | import { Redis } from '../redis' | 45 | import { Redis } from '../redis' |
54 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' | 46 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' |
@@ -71,7 +63,6 @@ import { processVideoLiveEnding } from './handlers/video-live-ending' | |||
71 | import { processVideoStudioEdition } from './handlers/video-studio-edition' | 63 | import { processVideoStudioEdition } from './handlers/video-studio-edition' |
72 | import { processVideoTranscoding } from './handlers/video-transcoding' | 64 | import { processVideoTranscoding } from './handlers/video-transcoding' |
73 | import { processVideosViewsStats } from './handlers/video-views-stats' | 65 | import { processVideosViewsStats } from './handlers/video-views-stats' |
74 | import { parseDurationToMs } from '@server/helpers/core-utils' | ||
75 | 66 | ||
76 | export type CreateJobArgument = | 67 | export type CreateJobArgument = |
77 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 68 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -166,7 +157,6 @@ class JobQueue { | |||
166 | 157 | ||
167 | private workers: { [id in JobType]?: Worker } = {} | 158 | private workers: { [id in JobType]?: Worker } = {} |
168 | private queues: { [id in JobType]?: Queue } = {} | 159 | private queues: { [id in JobType]?: Queue } = {} |
169 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} | ||
170 | private queueEvents: { [id in JobType]?: QueueEvents } = {} | 160 | private queueEvents: { [id in JobType]?: QueueEvents } = {} |
171 | 161 | ||
172 | private flowProducer: FlowProducer | 162 | private flowProducer: FlowProducer |
@@ -187,7 +177,6 @@ class JobQueue { | |||
187 | for (const handlerName of Object.keys(handlers)) { | 177 | for (const handlerName of Object.keys(handlers)) { |
188 | this.buildWorker(handlerName) | 178 | this.buildWorker(handlerName) |
189 | this.buildQueue(handlerName) | 179 | this.buildQueue(handlerName) |
190 | this.buildQueueScheduler(handlerName) | ||
191 | this.buildQueueEvent(handlerName) | 180 | this.buildQueueEvent(handlerName) |
192 | } | 181 | } |
193 | 182 | ||
@@ -205,7 +194,8 @@ class JobQueue { | |||
205 | autorun: false, | 194 | autorun: false, |
206 | concurrency: this.getJobConcurrency(handlerName), | 195 | concurrency: this.getJobConcurrency(handlerName), |
207 | prefix: this.jobRedisPrefix, | 196 | prefix: this.jobRedisPrefix, |
208 | connection: Redis.getRedisClientOptions('Worker') | 197 | connection: Redis.getRedisClientOptions('Worker'), |
198 | maxStalledCount: 10 | ||
209 | } | 199 | } |
210 | 200 | ||
211 | const handler = function (job: Job) { | 201 | const handler = function (job: Job) { |
@@ -255,20 +245,6 @@ class JobQueue { | |||
255 | this.queues[handlerName] = queue | 245 | this.queues[handlerName] = queue |
256 | } | 246 | } |
257 | 247 | ||
258 | private buildQueueScheduler (handlerName: JobType) { | ||
259 | const queueSchedulerOptions: QueueSchedulerOptions = { | ||
260 | autorun: false, | ||
261 | connection: Redis.getRedisClientOptions('QueueScheduler'), | ||
262 | prefix: this.jobRedisPrefix, | ||
263 | maxStalledCount: 10 | ||
264 | } | ||
265 | |||
266 | const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions) | ||
267 | queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) }) | ||
268 | |||
269 | this.queueSchedulers[handlerName] = queueScheduler | ||
270 | } | ||
271 | |||
272 | private buildQueueEvent (handlerName: JobType) { | 248 | private buildQueueEvent (handlerName: JobType) { |
273 | const queueEventsOptions: QueueEventsOptions = { | 249 | const queueEventsOptions: QueueEventsOptions = { |
274 | autorun: false, | 250 | autorun: false, |
@@ -289,13 +265,11 @@ class JobQueue { | |||
289 | .map(handlerName => { | 265 | .map(handlerName => { |
290 | const worker: Worker = this.workers[handlerName] | 266 | const worker: Worker = this.workers[handlerName] |
291 | const queue: Queue = this.queues[handlerName] | 267 | const queue: Queue = this.queues[handlerName] |
292 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | ||
293 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | 268 | const queueEvent: QueueEvents = this.queueEvents[handlerName] |
294 | 269 | ||
295 | return Promise.all([ | 270 | return Promise.all([ |
296 | worker.close(false), | 271 | worker.close(false), |
297 | queue.close(), | 272 | queue.close(), |
298 | queueScheduler.close(), | ||
299 | queueEvent.close() | 273 | queueEvent.close() |
300 | ]) | 274 | ]) |
301 | }) | 275 | }) |
@@ -307,12 +281,10 @@ class JobQueue { | |||
307 | const promises = Object.keys(this.workers) | 281 | const promises = Object.keys(this.workers) |
308 | .map(handlerName => { | 282 | .map(handlerName => { |
309 | const worker: Worker = this.workers[handlerName] | 283 | const worker: Worker = this.workers[handlerName] |
310 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | ||
311 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | 284 | const queueEvent: QueueEvents = this.queueEvents[handlerName] |
312 | 285 | ||
313 | return Promise.all([ | 286 | return Promise.all([ |
314 | worker.run(), | 287 | worker.run(), |
315 | queueScheduler.run(), | ||
316 | queueEvent.run() | 288 | queueEvent.run() |
317 | ]) | 289 | ]) |
318 | }) | 290 | }) |