import {
+ FlowJob,
+ FlowProducer,
Job,
JobsOptions,
Queue,
import { jobStates } from '@server/helpers/custom-validators/jobs'
import { CONFIG } from '@server/initializers/config'
import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
-import { timeoutPromise } from '@shared/core-utils'
+import { pick, timeoutPromise } from '@shared/core-utils'
import {
ActivitypubFollowPayload,
ActivitypubHttpBroadcastPayload,
ActorKeysPayload,
DeleteResumableUploadMetaFilePayload,
EmailPayload,
+ FederateVideoPayload,
JobState,
JobType,
ManageVideoTorrentPayload,
MoveObjectStoragePayload,
+ NotifyPayload,
RefreshPayload,
VideoFileImportPayload,
VideoImportPayload,
import { refreshAPObject } from './handlers/activitypub-refresher'
import { processActorKeys } from './handlers/actor-keys'
import { processEmail } from './handlers/email'
+import { processFederateVideo } from './handlers/federate-video'
import { processManageVideoTorrent } from './handlers/manage-video-torrent'
import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
+import { processNotify } from './handlers/notify'
import { processVideoFileImport } from './handlers/video-file-import'
import { processVideoImport } from './handlers/video-import'
import { processVideoLiveEnding } from './handlers/video-live-ending'
import { processVideoTranscoding } from './handlers/video-transcoding'
import { processVideosViewsStats } from './handlers/video-views-stats'
-type CreateJobArgument =
+export type CreateJobArgument =
{ type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
{ type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
{ type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
{ type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
{ type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
- { type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
+ { type: 'notify', payload: NotifyPayload } |
+ { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
+ { type: 'federate-video', payload: FederateVideoPayload }
export type CreateJobOptions = {
delay?: number
'video-redundancy': processVideoRedundancy,
'move-to-object-storage': processMoveToObjectStorage,
'manage-video-torrent': processManageVideoTorrent,
- 'video-studio-edition': processVideoStudioEdition
+ 'notify': processNotify,
+ 'video-studio-edition': processVideoStudioEdition,
+ 'federate-video': processFederateVideo
}
const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
'video-live-ending',
'move-to-object-storage',
'manage-video-torrent',
- 'video-studio-edition'
+ 'video-studio-edition',
+ 'notify',
+ 'federate-video'
]
const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
private queueEvents: { [id in JobType]?: QueueEvents } = {}
+ private flowProducer: FlowProducer
+
private initialized = false
private jobRedisPrefix: string
this.buildQueueEvent(handlerName, produceOnly)
}
+ this.flowProducer = new FlowProducer({
+ connection: this.getRedisConnection(),
+ prefix: this.jobRedisPrefix
+ })
+
this.addRepeatableJobs()
}
}
}
+ // ---------------------------------------------------------------------------
+
async terminate () {
const promises = Object.keys(this.workers)
.map(handlerName => {
}
}
- createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void {
- this.createJobWithPromise(obj, options)
- .catch(err => logger.error('Cannot create job.', { err, obj }))
+ // ---------------------------------------------------------------------------
+
+ createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
+ this.createJob(options)
+ .catch(err => logger.error('Cannot create job.', { err, options }))
}
- async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
- const queue: Queue = this.queues[obj.type]
+ async createJob (options: CreateJobArgument & CreateJobOptions) {
+ const queue: Queue = this.queues[options.type]
if (queue === undefined) {
- logger.error('Unknown queue %s: cannot create job.', obj.type)
+ logger.error('Unknown queue %s: cannot create job.', options.type)
return
}
- const jobArgs: JobsOptions = {
+ const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
+
+ return queue.add('job', options.payload, jobOptions)
+ }
+
+ async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
+ let lastJob: FlowJob
+
+ for (const job of jobs) {
+ if (!job) continue
+
+ lastJob = {
+ name: 'job',
+ data: job.payload,
+ queueName: job.type,
+ opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
+ children: lastJob
+ ? [ lastJob ]
+ : []
+ }
+ }
+
+ return this.flowProducer.add(lastJob)
+ }
+
+ private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
+ return {
backoff: { delay: 60 * 1000, type: 'exponential' },
- attempts: JOB_ATTEMPTS[obj.type],
+ attempts: JOB_ATTEMPTS[type],
priority: options.priority,
delay: options.delay
}
-
- return queue.add('job', obj.payload, jobArgs)
}
+ // ---------------------------------------------------------------------------
+
async listForApi (options: {
state?: JobState
start: number
return Promise.all(promises)
}
+ // ---------------------------------------------------------------------------
+
async removeOldJobs () {
for (const key of Object.keys(this.queues)) {
const queue: Queue = this.queues[key]