diff options
-rw-r--r-- | package.json | 2 | ||||
-rw-r--r-- | server/initializers/constants.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 36 | ||||
-rw-r--r-- | yarn.lock | 18 |
4 files changed, 13 insertions, 47 deletions
diff --git a/package.json b/package.json index 0c2b6e5e5..831f4d143 100644 --- a/package.json +++ b/package.json | |||
@@ -107,7 +107,7 @@ | |||
107 | "bencode": "^2.0.2", | 107 | "bencode": "^2.0.2", |
108 | "bittorrent-tracker": "^9.0.0", | 108 | "bittorrent-tracker": "^9.0.0", |
109 | "bluebird": "^3.5.0", | 109 | "bluebird": "^3.5.0", |
110 | "bullmq": "^1.87.0", | 110 | "bullmq": "^3.6.6", |
111 | "bytes": "^3.0.0", | 111 | "bytes": "^3.0.0", |
112 | "chokidar": "^3.4.2", | 112 | "chokidar": "^3.4.2", |
113 | "commander": "^10.0.0", | 113 | "commander": "^10.0.0", |
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 992c86ed2..4703e20f2 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts | |||
@@ -212,10 +212,10 @@ const JOB_TTL: { [id in JobType]: number } = { | |||
212 | } | 212 | } |
213 | const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = { | 213 | const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = { |
214 | 'videos-views-stats': { | 214 | 'videos-views-stats': { |
215 | cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour | 215 | pattern: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour |
216 | }, | 216 | }, |
217 | 'activitypub-cleaner': { | 217 | 'activitypub-cleaner': { |
218 | cron: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM | 218 | pattern: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM |
219 | } | 219 | } |
220 | } | 220 | } |
221 | const JOB_PRIORITY = { | 221 | const JOB_PRIORITY = { |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 8597eb000..cc6be0bd8 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -7,11 +7,10 @@ import { | |||
7 | QueueEvents, | 7 | QueueEvents, |
8 | QueueEventsOptions, | 8 | QueueEventsOptions, |
9 | QueueOptions, | 9 | QueueOptions, |
10 | QueueScheduler, | ||
11 | QueueSchedulerOptions, | ||
12 | Worker, | 10 | Worker, |
13 | WorkerOptions | 11 | WorkerOptions |
14 | } from 'bullmq' | 12 | } from 'bullmq' |
13 | import { parseDurationToMs } from '@server/helpers/core-utils' | ||
15 | import { jobStates } from '@server/helpers/custom-validators/jobs' | 14 | import { jobStates } from '@server/helpers/custom-validators/jobs' |
16 | import { CONFIG } from '@server/initializers/config' | 15 | import { CONFIG } from '@server/initializers/config' |
17 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 16 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
@@ -41,14 +40,7 @@ import { | |||
41 | VideoTranscodingPayload | 40 | VideoTranscodingPayload |
42 | } from '../../../shared/models' | 41 | } from '../../../shared/models' |
43 | import { logger } from '../../helpers/logger' | 42 | import { logger } from '../../helpers/logger' |
44 | import { | 43 | import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' |
45 | JOB_ATTEMPTS, | ||
46 | JOB_CONCURRENCY, | ||
47 | JOB_REMOVAL_OPTIONS, | ||
48 | JOB_TTL, | ||
49 | REPEAT_JOBS, | ||
50 | WEBSERVER | ||
51 | } from '../../initializers/constants' | ||
52 | import { Hooks } from '../plugins/hooks' | 44 | import { Hooks } from '../plugins/hooks' |
53 | import { Redis } from '../redis' | 45 | import { Redis } from '../redis' |
54 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' | 46 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' |
@@ -71,7 +63,6 @@ import { processVideoLiveEnding } from './handlers/video-live-ending' | |||
71 | import { processVideoStudioEdition } from './handlers/video-studio-edition' | 63 | import { processVideoStudioEdition } from './handlers/video-studio-edition' |
72 | import { processVideoTranscoding } from './handlers/video-transcoding' | 64 | import { processVideoTranscoding } from './handlers/video-transcoding' |
73 | import { processVideosViewsStats } from './handlers/video-views-stats' | 65 | import { processVideosViewsStats } from './handlers/video-views-stats' |
74 | import { parseDurationToMs } from '@server/helpers/core-utils' | ||
75 | 66 | ||
76 | export type CreateJobArgument = | 67 | export type CreateJobArgument = |
77 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 68 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -166,7 +157,6 @@ class JobQueue { | |||
166 | 157 | ||
167 | private workers: { [id in JobType]?: Worker } = {} | 158 | private workers: { [id in JobType]?: Worker } = {} |
168 | private queues: { [id in JobType]?: Queue } = {} | 159 | private queues: { [id in JobType]?: Queue } = {} |
169 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} | ||
170 | private queueEvents: { [id in JobType]?: QueueEvents } = {} | 160 | private queueEvents: { [id in JobType]?: QueueEvents } = {} |
171 | 161 | ||
172 | private flowProducer: FlowProducer | 162 | private flowProducer: FlowProducer |
@@ -187,7 +177,6 @@ class JobQueue { | |||
187 | for (const handlerName of Object.keys(handlers)) { | 177 | for (const handlerName of Object.keys(handlers)) { |
188 | this.buildWorker(handlerName) | 178 | this.buildWorker(handlerName) |
189 | this.buildQueue(handlerName) | 179 | this.buildQueue(handlerName) |
190 | this.buildQueueScheduler(handlerName) | ||
191 | this.buildQueueEvent(handlerName) | 180 | this.buildQueueEvent(handlerName) |
192 | } | 181 | } |
193 | 182 | ||
@@ -205,7 +194,8 @@ class JobQueue { | |||
205 | autorun: false, | 194 | autorun: false, |
206 | concurrency: this.getJobConcurrency(handlerName), | 195 | concurrency: this.getJobConcurrency(handlerName), |
207 | prefix: this.jobRedisPrefix, | 196 | prefix: this.jobRedisPrefix, |
208 | connection: Redis.getRedisClientOptions('Worker') | 197 | connection: Redis.getRedisClientOptions('Worker'), |
198 | maxStalledCount: 10 | ||
209 | } | 199 | } |
210 | 200 | ||
211 | const handler = function (job: Job) { | 201 | const handler = function (job: Job) { |
@@ -255,20 +245,6 @@ class JobQueue { | |||
255 | this.queues[handlerName] = queue | 245 | this.queues[handlerName] = queue |
256 | } | 246 | } |
257 | 247 | ||
258 | private buildQueueScheduler (handlerName: JobType) { | ||
259 | const queueSchedulerOptions: QueueSchedulerOptions = { | ||
260 | autorun: false, | ||
261 | connection: Redis.getRedisClientOptions('QueueScheduler'), | ||
262 | prefix: this.jobRedisPrefix, | ||
263 | maxStalledCount: 10 | ||
264 | } | ||
265 | |||
266 | const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions) | ||
267 | queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) }) | ||
268 | |||
269 | this.queueSchedulers[handlerName] = queueScheduler | ||
270 | } | ||
271 | |||
272 | private buildQueueEvent (handlerName: JobType) { | 248 | private buildQueueEvent (handlerName: JobType) { |
273 | const queueEventsOptions: QueueEventsOptions = { | 249 | const queueEventsOptions: QueueEventsOptions = { |
274 | autorun: false, | 250 | autorun: false, |
@@ -289,13 +265,11 @@ class JobQueue { | |||
289 | .map(handlerName => { | 265 | .map(handlerName => { |
290 | const worker: Worker = this.workers[handlerName] | 266 | const worker: Worker = this.workers[handlerName] |
291 | const queue: Queue = this.queues[handlerName] | 267 | const queue: Queue = this.queues[handlerName] |
292 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | ||
293 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | 268 | const queueEvent: QueueEvents = this.queueEvents[handlerName] |
294 | 269 | ||
295 | return Promise.all([ | 270 | return Promise.all([ |
296 | worker.close(false), | 271 | worker.close(false), |
297 | queue.close(), | 272 | queue.close(), |
298 | queueScheduler.close(), | ||
299 | queueEvent.close() | 273 | queueEvent.close() |
300 | ]) | 274 | ]) |
301 | }) | 275 | }) |
@@ -307,12 +281,10 @@ class JobQueue { | |||
307 | const promises = Object.keys(this.workers) | 281 | const promises = Object.keys(this.workers) |
308 | .map(handlerName => { | 282 | .map(handlerName => { |
309 | const worker: Worker = this.workers[handlerName] | 283 | const worker: Worker = this.workers[handlerName] |
310 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | ||
311 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | 284 | const queueEvent: QueueEvents = this.queueEvents[handlerName] |
312 | 285 | ||
313 | return Promise.all([ | 286 | return Promise.all([ |
314 | worker.run(), | 287 | worker.run(), |
315 | queueScheduler.run(), | ||
316 | queueEvent.run() | 288 | queueEvent.run() |
317 | ]) | 289 | ]) |
318 | }) | 290 | }) |
@@ -3205,15 +3205,14 @@ builtins@^5.0.1: | |||
3205 | dependencies: | 3205 | dependencies: |
3206 | semver "^7.0.0" | 3206 | semver "^7.0.0" |
3207 | 3207 | ||
3208 | bullmq@^1.87.0: | 3208 | bullmq@^3.6.6: |
3209 | version "1.91.1" | 3209 | version "3.6.6" |
3210 | resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-1.91.1.tgz#ed17cfd4e314afa398fd099a32d365046b1ed4bc" | 3210 | resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-3.6.6.tgz#de3c407021eff2eb283fb2aca66336ebeee9d5c5" |
3211 | integrity sha512-u7dat9I8ZwouZ651AMZkBSvB6NVUPpnAjd4iokd9DM41whqIBnDjuL11h7+kEjcpiDKj6E+wxZiER00FqirZQg== | 3211 | integrity sha512-W71jXrcTdcT3Y5tzMyTx22Cd8O3dTML7vl6KG3YdGVGrO3+UmKRLYfGLn1QwIhIoTQJVvIrSB4qfGs1hgqYRVw== |
3212 | dependencies: | 3212 | dependencies: |
3213 | cron-parser "^4.6.0" | 3213 | cron-parser "^4.6.0" |
3214 | get-port "6.1.2" | ||
3215 | glob "^8.0.3" | 3214 | glob "^8.0.3" |
3216 | ioredis "^5.2.2" | 3215 | ioredis "^5.3.0" |
3217 | lodash "^4.17.21" | 3216 | lodash "^4.17.21" |
3218 | msgpackr "^1.6.2" | 3217 | msgpackr "^1.6.2" |
3219 | semver "^7.3.7" | 3218 | semver "^7.3.7" |
@@ -5181,11 +5180,6 @@ get-port@5.1.1: | |||
5181 | resolved "https://registry.yarnpkg.com/get-port/-/get-port-5.1.1.tgz#0469ed07563479de6efb986baf053dcd7d4e3193" | 5180 | resolved "https://registry.yarnpkg.com/get-port/-/get-port-5.1.1.tgz#0469ed07563479de6efb986baf053dcd7d4e3193" |
5182 | integrity sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ== | 5181 | integrity sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ== |
5183 | 5182 | ||
5184 | get-port@6.1.2: | ||
5185 | version "6.1.2" | ||
5186 | resolved "https://registry.yarnpkg.com/get-port/-/get-port-6.1.2.tgz#c1228abb67ba0e17fb346da33b15187833b9c08a" | ||
5187 | integrity sha512-BrGGraKm2uPqurfGVj/z97/zv8dPleC6x9JBNRTrDNtCkkRF4rPwrQXFgL7+I+q8QSdU4ntLQX2D7KIxSy8nGw== | ||
5188 | |||
5189 | get-stdin@^8.0.0: | 5183 | get-stdin@^8.0.0: |
5190 | version "8.0.0" | 5184 | version "8.0.0" |
5191 | resolved "https://registry.yarnpkg.com/get-stdin/-/get-stdin-8.0.0.tgz#cbad6a73feb75f6eeb22ba9e01f89aa28aa97a53" | 5185 | resolved "https://registry.yarnpkg.com/get-stdin/-/get-stdin-8.0.0.tgz#cbad6a73feb75f6eeb22ba9e01f89aa28aa97a53" |
@@ -5704,7 +5698,7 @@ invariant@2.2.4: | |||
5704 | dependencies: | 5698 | dependencies: |
5705 | loose-envify "^1.0.0" | 5699 | loose-envify "^1.0.0" |
5706 | 5700 | ||
5707 | ioredis@^5.2.2, ioredis@^5.2.3: | 5701 | ioredis@^5.2.3, ioredis@^5.3.0: |
5708 | version "5.3.1" | 5702 | version "5.3.1" |
5709 | resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-5.3.1.tgz#55d394a51258cee3af9e96c21c863b1a97bf951f" | 5703 | resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-5.3.1.tgz#55d394a51258cee3af9e96c21c863b1a97bf951f" |
5710 | integrity sha512-C+IBcMysM6v52pTLItYMeV4Hz7uriGtoJdz7SSBDX6u+zwSYGirLdQh3L7t/OItWITcw3gTFMjJReYUwS4zihg== | 5704 | integrity sha512-C+IBcMysM6v52pTLItYMeV4Hz7uriGtoJdz7SSBDX6u+zwSYGirLdQh3L7t/OItWITcw3gTFMjJReYUwS4zihg== |