aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-02-16 14:10:11 +0100
committerChocobozzz <me@florianbigard.com>2023-02-16 14:10:11 +0100
commit182082f52d8cd94dc068ce39276af8243a00fb7c (patch)
tree25d6155e7515c23c2ec28fa2b83928ff80930dec /server/lib/job-queue/job-queue.ts
parente65ef81cf51746616182a822bd6933bf0d16717a (diff)
downloadPeerTube-182082f52d8cd94dc068ce39276af8243a00fb7c.tar.gz
PeerTube-182082f52d8cd94dc068ce39276af8243a00fb7c.tar.zst
PeerTube-182082f52d8cd94dc068ce39276af8243a00fb7c.zip
Update bullmq
Requires redis >= 6.2
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r--server/lib/job-queue/job-queue.ts36
1 files changed, 4 insertions, 32 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 8597eb000..cc6be0bd8 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -7,11 +7,10 @@ import {
7 QueueEvents, 7 QueueEvents,
8 QueueEventsOptions, 8 QueueEventsOptions,
9 QueueOptions, 9 QueueOptions,
10 QueueScheduler,
11 QueueSchedulerOptions,
12 Worker, 10 Worker,
13 WorkerOptions 11 WorkerOptions
14} from 'bullmq' 12} from 'bullmq'
13import { parseDurationToMs } from '@server/helpers/core-utils'
15import { jobStates } from '@server/helpers/custom-validators/jobs' 14import { jobStates } from '@server/helpers/custom-validators/jobs'
16import { CONFIG } from '@server/initializers/config' 15import { CONFIG } from '@server/initializers/config'
17import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 16import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
@@ -41,14 +40,7 @@ import {
41 VideoTranscodingPayload 40 VideoTranscodingPayload
42} from '../../../shared/models' 41} from '../../../shared/models'
43import { logger } from '../../helpers/logger' 42import { logger } from '../../helpers/logger'
44import { 43import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
45 JOB_ATTEMPTS,
46 JOB_CONCURRENCY,
47 JOB_REMOVAL_OPTIONS,
48 JOB_TTL,
49 REPEAT_JOBS,
50 WEBSERVER
51} from '../../initializers/constants'
52import { Hooks } from '../plugins/hooks' 44import { Hooks } from '../plugins/hooks'
53import { Redis } from '../redis' 45import { Redis } from '../redis'
54import { processActivityPubCleaner } from './handlers/activitypub-cleaner' 46import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
@@ -71,7 +63,6 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
71import { processVideoStudioEdition } from './handlers/video-studio-edition' 63import { processVideoStudioEdition } from './handlers/video-studio-edition'
72import { processVideoTranscoding } from './handlers/video-transcoding' 64import { processVideoTranscoding } from './handlers/video-transcoding'
73import { processVideosViewsStats } from './handlers/video-views-stats' 65import { processVideosViewsStats } from './handlers/video-views-stats'
74import { parseDurationToMs } from '@server/helpers/core-utils'
75 66
76export type CreateJobArgument = 67export type CreateJobArgument =
77 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 68 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -166,7 +157,6 @@ class JobQueue {
166 157
167 private workers: { [id in JobType]?: Worker } = {} 158 private workers: { [id in JobType]?: Worker } = {}
168 private queues: { [id in JobType]?: Queue } = {} 159 private queues: { [id in JobType]?: Queue } = {}
169 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
170 private queueEvents: { [id in JobType]?: QueueEvents } = {} 160 private queueEvents: { [id in JobType]?: QueueEvents } = {}
171 161
172 private flowProducer: FlowProducer 162 private flowProducer: FlowProducer
@@ -187,7 +177,6 @@ class JobQueue {
187 for (const handlerName of Object.keys(handlers)) { 177 for (const handlerName of Object.keys(handlers)) {
188 this.buildWorker(handlerName) 178 this.buildWorker(handlerName)
189 this.buildQueue(handlerName) 179 this.buildQueue(handlerName)
190 this.buildQueueScheduler(handlerName)
191 this.buildQueueEvent(handlerName) 180 this.buildQueueEvent(handlerName)
192 } 181 }
193 182
@@ -205,7 +194,8 @@ class JobQueue {
205 autorun: false, 194 autorun: false,
206 concurrency: this.getJobConcurrency(handlerName), 195 concurrency: this.getJobConcurrency(handlerName),
207 prefix: this.jobRedisPrefix, 196 prefix: this.jobRedisPrefix,
208 connection: Redis.getRedisClientOptions('Worker') 197 connection: Redis.getRedisClientOptions('Worker'),
198 maxStalledCount: 10
209 } 199 }
210 200
211 const handler = function (job: Job) { 201 const handler = function (job: Job) {
@@ -255,20 +245,6 @@ class JobQueue {
255 this.queues[handlerName] = queue 245 this.queues[handlerName] = queue
256 } 246 }
257 247
258 private buildQueueScheduler (handlerName: JobType) {
259 const queueSchedulerOptions: QueueSchedulerOptions = {
260 autorun: false,
261 connection: Redis.getRedisClientOptions('QueueScheduler'),
262 prefix: this.jobRedisPrefix,
263 maxStalledCount: 10
264 }
265
266 const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
267 queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
268
269 this.queueSchedulers[handlerName] = queueScheduler
270 }
271
272 private buildQueueEvent (handlerName: JobType) { 248 private buildQueueEvent (handlerName: JobType) {
273 const queueEventsOptions: QueueEventsOptions = { 249 const queueEventsOptions: QueueEventsOptions = {
274 autorun: false, 250 autorun: false,
@@ -289,13 +265,11 @@ class JobQueue {
289 .map(handlerName => { 265 .map(handlerName => {
290 const worker: Worker = this.workers[handlerName] 266 const worker: Worker = this.workers[handlerName]
291 const queue: Queue = this.queues[handlerName] 267 const queue: Queue = this.queues[handlerName]
292 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
293 const queueEvent: QueueEvents = this.queueEvents[handlerName] 268 const queueEvent: QueueEvents = this.queueEvents[handlerName]
294 269
295 return Promise.all([ 270 return Promise.all([
296 worker.close(false), 271 worker.close(false),
297 queue.close(), 272 queue.close(),
298 queueScheduler.close(),
299 queueEvent.close() 273 queueEvent.close()
300 ]) 274 ])
301 }) 275 })
@@ -307,12 +281,10 @@ class JobQueue {
307 const promises = Object.keys(this.workers) 281 const promises = Object.keys(this.workers)
308 .map(handlerName => { 282 .map(handlerName => {
309 const worker: Worker = this.workers[handlerName] 283 const worker: Worker = this.workers[handlerName]
310 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
311 const queueEvent: QueueEvents = this.queueEvents[handlerName] 284 const queueEvent: QueueEvents = this.queueEvents[handlerName]
312 285
313 return Promise.all([ 286 return Promise.all([
314 worker.run(), 287 worker.run(),
315 queueScheduler.run(),
316 queueEvent.run() 288 queueEvent.run()
317 ]) 289 ])
318 }) 290 })