private constructor () {}
- init () {
+ async init () {
// Already initialized
if (this.initialized === true) return
this.initialized = true
})
this.jobQueue.watchStuckJobs(5000)
+ await this.reactiveStuckJobs()
+
for (const handlerName of Object.keys(handlers)) {
this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
try {
})
}
+ private reactiveStuckJobs () {
+ const promises: Promise<any>[] = []
+
+ this.jobQueue.active((err, ids) => {
+ if (err) throw err
+
+ for (const id of ids) {
+ kue.Job.get(id, (err, job) => {
+ if (err) throw err
+
+ const p = new Promise((res, rej) => {
+ job.inactive(err => {
+ if (err) return rej(err)
+ return res()
+ })
+ })
+
+ promises.push(p)
+ })
+ }
+ })
+
+ return Promise.all(promises)
+ }
+
static get Instance () {
return this.instance || (this.instance = new this())
}
}
async function isTherePendingRequests (servers: ServerInfo[]) {
- const states: JobState[] = [ 'inactive', 'active' ]
+ const states: JobState[] = [ 'inactive', 'active', 'delayed' ]
const tasks: Promise<any>[] = []
let pendingRequests = false