diff options
-rw-r--r-- | scripts/create-import-video-file-job.ts | 2 | ||||
-rw-r--r-- | scripts/create-move-video-storage-job.ts | 2 | ||||
-rwxr-xr-x | scripts/create-transcoding-job.ts | 2 | ||||
-rw-r--r-- | scripts/migrations/peertube-4.0.ts | 2 | ||||
-rw-r--r-- | scripts/migrations/peertube-4.2.ts | 2 | ||||
-rw-r--r-- | server.ts | 6 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 37 |
7 files changed, 38 insertions, 15 deletions
diff --git a/scripts/create-import-video-file-job.ts b/scripts/create-import-video-file-job.ts index cf974f240..9cb387d2e 100644 --- a/scripts/create-import-video-file-job.ts +++ b/scripts/create-import-video-file-job.ts | |||
@@ -44,7 +44,7 @@ async function run () { | |||
44 | filePath: resolve(options.import) | 44 | filePath: resolve(options.import) |
45 | } | 45 | } |
46 | 46 | ||
47 | JobQueue.Instance.init(true) | 47 | JobQueue.Instance.init() |
48 | await JobQueue.Instance.createJob({ type: 'video-file-import', payload: dataInput }) | 48 | await JobQueue.Instance.createJob({ type: 'video-file-import', payload: dataInput }) |
49 | console.log('Import job for video %s created.', video.uuid) | 49 | console.log('Import job for video %s created.', video.uuid) |
50 | } | 50 | } |
diff --git a/scripts/create-move-video-storage-job.ts b/scripts/create-move-video-storage-job.ts index 0f0d4ee35..13ba3c0b7 100644 --- a/scripts/create-move-video-storage-job.ts +++ b/scripts/create-move-video-storage-job.ts | |||
@@ -37,7 +37,7 @@ run() | |||
37 | async function run () { | 37 | async function run () { |
38 | await initDatabaseModels(true) | 38 | await initDatabaseModels(true) |
39 | 39 | ||
40 | JobQueue.Instance.init(true) | 40 | JobQueue.Instance.init() |
41 | 41 | ||
42 | let ids: number[] = [] | 42 | let ids: number[] = [] |
43 | 43 | ||
diff --git a/scripts/create-transcoding-job.ts b/scripts/create-transcoding-job.ts index aa97b0ba7..ffdf55ae4 100755 --- a/scripts/create-transcoding-job.ts +++ b/scripts/create-transcoding-job.ts | |||
@@ -97,7 +97,7 @@ async function run () { | |||
97 | } | 97 | } |
98 | } | 98 | } |
99 | 99 | ||
100 | JobQueue.Instance.init(true) | 100 | JobQueue.Instance.init() |
101 | 101 | ||
102 | video.state = VideoState.TO_TRANSCODE | 102 | video.state = VideoState.TO_TRANSCODE |
103 | await video.save() | 103 | await video.save() |
diff --git a/scripts/migrations/peertube-4.0.ts b/scripts/migrations/peertube-4.0.ts index 9e5ca60d4..b0891c2e6 100644 --- a/scripts/migrations/peertube-4.0.ts +++ b/scripts/migrations/peertube-4.0.ts | |||
@@ -21,7 +21,7 @@ async function run () { | |||
21 | 21 | ||
22 | await initDatabaseModels(true) | 22 | await initDatabaseModels(true) |
23 | 23 | ||
24 | JobQueue.Instance.init(true) | 24 | JobQueue.Instance.init() |
25 | 25 | ||
26 | const ids = await VideoModel.listLocalIds() | 26 | const ids = await VideoModel.listLocalIds() |
27 | 27 | ||
diff --git a/scripts/migrations/peertube-4.2.ts b/scripts/migrations/peertube-4.2.ts index 6a9007265..513c629ef 100644 --- a/scripts/migrations/peertube-4.2.ts +++ b/scripts/migrations/peertube-4.2.ts | |||
@@ -27,7 +27,7 @@ async function run () { | |||
27 | console.log('Generate avatar miniatures from existing avatars.') | 27 | console.log('Generate avatar miniatures from existing avatars.') |
28 | 28 | ||
29 | await initDatabaseModels(true) | 29 | await initDatabaseModels(true) |
30 | JobQueue.Instance.init(true) | 30 | JobQueue.Instance.init() |
31 | 31 | ||
32 | const accounts: AccountModel[] = await AccountModel.findAll({ | 32 | const accounts: AccountModel[] = await AccountModel.findAll({ |
33 | include: [ | 33 | include: [ |
@@ -351,6 +351,12 @@ async function startApplication () { | |||
351 | ApplicationModel.updateNodeVersions() | 351 | ApplicationModel.updateNodeVersions() |
352 | .catch(err => logger.error('Cannot update node versions.', { err })) | 352 | .catch(err => logger.error('Cannot update node versions.', { err })) |
353 | 353 | ||
354 | JobQueue.Instance.start() | ||
355 | .catch(err => { | ||
356 | logger.error('Cannot start job queue.', { err }) | ||
357 | process.exit(-1) | ||
358 | }) | ||
359 | |||
354 | logger.info('HTTP server listening on %s:%d', hostname, port) | 360 | logger.info('HTTP server listening on %s:%d', hostname, port) |
355 | logger.info('Web server: %s', WEBSERVER.URL) | 361 | logger.info('Web server: %s', WEBSERVER.URL) |
356 | 362 | ||
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e54d12acd..655be6568 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -168,7 +168,7 @@ class JobQueue { | |||
168 | private constructor () { | 168 | private constructor () { |
169 | } | 169 | } |
170 | 170 | ||
171 | init (produceOnly = false) { | 171 | init () { |
172 | // Already initialized | 172 | // Already initialized |
173 | if (this.initialized === true) return | 173 | if (this.initialized === true) return |
174 | this.initialized = true | 174 | this.initialized = true |
@@ -176,10 +176,10 @@ class JobQueue { | |||
176 | this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST | 176 | this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST |
177 | 177 | ||
178 | for (const handlerName of (Object.keys(handlers) as JobType[])) { | 178 | for (const handlerName of (Object.keys(handlers) as JobType[])) { |
179 | this.buildWorker(handlerName, produceOnly) | 179 | this.buildWorker(handlerName) |
180 | this.buildQueue(handlerName) | 180 | this.buildQueue(handlerName) |
181 | this.buildQueueScheduler(handlerName, produceOnly) | 181 | this.buildQueueScheduler(handlerName) |
182 | this.buildQueueEvent(handlerName, produceOnly) | 182 | this.buildQueueEvent(handlerName) |
183 | } | 183 | } |
184 | 184 | ||
185 | this.flowProducer = new FlowProducer({ | 185 | this.flowProducer = new FlowProducer({ |
@@ -191,9 +191,9 @@ class JobQueue { | |||
191 | this.addRepeatableJobs() | 191 | this.addRepeatableJobs() |
192 | } | 192 | } |
193 | 193 | ||
194 | private buildWorker (handlerName: JobType, produceOnly: boolean) { | 194 | private buildWorker (handlerName: JobType) { |
195 | const workerOptions: WorkerOptions = { | 195 | const workerOptions: WorkerOptions = { |
196 | autorun: !produceOnly, | 196 | autorun: false, |
197 | concurrency: this.getJobConcurrency(handlerName), | 197 | concurrency: this.getJobConcurrency(handlerName), |
198 | prefix: this.jobRedisPrefix, | 198 | prefix: this.jobRedisPrefix, |
199 | connection: this.getRedisConnection() | 199 | connection: this.getRedisConnection() |
@@ -246,9 +246,9 @@ class JobQueue { | |||
246 | this.queues[handlerName] = queue | 246 | this.queues[handlerName] = queue |
247 | } | 247 | } |
248 | 248 | ||
249 | private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { | 249 | private buildQueueScheduler (handlerName: JobType) { |
250 | const queueSchedulerOptions: QueueSchedulerOptions = { | 250 | const queueSchedulerOptions: QueueSchedulerOptions = { |
251 | autorun: !produceOnly, | 251 | autorun: false, |
252 | connection: this.getRedisConnection(), | 252 | connection: this.getRedisConnection(), |
253 | prefix: this.jobRedisPrefix, | 253 | prefix: this.jobRedisPrefix, |
254 | maxStalledCount: 10 | 254 | maxStalledCount: 10 |
@@ -260,9 +260,9 @@ class JobQueue { | |||
260 | this.queueSchedulers[handlerName] = queueScheduler | 260 | this.queueSchedulers[handlerName] = queueScheduler |
261 | } | 261 | } |
262 | 262 | ||
263 | private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { | 263 | private buildQueueEvent (handlerName: JobType) { |
264 | const queueEventsOptions: QueueEventsOptions = { | 264 | const queueEventsOptions: QueueEventsOptions = { |
265 | autorun: !produceOnly, | 265 | autorun: false, |
266 | connection: this.getRedisConnection(), | 266 | connection: this.getRedisConnection(), |
267 | prefix: this.jobRedisPrefix | 267 | prefix: this.jobRedisPrefix |
268 | } | 268 | } |
@@ -304,6 +304,23 @@ class JobQueue { | |||
304 | return Promise.all(promises) | 304 | return Promise.all(promises) |
305 | } | 305 | } |
306 | 306 | ||
307 | start () { | ||
308 | const promises = Object.keys(this.workers) | ||
309 | .map(handlerName => { | ||
310 | const worker: Worker = this.workers[handlerName] | ||
311 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | ||
312 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | ||
313 | |||
314 | return Promise.all([ | ||
315 | worker.run(), | ||
316 | queueScheduler.run(), | ||
317 | queueEvent.run() | ||
318 | ]) | ||
319 | }) | ||
320 | |||
321 | return Promise.all(promises) | ||
322 | } | ||
323 | |||
307 | async pause () { | 324 | async pause () { |
308 | for (const handlerName of Object.keys(this.workers)) { | 325 | for (const handlerName of Object.keys(this.workers)) { |
309 | const worker: Worker = this.workers[handlerName] | 326 | const worker: Worker = this.workers[handlerName] |