aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r--server/lib/job-queue/job-queue.ts188
1 files changed, 132 insertions, 56 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0ae325f4d..0cf5d53ce 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,7 +1,19 @@
1import Bull, { Job, JobOptions, Queue } from 'bull' 1import {
2 Job,
3 JobsOptions,
4 Queue,
5 QueueEvents,
6 QueueEventsOptions,
7 QueueOptions,
8 QueueScheduler,
9 QueueSchedulerOptions,
10 Worker,
11 WorkerOptions
12} from 'bullmq'
2import { jobStates } from '@server/helpers/custom-validators/jobs' 13import { jobStates } from '@server/helpers/custom-validators/jobs'
3import { CONFIG } from '@server/initializers/config' 14import { CONFIG } from '@server/initializers/config'
4import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 15import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
16import { timeoutPromise } from '@shared/core-utils'
5import { 17import {
6 ActivitypubFollowPayload, 18 ActivitypubFollowPayload,
7 ActivitypubHttpBroadcastPayload, 19 ActivitypubHttpBroadcastPayload,
@@ -120,7 +132,11 @@ class JobQueue {
120 132
121 private static instance: JobQueue 133 private static instance: JobQueue
122 134
135 private workers: { [id in JobType]?: Worker } = {}
123 private queues: { [id in JobType]?: Queue } = {} 136 private queues: { [id in JobType]?: Queue } = {}
137 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
138 private queueEvents: { [id in JobType]?: QueueEvents } = {}
139
124 private initialized = false 140 private initialized = false
125 private jobRedisPrefix: string 141 private jobRedisPrefix: string
126 142
@@ -134,75 +150,131 @@ class JobQueue {
134 150
135 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST 151 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
136 152
137 const queueOptions: Bull.QueueOptions = { 153 for (const handlerName of (Object.keys(handlers) as JobType[])) {
154 this.buildWorker(handlerName, produceOnly)
155 this.buildQueue(handlerName)
156 this.buildQueueScheduler(handlerName, produceOnly)
157 this.buildQueueEvent(handlerName, produceOnly)
158 }
159
160 this.addRepeatableJobs()
161 }
162
163 private buildWorker (handlerName: JobType, produceOnly: boolean) {
164 const workerOptions: WorkerOptions = {
165 autorun: !produceOnly,
166 concurrency: this.getJobConcurrency(handlerName),
138 prefix: this.jobRedisPrefix, 167 prefix: this.jobRedisPrefix,
139 redis: { 168 connection: this.getRedisConnection()
140 password: CONFIG.REDIS.AUTH,
141 db: CONFIG.REDIS.DB,
142 host: CONFIG.REDIS.HOSTNAME,
143 port: CONFIG.REDIS.PORT,
144 path: CONFIG.REDIS.SOCKET
145 },
146 settings: {
147 maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts
148 }
149 } 169 }
150 170
151 for (const handlerName of (Object.keys(handlers) as JobType[])) { 171 const handler = function (job: Job) {
152 const queue = new Bull(handlerName, queueOptions) 172 const timeout = JOB_TTL[handlerName]
173 const p = handlers[handlerName](job)
153 174
154 if (produceOnly) { 175 if (!timeout) return p
155 queue.pause(true)
156 .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err }))
157 }
158 176
159 const handler = handlers[handlerName] 177 return timeoutPromise(p, timeout)
178 }
160 179
161 queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => { 180 const processor = async (jobArg: Job<any>) => {
162 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) 181 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
163 182
164 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') 183 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
165 }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) 184 }
166 185
167 queue.on('failed', (job, err) => { 186 const worker = new Worker(handlerName, processor, workerOptions)
168 const logLevel = silentFailure.has(handlerName)
169 ? 'debug'
170 : 'error'
171 187
172 logger.log(logLevel, 'Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) 188 worker.on('failed', (job, err) => {
189 const logLevel = silentFailure.has(handlerName)
190 ? 'debug'
191 : 'error'
173 192
174 if (errorHandlers[job.name]) { 193 logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
175 errorHandlers[job.name](job, err)
176 .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
177 }
178 })
179 194
180 queue.on('error', err => { 195 if (errorHandlers[job.name]) {
181 logger.error('Error in job queue %s.', handlerName, { err }) 196 errorHandlers[job.name](job, err)
182 }) 197 .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
198 }
199 })
183 200
184 this.queues[handlerName] = queue 201 worker.on('error', err => {
202 logger.error('Error in job queue %s.', handlerName, { err })
203 })
204
205 this.workers[handlerName] = worker
206 }
207
208 private buildQueue (handlerName: JobType) {
209 const queueOptions: QueueOptions = {
210 connection: this.getRedisConnection(),
211 prefix: this.jobRedisPrefix
185 } 212 }
186 213
187 this.addRepeatableJobs() 214 this.queues[handlerName] = new Queue(handlerName, queueOptions)
215 }
216
217 private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) {
218 const queueSchedulerOptions: QueueSchedulerOptions = {
219 autorun: !produceOnly,
220 connection: this.getRedisConnection(),
221 prefix: this.jobRedisPrefix,
222 maxStalledCount: 10
223 }
224 this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions)
188 } 225 }
189 226
190 terminate () { 227 private buildQueueEvent (handlerName: JobType, produceOnly: boolean) {
191 for (const queueName of Object.keys(this.queues)) { 228 const queueEventsOptions: QueueEventsOptions = {
192 const queue = this.queues[queueName] 229 autorun: !produceOnly,
193 queue.close() 230 connection: this.getRedisConnection(),
231 prefix: this.jobRedisPrefix
194 } 232 }
233 this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions)
234 }
235
236 private getRedisConnection () {
237 return {
238 password: CONFIG.REDIS.AUTH,
239 db: CONFIG.REDIS.DB,
240 host: CONFIG.REDIS.HOSTNAME,
241 port: CONFIG.REDIS.PORT,
242 path: CONFIG.REDIS.SOCKET
243 }
244 }
245
246 async terminate () {
247 const promises = Object.keys(this.workers)
248 .map(handlerName => {
249 const worker: Worker = this.workers[handlerName]
250 const queue: Queue = this.queues[handlerName]
251 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
252 const queueEvent: QueueEvents = this.queueEvents[handlerName]
253
254 return Promise.all([
255 worker.close(false),
256 queue.close(),
257 queueScheduler.close(),
258 queueEvent.close()
259 ])
260 })
261
262 return Promise.all(promises)
195 } 263 }
196 264
197 async pause () { 265 async pause () {
198 for (const handler of Object.keys(this.queues)) { 266 for (const handler of Object.keys(this.workers)) {
199 await this.queues[handler].pause(true) 267 const worker: Worker = this.workers[handler]
268
269 await worker.pause()
200 } 270 }
201 } 271 }
202 272
203 async resume () { 273 resume () {
204 for (const handler of Object.keys(this.queues)) { 274 for (const handler of Object.keys(this.workers)) {
205 await this.queues[handler].resume(true) 275 const worker: Worker = this.workers[handler]
276
277 worker.resume()
206 } 278 }
207 } 279 }
208 280
@@ -211,22 +283,21 @@ class JobQueue {
211 .catch(err => logger.error('Cannot create job.', { err, obj })) 283 .catch(err => logger.error('Cannot create job.', { err, obj }))
212 } 284 }
213 285
214 createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { 286 async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
215 const queue: Queue = this.queues[obj.type] 287 const queue: Queue = this.queues[obj.type]
216 if (queue === undefined) { 288 if (queue === undefined) {
217 logger.error('Unknown queue %s: cannot create job.', obj.type) 289 logger.error('Unknown queue %s: cannot create job.', obj.type)
218 return 290 return
219 } 291 }
220 292
221 const jobArgs: JobOptions = { 293 const jobArgs: JobsOptions = {
222 backoff: { delay: 60 * 1000, type: 'exponential' }, 294 backoff: { delay: 60 * 1000, type: 'exponential' },
223 attempts: JOB_ATTEMPTS[obj.type], 295 attempts: JOB_ATTEMPTS[obj.type],
224 timeout: JOB_TTL[obj.type],
225 priority: options.priority, 296 priority: options.priority,
226 delay: options.delay 297 delay: options.delay
227 } 298 }
228 299
229 return queue.add(obj.payload, jobArgs) 300 return queue.add('job', obj.payload, jobArgs)
230 } 301 }
231 302
232 async listForApi (options: { 303 async listForApi (options: {
@@ -244,7 +315,8 @@ class JobQueue {
244 const filteredJobTypes = this.filterJobTypes(jobType) 315 const filteredJobTypes = this.filterJobTypes(jobType)
245 316
246 for (const jobType of filteredJobTypes) { 317 for (const jobType of filteredJobTypes) {
247 const queue = this.queues[jobType] 318 const queue: Queue = this.queues[jobType]
319
248 if (queue === undefined) { 320 if (queue === undefined) {
249 logger.error('Unknown queue %s to list jobs.', jobType) 321 logger.error('Unknown queue %s to list jobs.', jobType)
250 continue 322 continue
@@ -297,18 +369,22 @@ class JobQueue {
297 369
298 async removeOldJobs () { 370 async removeOldJobs () {
299 for (const key of Object.keys(this.queues)) { 371 for (const key of Object.keys(this.queues)) {
300 const queue = this.queues[key] 372 const queue: Queue = this.queues[key]
301 await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') 373 await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed')
302 } 374 }
303 } 375 }
304 376
377 waitJob (job: Job) {
378 return job.waitUntilFinished(this.queueEvents[job.queueName])
379 }
380
305 private addRepeatableJobs () { 381 private addRepeatableJobs () {
306 this.queues['videos-views-stats'].add({}, { 382 this.queues['videos-views-stats'].add('job', {}, {
307 repeat: REPEAT_JOBS['videos-views-stats'] 383 repeat: REPEAT_JOBS['videos-views-stats']
308 }).catch(err => logger.error('Cannot add repeatable job.', { err })) 384 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
309 385
310 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { 386 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
311 this.queues['activitypub-cleaner'].add({}, { 387 this.queues['activitypub-cleaner'].add('job', {}, {
312 repeat: REPEAT_JOBS['activitypub-cleaner'] 388 repeat: REPEAT_JOBS['activitypub-cleaner']
313 }).catch(err => logger.error('Cannot add repeatable job.', { err })) 389 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
314 } 390 }