diff options
author | Chocobozzz <me@florianbigard.com> | 2018-02-12 11:25:09 +0100 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2018-02-12 11:25:09 +0100 |
commit | 3df456380a3d6c7ff39b0004a5390011739ebc8e (patch) | |
tree | f7a0cf3febef6400d70dc34110dceee4ddbda218 | |
parent | e78720386f9a3187f30c0dc74a27ced837977ca9 (diff) | |
download | PeerTube-3df456380a3d6c7ff39b0004a5390011739ebc8e.tar.gz PeerTube-3df456380a3d6c7ff39b0004a5390011739ebc8e.tar.zst PeerTube-3df456380a3d6c7ff39b0004a5390011739ebc8e.zip |
Don't stuck on active jobs
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 29 | ||||
-rw-r--r-- | server/tests/real-world/real-world.ts | 2 |
2 files changed, 29 insertions, 2 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index d9d7a6e42..fb3cc8f66 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -32,7 +32,7 @@ class JobQueue { | |||
32 | 32 | ||
33 | private constructor () {} | 33 | private constructor () {} |
34 | 34 | ||
35 | init () { | 35 | async init () { |
36 | // Already initialized | 36 | // Already initialized |
37 | if (this.initialized === true) return | 37 | if (this.initialized === true) return |
38 | this.initialized = true | 38 | this.initialized = true |
@@ -54,6 +54,8 @@ class JobQueue { | |||
54 | }) | 54 | }) |
55 | this.jobQueue.watchStuckJobs(5000) | 55 | this.jobQueue.watchStuckJobs(5000) |
56 | 56 | ||
57 | await this.reactiveStuckJobs() | ||
58 | |||
57 | for (const handlerName of Object.keys(handlers)) { | 59 | for (const handlerName of Object.keys(handlers)) { |
58 | this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { | 60 | this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { |
59 | try { | 61 | try { |
@@ -117,6 +119,31 @@ class JobQueue { | |||
117 | }) | 119 | }) |
118 | } | 120 | } |
119 | 121 | ||
122 | private reactiveStuckJobs () { | ||
123 | const promises: Promise<any>[] = [] | ||
124 | |||
125 | this.jobQueue.active((err, ids) => { | ||
126 | if (err) throw err | ||
127 | |||
128 | for (const id of ids) { | ||
129 | kue.Job.get(id, (err, job) => { | ||
130 | if (err) throw err | ||
131 | |||
132 | const p = new Promise((res, rej) => { | ||
133 | job.inactive(err => { | ||
134 | if (err) return rej(err) | ||
135 | return res() | ||
136 | }) | ||
137 | }) | ||
138 | |||
139 | promises.push(p) | ||
140 | }) | ||
141 | } | ||
142 | }) | ||
143 | |||
144 | return Promise.all(promises) | ||
145 | } | ||
146 | |||
120 | static get Instance () { | 147 | static get Instance () { |
121 | return this.instance || (this.instance = new this()) | 148 | return this.instance || (this.instance = new this()) |
122 | } | 149 | } |
diff --git a/server/tests/real-world/real-world.ts b/server/tests/real-world/real-world.ts index 7f67525ed..f10ca856d 100644 --- a/server/tests/real-world/real-world.ts +++ b/server/tests/real-world/real-world.ts | |||
@@ -347,7 +347,7 @@ function goodbye () { | |||
347 | } | 347 | } |
348 | 348 | ||
349 | async function isTherePendingRequests (servers: ServerInfo[]) { | 349 | async function isTherePendingRequests (servers: ServerInfo[]) { |
350 | const states: JobState[] = [ 'inactive', 'active' ] | 350 | const states: JobState[] = [ 'inactive', 'active', 'delayed' ] |
351 | const tasks: Promise<any>[] = [] | 351 | const tasks: Promise<any>[] = [] |
352 | let pendingRequests = false | 352 | let pendingRequests = false |
353 | 353 | ||