X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=ba9cbe0d99d64b9c0e3f10a579acfec291510180;hb=4759fedc6112cdb0b68b8550677c05a3b1ed62bd;hp=b018d0e8a6291a6278a947fc53b659071d899977;hpb=c1e791bad0b079af67398f6407221e6dcbb573dd;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index b018d0e8a..ba9cbe0d9 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -2,13 +2,16 @@ import * as Bull from 'bull' import { JobState, JobType } from '../../../shared/models' import { logger } from '../../helpers/logger' import { Redis } from '../redis' -import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' +import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS } from '../../initializers' import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' import { EmailPayload, processEmail } from './handlers/email' import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' +import { processVideoImport, VideoImportPayload } from './handlers/video-import' +import { processVideosViews } from './handlers/video-views' +import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -17,7 +20,10 @@ type CreateJobArgument = { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | { type: 'video-file-import', payload: VideoFileImportPayload } | { type: 'video-file', payload: VideoFilePayload } | - { type: 'email', payload: EmailPayload } + { type: 'email', payload: EmailPayload } | + { type: 'video-import', payload: VideoImportPayload } | + { type: 'activitypub-refresher', payload: RefreshPayload } | + { type: 'videos-views', payload: {} } const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, @@ -26,14 +32,10 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { 'activitypub-follow': processActivityPubFollow, 'video-file-import': processVideoFileImport, 'video-file': processVideoFile, - 'email': processEmail -} - -const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = { - 'activitypub-http-broadcast': true, - 'activitypub-http-unicast': true, - 'activitypub-http-fetcher': true, - 'activitypub-follow': true + 'email': processEmail, + 'video-import': processVideoImport, + 'videos-views': processVideosViews, + 'activitypub-refresher': refreshAPObject } const jobTypes: JobType[] = [ @@ -43,7 +45,10 @@ const jobTypes: JobType[] = [ 'activitypub-http-unicast', 'email', 'video-file', - 'video-file-import' + 'video-file-import', + 'video-import', + 'videos-views', + 'activitypub-refresher' ] class JobQueue { @@ -64,7 +69,10 @@ class JobQueue { this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST const queueOptions = { prefix: this.jobRedisPrefix, - redis: Redis.getRedisClient() + redis: Redis.getRedisClient(), + settings: { + maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts + } } for (const handlerName of Object.keys(handlers)) { @@ -72,15 +80,27 @@ class JobQueue { const handler = handlers[handlerName] queue.process(JOB_CONCURRENCY[handlerName], handler) - .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err })) + .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) + + queue.on('failed', (job, err) => { + logger.error('Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) + }) queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) - process.exit(-1) }) this.queues[handlerName] = queue } + + this.addRepeatableJobs() + } + + terminate () { + for (const queueName of Object.keys(this.queues)) { + const queue = this.queues[queueName] + queue.close() + } } createJob (obj: CreateJobArgument) { @@ -92,11 +112,8 @@ class JobQueue { const jobArgs: Bull.JobOptions = { backoff: { delay: 60 * 1000, type: 'exponential' }, - attempts: JOB_ATTEMPTS[obj.type] - } - - if (jobsWithRequestTimeout[obj.type] === true) { - jobArgs.timeout = JOB_REQUEST_TTL + attempts: JOB_ATTEMPTS[obj.type], + timeout: JOB_TTL[obj.type] } return queue.add(obj.payload, jobArgs) @@ -148,13 +165,19 @@ class JobQueue { return total } - removeOldJobs () { + async removeOldJobs () { for (const key of Object.keys(this.queues)) { const queue = this.queues[key] - queue.clean(JOB_COMPLETED_LIFETIME, 'completed') + await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') } } + private addRepeatableJobs () { + this.queues['videos-views'].add({}, { + repeat: REPEAT_JOBS['videos-views'] + }) + } + static get Instance () { return this.instance || (this.instance = new this()) }