X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=5d0b797b0ca157aab24e72cb8d0750ce5d5d2f2d;hb=3b01f4c0ac764ecb70efaadfd939ca868c28769c;hp=a1c623b2575d0997d4987f003f21278c73c05df6;hpb=cf59a2a0c367683ba35758419499bf6087c192ec;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index a1c623b25..5d0b797b0 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -1,19 +1,35 @@ import * as Bull from 'bull' -import { JobState, JobType } from '../../../shared/models' +import { jobStates } from '@server/helpers/custom-validators/jobs' +import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' +import { + ActivitypubFollowPayload, + ActivitypubHttpBroadcastPayload, + ActivitypubHttpFetcherPayload, + ActivitypubHttpUnicastPayload, + EmailPayload, + JobState, + JobType, + RefreshPayload, + VideoFileImportPayload, + VideoImportPayload, + VideoLiveEndingPayload, + VideoRedundancyPayload, + VideoTranscodingPayload +} from '../../../shared/models' import { logger } from '../../helpers/logger' -import { Redis } from '../redis' import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' -import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' -import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' -import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' -import { EmailPayload, processEmail } from './handlers/email' -import { processVideoTranscoding, VideoTranscodingPayload } from './handlers/video-transcoding' -import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' -import { processVideoImport, VideoImportPayload } from './handlers/video-import' +import { Redis } from '../redis' +import { processActivityPubFollow } from './handlers/activitypub-follow' +import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' +import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' +import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' +import { refreshAPObject } from './handlers/activitypub-refresher' +import { processEmail } from './handlers/email' +import { processVideoFileImport } from './handlers/video-file-import' +import { processVideoImport } from './handlers/video-import' +import { processVideoLiveEnding } from './handlers/video-live-ending' +import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideosViews } from './handlers/video-views' -import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher' -import { processVideoFileImport, VideoFileImportPayload } from './handlers/video-file-import' -import { processVideoRedundancy, VideoRedundancyPayload } from '@server/lib/job-queue/handlers/video-redundancy' type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -26,9 +42,14 @@ type CreateJobArgument = { type: 'video-import', payload: VideoImportPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } | { type: 'videos-views', payload: {} } | + { type: 'video-live-ending', payload: VideoLiveEndingPayload } | { type: 'video-redundancy', payload: VideoRedundancyPayload } -const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { +type CreateJobOptions = { + delay?: number +} + +const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, @@ -39,6 +60,7 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { 'video-import': processVideoImport, 'videos-views': processVideosViews, 'activitypub-refresher': refreshAPObject, + 'video-live-ending': processVideoLiveEnding, 'video-redundancy': processVideoRedundancy } @@ -53,20 +75,22 @@ const jobTypes: JobType[] = [ 'video-import', 'videos-views', 'activitypub-refresher', - 'video-redundancy' + 'video-redundancy', + 'video-live-ending' ] class JobQueue { private static instance: JobQueue - private queues: { [ id in JobType ]?: Bull.Queue } = {} + private queues: { [id in JobType]?: Bull.Queue } = {} private initialized = false private jobRedisPrefix: string - private constructor () {} + private constructor () { + } - async init () { + init () { // Already initialized if (this.initialized === true) return this.initialized = true @@ -108,43 +132,50 @@ class JobQueue { } } - createJob (obj: CreateJobArgument) { + createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { + this.createJobWithPromise(obj, options) + .catch(err => logger.error('Cannot create job.', { err, obj })) + } + + createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { const queue = this.queues[obj.type] if (queue === undefined) { logger.error('Unknown queue %s: cannot create job.', obj.type) - throw Error('Unknown queue, cannot create job') + return } const jobArgs: Bull.JobOptions = { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[obj.type], - timeout: JOB_TTL[obj.type] + timeout: JOB_TTL[obj.type], + delay: options.delay } return queue.add(obj.payload, jobArgs) } async listForApi (options: { - state: JobState, - start: number, - count: number, - asc?: boolean, + state?: JobState + start: number + count: number + asc?: boolean jobType: JobType }): Promise { const { state, start, count, asc, jobType } = options + + const states = state ? [ state ] : jobStates let results: Bull.Job[] = [] const filteredJobTypes = this.filterJobTypes(jobType) - // TODO: optimize for (const jobType of filteredJobTypes) { - const queue = this.queues[ jobType ] + const queue = this.queues[jobType] if (queue === undefined) { logger.error('Unknown queue %s to list jobs.', jobType) continue } - const jobs = await queue.getJobs([ state ], 0, start + count, asc) + const jobs = await queue.getJobs(states, 0, start + count, asc) results = results.concat(jobs) } @@ -161,12 +192,13 @@ class JobQueue { } async count (state: JobState, jobType?: JobType): Promise { + const states = state ? [ state ] : jobStates let total = 0 const filteredJobTypes = this.filterJobTypes(jobType) for (const type of filteredJobTypes) { - const queue = this.queues[ type ] + const queue = this.queues[type] if (queue === undefined) { logger.error('Unknown queue %s to count jobs.', type) continue @@ -174,7 +206,9 @@ class JobQueue { const counts = await queue.getJobCounts() - total += counts[ state ] + for (const s of states) { + total += counts[s] + } } return total @@ -190,7 +224,7 @@ class JobQueue { private addRepeatableJobs () { this.queues['videos-views'].add({}, { repeat: REPEAT_JOBS['videos-views'] - }) + }).catch(err => logger.error('Cannot add repeatable job.', { err })) } private filterJobTypes (jobType?: JobType) {