]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Update bullmq
authorChocobozzz <me@florianbigard.com>
Thu, 16 Feb 2023 13:10:11 +0000 (14:10 +0100)
committerChocobozzz <me@florianbigard.com>
Thu, 16 Feb 2023 13:10:11 +0000 (14:10 +0100)
Requires redis >= 6.2

package.json
server/initializers/constants.ts
server/lib/job-queue/job-queue.ts
yarn.lock

index 0c2b6e5e5a9d5f8c96d4e30c6a611ffe03846662..831f4d1437053c23c151b225db98abbea28ef3de 100644 (file)
     "bencode": "^2.0.2",
     "bittorrent-tracker": "^9.0.0",
     "bluebird": "^3.5.0",
-    "bullmq": "^1.87.0",
+    "bullmq": "^3.6.6",
     "bytes": "^3.0.0",
     "chokidar": "^3.4.2",
     "commander": "^10.0.0",
index 992c86ed25c91ace624dcf841c9ab1282527e6e3..4703e20f28acc50372afb205cb14c3a2cc3158dd 100644 (file)
@@ -212,10 +212,10 @@ const JOB_TTL: { [id in JobType]: number } = {
 }
 const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = {
   'videos-views-stats': {
-    cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
+    pattern: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
   },
   'activitypub-cleaner': {
-    cron: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM
+    pattern: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM
   }
 }
 const JOB_PRIORITY = {
index 8597eb00018356dc081237b0f48f57cb7f8d6415..cc6be0bd8d1fb3b48f9765d326d31f727e294ea7 100644 (file)
@@ -7,11 +7,10 @@ import {
   QueueEvents,
   QueueEventsOptions,
   QueueOptions,
-  QueueScheduler,
-  QueueSchedulerOptions,
   Worker,
   WorkerOptions
 } from 'bullmq'
+import { parseDurationToMs } from '@server/helpers/core-utils'
 import { jobStates } from '@server/helpers/custom-validators/jobs'
 import { CONFIG } from '@server/initializers/config'
 import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
@@ -41,14 +40,7 @@ import {
   VideoTranscodingPayload
 } from '../../../shared/models'
 import { logger } from '../../helpers/logger'
-import {
-  JOB_ATTEMPTS,
-  JOB_CONCURRENCY,
-  JOB_REMOVAL_OPTIONS,
-  JOB_TTL,
-  REPEAT_JOBS,
-  WEBSERVER
-} from '../../initializers/constants'
+import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
 import { Hooks } from '../plugins/hooks'
 import { Redis } from '../redis'
 import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
@@ -71,7 +63,6 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
 import { processVideoStudioEdition } from './handlers/video-studio-edition'
 import { processVideoTranscoding } from './handlers/video-transcoding'
 import { processVideosViewsStats } from './handlers/video-views-stats'
-import { parseDurationToMs } from '@server/helpers/core-utils'
 
 export type CreateJobArgument =
   { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -166,7 +157,6 @@ class JobQueue {
 
   private workers: { [id in JobType]?: Worker } = {}
   private queues: { [id in JobType]?: Queue } = {}
-  private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
   private queueEvents: { [id in JobType]?: QueueEvents } = {}
 
   private flowProducer: FlowProducer
@@ -187,7 +177,6 @@ class JobQueue {
     for (const handlerName of Object.keys(handlers)) {
       this.buildWorker(handlerName)
       this.buildQueue(handlerName)
-      this.buildQueueScheduler(handlerName)
       this.buildQueueEvent(handlerName)
     }
 
@@ -205,7 +194,8 @@ class JobQueue {
       autorun: false,
       concurrency: this.getJobConcurrency(handlerName),
       prefix: this.jobRedisPrefix,
-      connection: Redis.getRedisClientOptions('Worker')
+      connection: Redis.getRedisClientOptions('Worker'),
+      maxStalledCount: 10
     }
 
     const handler = function (job: Job) {
@@ -255,20 +245,6 @@ class JobQueue {
     this.queues[handlerName] = queue
   }
 
-  private buildQueueScheduler (handlerName: JobType) {
-    const queueSchedulerOptions: QueueSchedulerOptions = {
-      autorun: false,
-      connection: Redis.getRedisClientOptions('QueueScheduler'),
-      prefix: this.jobRedisPrefix,
-      maxStalledCount: 10
-    }
-
-    const queueScheduler = new QueueScheduler(handlerName, queueSchedulerOptions)
-    queueScheduler.on('error', err => { logger.error('Error in job queue scheduler %s.', handlerName, { err }) })
-
-    this.queueSchedulers[handlerName] = queueScheduler
-  }
-
   private buildQueueEvent (handlerName: JobType) {
     const queueEventsOptions: QueueEventsOptions = {
       autorun: false,
@@ -289,13 +265,11 @@ class JobQueue {
       .map(handlerName => {
         const worker: Worker = this.workers[handlerName]
         const queue: Queue = this.queues[handlerName]
-        const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
         const queueEvent: QueueEvents = this.queueEvents[handlerName]
 
         return Promise.all([
           worker.close(false),
           queue.close(),
-          queueScheduler.close(),
           queueEvent.close()
         ])
       })
@@ -307,12 +281,10 @@ class JobQueue {
     const promises = Object.keys(this.workers)
       .map(handlerName => {
         const worker: Worker = this.workers[handlerName]
-        const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
         const queueEvent: QueueEvents = this.queueEvents[handlerName]
 
         return Promise.all([
           worker.run(),
-          queueScheduler.run(),
           queueEvent.run()
         ])
       })
index a3662752452afdcc438936edc0ff228f15a84289..f8d9557efa7e7d943084b9f4abab0ea3cf679100 100644 (file)
--- a/yarn.lock
+++ b/yarn.lock
@@ -3205,15 +3205,14 @@ builtins@^5.0.1:
   dependencies:
     semver "^7.0.0"
 
-bullmq@^1.87.0:
-  version "1.91.1"
-  resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-1.91.1.tgz#ed17cfd4e314afa398fd099a32d365046b1ed4bc"
-  integrity sha512-u7dat9I8ZwouZ651AMZkBSvB6NVUPpnAjd4iokd9DM41whqIBnDjuL11h7+kEjcpiDKj6E+wxZiER00FqirZQg==
+bullmq@^3.6.6:
+  version "3.6.6"
+  resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-3.6.6.tgz#de3c407021eff2eb283fb2aca66336ebeee9d5c5"
+  integrity sha512-W71jXrcTdcT3Y5tzMyTx22Cd8O3dTML7vl6KG3YdGVGrO3+UmKRLYfGLn1QwIhIoTQJVvIrSB4qfGs1hgqYRVw==
   dependencies:
     cron-parser "^4.6.0"
-    get-port "6.1.2"
     glob "^8.0.3"
-    ioredis "^5.2.2"
+    ioredis "^5.3.0"
     lodash "^4.17.21"
     msgpackr "^1.6.2"
     semver "^7.3.7"
@@ -5181,11 +5180,6 @@ get-port@5.1.1:
   resolved "https://registry.yarnpkg.com/get-port/-/get-port-5.1.1.tgz#0469ed07563479de6efb986baf053dcd7d4e3193"
   integrity sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ==
 
-get-port@6.1.2:
-  version "6.1.2"
-  resolved "https://registry.yarnpkg.com/get-port/-/get-port-6.1.2.tgz#c1228abb67ba0e17fb346da33b15187833b9c08a"
-  integrity sha512-BrGGraKm2uPqurfGVj/z97/zv8dPleC6x9JBNRTrDNtCkkRF4rPwrQXFgL7+I+q8QSdU4ntLQX2D7KIxSy8nGw==
-
 get-stdin@^8.0.0:
   version "8.0.0"
   resolved "https://registry.yarnpkg.com/get-stdin/-/get-stdin-8.0.0.tgz#cbad6a73feb75f6eeb22ba9e01f89aa28aa97a53"
@@ -5704,7 +5698,7 @@ invariant@2.2.4:
   dependencies:
     loose-envify "^1.0.0"
 
-ioredis@^5.2.2, ioredis@^5.2.3:
+ioredis@^5.2.3, ioredis@^5.3.0:
   version "5.3.1"
   resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-5.3.1.tgz#55d394a51258cee3af9e96c21c863b1a97bf951f"
   integrity sha512-C+IBcMysM6v52pTLItYMeV4Hz7uriGtoJdz7SSBDX6u+zwSYGirLdQh3L7t/OItWITcw3gTFMjJReYUwS4zihg==