jobsRouter.post('/resume',
authenticate,
ensureUserHasRight(UserRight.MANAGE_JOBS),
- resumeJobQueue
+ asyncMiddleware(resumeJobQueue)
)
jobsRouter.get('/:state?',
return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
}
-function resumeJobQueue (req: express.Request, res: express.Response) {
- JobQueue.Instance.resume()
+async function resumeJobQueue (req: express.Request, res: express.Response) {
+ await JobQueue.Instance.resume()
return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
}
}
async pause () {
- for (const handler of Object.keys(this.workers)) {
- const worker: Worker = this.workers[handler]
+ for (const handlerName of Object.keys(this.workers)) {
+ const worker: Worker = this.workers[handlerName]
+ const queue: Queue = this.queues[handlerName]
await worker.pause()
+ await queue.pause()
}
}
- resume () {
- for (const handler of Object.keys(this.workers)) {
- const worker: Worker = this.workers[handler]
+ async resume () {
+ for (const handlerName of Object.keys(this.workers)) {
+ const worker: Worker = this.workers[handlerName]
+ const queue: Queue = this.queues[handlerName]
worker.resume()
+ await queue.resume()
}
}
}): Promise<Job[]> {
const { state, start, count, asc, jobType } = options
- const states = state ? [ state ] : jobStates
- let results: Job[] = []
+ const states = this.buildStateFilter(state)
+ const filteredJobTypes = this.buildTypeFilter(jobType)
- const filteredJobTypes = this.filterJobTypes(jobType)
+ let results: Job[] = []
for (const jobType of filteredJobTypes) {
const queue: Queue = this.queues[jobType]
async count (state: JobState, jobType?: JobType): Promise<number> {
const states = state ? [ state ] : jobStates
- let total = 0
+ const filteredJobTypes = this.buildTypeFilter(jobType)
- const filteredJobTypes = this.filterJobTypes(jobType)
+ let total = 0
for (const type of filteredJobTypes) {
const queue = this.queues[type]
return total
}
+ private buildStateFilter (state?: JobState) {
+ if (!state) return jobStates
+
+ const states = [ state ]
+
+ // Include parent if filtering on waiting
+ if (state === 'waiting') states.push('waiting-children')
+
+ return states
+ }
+
+ private buildTypeFilter (jobType?: JobType) {
+ if (!jobType) return jobTypes
+
+ return jobTypes.filter(t => t === jobType)
+ }
+
async getStats () {
const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
}
}
- private filterJobTypes (jobType?: JobType) {
- if (!jobType) return jobTypes
-
- return jobTypes.filter(t => t === jobType)
- }
-
private getJobConcurrency (jobType: JobType) {
if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
if (job.type === 'videos-views-stats') job = body.data[1]
expect(job.state).to.equal('completed')
- expect(job.type.startsWith('activitypub-')).to.be.true
expect(dateIsValid(job.createdAt as string)).to.be.true
expect(dateIsValid(job.processedOn as string)).to.be.true
expect(dateIsValid(job.finishedOn as string)).to.be.true
await wait(5000)
- const body = await servers[1].jobs.list({ state: 'waiting', jobType: 'video-transcoding' })
- expect(body.data).to.have.lengthOf(4)
+ {
+ const body = await servers[1].jobs.list({ state: 'waiting', jobType: 'video-transcoding' })
+ // waiting includes waiting-children
+ expect(body.data).to.have.lengthOf(4)
+ }
+
+ {
+ const body = await servers[1].jobs.list({ state: 'waiting-children', jobType: 'video-transcoding' })
+ expect(body.data).to.have.lengthOf(1)
+ }
})
it('Should resume the job queue', async function () {