]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blame - server/lib/job-queue/job-queue.ts
Fix user creation date on localized page
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
CommitLineData
5a921e7b 1import {
bd911b54
C
2 FlowJob,
3 FlowProducer,
5a921e7b
C
4 Job,
5 JobsOptions,
6 Queue,
7 QueueEvents,
8 QueueEventsOptions,
9 QueueOptions,
10 QueueScheduler,
11 QueueSchedulerOptions,
12 Worker,
13 WorkerOptions
14} from 'bullmq'
402145b8 15import { jobStates } from '@server/helpers/custom-validators/jobs'
9129b769 16import { CONFIG } from '@server/initializers/config'
402145b8 17import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
bd911b54 18import { pick, timeoutPromise } from '@shared/core-utils'
8dc8a34e
C
19import {
20 ActivitypubFollowPayload,
21 ActivitypubHttpBroadcastPayload,
e1c55031
C
22 ActivitypubHttpFetcherPayload,
23 ActivitypubHttpUnicastPayload,
8795d6f2 24 ActorKeysPayload,
2a491182 25 AfterVideoChannelImportPayload,
276250f0 26 DeleteResumableUploadMetaFilePayload,
e1c55031 27 EmailPayload,
bd911b54 28 FederateVideoPayload,
8dc8a34e 29 JobState,
e1c55031 30 JobType,
f012319a 31 ManageVideoTorrentPayload,
0305db28 32 MoveObjectStoragePayload,
bd911b54 33 NotifyPayload,
e1c55031 34 RefreshPayload,
2a491182 35 VideoChannelImportPayload,
e1c55031
C
36 VideoFileImportPayload,
37 VideoImportPayload,
a5cf76af 38 VideoLiveEndingPayload,
e1c55031 39 VideoRedundancyPayload,
92e66e04 40 VideoStudioEditionPayload,
e1c55031 41 VideoTranscodingPayload
8dc8a34e 42} from '../../../shared/models'
94a5ff8a 43import { logger } from '../../helpers/logger'
74dc3bca 44import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
22df69fd 45import { Hooks } from '../plugins/hooks'
74d249bc 46import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
402145b8 47import { processActivityPubFollow } from './handlers/activitypub-follow'
405c83f9 48import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
8dc8a34e
C
49import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
50import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
e1c55031 51import { refreshAPObject } from './handlers/activitypub-refresher'
8795d6f2 52import { processActorKeys } from './handlers/actor-keys'
ab08ab4e 53import { processAfterVideoChannelImport } from './handlers/after-video-channel-import'
402145b8 54import { processEmail } from './handlers/email'
bd911b54 55import { processFederateVideo } from './handlers/federate-video'
f012319a 56import { processManageVideoTorrent } from './handlers/manage-video-torrent'
32567717 57import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
bd911b54 58import { processNotify } from './handlers/notify'
2a491182 59import { processVideoChannelImport } from './handlers/video-channel-import'
e1c55031 60import { processVideoFileImport } from './handlers/video-file-import'
402145b8 61import { processVideoImport } from './handlers/video-import'
a5cf76af 62import { processVideoLiveEnding } from './handlers/video-live-ending'
92e66e04 63import { processVideoStudioEdition } from './handlers/video-studio-edition'
402145b8 64import { processVideoTranscoding } from './handlers/video-transcoding'
51353d9a 65import { processVideosViewsStats } from './handlers/video-views-stats'
94a5ff8a 66
bd911b54 67export type CreateJobArgument =
94a5ff8a 68 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
f27b7a75 69 { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
94a5ff8a
C
70 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
71 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
74d249bc 72 { type: 'activitypub-http-cleaner', payload: {} } |
5350fd8e 73 { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
28be8916 74 { type: 'video-file-import', payload: VideoFileImportPayload } |
a0327eed 75 { type: 'video-transcoding', payload: VideoTranscodingPayload } |
fbad87b0 76 { type: 'email', payload: EmailPayload } |
6b616860 77 { type: 'video-import', payload: VideoImportPayload } |
04b8c3fb 78 { type: 'activitypub-refresher', payload: RefreshPayload } |
51353d9a 79 { type: 'videos-views-stats', payload: {} } |
a5cf76af 80 { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
8795d6f2 81 { type: 'actor-keys', payload: ActorKeysPayload } |
0305db28 82 { type: 'video-redundancy', payload: VideoRedundancyPayload } |
276250f0 83 { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
92e66e04 84 { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
f012319a 85 { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
2a491182
F
86 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
87 { type: 'video-channel-import', payload: VideoChannelImportPayload } |
88 { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } |
bd911b54
C
89 { type: 'notify', payload: NotifyPayload } |
90 { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
91 { type: 'federate-video', payload: FederateVideoPayload }
94a5ff8a 92
0305db28 93export type CreateJobOptions = {
a5cf76af 94 delay?: number
77d7e851 95 priority?: number
a5cf76af
C
96}
97
41fb13c3 98const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
405c83f9
C
99 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
100 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
94a5ff8a
C
101 'activitypub-http-unicast': processActivityPubHttpUnicast,
102 'activitypub-http-fetcher': processActivityPubHttpFetcher,
74d249bc 103 'activitypub-cleaner': processActivityPubCleaner,
5350fd8e 104 'activitypub-follow': processActivityPubFollow,
28be8916 105 'video-file-import': processVideoFileImport,
a0327eed 106 'video-transcoding': processVideoTranscoding,
fbad87b0 107 'email': processEmail,
6b616860 108 'video-import': processVideoImport,
51353d9a 109 'videos-views-stats': processVideosViewsStats,
b764380a 110 'activitypub-refresher': refreshAPObject,
a5cf76af 111 'video-live-ending': processVideoLiveEnding,
8795d6f2 112 'actor-keys': processActorKeys,
0305db28 113 'video-redundancy': processVideoRedundancy,
c729caf6 114 'move-to-object-storage': processMoveToObjectStorage,
f012319a 115 'manage-video-torrent': processManageVideoTorrent,
bd911b54 116 'video-studio-edition': processVideoStudioEdition,
2a491182
F
117 'video-channel-import': processVideoChannelImport,
118 'after-video-channel-import': processAfterVideoChannelImport,
119 'notify': processNotify,
bd911b54 120 'federate-video': processFederateVideo
94a5ff8a
C
121}
122
32567717
C
123const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
124 'move-to-object-storage': onMoveToObjectStorageFailure
125}
126
94831479
C
127const jobTypes: JobType[] = [
128 'activitypub-follow',
71e3dfda 129 'activitypub-http-broadcast',
f27b7a75 130 'activitypub-http-broadcast-parallel',
71e3dfda 131 'activitypub-http-fetcher',
94831479 132 'activitypub-http-unicast',
74d249bc 133 'activitypub-cleaner',
94831479 134 'email',
a0327eed 135 'video-transcoding',
fbad87b0 136 'video-file-import',
6b616860 137 'video-import',
51353d9a 138 'videos-views-stats',
b764380a 139 'activitypub-refresher',
a5cf76af 140 'video-redundancy',
8795d6f2 141 'actor-keys',
0305db28 142 'video-live-ending',
c729caf6 143 'move-to-object-storage',
f012319a 144 'manage-video-torrent',
bd911b54 145 'video-studio-edition',
2a491182
F
146 'video-channel-import',
147 'after-video-channel-import',
bd911b54
C
148 'notify',
149 'federate-video'
71e3dfda
C
150]
151
941d28cc
C
152const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
153
94a5ff8a
C
154class JobQueue {
155
156 private static instance: JobQueue
157
5a921e7b 158 private workers: { [id in JobType]?: Worker } = {}
41fb13c3 159 private queues: { [id in JobType]?: Queue } = {}
5a921e7b
C
160 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
161 private queueEvents: { [id in JobType]?: QueueEvents } = {}
162
bd911b54
C
163 private flowProducer: FlowProducer
164
94a5ff8a 165 private initialized = false
2c29ad4f 166 private jobRedisPrefix: string
94a5ff8a 167
a1587156
C
168 private constructor () {
169 }
94a5ff8a 170
e1ab52d7 171 init (produceOnly = false) {
94a5ff8a
C
172 // Already initialized
173 if (this.initialized === true) return
174 this.initialized = true
175
6dd9de95 176 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
ff4d2c73 177
5a921e7b
C
178 for (const handlerName of (Object.keys(handlers) as JobType[])) {
179 this.buildWorker(handlerName, produceOnly)
180 this.buildQueue(handlerName)
181 this.buildQueueScheduler(handlerName, produceOnly)
182 this.buildQueueEvent(handlerName, produceOnly)
183 }
184
bd911b54
C
185 this.flowProducer = new FlowProducer({
186 connection: this.getRedisConnection(),
187 prefix: this.jobRedisPrefix
188 })
ab08ab4e 189 this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
bd911b54 190
5a921e7b
C
191 this.addRepeatableJobs()
192 }
193
194 private buildWorker (handlerName: JobType, produceOnly: boolean) {
195 const workerOptions: WorkerOptions = {
196 autorun: !produceOnly,
197 concurrency: this.getJobConcurrency(handlerName),
2c29ad4f 198 prefix: this.jobRedisPrefix,
5a921e7b 199 connection: this.getRedisConnection()
94831479 200 }
ecb4e35f 201
5a921e7b
C
202 const handler = function (job: Job) {
203 const timeout = JOB_TTL[handlerName]
204 const p = handlers[handlerName](job)
e1ab52d7 205
5a921e7b 206 if (!timeout) return p
e1ab52d7 207
5a921e7b
C
208 return timeoutPromise(p, timeout)
209 }
94a5ff8a 210
5a921e7b
C
211 const processor = async (jobArg: Job<any>) => {
212 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
22df69fd 213
5a921e7b
C
214 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
215 }
d7f83948 216
5a921e7b 217 const worker = new Worker(handlerName, processor, workerOptions)
941d28cc 218
5a921e7b
C
219 worker.on('failed', (job, err) => {
220 const logLevel = silentFailure.has(handlerName)
221 ? 'debug'
222 : 'error'
32567717 223
5a921e7b 224 logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
3df45638 225
5a921e7b
C
226 if (errorHandlers[job.name]) {
227 errorHandlers[job.name](job, err)
228 .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
229 }
230 })
94831479 231
ab08ab4e 232 worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
5a921e7b
C
233
234 this.workers[handlerName] = worker
235 }
236
237 private buildQueue (handlerName: JobType) {
238 const queueOptions: QueueOptions = {
239 connection: this.getRedisConnection(),
240 prefix: this.jobRedisPrefix
94a5ff8a 241 }
6b616860 242
ab08ab4e
C
243 const queue = new Queue(handlerName, queueOptions)
244 queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
245
246 this.queues[handlerName] = queue
5a921e7b
C
247 }
248
249 private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) {
250 const queueSchedulerOptions: QueueSchedulerOptions = {
251 autorun: !produceOnly,
252 connection: this.getRedisConnection(),
253 prefix: this.jobRedisPrefix,
254 maxStalledCount: 10
255 }
ab08ab4e
C
256
257 const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
258 queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
259
260 this.queueSchedulers[handlerName] = queueScheduler
94a5ff8a
C
261 }
262
5a921e7b
C
263 private buildQueueEvent (handlerName: JobType, produceOnly: boolean) {
264 const queueEventsOptions: QueueEventsOptions = {
265 autorun: !produceOnly,
266 connection: this.getRedisConnection(),
267 prefix: this.jobRedisPrefix
14f2b3ad 268 }
ab08ab4e
C
269
270 const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
271 queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
272
273 this.queueEvents[handlerName] = queueEvents
5a921e7b
C
274 }
275
276 private getRedisConnection () {
277 return {
278 password: CONFIG.REDIS.AUTH,
279 db: CONFIG.REDIS.DB,
280 host: CONFIG.REDIS.HOSTNAME,
281 port: CONFIG.REDIS.PORT,
282 path: CONFIG.REDIS.SOCKET
283 }
284 }
285
bd911b54
C
286 // ---------------------------------------------------------------------------
287
5a921e7b
C
288 async terminate () {
289 const promises = Object.keys(this.workers)
290 .map(handlerName => {
291 const worker: Worker = this.workers[handlerName]
292 const queue: Queue = this.queues[handlerName]
293 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
294 const queueEvent: QueueEvents = this.queueEvents[handlerName]
295
296 return Promise.all([
297 worker.close(false),
298 queue.close(),
299 queueScheduler.close(),
300 queueEvent.close()
301 ])
302 })
303
304 return Promise.all(promises)
14f2b3ad
C
305 }
306
419b520c 307 async pause () {
e2b2c726
C
308 for (const handlerName of Object.keys(this.workers)) {
309 const worker: Worker = this.workers[handlerName]
5a921e7b
C
310
311 await worker.pause()
419b520c
C
312 }
313 }
314
51335c72 315 resume () {
e2b2c726
C
316 for (const handlerName of Object.keys(this.workers)) {
317 const worker: Worker = this.workers[handlerName]
5a921e7b
C
318
319 worker.resume()
419b520c
C
320 }
321 }
322
bd911b54
C
323 // ---------------------------------------------------------------------------
324
325 createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
326 this.createJob(options)
327 .catch(err => logger.error('Cannot create job.', { err, options }))
a1587156
C
328 }
329
2a491182 330 createJob (options: CreateJobArgument & CreateJobOptions) {
bd911b54 331 const queue: Queue = this.queues[options.type]
94831479 332 if (queue === undefined) {
bd911b54 333 logger.error('Unknown queue %s: cannot create job.', options.type)
a1587156 334 return
94831479 335 }
94a5ff8a 336
bd911b54
C
337 const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
338
339 return queue.add('job', options.payload, jobOptions)
340 }
341
2a491182 342 createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
bd911b54
C
343 let lastJob: FlowJob
344
345 for (const job of jobs) {
346 if (!job) continue
347
348 lastJob = {
b42c2c7e
C
349 ...this.buildJobFlowOption(job),
350
bd911b54
C
351 children: lastJob
352 ? [ lastJob ]
353 : []
354 }
355 }
356
357 return this.flowProducer.add(lastJob)
358 }
359
2a491182 360 createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
b42c2c7e
C
361 return this.flowProducer.add({
362 ...this.buildJobFlowOption(parent),
363
364 children: children.map(c => this.buildJobFlowOption(c))
365 })
366 }
367
368 private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) {
369 return {
370 name: 'job',
371 data: job.payload,
372 queueName: job.type,
373 opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
374 }
375 }
376
bd911b54
C
377 private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
378 return {
94831479 379 backoff: { delay: 60 * 1000, type: 'exponential' },
bd911b54 380 attempts: JOB_ATTEMPTS[type],
77d7e851 381 priority: options.priority,
a5cf76af 382 delay: options.delay
94831479 383 }
94a5ff8a
C
384 }
385
bd911b54
C
386 // ---------------------------------------------------------------------------
387
1061c73f 388 async listForApi (options: {
402145b8 389 state?: JobState
a1587156
C
390 start: number
391 count: number
392 asc?: boolean
1061c73f 393 jobType: JobType
41fb13c3 394 }): Promise<Job[]> {
402145b8
C
395 const { state, start, count, asc, jobType } = options
396
e2b2c726
C
397 const states = this.buildStateFilter(state)
398 const filteredJobTypes = this.buildTypeFilter(jobType)
94a5ff8a 399
e2b2c726 400 let results: Job[] = []
1061c73f 401
1061c73f 402 for (const jobType of filteredJobTypes) {
5a921e7b
C
403 const queue: Queue = this.queues[jobType]
404
94831479
C
405 if (queue === undefined) {
406 logger.error('Unknown queue %s to list jobs.', jobType)
407 continue
408 }
2c29ad4f 409
402145b8 410 const jobs = await queue.getJobs(states, 0, start + count, asc)
94831479
C
411 results = results.concat(jobs)
412 }
94a5ff8a 413
94831479
C
414 results.sort((j1: any, j2: any) => {
415 if (j1.timestamp < j2.timestamp) return -1
416 else if (j1.timestamp === j2.timestamp) return 0
94a5ff8a 417
94831479 418 return 1
94a5ff8a 419 })
94a5ff8a 420
94831479 421 if (asc === false) results.reverse()
94a5ff8a 422
94831479 423 return results.slice(start, start + count)
94a5ff8a
C
424 }
425
402145b8
C
426 async count (state: JobState, jobType?: JobType): Promise<number> {
427 const states = state ? [ state ] : jobStates
e2b2c726 428 const filteredJobTypes = this.buildTypeFilter(jobType)
3df45638 429
e2b2c726 430 let total = 0
1061c73f
C
431
432 for (const type of filteredJobTypes) {
a1587156 433 const queue = this.queues[type]
94831479
C
434 if (queue === undefined) {
435 logger.error('Unknown queue %s to count jobs.', type)
436 continue
437 }
3df45638 438
94831479 439 const counts = await queue.getJobCounts()
3df45638 440
040d6896
RK
441 for (const s of states) {
442 total += counts[s]
443 }
94831479 444 }
3df45638 445
94831479 446 return total
3df45638
C
447 }
448
e2b2c726
C
449 private buildStateFilter (state?: JobState) {
450 if (!state) return jobStates
451
452 const states = [ state ]
453
454 // Include parent if filtering on waiting
455 if (state === 'waiting') states.push('waiting-children')
456
457 return states
458 }
459
460 private buildTypeFilter (jobType?: JobType) {
461 if (!jobType) return jobTypes
462
463 return jobTypes.filter(t => t === jobType)
464 }
465
630d0a1b
C
466 async getStats () {
467 const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
468
469 return Promise.all(promises)
470 }
471
bd911b54
C
472 // ---------------------------------------------------------------------------
473
2f5c6b2f 474 async removeOldJobs () {
94831479 475 for (const key of Object.keys(this.queues)) {
5a921e7b
C
476 const queue: Queue = this.queues[key]
477 await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed')
94831479 478 }
2c29ad4f
C
479 }
480
6b616860 481 private addRepeatableJobs () {
5a921e7b 482 this.queues['videos-views-stats'].add('job', {}, {
51353d9a 483 repeat: REPEAT_JOBS['videos-views-stats']
a1587156 484 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
74d249bc
C
485
486 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
5a921e7b 487 this.queues['activitypub-cleaner'].add('job', {}, {
74d249bc
C
488 repeat: REPEAT_JOBS['activitypub-cleaner']
489 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
490 }
6b616860
C
491 }
492
9129b769
C
493 private getJobConcurrency (jobType: JobType) {
494 if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
495 if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
496
497 return JOB_CONCURRENCY[jobType]
498 }
499
94a5ff8a
C
500 static get Instance () {
501 return this.instance || (this.instance = new this())
502 }
503}
504
505// ---------------------------------------------------------------------------
506
507export {
1061c73f 508 jobTypes,
94a5ff8a
C
509 JobQueue
510}