diff options
author | Chocobozzz <me@florianbigard.com> | 2018-02-12 11:25:09 +0100 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2018-02-12 11:25:09 +0100 |
commit | 3df456380a3d6c7ff39b0004a5390011739ebc8e (patch) | |
tree | f7a0cf3febef6400d70dc34110dceee4ddbda218 /server/lib/job-queue | |
parent | e78720386f9a3187f30c0dc74a27ced837977ca9 (diff) | |
download | PeerTube-3df456380a3d6c7ff39b0004a5390011739ebc8e.tar.gz PeerTube-3df456380a3d6c7ff39b0004a5390011739ebc8e.tar.zst PeerTube-3df456380a3d6c7ff39b0004a5390011739ebc8e.zip |
Don't stuck on active jobs
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 29 |
1 files changed, 28 insertions, 1 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index d9d7a6e42..fb3cc8f66 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -32,7 +32,7 @@ class JobQueue { | |||
32 | 32 | ||
33 | private constructor () {} | 33 | private constructor () {} |
34 | 34 | ||
35 | init () { | 35 | async init () { |
36 | // Already initialized | 36 | // Already initialized |
37 | if (this.initialized === true) return | 37 | if (this.initialized === true) return |
38 | this.initialized = true | 38 | this.initialized = true |
@@ -54,6 +54,8 @@ class JobQueue { | |||
54 | }) | 54 | }) |
55 | this.jobQueue.watchStuckJobs(5000) | 55 | this.jobQueue.watchStuckJobs(5000) |
56 | 56 | ||
57 | await this.reactiveStuckJobs() | ||
58 | |||
57 | for (const handlerName of Object.keys(handlers)) { | 59 | for (const handlerName of Object.keys(handlers)) { |
58 | this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { | 60 | this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { |
59 | try { | 61 | try { |
@@ -117,6 +119,31 @@ class JobQueue { | |||
117 | }) | 119 | }) |
118 | } | 120 | } |
119 | 121 | ||
122 | private reactiveStuckJobs () { | ||
123 | const promises: Promise<any>[] = [] | ||
124 | |||
125 | this.jobQueue.active((err, ids) => { | ||
126 | if (err) throw err | ||
127 | |||
128 | for (const id of ids) { | ||
129 | kue.Job.get(id, (err, job) => { | ||
130 | if (err) throw err | ||
131 | |||
132 | const p = new Promise((res, rej) => { | ||
133 | job.inactive(err => { | ||
134 | if (err) return rej(err) | ||
135 | return res() | ||
136 | }) | ||
137 | }) | ||
138 | |||
139 | promises.push(p) | ||
140 | }) | ||
141 | } | ||
142 | }) | ||
143 | |||
144 | return Promise.all(promises) | ||
145 | } | ||
146 | |||
120 | static get Instance () { | 147 | static get Instance () { |
121 | return this.instance || (this.instance = new this()) | 148 | return this.instance || (this.instance = new this()) |
122 | } | 149 | } |