}
async pause () {
- for (const handler of Object.keys(this.workers)) {
- const worker: Worker = this.workers[handler]
+ for (const handlerName of Object.keys(this.workers)) {
+ const worker: Worker = this.workers[handlerName]
+ const queue: Queue = this.queues[handlerName]
await worker.pause()
+ await queue.pause()
}
}
- resume () {
- for (const handler of Object.keys(this.workers)) {
- const worker: Worker = this.workers[handler]
+ async resume () {
+ for (const handlerName of Object.keys(this.workers)) {
+ const worker: Worker = this.workers[handlerName]
+ const queue: Queue = this.queues[handlerName]
worker.resume()
+ await queue.resume()
}
}
}): Promise<Job[]> {
const { state, start, count, asc, jobType } = options
- const states = state ? [ state ] : jobStates
- let results: Job[] = []
+ const states = this.buildStateFilter(state)
+ const filteredJobTypes = this.buildTypeFilter(jobType)
- const filteredJobTypes = this.filterJobTypes(jobType)
+ let results: Job[] = []
for (const jobType of filteredJobTypes) {
const queue: Queue = this.queues[jobType]
async count (state: JobState, jobType?: JobType): Promise<number> {
const states = state ? [ state ] : jobStates
- let total = 0
+ const filteredJobTypes = this.buildTypeFilter(jobType)
- const filteredJobTypes = this.filterJobTypes(jobType)
+ let total = 0
for (const type of filteredJobTypes) {
const queue = this.queues[type]
return total
}
+ private buildStateFilter (state?: JobState) {
+ if (!state) return jobStates
+
+ const states = [ state ]
+
+ // Include parent if filtering on waiting
+ if (state === 'waiting') states.push('waiting-children')
+
+ return states
+ }
+
+ private buildTypeFilter (jobType?: JobType) {
+ if (!jobType) return jobTypes
+
+ return jobTypes.filter(t => t === jobType)
+ }
+
async getStats () {
const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
}
}
- private filterJobTypes (jobType?: JobType) {
- if (!jobType) return jobTypes
-
- return jobTypes.filter(t => t === jobType)
- }
-
private getJobConcurrency (jobType: JobType) {
if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY