]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/job-queue.ts
Reduce AP context size on specific activities
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
index ec601e9eadd3cf761580b4a9dad94c036b25f1eb..14acace7da80a35f0d10338a6f73aa4c9c961e47 100644 (file)
@@ -13,6 +13,7 @@ import { processVideoImport, VideoImportPayload } from './handlers/video-import'
 import { processVideosViews } from './handlers/video-views'
 import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher'
 import { processVideoFileImport, VideoFileImportPayload } from './handlers/video-file-import'
+import { processVideoRedundancy, VideoRedundancyPayload } from '@server/lib/job-queue/handlers/video-redundancy'
 
 type CreateJobArgument =
   { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -24,20 +25,21 @@ type CreateJobArgument =
   { type: 'email', payload: EmailPayload } |
   { type: 'video-import', payload: VideoImportPayload } |
   { type: 'activitypub-refresher', payload: RefreshPayload } |
-  { type: 'videos-views', payload: {} }
+  { type: 'videos-views', payload: {} } |
+  { type: 'video-redundancy', payload: VideoRedundancyPayload }
 
-const handlers: { [ id in (JobType | 'video-file') ]: (job: Bull.Job) => Promise<any>} = {
+const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
   'activitypub-http-broadcast': processActivityPubHttpBroadcast,
   'activitypub-http-unicast': processActivityPubHttpUnicast,
   'activitypub-http-fetcher': processActivityPubHttpFetcher,
   'activitypub-follow': processActivityPubFollow,
   'video-file-import': processVideoFileImport,
   'video-transcoding': processVideoTranscoding,
-  'video-file': processVideoTranscoding, // TODO: remove it (changed in 1.3)
   'email': processEmail,
   'video-import': processVideoImport,
   'videos-views': processVideosViews,
-  'activitypub-refresher': refreshAPObject
+  'activitypub-refresher': refreshAPObject,
+  'video-redundancy': processVideoRedundancy
 }
 
 const jobTypes: JobType[] = [
@@ -50,20 +52,22 @@ const jobTypes: JobType[] = [
   'video-file-import',
   'video-import',
   'videos-views',
-  'activitypub-refresher'
+  'activitypub-refresher',
+  'video-redundancy'
 ]
 
 class JobQueue {
 
   private static instance: JobQueue
 
-  private queues: { [ id in JobType ]?: Bull.Queue } = {}
+  private queues: { [id in JobType]?: Bull.Queue } = {}
   private initialized = false
   private jobRedisPrefix: string
 
-  private constructor () {}
+  private constructor () {
+  }
 
-  async init () {
+  init () {
     // Already initialized
     if (this.initialized === true) return
     this.initialized = true
@@ -105,11 +109,16 @@ class JobQueue {
     }
   }
 
-  createJob (obj: CreateJobArgument) {
+  createJob (obj: CreateJobArgument): void {
+    this.createJobWithPromise(obj)
+         .catch(err => logger.error('Cannot create job.', { err, obj }))
+  }
+
+  createJobWithPromise (obj: CreateJobArgument) {
     const queue = this.queues[obj.type]
     if (queue === undefined) {
       logger.error('Unknown queue %s: cannot create job.', obj.type)
-      throw Error('Unknown queue, cannot create job')
+      return
     }
 
     const jobArgs: Bull.JobOptions = {
@@ -122,10 +131,10 @@ class JobQueue {
   }
 
   async listForApi (options: {
-    state: JobState,
-    start: number,
-    count: number,
-    asc?: boolean,
+    state: JobState
+    start: number
+    count: number
+    asc?: boolean
     jobType: JobType
   }): Promise<Bull.Job[]> {
     const { state, start, count, asc, jobType } = options
@@ -133,16 +142,14 @@ class JobQueue {
 
     const filteredJobTypes = this.filterJobTypes(jobType)
 
-    // TODO: optimize
     for (const jobType of filteredJobTypes) {
-      const queue = this.queues[ jobType ]
+      const queue = this.queues[jobType]
       if (queue === undefined) {
         logger.error('Unknown queue %s to list jobs.', jobType)
         continue
       }
 
-      // FIXME: Bull queue typings does not have getJobs method
-      const jobs = await (queue as any).getJobs(state, 0, start + count, asc)
+      const jobs = await queue.getJobs([ state ], 0, start + count, asc)
       results = results.concat(jobs)
     }
 
@@ -164,7 +171,7 @@ class JobQueue {
     const filteredJobTypes = this.filterJobTypes(jobType)
 
     for (const type of filteredJobTypes) {
-      const queue = this.queues[ type ]
+      const queue = this.queues[type]
       if (queue === undefined) {
         logger.error('Unknown queue %s to count jobs.', type)
         continue
@@ -172,7 +179,7 @@ class JobQueue {
 
       const counts = await queue.getJobCounts()
 
-      total += counts[ state ]
+      total += counts[state]
     }
 
     return total
@@ -188,7 +195,7 @@ class JobQueue {
   private addRepeatableJobs () {
     this.queues['videos-views'].add({}, {
       repeat: REPEAT_JOBS['videos-views']
-    })
+    }).catch(err => logger.error('Cannot add repeatable job.', { err }))
   }
 
   private filterJobTypes (jobType?: JobType) {