aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-02-16 14:10:11 +0100
committerChocobozzz <me@florianbigard.com>2023-02-16 14:10:11 +0100
commit182082f52d8cd94dc068ce39276af8243a00fb7c (patch)
tree25d6155e7515c23c2ec28fa2b83928ff80930dec
parente65ef81cf51746616182a822bd6933bf0d16717a (diff)
downloadPeerTube-182082f52d8cd94dc068ce39276af8243a00fb7c.tar.gz
PeerTube-182082f52d8cd94dc068ce39276af8243a00fb7c.tar.zst
PeerTube-182082f52d8cd94dc068ce39276af8243a00fb7c.zip
Update bullmq
Requires redis >= 6.2
-rw-r--r--package.json2
-rw-r--r--server/initializers/constants.ts4
-rw-r--r--server/lib/job-queue/job-queue.ts36
-rw-r--r--yarn.lock18
4 files changed, 13 insertions, 47 deletions
diff --git a/package.json b/package.json
index 0c2b6e5e5..831f4d143 100644
--- a/package.json
+++ b/package.json
@@ -107,7 +107,7 @@
107 "bencode": "^2.0.2", 107 "bencode": "^2.0.2",
108 "bittorrent-tracker": "^9.0.0", 108 "bittorrent-tracker": "^9.0.0",
109 "bluebird": "^3.5.0", 109 "bluebird": "^3.5.0",
110 "bullmq": "^1.87.0", 110 "bullmq": "^3.6.6",
111 "bytes": "^3.0.0", 111 "bytes": "^3.0.0",
112 "chokidar": "^3.4.2", 112 "chokidar": "^3.4.2",
113 "commander": "^10.0.0", 113 "commander": "^10.0.0",
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 992c86ed2..4703e20f2 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -212,10 +212,10 @@ const JOB_TTL: { [id in JobType]: number } = {
212} 212}
213const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = { 213const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = {
214 'videos-views-stats': { 214 'videos-views-stats': {
215 cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour 215 pattern: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
216 }, 216 },
217 'activitypub-cleaner': { 217 'activitypub-cleaner': {
218 cron: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM 218 pattern: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM
219 } 219 }
220} 220}
221const JOB_PRIORITY = { 221const JOB_PRIORITY = {
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 8597eb000..cc6be0bd8 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -7,11 +7,10 @@ import {
7 QueueEvents, 7 QueueEvents,
8 QueueEventsOptions, 8 QueueEventsOptions,
9 QueueOptions, 9 QueueOptions,
10 QueueScheduler,
11 QueueSchedulerOptions,
12 Worker, 10 Worker,
13 WorkerOptions 11 WorkerOptions
14} from 'bullmq' 12} from 'bullmq'
13import { parseDurationToMs } from '@server/helpers/core-utils'
15import { jobStates } from '@server/helpers/custom-validators/jobs' 14import { jobStates } from '@server/helpers/custom-validators/jobs'
16import { CONFIG } from '@server/initializers/config' 15import { CONFIG } from '@server/initializers/config'
17import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 16import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
@@ -41,14 +40,7 @@ import {
41 VideoTranscodingPayload 40 VideoTranscodingPayload
42} from '../../../shared/models' 41} from '../../../shared/models'
43import { logger } from '../../helpers/logger' 42import { logger } from '../../helpers/logger'
44import { 43import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
45 JOB_ATTEMPTS,
46 JOB_CONCURRENCY,
47 JOB_REMOVAL_OPTIONS,
48 JOB_TTL,
49 REPEAT_JOBS,
50 WEBSERVER
51} from '../../initializers/constants'
52import { Hooks } from '../plugins/hooks' 44import { Hooks } from '../plugins/hooks'
53import { Redis } from '../redis' 45import { Redis } from '../redis'
54import { processActivityPubCleaner } from './handlers/activitypub-cleaner' 46import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
@@ -71,7 +63,6 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
71import { processVideoStudioEdition } from './handlers/video-studio-edition' 63import { processVideoStudioEdition } from './handlers/video-studio-edition'
72import { processVideoTranscoding } from './handlers/video-transcoding' 64import { processVideoTranscoding } from './handlers/video-transcoding'
73import { processVideosViewsStats } from './handlers/video-views-stats' 65import { processVideosViewsStats } from './handlers/video-views-stats'
74import { parseDurationToMs } from '@server/helpers/core-utils'
75 66
76export type CreateJobArgument = 67export type CreateJobArgument =
77 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 68 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -166,7 +157,6 @@ class JobQueue {
166 157
167 private workers: { [id in JobType]?: Worker } = {} 158 private workers: { [id in JobType]?: Worker } = {}
168 private queues: { [id in JobType]?: Queue } = {} 159 private queues: { [id in JobType]?: Queue } = {}
169 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
170 private queueEvents: { [id in JobType]?: QueueEvents } = {} 160 private queueEvents: { [id in JobType]?: QueueEvents } = {}
171 161
172 private flowProducer: FlowProducer 162 private flowProducer: FlowProducer
@@ -187,7 +177,6 @@ class JobQueue {
187 for (const handlerName of Object.keys(handlers)) { 177 for (const handlerName of Object.keys(handlers)) {
188 this.buildWorker(handlerName) 178 this.buildWorker(handlerName)
189 this.buildQueue(handlerName) 179 this.buildQueue(handlerName)
190 this.buildQueueScheduler(handlerName)
191 this.buildQueueEvent(handlerName) 180 this.buildQueueEvent(handlerName)
192 } 181 }
193 182
@@ -205,7 +194,8 @@ class JobQueue {
205 autorun: false, 194 autorun: false,
206 concurrency: this.getJobConcurrency(handlerName), 195 concurrency: this.getJobConcurrency(handlerName),
207 prefix: this.jobRedisPrefix, 196 prefix: this.jobRedisPrefix,
208 connection: Redis.getRedisClientOptions('Worker') 197 connection: Redis.getRedisClientOptions('Worker'),
198 maxStalledCount: 10
209 } 199 }
210 200
211 const handler = function (job: Job) { 201 const handler = function (job: Job) {
@@ -255,20 +245,6 @@ class JobQueue {
255 this.queues[handlerName] = queue 245 this.queues[handlerName] = queue
256 } 246 }
257 247
258 private buildQueueScheduler (handlerName: JobType) {
259 const queueSchedulerOptions: QueueSchedulerOptions = {
260 autorun: false,
261 connection: Redis.getRedisClientOptions('QueueScheduler'),
262 prefix: this.jobRedisPrefix,
263 maxStalledCount: 10
264 }
265
266 const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
267 queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
268
269 this.queueSchedulers[handlerName] = queueScheduler
270 }
271
272 private buildQueueEvent (handlerName: JobType) { 248 private buildQueueEvent (handlerName: JobType) {
273 const queueEventsOptions: QueueEventsOptions = { 249 const queueEventsOptions: QueueEventsOptions = {
274 autorun: false, 250 autorun: false,
@@ -289,13 +265,11 @@ class JobQueue {
289 .map(handlerName => { 265 .map(handlerName => {
290 const worker: Worker = this.workers[handlerName] 266 const worker: Worker = this.workers[handlerName]
291 const queue: Queue = this.queues[handlerName] 267 const queue: Queue = this.queues[handlerName]
292 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
293 const queueEvent: QueueEvents = this.queueEvents[handlerName] 268 const queueEvent: QueueEvents = this.queueEvents[handlerName]
294 269
295 return Promise.all([ 270 return Promise.all([
296 worker.close(false), 271 worker.close(false),
297 queue.close(), 272 queue.close(),
298 queueScheduler.close(),
299 queueEvent.close() 273 queueEvent.close()
300 ]) 274 ])
301 }) 275 })
@@ -307,12 +281,10 @@ class JobQueue {
307 const promises = Object.keys(this.workers) 281 const promises = Object.keys(this.workers)
308 .map(handlerName => { 282 .map(handlerName => {
309 const worker: Worker = this.workers[handlerName] 283 const worker: Worker = this.workers[handlerName]
310 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
311 const queueEvent: QueueEvents = this.queueEvents[handlerName] 284 const queueEvent: QueueEvents = this.queueEvents[handlerName]
312 285
313 return Promise.all([ 286 return Promise.all([
314 worker.run(), 287 worker.run(),
315 queueScheduler.run(),
316 queueEvent.run() 288 queueEvent.run()
317 ]) 289 ])
318 }) 290 })
diff --git a/yarn.lock b/yarn.lock
index a36627524..f8d9557ef 100644
--- a/yarn.lock
+++ b/yarn.lock
@@ -3205,15 +3205,14 @@ builtins@^5.0.1:
3205 dependencies: 3205 dependencies:
3206 semver "^7.0.0" 3206 semver "^7.0.0"
3207 3207
3208bullmq@^1.87.0: 3208bullmq@^3.6.6:
3209 version "1.91.1" 3209 version "3.6.6"
3210 resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-1.91.1.tgz#ed17cfd4e314afa398fd099a32d365046b1ed4bc" 3210 resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-3.6.6.tgz#de3c407021eff2eb283fb2aca66336ebeee9d5c5"
3211 integrity sha512-u7dat9I8ZwouZ651AMZkBSvB6NVUPpnAjd4iokd9DM41whqIBnDjuL11h7+kEjcpiDKj6E+wxZiER00FqirZQg== 3211 integrity sha512-W71jXrcTdcT3Y5tzMyTx22Cd8O3dTML7vl6KG3YdGVGrO3+UmKRLYfGLn1QwIhIoTQJVvIrSB4qfGs1hgqYRVw==
3212 dependencies: 3212 dependencies:
3213 cron-parser "^4.6.0" 3213 cron-parser "^4.6.0"
3214 get-port "6.1.2"
3215 glob "^8.0.3" 3214 glob "^8.0.3"
3216 ioredis "^5.2.2" 3215 ioredis "^5.3.0"
3217 lodash "^4.17.21" 3216 lodash "^4.17.21"
3218 msgpackr "^1.6.2" 3217 msgpackr "^1.6.2"
3219 semver "^7.3.7" 3218 semver "^7.3.7"
@@ -5181,11 +5180,6 @@ get-port@5.1.1:
5181 resolved "https://registry.yarnpkg.com/get-port/-/get-port-5.1.1.tgz#0469ed07563479de6efb986baf053dcd7d4e3193" 5180 resolved "https://registry.yarnpkg.com/get-port/-/get-port-5.1.1.tgz#0469ed07563479de6efb986baf053dcd7d4e3193"
5182 integrity sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ== 5181 integrity sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==
5183 5182
5184get-port@6.1.2:
5185 version "6.1.2"
5186 resolved "https://registry.yarnpkg.com/get-port/-/get-port-6.1.2.tgz#c1228abb67ba0e17fb346da33b15187833b9c08a"
5187 integrity sha512-BrGGraKm2uPqurfGVj/z97/zv8dPleC6x9JBNRTrDNtCkkRF4rPwrQXFgL7+I+q8QSdU4ntLQX2D7KIxSy8nGw==
5188
5189get-stdin@^8.0.0: 5183get-stdin@^8.0.0:
5190 version "8.0.0" 5184 version "8.0.0"
5191 resolved "https://registry.yarnpkg.com/get-stdin/-/get-stdin-8.0.0.tgz#cbad6a73feb75f6eeb22ba9e01f89aa28aa97a53" 5185 resolved "https://registry.yarnpkg.com/get-stdin/-/get-stdin-8.0.0.tgz#cbad6a73feb75f6eeb22ba9e01f89aa28aa97a53"
@@ -5704,7 +5698,7 @@ invariant@2.2.4:
5704 dependencies: 5698 dependencies:
5705 loose-envify "^1.0.0" 5699 loose-envify "^1.0.0"
5706 5700
5707ioredis@^5.2.2, ioredis@^5.2.3: 5701ioredis@^5.2.3, ioredis@^5.3.0:
5708 version "5.3.1" 5702 version "5.3.1"
5709 resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-5.3.1.tgz#55d394a51258cee3af9e96c21c863b1a97bf951f" 5703 resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-5.3.1.tgz#55d394a51258cee3af9e96c21c863b1a97bf951f"
5710 integrity sha512-C+IBcMysM6v52pTLItYMeV4Hz7uriGtoJdz7SSBDX6u+zwSYGirLdQh3L7t/OItWITcw3gTFMjJReYUwS4zihg== 5704 integrity sha512-C+IBcMysM6v52pTLItYMeV4Hz7uriGtoJdz7SSBDX6u+zwSYGirLdQh3L7t/OItWITcw3gTFMjJReYUwS4zihg==