X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=14acace7da80a35f0d10338a6f73aa4c9c961e47;hb=134cf2bce96a8c5aefd55154e884964975d8cf23;hp=2e14867f247134ce6782b31e0e1268abaac53aa6;hpb=fbad87b0472f574409f7aa3ae7f8b54927d0cdd6;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 2e14867f2..14acace7d 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -2,14 +2,18 @@ import * as Bull from 'bull' import { JobState, JobType } from '../../../shared/models' import { logger } from '../../helpers/logger' import { Redis } from '../redis' -import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' +import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' import { EmailPayload, processEmail } from './handlers/email' -import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' +import { processVideoTranscoding, VideoTranscodingPayload } from './handlers/video-transcoding' import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' import { processVideoImport, VideoImportPayload } from './handlers/video-import' +import { processVideosViews } from './handlers/video-views' +import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher' +import { processVideoFileImport, VideoFileImportPayload } from './handlers/video-file-import' +import { processVideoRedundancy, VideoRedundancyPayload } from '@server/lib/job-queue/handlers/video-redundancy' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -17,26 +21,25 @@ type CreateJobArgument = { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | { type: 'video-file-import', payload: VideoFileImportPayload } | - { type: 'video-file', payload: VideoFilePayload } | + { type: 'video-transcoding', payload: VideoTranscodingPayload } | { type: 'email', payload: EmailPayload } | - { type: 'video-import', payload: VideoImportPayload } + { type: 'video-import', payload: VideoImportPayload } | + { type: 'activitypub-refresher', payload: RefreshPayload } | + { type: 'videos-views', payload: {} } | + { type: 'video-redundancy', payload: VideoRedundancyPayload } -const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { +const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, 'activitypub-follow': processActivityPubFollow, 'video-file-import': processVideoFileImport, - 'video-file': processVideoFile, + 'video-transcoding': processVideoTranscoding, 'email': processEmail, - 'video-import': processVideoImport -} - -const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = { - 'activitypub-http-broadcast': true, - 'activitypub-http-unicast': true, - 'activitypub-http-fetcher': true, - 'activitypub-follow': true + 'video-import': processVideoImport, + 'videos-views': processVideosViews, + 'activitypub-refresher': refreshAPObject, + 'video-redundancy': processVideoRedundancy } const jobTypes: JobType[] = [ @@ -45,30 +48,34 @@ const jobTypes: JobType[] = [ 'activitypub-http-fetcher', 'activitypub-http-unicast', 'email', - 'video-file', + 'video-transcoding', 'video-file-import', - 'video-import' + 'video-import', + 'videos-views', + 'activitypub-refresher', + 'video-redundancy' ] class JobQueue { private static instance: JobQueue - private queues: { [ id in JobType ]?: Bull.Queue } = {} + private queues: { [id in JobType]?: Bull.Queue } = {} private initialized = false private jobRedisPrefix: string - private constructor () {} + private constructor () { + } - async init () { + init () { // Already initialized if (this.initialized === true) return this.initialized = true - this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST + this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST const queueOptions = { prefix: this.jobRedisPrefix, - redis: Redis.getRedisClient(), + redis: Redis.getRedisClientOptions(), settings: { maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts } @@ -79,15 +86,20 @@ class JobQueue { const handler = handlers[handlerName] queue.process(JOB_CONCURRENCY[handlerName], handler) - .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err })) + .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) + + queue.on('failed', (job, err) => { + logger.error('Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) + }) queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) - process.exit(-1) }) this.queues[handlerName] = queue } + + this.addRepeatableJobs() } terminate () { @@ -97,38 +109,47 @@ class JobQueue { } } - createJob (obj: CreateJobArgument) { + createJob (obj: CreateJobArgument): void { + this.createJobWithPromise(obj) + .catch(err => logger.error('Cannot create job.', { err, obj })) + } + + createJobWithPromise (obj: CreateJobArgument) { const queue = this.queues[obj.type] if (queue === undefined) { logger.error('Unknown queue %s: cannot create job.', obj.type) - throw Error('Unknown queue, cannot create job') + return } const jobArgs: Bull.JobOptions = { backoff: { delay: 60 * 1000, type: 'exponential' }, - attempts: JOB_ATTEMPTS[obj.type] - } - - if (jobsWithRequestTimeout[obj.type] === true) { - jobArgs.timeout = JOB_REQUEST_TTL + attempts: JOB_ATTEMPTS[obj.type], + timeout: JOB_TTL[obj.type] } return queue.add(obj.payload, jobArgs) } - async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise { + async listForApi (options: { + state: JobState + start: number + count: number + asc?: boolean + jobType: JobType + }): Promise { + const { state, start, count, asc, jobType } = options let results: Bull.Job[] = [] - // TODO: optimize - for (const jobType of jobTypes) { - const queue = this.queues[ jobType ] + const filteredJobTypes = this.filterJobTypes(jobType) + + for (const jobType of filteredJobTypes) { + const queue = this.queues[jobType] if (queue === undefined) { logger.error('Unknown queue %s to list jobs.', jobType) continue } - // FIXME: Bull queue typings does not have getJobs method - const jobs = await (queue as any).getJobs(state, 0, start + count, asc) + const jobs = await queue.getJobs([ state ], 0, start + count, asc) results = results.concat(jobs) } @@ -144,11 +165,13 @@ class JobQueue { return results.slice(start, start + count) } - async count (state: JobState): Promise { + async count (state: JobState, jobType?: JobType): Promise { let total = 0 - for (const type of jobTypes) { - const queue = this.queues[ type ] + const filteredJobTypes = this.filterJobTypes(jobType) + + for (const type of filteredJobTypes) { + const queue = this.queues[type] if (queue === undefined) { logger.error('Unknown queue %s to count jobs.', type) continue @@ -156,19 +179,31 @@ class JobQueue { const counts = await queue.getJobCounts() - total += counts[ state ] + total += counts[state] } return total } - removeOldJobs () { + async removeOldJobs () { for (const key of Object.keys(this.queues)) { const queue = this.queues[key] - queue.clean(JOB_COMPLETED_LIFETIME, 'completed') + await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') } } + private addRepeatableJobs () { + this.queues['videos-views'].add({}, { + repeat: REPEAT_JOBS['videos-views'] + }).catch(err => logger.error('Cannot add repeatable job.', { err })) + } + + private filterJobTypes (jobType?: JobType) { + if (!jobType) return jobTypes + + return jobTypes.filter(t => t === jobType) + } + static get Instance () { return this.instance || (this.instance = new this()) } @@ -177,5 +212,6 @@ class JobQueue { // --------------------------------------------------------------------------- export { + jobTypes, JobQueue }