]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/job-queue.ts
Fix job queue tests
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / job-queue.ts
index 386d20103716711e22c90152fee273b96f7c9022..14e3a00aaa31e564d18065691871bec1348ef1c7 100644 (file)
@@ -284,18 +284,22 @@ class JobQueue {
   }
 
   async pause () {
-    for (const handler of Object.keys(this.workers)) {
-      const worker: Worker = this.workers[handler]
+    for (const handlerName of Object.keys(this.workers)) {
+      const worker: Worker = this.workers[handlerName]
+      const queue: Queue = this.queues[handlerName]
 
       await worker.pause()
+      await queue.pause()
     }
   }
 
-  resume () {
-    for (const handler of Object.keys(this.workers)) {
-      const worker: Worker = this.workers[handler]
+  async resume () {
+    for (const handlerName of Object.keys(this.workers)) {
+      const worker: Worker = this.workers[handlerName]
+      const queue: Queue = this.queues[handlerName]
 
       worker.resume()
+      await queue.resume()
     }
   }
 
@@ -373,10 +377,10 @@ class JobQueue {
   }): Promise<Job[]> {
     const { state, start, count, asc, jobType } = options
 
-    const states = state ? [ state ] : jobStates
-    let results: Job[] = []
+    const states = this.buildStateFilter(state)
+    const filteredJobTypes = this.buildTypeFilter(jobType)
 
-    const filteredJobTypes = this.filterJobTypes(jobType)
+    let results: Job[] = []
 
     for (const jobType of filteredJobTypes) {
       const queue: Queue = this.queues[jobType]
@@ -404,9 +408,9 @@ class JobQueue {
 
   async count (state: JobState, jobType?: JobType): Promise<number> {
     const states = state ? [ state ] : jobStates
-    let total = 0
+    const filteredJobTypes = this.buildTypeFilter(jobType)
 
-    const filteredJobTypes = this.filterJobTypes(jobType)
+    let total = 0
 
     for (const type of filteredJobTypes) {
       const queue = this.queues[type]
@@ -425,6 +429,23 @@ class JobQueue {
     return total
   }
 
+  private buildStateFilter (state?: JobState) {
+    if (!state) return jobStates
+
+    const states = [ state ]
+
+    // Include parent if filtering on waiting
+    if (state === 'waiting') states.push('waiting-children')
+
+    return states
+  }
+
+  private buildTypeFilter (jobType?: JobType) {
+    if (!jobType) return jobTypes
+
+    return jobTypes.filter(t => t === jobType)
+  }
+
   async getStats () {
     const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
 
@@ -452,12 +473,6 @@ class JobQueue {
     }
   }
 
-  private filterJobTypes (jobType?: JobType) {
-    if (!jobType) return jobTypes
-
-    return jobTypes.filter(t => t === jobType)
-  }
-
   private getJobConcurrency (jobType: JobType) {
     if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
     if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY