X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fjob-queue.ts;h=fb3cc8f66e9f463bb41db283bcf0c1eb71b13ba1;hb=3df456380a3d6c7ff39b0004a5390011739ebc8e;hp=521d609cbdfbbf0448912b6bbc8ce8e072a2eb75;hpb=fafc13fff164778161bb6bb0d3019e2b7a238243;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 521d609cb..fb3cc8f66 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -32,7 +32,7 @@ class JobQueue { private constructor () {} - init () { + async init () { // Already initialized if (this.initialized === true) return this.initialized = true @@ -54,6 +54,8 @@ class JobQueue { }) this.jobQueue.watchStuckJobs(5000) + await this.reactiveStuckJobs() + for (const handlerName of Object.keys(handlers)) { this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { try { @@ -83,7 +85,7 @@ class JobQueue { listForApi (state: JobState, start: number, count: number, sort: string) { return new Promise((res, rej) => { - kue.Job.rangeByState(state, start, start + count, sort, (err, jobs) => { + kue.Job.rangeByState(state, start, start + count - 1, sort, (err, jobs) => { if (err) return rej(err) return res(jobs) @@ -117,6 +119,31 @@ class JobQueue { }) } + private reactiveStuckJobs () { + const promises: Promise[] = [] + + this.jobQueue.active((err, ids) => { + if (err) throw err + + for (const id of ids) { + kue.Job.get(id, (err, job) => { + if (err) throw err + + const p = new Promise((res, rej) => { + job.inactive(err => { + if (err) return rej(err) + return res() + }) + }) + + promises.push(p) + }) + } + }) + + return Promise.all(promises) + } + static get Instance () { return this.instance || (this.instance = new this()) }