aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-02-12 11:25:09 +0100
committerChocobozzz <me@florianbigard.com>2018-02-12 11:25:09 +0100
commit3df456380a3d6c7ff39b0004a5390011739ebc8e (patch)
treef7a0cf3febef6400d70dc34110dceee4ddbda218 /server/lib/job-queue
parente78720386f9a3187f30c0dc74a27ced837977ca9 (diff)
downloadPeerTube-3df456380a3d6c7ff39b0004a5390011739ebc8e.tar.gz
PeerTube-3df456380a3d6c7ff39b0004a5390011739ebc8e.tar.zst
PeerTube-3df456380a3d6c7ff39b0004a5390011739ebc8e.zip
Don't stuck on active jobs
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/job-queue.ts29
1 files changed, 28 insertions, 1 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index d9d7a6e42..fb3cc8f66 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -32,7 +32,7 @@ class JobQueue {
32 32
33 private constructor () {} 33 private constructor () {}
34 34
35 init () { 35 async init () {
36 // Already initialized 36 // Already initialized
37 if (this.initialized === true) return 37 if (this.initialized === true) return
38 this.initialized = true 38 this.initialized = true
@@ -54,6 +54,8 @@ class JobQueue {
54 }) 54 })
55 this.jobQueue.watchStuckJobs(5000) 55 this.jobQueue.watchStuckJobs(5000)
56 56
57 await this.reactiveStuckJobs()
58
57 for (const handlerName of Object.keys(handlers)) { 59 for (const handlerName of Object.keys(handlers)) {
58 this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { 60 this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
59 try { 61 try {
@@ -117,6 +119,31 @@ class JobQueue {
117 }) 119 })
118 } 120 }
119 121
122 private reactiveStuckJobs () {
123 const promises: Promise<any>[] = []
124
125 this.jobQueue.active((err, ids) => {
126 if (err) throw err
127
128 for (const id of ids) {
129 kue.Job.get(id, (err, job) => {
130 if (err) throw err
131
132 const p = new Promise((res, rej) => {
133 job.inactive(err => {
134 if (err) return rej(err)
135 return res()
136 })
137 })
138
139 promises.push(p)
140 })
141 }
142 })
143
144 return Promise.all(promises)
145 }
146
120 static get Instance () { 147 static get Instance () {
121 return this.instance || (this.instance = new this()) 148 return this.instance || (this.instance = new this())
122 } 149 }