]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/job-queue.ts
Don't stuck on active jobs
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
index 521d609cbdfbbf0448912b6bbc8ce8e072a2eb75..fb3cc8f66e9f463bb41db283bcf0c1eb71b13ba1 100644 (file)
@@ -32,7 +32,7 @@ class JobQueue {
 
   private constructor () {}
 
-  init () {
+  async init () {
     // Already initialized
     if (this.initialized === true) return
     this.initialized = true
@@ -54,6 +54,8 @@ class JobQueue {
     })
     this.jobQueue.watchStuckJobs(5000)
 
+    await this.reactiveStuckJobs()
+
     for (const handlerName of Object.keys(handlers)) {
       this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
         try {
@@ -83,7 +85,7 @@ class JobQueue {
 
   listForApi (state: JobState, start: number, count: number, sort: string) {
     return new Promise<kue.Job[]>((res, rej) => {
-      kue.Job.rangeByState(state, start, start + count, sort, (err, jobs) => {
+      kue.Job.rangeByState(state, start, start + count - 1, sort, (err, jobs) => {
         if (err) return rej(err)
 
         return res(jobs)
@@ -117,6 +119,31 @@ class JobQueue {
     })
   }
 
+  private reactiveStuckJobs () {
+    const promises: Promise<any>[] = []
+
+    this.jobQueue.active((err, ids) => {
+      if (err) throw err
+
+      for (const id of ids) {
+        kue.Job.get(id, (err, job) => {
+          if (err) throw err
+
+          const p = new Promise((res, rej) => {
+            job.inactive(err => {
+              if (err) return rej(err)
+              return res()
+            })
+          })
+
+          promises.push(p)
+        })
+      }
+    })
+
+    return Promise.all(promises)
+  }
+
   static get Instance () {
     return this.instance || (this.instance = new this())
   }