aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-09-14 11:35:58 +0200
committerChocobozzz <me@florianbigard.com>2022-09-14 11:45:18 +0200
commit4404a7c467a2c6863728127eeff5ca4b59619940 (patch)
treefef12748e97c9474d1ca7f21e13ad435286549d8 /server/lib/job-queue/job-queue.ts
parentfc3784583ce383cec8619be5b1d311b340b645f9 (diff)
downloadPeerTube-4404a7c467a2c6863728127eeff5ca4b59619940.tar.gz
PeerTube-4404a7c467a2c6863728127eeff5ca4b59619940.tar.zst
PeerTube-4404a7c467a2c6863728127eeff5ca4b59619940.zip
Prevent job queue to be started before plugins
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r--server/lib/job-queue/job-queue.ts37
1 files changed, 27 insertions, 10 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index e54d12acd..655be6568 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -168,7 +168,7 @@ class JobQueue {
168 private constructor () { 168 private constructor () {
169 } 169 }
170 170
171 init (produceOnly = false) { 171 init () {
172 // Already initialized 172 // Already initialized
173 if (this.initialized === true) return 173 if (this.initialized === true) return
174 this.initialized = true 174 this.initialized = true
@@ -176,10 +176,10 @@ class JobQueue {
176 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST 176 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
177 177
178 for (const handlerName of (Object.keys(handlers) as JobType[])) { 178 for (const handlerName of (Object.keys(handlers) as JobType[])) {
179 this.buildWorker(handlerName, produceOnly) 179 this.buildWorker(handlerName)
180 this.buildQueue(handlerName) 180 this.buildQueue(handlerName)
181 this.buildQueueScheduler(handlerName, produceOnly) 181 this.buildQueueScheduler(handlerName)
182 this.buildQueueEvent(handlerName, produceOnly) 182 this.buildQueueEvent(handlerName)
183 } 183 }
184 184
185 this.flowProducer = new FlowProducer({ 185 this.flowProducer = new FlowProducer({
@@ -191,9 +191,9 @@ class JobQueue {
191 this.addRepeatableJobs() 191 this.addRepeatableJobs()
192 } 192 }
193 193
194 private buildWorker (handlerName: JobType, produceOnly: boolean) { 194 private buildWorker (handlerName: JobType) {
195 const workerOptions: WorkerOptions = { 195 const workerOptions: WorkerOptions = {
196 autorun: !produceOnly, 196 autorun: false,
197 concurrency: this.getJobConcurrency(handlerName), 197 concurrency: this.getJobConcurrency(handlerName),
198 prefix: this.jobRedisPrefix, 198 prefix: this.jobRedisPrefix,
199 connection: this.getRedisConnection() 199 connection: this.getRedisConnection()
@@ -246,9 +246,9 @@ class JobQueue {
246 this.queues[handlerName] = queue 246 this.queues[handlerName] = queue
247 } 247 }
248 248
249 private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { 249 private buildQueueScheduler (handlerName: JobType) {
250 const queueSchedulerOptions: QueueSchedulerOptions = { 250 const queueSchedulerOptions: QueueSchedulerOptions = {
251 autorun: !produceOnly, 251 autorun: false,
252 connection: this.getRedisConnection(), 252 connection: this.getRedisConnection(),
253 prefix: this.jobRedisPrefix, 253 prefix: this.jobRedisPrefix,
254 maxStalledCount: 10 254 maxStalledCount: 10
@@ -260,9 +260,9 @@ class JobQueue {
260 this.queueSchedulers[handlerName] = queueScheduler 260 this.queueSchedulers[handlerName] = queueScheduler
261 } 261 }
262 262
263 private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { 263 private buildQueueEvent (handlerName: JobType) {
264 const queueEventsOptions: QueueEventsOptions = { 264 const queueEventsOptions: QueueEventsOptions = {
265 autorun: !produceOnly, 265 autorun: false,
266 connection: this.getRedisConnection(), 266 connection: this.getRedisConnection(),
267 prefix: this.jobRedisPrefix 267 prefix: this.jobRedisPrefix
268 } 268 }
@@ -304,6 +304,23 @@ class JobQueue {
304 return Promise.all(promises) 304 return Promise.all(promises)
305 } 305 }
306 306
307 start () {
308 const promises = Object.keys(this.workers)
309 .map(handlerName => {
310 const worker: Worker = this.workers[handlerName]
311 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
312 const queueEvent: QueueEvents = this.queueEvents[handlerName]
313
314 return Promise.all([
315 worker.run(),
316 queueScheduler.run(),
317 queueEvent.run()
318 ])
319 })
320
321 return Promise.all(promises)
322 }
323
307 async pause () { 324 async pause () {
308 for (const handlerName of Object.keys(this.workers)) { 325 for (const handlerName of Object.keys(this.workers)) {
309 const worker: Worker = this.workers[handlerName] 326 const worker: Worker = this.workers[handlerName]