]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/job-queue.ts
Fix from header in contact form
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
index 8a24604e1df047bf223fc47342fc633714be007a..ba9cbe0d99d64b9c0e3f10a579acfec291510180 100644 (file)
@@ -2,7 +2,7 @@ import * as Bull from 'bull'
 import { JobState, JobType } from '../../../shared/models'
 import { logger } from '../../helpers/logger'
 import { Redis } from '../redis'
-import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL } from '../../initializers'
+import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS } from '../../initializers'
 import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
 import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
 import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
@@ -10,6 +10,8 @@ import { EmailPayload, processEmail } from './handlers/email'
 import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file'
 import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
 import { processVideoImport, VideoImportPayload } from './handlers/video-import'
+import { processVideosViews } from './handlers/video-views'
+import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher'
 
 type CreateJobArgument =
   { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -19,7 +21,9 @@ type CreateJobArgument =
   { type: 'video-file-import', payload: VideoFileImportPayload } |
   { type: 'video-file', payload: VideoFilePayload } |
   { type: 'email', payload: EmailPayload } |
-  { type: 'video-import', payload: VideoImportPayload }
+  { type: 'video-import', payload: VideoImportPayload } |
+  { type: 'activitypub-refresher', payload: RefreshPayload } |
+  { type: 'videos-views', payload: {} }
 
 const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
   'activitypub-http-broadcast': processActivityPubHttpBroadcast,
@@ -29,14 +33,9 @@ const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
   'video-file-import': processVideoFileImport,
   'video-file': processVideoFile,
   'email': processEmail,
-  'video-import': processVideoImport
-}
-
-const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = {
-  'activitypub-http-broadcast': true,
-  'activitypub-http-unicast': true,
-  'activitypub-http-fetcher': true,
-  'activitypub-follow': true
+  'video-import': processVideoImport,
+  'videos-views': processVideosViews,
+  'activitypub-refresher': refreshAPObject
 }
 
 const jobTypes: JobType[] = [
@@ -47,7 +46,9 @@ const jobTypes: JobType[] = [
   'email',
   'video-file',
   'video-file-import',
-  'video-import'
+  'video-import',
+  'videos-views',
+  'activitypub-refresher'
 ]
 
 class JobQueue {
@@ -87,11 +88,12 @@ class JobQueue {
 
       queue.on('error', err => {
         logger.error('Error in job queue %s.', handlerName, { err })
-        process.exit(-1)
       })
 
       this.queues[handlerName] = queue
     }
+
+    this.addRepeatableJobs()
   }
 
   terminate () {
@@ -163,13 +165,19 @@ class JobQueue {
     return total
   }
 
-  removeOldJobs () {
+  async removeOldJobs () {
     for (const key of Object.keys(this.queues)) {
       const queue = this.queues[key]
-      queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
+      await queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
     }
   }
 
+  private addRepeatableJobs () {
+    this.queues['videos-views'].add({}, {
+      repeat: REPEAT_JOBS['videos-views']
+    })
+  }
+
   static get Instance () {
     return this.instance || (this.instance = new this())
   }