aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--scripts/create-import-video-file-job.ts2
-rw-r--r--scripts/create-move-video-storage-job.ts2
-rwxr-xr-xscripts/create-transcoding-job.ts2
-rw-r--r--scripts/migrations/peertube-4.0.ts2
-rw-r--r--scripts/migrations/peertube-4.2.ts2
-rw-r--r--server.ts6
-rw-r--r--server/lib/job-queue/job-queue.ts37
7 files changed, 38 insertions, 15 deletions
diff --git a/scripts/create-import-video-file-job.ts b/scripts/create-import-video-file-job.ts
index cf974f240..9cb387d2e 100644
--- a/scripts/create-import-video-file-job.ts
+++ b/scripts/create-import-video-file-job.ts
@@ -44,7 +44,7 @@ async function run () {
44 filePath: resolve(options.import) 44 filePath: resolve(options.import)
45 } 45 }
46 46
47 JobQueue.Instance.init(true) 47 JobQueue.Instance.init()
48 await JobQueue.Instance.createJob({ type: 'video-file-import', payload: dataInput }) 48 await JobQueue.Instance.createJob({ type: 'video-file-import', payload: dataInput })
49 console.log('Import job for video %s created.', video.uuid) 49 console.log('Import job for video %s created.', video.uuid)
50} 50}
diff --git a/scripts/create-move-video-storage-job.ts b/scripts/create-move-video-storage-job.ts
index 0f0d4ee35..13ba3c0b7 100644
--- a/scripts/create-move-video-storage-job.ts
+++ b/scripts/create-move-video-storage-job.ts
@@ -37,7 +37,7 @@ run()
37async function run () { 37async function run () {
38 await initDatabaseModels(true) 38 await initDatabaseModels(true)
39 39
40 JobQueue.Instance.init(true) 40 JobQueue.Instance.init()
41 41
42 let ids: number[] = [] 42 let ids: number[] = []
43 43
diff --git a/scripts/create-transcoding-job.ts b/scripts/create-transcoding-job.ts
index aa97b0ba7..ffdf55ae4 100755
--- a/scripts/create-transcoding-job.ts
+++ b/scripts/create-transcoding-job.ts
@@ -97,7 +97,7 @@ async function run () {
97 } 97 }
98 } 98 }
99 99
100 JobQueue.Instance.init(true) 100 JobQueue.Instance.init()
101 101
102 video.state = VideoState.TO_TRANSCODE 102 video.state = VideoState.TO_TRANSCODE
103 await video.save() 103 await video.save()
diff --git a/scripts/migrations/peertube-4.0.ts b/scripts/migrations/peertube-4.0.ts
index 9e5ca60d4..b0891c2e6 100644
--- a/scripts/migrations/peertube-4.0.ts
+++ b/scripts/migrations/peertube-4.0.ts
@@ -21,7 +21,7 @@ async function run () {
21 21
22 await initDatabaseModels(true) 22 await initDatabaseModels(true)
23 23
24 JobQueue.Instance.init(true) 24 JobQueue.Instance.init()
25 25
26 const ids = await VideoModel.listLocalIds() 26 const ids = await VideoModel.listLocalIds()
27 27
diff --git a/scripts/migrations/peertube-4.2.ts b/scripts/migrations/peertube-4.2.ts
index 6a9007265..513c629ef 100644
--- a/scripts/migrations/peertube-4.2.ts
+++ b/scripts/migrations/peertube-4.2.ts
@@ -27,7 +27,7 @@ async function run () {
27 console.log('Generate avatar miniatures from existing avatars.') 27 console.log('Generate avatar miniatures from existing avatars.')
28 28
29 await initDatabaseModels(true) 29 await initDatabaseModels(true)
30 JobQueue.Instance.init(true) 30 JobQueue.Instance.init()
31 31
32 const accounts: AccountModel[] = await AccountModel.findAll({ 32 const accounts: AccountModel[] = await AccountModel.findAll({
33 include: [ 33 include: [
diff --git a/server.ts b/server.ts
index 63a08f471..887814d4e 100644
--- a/server.ts
+++ b/server.ts
@@ -351,6 +351,12 @@ async function startApplication () {
351 ApplicationModel.updateNodeVersions() 351 ApplicationModel.updateNodeVersions()
352 .catch(err => logger.error('Cannot update node versions.', { err })) 352 .catch(err => logger.error('Cannot update node versions.', { err }))
353 353
354 JobQueue.Instance.start()
355 .catch(err => {
356 logger.error('Cannot start job queue.', { err })
357 process.exit(-1)
358 })
359
354 logger.info('HTTP server listening on %s:%d', hostname, port) 360 logger.info('HTTP server listening on %s:%d', hostname, port)
355 logger.info('Web server: %s', WEBSERVER.URL) 361 logger.info('Web server: %s', WEBSERVER.URL)
356 362
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index e54d12acd..655be6568 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -168,7 +168,7 @@ class JobQueue {
168 private constructor () { 168 private constructor () {
169 } 169 }
170 170
171 init (produceOnly = false) { 171 init () {
172 // Already initialized 172 // Already initialized
173 if (this.initialized === true) return 173 if (this.initialized === true) return
174 this.initialized = true 174 this.initialized = true
@@ -176,10 +176,10 @@ class JobQueue {
176 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST 176 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
177 177
178 for (const handlerName of (Object.keys(handlers) as JobType[])) { 178 for (const handlerName of (Object.keys(handlers) as JobType[])) {
179 this.buildWorker(handlerName, produceOnly) 179 this.buildWorker(handlerName)
180 this.buildQueue(handlerName) 180 this.buildQueue(handlerName)
181 this.buildQueueScheduler(handlerName, produceOnly) 181 this.buildQueueScheduler(handlerName)
182 this.buildQueueEvent(handlerName, produceOnly) 182 this.buildQueueEvent(handlerName)
183 } 183 }
184 184
185 this.flowProducer = new FlowProducer({ 185 this.flowProducer = new FlowProducer({
@@ -191,9 +191,9 @@ class JobQueue {
191 this.addRepeatableJobs() 191 this.addRepeatableJobs()
192 } 192 }
193 193
194 private buildWorker (handlerName: JobType, produceOnly: boolean) { 194 private buildWorker (handlerName: JobType) {
195 const workerOptions: WorkerOptions = { 195 const workerOptions: WorkerOptions = {
196 autorun: !produceOnly, 196 autorun: false,
197 concurrency: this.getJobConcurrency(handlerName), 197 concurrency: this.getJobConcurrency(handlerName),
198 prefix: this.jobRedisPrefix, 198 prefix: this.jobRedisPrefix,
199 connection: this.getRedisConnection() 199 connection: this.getRedisConnection()
@@ -246,9 +246,9 @@ class JobQueue {
246 this.queues[handlerName] = queue 246 this.queues[handlerName] = queue
247 } 247 }
248 248
249 private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { 249 private buildQueueScheduler (handlerName: JobType) {
250 const queueSchedulerOptions: QueueSchedulerOptions = { 250 const queueSchedulerOptions: QueueSchedulerOptions = {
251 autorun: !produceOnly, 251 autorun: false,
252 connection: this.getRedisConnection(), 252 connection: this.getRedisConnection(),
253 prefix: this.jobRedisPrefix, 253 prefix: this.jobRedisPrefix,
254 maxStalledCount: 10 254 maxStalledCount: 10
@@ -260,9 +260,9 @@ class JobQueue {
260 this.queueSchedulers[handlerName] = queueScheduler 260 this.queueSchedulers[handlerName] = queueScheduler
261 } 261 }
262 262
263 private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { 263 private buildQueueEvent (handlerName: JobType) {
264 const queueEventsOptions: QueueEventsOptions = { 264 const queueEventsOptions: QueueEventsOptions = {
265 autorun: !produceOnly, 265 autorun: false,
266 connection: this.getRedisConnection(), 266 connection: this.getRedisConnection(),
267 prefix: this.jobRedisPrefix 267 prefix: this.jobRedisPrefix
268 } 268 }
@@ -304,6 +304,23 @@ class JobQueue {
304 return Promise.all(promises) 304 return Promise.all(promises)
305 } 305 }
306 306
307 start () {
308 const promises = Object.keys(this.workers)
309 .map(handlerName => {
310 const worker: Worker = this.workers[handlerName]
311 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
312 const queueEvent: QueueEvents = this.queueEvents[handlerName]
313
314 return Promise.all([
315 worker.run(),
316 queueScheduler.run(),
317 queueEvent.run()
318 ])
319 })
320
321 return Promise.all(promises)
322 }
323
307 async pause () { 324 async pause () {
308 for (const handlerName of Object.keys(this.workers)) { 325 for (const handlerName of Object.keys(this.workers)) {
309 const worker: Worker = this.workers[handlerName] 326 const worker: Worker = this.workers[handlerName]