]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/job-queue.ts
Add create-import-video-file-job command
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
index e8f6c36203f7c8d81fb3633fd520c24dc49eab56..69335acf0b2a2d7dbec41f537e732c0edf9f9905 100644 (file)
@@ -1,17 +1,21 @@
 import * as kue from 'kue'
-import { JobType, JobState } from '../../../shared/models'
+import { JobState, JobType } from '../../../shared/models'
 import { logger } from '../../helpers/logger'
-import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
+import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers'
+import { Redis } from '../redis'
 import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
 import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
 import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
 import { EmailPayload, processEmail } from './handlers/email'
-import { processVideoFile, VideoFilePayload } from './handlers/video-file'
+import { processVideoFile, processVideoImport, VideoFilePayload, VideoImportPayload } from './handlers/video-file'
+import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
 
 type CreateJobArgument =
   { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
   { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
   { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
+  { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
+  { type: 'video-file-import', payload: VideoImportPayload } |
   { type: 'video-file', payload: VideoFilePayload } |
   { type: 'email', payload: EmailPayload }
 
@@ -19,41 +23,56 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
   'activitypub-http-broadcast': processActivityPubHttpBroadcast,
   'activitypub-http-unicast': processActivityPubHttpUnicast,
   'activitypub-http-fetcher': processActivityPubHttpFetcher,
+  'activitypub-follow': processActivityPubFollow,
+  'video-file-import': processVideoImport,
   'video-file': processVideoFile,
   'email': processEmail
 }
 
+const jobsWithTLL: JobType[] = [
+  'activitypub-http-broadcast',
+  'activitypub-http-unicast',
+  'activitypub-http-fetcher',
+  'activitypub-follow'
+]
+
 class JobQueue {
 
   private static instance: JobQueue
 
   private jobQueue: kue.Queue
   private initialized = false
+  private jobRedisPrefix: string
 
   private constructor () {}
 
-  init () {
+  async init () {
     // Already initialized
     if (this.initialized === true) return
     this.initialized = true
 
+    this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST
+
     this.jobQueue = kue.createQueue({
-      prefix: 'q-' + CONFIG.WEBSERVER.HOST,
+      prefix: this.jobRedisPrefix,
       redis: {
         host: CONFIG.REDIS.HOSTNAME,
         port: CONFIG.REDIS.PORT,
-        auth: CONFIG.REDIS.AUTH
+        auth: CONFIG.REDIS.AUTH,
+        db: CONFIG.REDIS.DB
       }
     })
 
-    this.jobQueue.setMaxListeners(15)
+    this.jobQueue.setMaxListeners(20)
 
     this.jobQueue.on('error', err => {
-      logger.error('Error in job queue.', err)
+      logger.error('Error in job queue.', { err })
       process.exit(-1)
     })
     this.jobQueue.watchStuckJobs(5000)
 
+    await this.reactiveStuckJobs()
+
     for (const handlerName of Object.keys(handlers)) {
       this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
         try {
@@ -68,29 +87,34 @@ class JobQueue {
 
   createJob (obj: CreateJobArgument, priority = 'normal') {
     return new Promise((res, rej) => {
-      this.jobQueue
+      let job = this.jobQueue
         .create(obj.type, obj.payload)
         .priority(priority)
         .attempts(JOB_ATTEMPTS[obj.type])
         .backoff({ delay: 60 * 1000, type: 'exponential' })
-        .save(err => {
-          if (err) return rej(err)
 
-          return res()
-        })
-    })
-  }
+      if (jobsWithTLL.indexOf(obj.type) !== -1) {
+        job = job.ttl(JOB_REQUEST_TTL)
+      }
 
-  listForApi (state: JobState, start: number, count: number, sort: string) {
-    return new Promise<kue.Job[]>((res, rej) => {
-      kue.Job.rangeByState(state, start, count, sort, (err, jobs) => {
+      return job.save(err => {
         if (err) return rej(err)
 
-        return res(jobs)
+        return res()
       })
     })
   }
 
+  async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise<kue.Job[]> {
+    const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count)
+
+    const jobPromises = jobStrings
+      .map(s => s.split('|'))
+      .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10)))
+
+    return Promise.all(jobPromises)
+  }
+
   count (state: JobState) {
     return new Promise<number>((res, rej) => {
       this.jobQueue[state + 'Count']((err, total) => {
@@ -105,7 +129,7 @@ class JobQueue {
     const now = new Date().getTime()
     kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
       if (err) {
-        logger.error('Cannot get jobs when removing old jobs.', err)
+        logger.error('Cannot get jobs when removing old jobs.', { err })
         return
       }
 
@@ -117,6 +141,41 @@ class JobQueue {
     })
   }
 
+  private reactiveStuckJobs () {
+    const promises: Promise<any>[] = []
+
+    this.jobQueue.active((err, ids) => {
+      if (err) throw err
+
+      for (const id of ids) {
+        kue.Job.get(id, (err, job) => {
+          if (err) throw err
+
+          const p = new Promise((res, rej) => {
+            job.inactive(err => {
+              if (err) return rej(err)
+              return res()
+            })
+          })
+
+          promises.push(p)
+        })
+      }
+    })
+
+    return Promise.all(promises)
+  }
+
+  private getJob (id: number) {
+    return new Promise<kue.Job>((res, rej) => {
+      kue.Job.get(id, (err, job) => {
+        if (err) return rej(err)
+
+        return res(job)
+      })
+    })
+  }
+
   static get Instance () {
     return this.instance || (this.instance = new this())
   }