]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/job-queue.ts
Use bullmq job dependency
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
index 0cf5d53ce015d3ece080dad3c44ccd910b1ee84f..50d732bebcf5c05c9df0c1eac2b12d3c6585ab24 100644 (file)
@@ -1,4 +1,6 @@
 import {
+  FlowJob,
+  FlowProducer,
   Job,
   JobsOptions,
   Queue,
@@ -13,7 +15,7 @@ import {
 import { jobStates } from '@server/helpers/custom-validators/jobs'
 import { CONFIG } from '@server/initializers/config'
 import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
-import { timeoutPromise } from '@shared/core-utils'
+import { pick, timeoutPromise } from '@shared/core-utils'
 import {
   ActivitypubFollowPayload,
   ActivitypubHttpBroadcastPayload,
@@ -22,10 +24,12 @@ import {
   ActorKeysPayload,
   DeleteResumableUploadMetaFilePayload,
   EmailPayload,
+  FederateVideoPayload,
   JobState,
   JobType,
   ManageVideoTorrentPayload,
   MoveObjectStoragePayload,
+  NotifyPayload,
   RefreshPayload,
   VideoFileImportPayload,
   VideoImportPayload,
@@ -45,8 +49,10 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica
 import { refreshAPObject } from './handlers/activitypub-refresher'
 import { processActorKeys } from './handlers/actor-keys'
 import { processEmail } from './handlers/email'
+import { processFederateVideo } from './handlers/federate-video'
 import { processManageVideoTorrent } from './handlers/manage-video-torrent'
 import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
+import { processNotify } from './handlers/notify'
 import { processVideoFileImport } from './handlers/video-file-import'
 import { processVideoImport } from './handlers/video-import'
 import { processVideoLiveEnding } from './handlers/video-live-ending'
@@ -54,7 +60,7 @@ import { processVideoStudioEdition } from './handlers/video-studio-edition'
 import { processVideoTranscoding } from './handlers/video-transcoding'
 import { processVideosViewsStats } from './handlers/video-views-stats'
 
-type CreateJobArgument =
+export type CreateJobArgument =
   { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
   { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
   { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
@@ -73,7 +79,9 @@ type CreateJobArgument =
   { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
   { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
   { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
-  { type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
+  { type: 'notify', payload: NotifyPayload } |
+  { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
+  { type: 'federate-video', payload: FederateVideoPayload }
 
 export type CreateJobOptions = {
   delay?: number
@@ -98,7 +106,9 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
   'video-redundancy': processVideoRedundancy,
   'move-to-object-storage': processMoveToObjectStorage,
   'manage-video-torrent': processManageVideoTorrent,
-  'video-studio-edition': processVideoStudioEdition
+  'notify': processNotify,
+  'video-studio-edition': processVideoStudioEdition,
+  'federate-video': processFederateVideo
 }
 
 const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
@@ -123,7 +133,9 @@ const jobTypes: JobType[] = [
   'video-live-ending',
   'move-to-object-storage',
   'manage-video-torrent',
-  'video-studio-edition'
+  'video-studio-edition',
+  'notify',
+  'federate-video'
 ]
 
 const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
@@ -137,6 +149,8 @@ class JobQueue {
   private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
   private queueEvents: { [id in JobType]?: QueueEvents } = {}
 
+  private flowProducer: FlowProducer
+
   private initialized = false
   private jobRedisPrefix: string
 
@@ -157,6 +171,11 @@ class JobQueue {
       this.buildQueueEvent(handlerName, produceOnly)
     }
 
+    this.flowProducer = new FlowProducer({
+      connection: this.getRedisConnection(),
+      prefix: this.jobRedisPrefix
+    })
+
     this.addRepeatableJobs()
   }
 
@@ -243,6 +262,8 @@ class JobQueue {
     }
   }
 
+  // ---------------------------------------------------------------------------
+
   async terminate () {
     const promises = Object.keys(this.workers)
       .map(handlerName => {
@@ -278,28 +299,56 @@ class JobQueue {
     }
   }
 
-  createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void {
-    this.createJobWithPromise(obj, options)
-        .catch(err => logger.error('Cannot create job.', { err, obj }))
+  // ---------------------------------------------------------------------------
+
+  createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
+    this.createJob(options)
+        .catch(err => logger.error('Cannot create job.', { err, options }))
   }
 
-  async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
-    const queue: Queue = this.queues[obj.type]
+  async createJob (options: CreateJobArgument & CreateJobOptions) {
+    const queue: Queue = this.queues[options.type]
     if (queue === undefined) {
-      logger.error('Unknown queue %s: cannot create job.', obj.type)
+      logger.error('Unknown queue %s: cannot create job.', options.type)
       return
     }
 
-    const jobArgs: JobsOptions = {
+    const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
+
+    return queue.add('job', options.payload, jobOptions)
+  }
+
+  async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
+    let lastJob: FlowJob
+
+    for (const job of jobs) {
+      if (!job) continue
+
+      lastJob = {
+        name: 'job',
+        data: job.payload,
+        queueName: job.type,
+        opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
+        children: lastJob
+          ? [ lastJob ]
+          : []
+      }
+    }
+
+    return this.flowProducer.add(lastJob)
+  }
+
+  private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
+    return {
       backoff: { delay: 60 * 1000, type: 'exponential' },
-      attempts: JOB_ATTEMPTS[obj.type],
+      attempts: JOB_ATTEMPTS[type],
       priority: options.priority,
       delay: options.delay
     }
-
-    return queue.add('job', obj.payload, jobArgs)
   }
 
+  // ---------------------------------------------------------------------------
+
   async listForApi (options: {
     state?: JobState
     start: number
@@ -367,6 +416,8 @@ class JobQueue {
     return Promise.all(promises)
   }
 
+  // ---------------------------------------------------------------------------
+
   async removeOldJobs () {
     for (const key of Object.keys(this.queues)) {
       const queue: Queue = this.queues[key]