diff options
author | Chocobozzz <me@florianbigard.com> | 2022-08-08 10:42:08 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-08-09 09:18:07 +0200 |
commit | 5a921e7b74910414626bfc9672b857e987e3ebed (patch) | |
tree | f627e2ccc11c55bcba9e630951e72c5f94864c12 /server/lib/job-queue/job-queue.ts | |
parent | 5e2afe4290103bf0d54ae7b3e62781f2a00487c9 (diff) | |
download | PeerTube-5a921e7b74910414626bfc9672b857e987e3ebed.tar.gz PeerTube-5a921e7b74910414626bfc9672b857e987e3ebed.tar.zst PeerTube-5a921e7b74910414626bfc9672b857e987e3ebed.zip |
Move to bullmq
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 188 |
1 files changed, 132 insertions, 56 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0ae325f4d..0cf5d53ce 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,7 +1,19 @@ | |||
1 | import Bull, { Job, JobOptions, Queue } from 'bull' | 1 | import { |
2 | Job, | ||
3 | JobsOptions, | ||
4 | Queue, | ||
5 | QueueEvents, | ||
6 | QueueEventsOptions, | ||
7 | QueueOptions, | ||
8 | QueueScheduler, | ||
9 | QueueSchedulerOptions, | ||
10 | Worker, | ||
11 | WorkerOptions | ||
12 | } from 'bullmq' | ||
2 | import { jobStates } from '@server/helpers/custom-validators/jobs' | 13 | import { jobStates } from '@server/helpers/custom-validators/jobs' |
3 | import { CONFIG } from '@server/initializers/config' | 14 | import { CONFIG } from '@server/initializers/config' |
4 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 15 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
16 | import { timeoutPromise } from '@shared/core-utils' | ||
5 | import { | 17 | import { |
6 | ActivitypubFollowPayload, | 18 | ActivitypubFollowPayload, |
7 | ActivitypubHttpBroadcastPayload, | 19 | ActivitypubHttpBroadcastPayload, |
@@ -120,7 +132,11 @@ class JobQueue { | |||
120 | 132 | ||
121 | private static instance: JobQueue | 133 | private static instance: JobQueue |
122 | 134 | ||
135 | private workers: { [id in JobType]?: Worker } = {} | ||
123 | private queues: { [id in JobType]?: Queue } = {} | 136 | private queues: { [id in JobType]?: Queue } = {} |
137 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} | ||
138 | private queueEvents: { [id in JobType]?: QueueEvents } = {} | ||
139 | |||
124 | private initialized = false | 140 | private initialized = false |
125 | private jobRedisPrefix: string | 141 | private jobRedisPrefix: string |
126 | 142 | ||
@@ -134,75 +150,131 @@ class JobQueue { | |||
134 | 150 | ||
135 | this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST | 151 | this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST |
136 | 152 | ||
137 | const queueOptions: Bull.QueueOptions = { | 153 | for (const handlerName of (Object.keys(handlers) as JobType[])) { |
154 | this.buildWorker(handlerName, produceOnly) | ||
155 | this.buildQueue(handlerName) | ||
156 | this.buildQueueScheduler(handlerName, produceOnly) | ||
157 | this.buildQueueEvent(handlerName, produceOnly) | ||
158 | } | ||
159 | |||
160 | this.addRepeatableJobs() | ||
161 | } | ||
162 | |||
163 | private buildWorker (handlerName: JobType, produceOnly: boolean) { | ||
164 | const workerOptions: WorkerOptions = { | ||
165 | autorun: !produceOnly, | ||
166 | concurrency: this.getJobConcurrency(handlerName), | ||
138 | prefix: this.jobRedisPrefix, | 167 | prefix: this.jobRedisPrefix, |
139 | redis: { | 168 | connection: this.getRedisConnection() |
140 | password: CONFIG.REDIS.AUTH, | ||
141 | db: CONFIG.REDIS.DB, | ||
142 | host: CONFIG.REDIS.HOSTNAME, | ||
143 | port: CONFIG.REDIS.PORT, | ||
144 | path: CONFIG.REDIS.SOCKET | ||
145 | }, | ||
146 | settings: { | ||
147 | maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts | ||
148 | } | ||
149 | } | 169 | } |
150 | 170 | ||
151 | for (const handlerName of (Object.keys(handlers) as JobType[])) { | 171 | const handler = function (job: Job) { |
152 | const queue = new Bull(handlerName, queueOptions) | 172 | const timeout = JOB_TTL[handlerName] |
173 | const p = handlers[handlerName](job) | ||
153 | 174 | ||
154 | if (produceOnly) { | 175 | if (!timeout) return p |
155 | queue.pause(true) | ||
156 | .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err })) | ||
157 | } | ||
158 | 176 | ||
159 | const handler = handlers[handlerName] | 177 | return timeoutPromise(p, timeout) |
178 | } | ||
160 | 179 | ||
161 | queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => { | 180 | const processor = async (jobArg: Job<any>) => { |
162 | const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) | 181 | const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) |
163 | 182 | ||
164 | return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') | 183 | return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') |
165 | }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) | 184 | } |
166 | 185 | ||
167 | queue.on('failed', (job, err) => { | 186 | const worker = new Worker(handlerName, processor, workerOptions) |
168 | const logLevel = silentFailure.has(handlerName) | ||
169 | ? 'debug' | ||
170 | : 'error' | ||
171 | 187 | ||
172 | logger.log(logLevel, 'Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) | 188 | worker.on('failed', (job, err) => { |
189 | const logLevel = silentFailure.has(handlerName) | ||
190 | ? 'debug' | ||
191 | : 'error' | ||
173 | 192 | ||
174 | if (errorHandlers[job.name]) { | 193 | logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err }) |
175 | errorHandlers[job.name](job, err) | ||
176 | .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err })) | ||
177 | } | ||
178 | }) | ||
179 | 194 | ||
180 | queue.on('error', err => { | 195 | if (errorHandlers[job.name]) { |
181 | logger.error('Error in job queue %s.', handlerName, { err }) | 196 | errorHandlers[job.name](job, err) |
182 | }) | 197 | .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err })) |
198 | } | ||
199 | }) | ||
183 | 200 | ||
184 | this.queues[handlerName] = queue | 201 | worker.on('error', err => { |
202 | logger.error('Error in job queue %s.', handlerName, { err }) | ||
203 | }) | ||
204 | |||
205 | this.workers[handlerName] = worker | ||
206 | } | ||
207 | |||
208 | private buildQueue (handlerName: JobType) { | ||
209 | const queueOptions: QueueOptions = { | ||
210 | connection: this.getRedisConnection(), | ||
211 | prefix: this.jobRedisPrefix | ||
185 | } | 212 | } |
186 | 213 | ||
187 | this.addRepeatableJobs() | 214 | this.queues[handlerName] = new Queue(handlerName, queueOptions) |
215 | } | ||
216 | |||
217 | private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { | ||
218 | const queueSchedulerOptions: QueueSchedulerOptions = { | ||
219 | autorun: !produceOnly, | ||
220 | connection: this.getRedisConnection(), | ||
221 | prefix: this.jobRedisPrefix, | ||
222 | maxStalledCount: 10 | ||
223 | } | ||
224 | this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions) | ||
188 | } | 225 | } |
189 | 226 | ||
190 | terminate () { | 227 | private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { |
191 | for (const queueName of Object.keys(this.queues)) { | 228 | const queueEventsOptions: QueueEventsOptions = { |
192 | const queue = this.queues[queueName] | 229 | autorun: !produceOnly, |
193 | queue.close() | 230 | connection: this.getRedisConnection(), |
231 | prefix: this.jobRedisPrefix | ||
194 | } | 232 | } |
233 | this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions) | ||
234 | } | ||
235 | |||
236 | private getRedisConnection () { | ||
237 | return { | ||
238 | password: CONFIG.REDIS.AUTH, | ||
239 | db: CONFIG.REDIS.DB, | ||
240 | host: CONFIG.REDIS.HOSTNAME, | ||
241 | port: CONFIG.REDIS.PORT, | ||
242 | path: CONFIG.REDIS.SOCKET | ||
243 | } | ||
244 | } | ||
245 | |||
246 | async terminate () { | ||
247 | const promises = Object.keys(this.workers) | ||
248 | .map(handlerName => { | ||
249 | const worker: Worker = this.workers[handlerName] | ||
250 | const queue: Queue = this.queues[handlerName] | ||
251 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | ||
252 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | ||
253 | |||
254 | return Promise.all([ | ||
255 | worker.close(false), | ||
256 | queue.close(), | ||
257 | queueScheduler.close(), | ||
258 | queueEvent.close() | ||
259 | ]) | ||
260 | }) | ||
261 | |||
262 | return Promise.all(promises) | ||
195 | } | 263 | } |
196 | 264 | ||
197 | async pause () { | 265 | async pause () { |
198 | for (const handler of Object.keys(this.queues)) { | 266 | for (const handler of Object.keys(this.workers)) { |
199 | await this.queues[handler].pause(true) | 267 | const worker: Worker = this.workers[handler] |
268 | |||
269 | await worker.pause() | ||
200 | } | 270 | } |
201 | } | 271 | } |
202 | 272 | ||
203 | async resume () { | 273 | resume () { |
204 | for (const handler of Object.keys(this.queues)) { | 274 | for (const handler of Object.keys(this.workers)) { |
205 | await this.queues[handler].resume(true) | 275 | const worker: Worker = this.workers[handler] |
276 | |||
277 | worker.resume() | ||
206 | } | 278 | } |
207 | } | 279 | } |
208 | 280 | ||
@@ -211,22 +283,21 @@ class JobQueue { | |||
211 | .catch(err => logger.error('Cannot create job.', { err, obj })) | 283 | .catch(err => logger.error('Cannot create job.', { err, obj })) |
212 | } | 284 | } |
213 | 285 | ||
214 | createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { | 286 | async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { |
215 | const queue: Queue = this.queues[obj.type] | 287 | const queue: Queue = this.queues[obj.type] |
216 | if (queue === undefined) { | 288 | if (queue === undefined) { |
217 | logger.error('Unknown queue %s: cannot create job.', obj.type) | 289 | logger.error('Unknown queue %s: cannot create job.', obj.type) |
218 | return | 290 | return |
219 | } | 291 | } |
220 | 292 | ||
221 | const jobArgs: JobOptions = { | 293 | const jobArgs: JobsOptions = { |
222 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 294 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
223 | attempts: JOB_ATTEMPTS[obj.type], | 295 | attempts: JOB_ATTEMPTS[obj.type], |
224 | timeout: JOB_TTL[obj.type], | ||
225 | priority: options.priority, | 296 | priority: options.priority, |
226 | delay: options.delay | 297 | delay: options.delay |
227 | } | 298 | } |
228 | 299 | ||
229 | return queue.add(obj.payload, jobArgs) | 300 | return queue.add('job', obj.payload, jobArgs) |
230 | } | 301 | } |
231 | 302 | ||
232 | async listForApi (options: { | 303 | async listForApi (options: { |
@@ -244,7 +315,8 @@ class JobQueue { | |||
244 | const filteredJobTypes = this.filterJobTypes(jobType) | 315 | const filteredJobTypes = this.filterJobTypes(jobType) |
245 | 316 | ||
246 | for (const jobType of filteredJobTypes) { | 317 | for (const jobType of filteredJobTypes) { |
247 | const queue = this.queues[jobType] | 318 | const queue: Queue = this.queues[jobType] |
319 | |||
248 | if (queue === undefined) { | 320 | if (queue === undefined) { |
249 | logger.error('Unknown queue %s to list jobs.', jobType) | 321 | logger.error('Unknown queue %s to list jobs.', jobType) |
250 | continue | 322 | continue |
@@ -297,18 +369,22 @@ class JobQueue { | |||
297 | 369 | ||
298 | async removeOldJobs () { | 370 | async removeOldJobs () { |
299 | for (const key of Object.keys(this.queues)) { | 371 | for (const key of Object.keys(this.queues)) { |
300 | const queue = this.queues[key] | 372 | const queue: Queue = this.queues[key] |
301 | await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') | 373 | await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed') |
302 | } | 374 | } |
303 | } | 375 | } |
304 | 376 | ||
377 | waitJob (job: Job) { | ||
378 | return job.waitUntilFinished(this.queueEvents[job.queueName]) | ||
379 | } | ||
380 | |||
305 | private addRepeatableJobs () { | 381 | private addRepeatableJobs () { |
306 | this.queues['videos-views-stats'].add({}, { | 382 | this.queues['videos-views-stats'].add('job', {}, { |
307 | repeat: REPEAT_JOBS['videos-views-stats'] | 383 | repeat: REPEAT_JOBS['videos-views-stats'] |
308 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | 384 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
309 | 385 | ||
310 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | 386 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { |
311 | this.queues['activitypub-cleaner'].add({}, { | 387 | this.queues['activitypub-cleaner'].add('job', {}, { |
312 | repeat: REPEAT_JOBS['activitypub-cleaner'] | 388 | repeat: REPEAT_JOBS['activitypub-cleaner'] |
313 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | 389 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
314 | } | 390 | } |