]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Move to bullmq
authorChocobozzz <me@florianbigard.com>
Mon, 8 Aug 2022 08:42:08 +0000 (10:42 +0200)
committerChocobozzz <me@florianbigard.com>
Tue, 9 Aug 2022 07:18:07 +0000 (09:18 +0200)
30 files changed:
package.json
server.ts
server/controllers/api/jobs.ts
server/controllers/api/videos/update.ts
server/controllers/api/videos/upload.ts
server/helpers/custom-validators/jobs.ts
server/helpers/ffmpeg/ffmpeg-commons.ts
server/helpers/ffmpeg/ffmpeg-vod.ts
server/initializers/constants.ts
server/lib/job-queue/handlers/activitypub-cleaner.ts
server/lib/job-queue/handlers/activitypub-follow.ts
server/lib/job-queue/handlers/activitypub-http-broadcast.ts
server/lib/job-queue/handlers/activitypub-http-fetcher.ts
server/lib/job-queue/handlers/activitypub-http-unicast.ts
server/lib/job-queue/handlers/activitypub-refresher.ts
server/lib/job-queue/handlers/actor-keys.ts
server/lib/job-queue/handlers/email.ts
server/lib/job-queue/handlers/manage-video-torrent.ts
server/lib/job-queue/handlers/move-to-object-storage.ts
server/lib/job-queue/handlers/video-file-import.ts
server/lib/job-queue/handlers/video-import.ts
server/lib/job-queue/handlers/video-live-ending.ts
server/lib/job-queue/handlers/video-redundancy.ts
server/lib/job-queue/handlers/video-studio-edition.ts
server/lib/job-queue/handlers/video-transcoding.ts
server/lib/job-queue/job-queue.ts
server/lib/transcoding/transcoding.ts
shared/core-utils/common/promises.ts
shared/models/server/job.model.ts
yarn.lock

index 24924c3da05b4a14ef421ac1d63ceb9f48a58419..64faf835592f8f33cb0aa1978c49d75de60fb659 100644 (file)
     "bencode": "^2.0.2",
     "bittorrent-tracker": "^9.0.0",
     "bluebird": "^3.5.0",
-    "bull": "^4.1.0",
+    "bullmq": "^1.87.0",
     "bytes": "^3.0.0",
     "chokidar": "^3.4.2",
     "commander": "^9.0.0",
     "@types/bencode": "^2.0.0",
     "@types/bluebird": "^3.5.33",
     "@types/body-parser": "^1.16.3",
-    "@types/bull": "^3.15.0",
     "@types/bytes": "^3.0.0",
     "@types/chai": "^4.0.4",
     "@types/chai-json-schema": "^1.4.3",
index aaf1ea021c3946b0017053d2c599055264ad00c7..3b9353e2ff8657cf721614124b2ae77bfeaf0f64 100644 (file)
--- a/server.ts
+++ b/server.ts
@@ -352,6 +352,7 @@ async function startApplication () {
 
   process.on('exit', () => {
     JobQueue.Instance.terminate()
+      .catch(err => logger.error('Cannot terminate job queue.', { err }))
   })
 
   process.on('SIGINT', () => process.exit(0))
index c61b7362f22a8277bbf5cd49161c5056d836f9ec..6a53e308363e522f4b4aed3da9b4d3b1faa629db 100644 (file)
@@ -1,3 +1,4 @@
+import { Job as BullJob } from 'bullmq'
 import express from 'express'
 import { HttpStatusCode, Job, JobState, JobType, ResultList, UserRight } from '@shared/models'
 import { isArray } from '../../helpers/custom-validators/misc'
@@ -25,7 +26,7 @@ jobsRouter.post('/pause',
 jobsRouter.post('/resume',
   authenticate,
   ensureUserHasRight(UserRight.MANAGE_JOBS),
-  asyncMiddleware(resumeJobQueue)
+  resumeJobQueue
 )
 
 jobsRouter.get('/:state?',
@@ -54,8 +55,8 @@ async function pauseJobQueue (req: express.Request, res: express.Response) {
   return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
 }
 
-async function resumeJobQueue (req: express.Request, res: express.Response) {
-  await JobQueue.Instance.resume()
+function resumeJobQueue (req: express.Request, res: express.Response) {
+  JobQueue.Instance.resume()
 
   return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
 }
@@ -82,7 +83,7 @@ async function listJobs (req: express.Request, res: express.Response) {
   return res.json(result)
 }
 
-async function formatJob (job: any, state?: JobState): Promise<Job> {
+async function formatJob (job: BullJob, state?: JobState): Promise<Job> {
   const error = isArray(job.stacktrace) && job.stacktrace.length !== 0
     ? job.stacktrace[0]
     : null
@@ -90,9 +91,9 @@ async function formatJob (job: any, state?: JobState): Promise<Job> {
   return {
     id: job.id,
     state: state || await job.getState(),
-    type: job.queue.name as JobType,
+    type: job.queueName as JobType,
     data: job.data,
-    progress: await job.progress(),
+    progress: job.progress as number,
     priority: job.opts.priority,
     error,
     createdAt: new Date(job.timestamp),
index 65a7321fd22d5c9e0ee192f55585633bd20496ea..1545a223252af382a855c4f5097c96d054eec696 100644 (file)
@@ -199,7 +199,7 @@ async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfo
     const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id }
 
     const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
-    await job.finished()
+    await JobQueue.Instance.waitJob(job)
   }
 
   const hls = video.getHLSPlaylist()
@@ -208,7 +208,7 @@ async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfo
     const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id }
 
     const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
-    await job.finished()
+    await JobQueue.Instance.waitJob(job)
   }
 
   // Refresh video since files have changed
index 3ce66c9cad11327c4258aa8438503afadc0dc890..4a9d7b61952b51ad8381057c8cde1d3496ff0b87 100644 (file)
@@ -17,6 +17,7 @@ import {
 import { VideoPathManager } from '@server/lib/video-path-manager'
 import { buildNextVideoState } from '@server/lib/video-state'
 import { openapiOperationDoc } from '@server/middlewares/doc'
+import { VideoSourceModel } from '@server/models/video/video-source'
 import { MVideoFile, MVideoFullLight } from '@server/types/models'
 import { getLowercaseExtension } from '@shared/core-utils'
 import { isAudioFile, uuidToShort } from '@shared/extra-utils'
@@ -44,7 +45,6 @@ import {
 import { ScheduleVideoUpdateModel } from '../../../models/video/schedule-video-update'
 import { VideoModel } from '../../../models/video/video'
 import { VideoFileModel } from '../../../models/video/video-file'
-import { VideoSourceModel } from '@server/models/video/video-source'
 
 const lTags = loggerTagsFactory('api', 'video')
 const auditLogger = auditLoggerFactory('videos')
@@ -270,7 +270,7 @@ async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoF
   const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' }
 
   const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
-  await job.finished()
+  await JobQueue.Instance.waitJob(job)
 
   const refreshedVideo = await VideoModel.loadFull(video.id)
   if (!refreshedVideo) return
index f6777ecd51610d6996a0fc6f90ef8763d6b352f1..c168b3e919ed1dca50df9f5f29f2114889a846eb 100644 (file)
@@ -2,7 +2,7 @@ import { JobState } from '../../../shared/models'
 import { exists } from './misc'
 import { jobTypes } from '@server/lib/job-queue/job-queue'
 
-const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed', 'paused' ]
+const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed', 'paused', 'waiting-children' ]
 
 function isValidJobState (value: JobState) {
   return exists(value) && jobStates.includes(value)
index ee338889cb498995b1393470c4e19bed5729ce73..b0198989930ce853cf6b83e70f9e05a6af3d29ae 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import ffmpeg, { FfmpegCommand } from 'fluent-ffmpeg'
 import { execPromise } from '@server/helpers/core-utils'
 import { logger, loggerTagsFactory } from '@server/helpers/logger'
@@ -81,7 +81,7 @@ async function runCommand (options: {
       command.on('progress', progress => {
         if (!progress.percent) return
 
-        job.progress(Math.round(progress.percent))
+        job.updateProgress(Math.round(progress.percent))
           .catch(err => logger.warn('Cannot set ffmpeg job progress.', { err, ...lTags() }))
       })
     }
index f84157e0f7dcd7a41b5831be43fcb9c9a8030f91..7a81a131310fcf0108d92c32edb0aeb7603d4092 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { FfmpegCommand } from 'fluent-ffmpeg'
 import { readFile, writeFile } from 'fs-extra'
 import { dirname } from 'path'
index 8165a289df94aa80c2800ae99be56d638bf09191..db43c59bebcffdc328801c6fde1937bdeb0626c2 100644 (file)
@@ -1,4 +1,4 @@
-import { CronRepeatOptions, EveryRepeatOptions } from 'bull'
+import { RepeatOptions } from 'bullmq'
 import { randomBytes } from 'crypto'
 import { invert } from 'lodash'
 import { join } from 'path'
@@ -197,7 +197,7 @@ const JOB_TTL: { [id in JobType]: number } = {
   'manage-video-torrent': 1000 * 3600 * 3, // 3 hours
   'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours
 }
-const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = {
+const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = {
   'videos-views-stats': {
     cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
   },
index 3d7dc6fb97582e7096aec469a977c36e03849dd1..84c0a2de2bcafb9486782e41b553524daefee3b9 100644 (file)
@@ -1,5 +1,5 @@
 import { map } from 'bluebird'
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import {
   isAnnounceActivityValid,
   isDislikeActivityValid,
index 2ee98171cfd98e96066186b7a9d3ee1baddce055..944da5be17c4dada7e7aed19c072b63f5ebea250 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url'
 import { ActivitypubFollowPayload } from '@shared/models'
 import { sanitizeHost } from '../../../helpers/core-utils'
index 709e8501f1f603d664b38e0ff9d20ff740b8f29c..354c608fb6212421c422e6b5f98fcac0f0690cbb 100644 (file)
@@ -1,5 +1,5 @@
 import { map } from 'bluebird'
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
 import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
 import { ActivitypubHttpBroadcastPayload } from '@shared/models'
index de533de6c5ef7ce40c07161447b6416410c2e67e..e0b8418872d75d78fc67f142779dfa8d112b424b 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models'
 import { logger } from '../../../helpers/logger'
 import { VideoModel } from '../../../models/video/video'
index 99bcd3e8d5e11ab31281cc3973e59b1ca2b5d804..837a597a58aeb56cda164b75120c18edc3953d40 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
 import { ActivitypubHttpUnicastPayload } from '@shared/models'
 import { logger } from '../../../helpers/logger'
index 92ceed18094009b7ca088525228e3b2f338bfc9d..600f858a0811026b6ed1ca744981766037ee80a0 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists'
 import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos'
 import { loadVideoByUrl } from '@server/lib/model-loaders'
index 9d5a65376a3982e72c1b7bd12b43db726d089381..4a5bad9fb9272877dce9fb57fd6caca423ad925b 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors'
 import { ActorModel } from '@server/models/actor/actor'
 import { ActorKeysPayload } from '@shared/models'
index 6fc1caa84bf1a2e64f215458993f178e30e7f44d..b5b9475b1db2c7cf2070464878c25fb624e144a2 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { EmailPayload } from '@shared/models'
 import { logger } from '../../../helpers/logger'
 import { Emailer } from '../../emailer'
index dfd4e6140bfe4894e8aa16a945375f352e433304..4505ca79ebb085f9814a5aa21e6255f877a0c95c 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent'
 import { VideoModel } from '@server/models/video/video'
 import { VideoFileModel } from '@server/models/video/video-file'
index 49064052c1661f0dabbd402dde7bb2395fff6d5e..d608fd865232db27e25b8a9398707fa62685c13e 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { remove } from 'fs-extra'
 import { join } from 'path'
 import { logger, loggerTagsFactory } from '@server/helpers/logger'
index 71c5444afffa014c7bb6857b6bb91f51e8ccaa41..40c44cf52ce19b8e15b8fdd44edb0469ee8b00c4 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { copy, stat } from 'fs-extra'
 import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
 import { CONFIG } from '@server/initializers/config'
index 4cde26aef968c4bb82500d9729542126a9b1dcc6..e5cd35865fdbfb4bafcbe0e87b37b36bac60dffa 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { move, remove, stat } from 'fs-extra'
 import { retryTransactionWrapper } from '@server/helpers/database-utils'
 import { YoutubeDLWrapper } from '@server/helpers/youtube-dl'
index 78d0b21923ea1e2541f8bd1913ba9f2c5aca2764..79002258cd292153bdf66e369b7f0c4299f24797 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { readdir, remove } from 'fs-extra'
 import { join } from 'path'
 import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg'
index 9cb7a65891b07be7f583ba3a34e6ac56ae3f1dfb..75ab2cd02ec6ab513ddf38c6f31056055cc3b060 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler'
 import { VideoRedundancyPayload } from '@shared/models'
 import { logger } from '../../../helpers/logger'
index 735150d57fecd0b26b13b95233882e2886f52a3f..0782435381b684d899928fe669b0f81b9177e569 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { move, remove } from 'fs-extra'
 import { join } from 'path'
 import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg'
index 4e5e979190758b3adc89f779fa9f2787c6ffc84f..8dbae8c423490a1679da4506544f382bf35b2c0a 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg'
 import { Hooks } from '@server/lib/plugins/hooks'
 import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
index 0ae325f4d979713e0ea28e635d7b2ded6047487a..0cf5d53ce015d3ece080dad3c44ccd910b1ee84f 100644 (file)
@@ -1,7 +1,19 @@
-import Bull, { Job, JobOptions, Queue } from 'bull'
+import {
+  Job,
+  JobsOptions,
+  Queue,
+  QueueEvents,
+  QueueEventsOptions,
+  QueueOptions,
+  QueueScheduler,
+  QueueSchedulerOptions,
+  Worker,
+  WorkerOptions
+} from 'bullmq'
 import { jobStates } from '@server/helpers/custom-validators/jobs'
 import { CONFIG } from '@server/initializers/config'
 import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
+import { timeoutPromise } from '@shared/core-utils'
 import {
   ActivitypubFollowPayload,
   ActivitypubHttpBroadcastPayload,
@@ -120,7 +132,11 @@ class JobQueue {
 
   private static instance: JobQueue
 
+  private workers: { [id in JobType]?: Worker } = {}
   private queues: { [id in JobType]?: Queue } = {}
+  private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
+  private queueEvents: { [id in JobType]?: QueueEvents } = {}
+
   private initialized = false
   private jobRedisPrefix: string
 
@@ -134,75 +150,131 @@ class JobQueue {
 
     this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
 
-    const queueOptions: Bull.QueueOptions = {
+    for (const handlerName of (Object.keys(handlers) as JobType[])) {
+      this.buildWorker(handlerName, produceOnly)
+      this.buildQueue(handlerName)
+      this.buildQueueScheduler(handlerName, produceOnly)
+      this.buildQueueEvent(handlerName, produceOnly)
+    }
+
+    this.addRepeatableJobs()
+  }
+
+  private buildWorker (handlerName: JobType, produceOnly: boolean) {
+    const workerOptions: WorkerOptions = {
+      autorun: !produceOnly,
+      concurrency: this.getJobConcurrency(handlerName),
       prefix: this.jobRedisPrefix,
-      redis: {
-        password: CONFIG.REDIS.AUTH,
-        db: CONFIG.REDIS.DB,
-        host: CONFIG.REDIS.HOSTNAME,
-        port: CONFIG.REDIS.PORT,
-        path: CONFIG.REDIS.SOCKET
-      },
-      settings: {
-        maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts
-      }
+      connection: this.getRedisConnection()
     }
 
-    for (const handlerName of (Object.keys(handlers) as JobType[])) {
-      const queue = new Bull(handlerName, queueOptions)
+    const handler = function (job: Job) {
+      const timeout = JOB_TTL[handlerName]
+      const p = handlers[handlerName](job)
 
-      if (produceOnly) {
-        queue.pause(true)
-             .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err }))
-      }
+      if (!timeout) return p
 
-      const handler = handlers[handlerName]
+      return timeoutPromise(p, timeout)
+    }
 
-      queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => {
-        const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
+    const processor = async (jobArg: Job<any>) => {
+      const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
 
-        return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
-      }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err }))
+      return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
+    }
 
-      queue.on('failed', (job, err) => {
-        const logLevel = silentFailure.has(handlerName)
-          ? 'debug'
-          : 'error'
+    const worker = new Worker(handlerName, processor, workerOptions)
 
-        logger.log(logLevel, 'Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err })
+    worker.on('failed', (job, err) => {
+      const logLevel = silentFailure.has(handlerName)
+        ? 'debug'
+        : 'error'
 
-        if (errorHandlers[job.name]) {
-          errorHandlers[job.name](job, err)
-            .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
-        }
-      })
+      logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
 
-      queue.on('error', err => {
-        logger.error('Error in job queue %s.', handlerName, { err })
-      })
+      if (errorHandlers[job.name]) {
+        errorHandlers[job.name](job, err)
+          .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
+      }
+    })
 
-      this.queues[handlerName] = queue
+    worker.on('error', err => {
+      logger.error('Error in job queue %s.', handlerName, { err })
+    })
+
+    this.workers[handlerName] = worker
+  }
+
+  private buildQueue (handlerName: JobType) {
+    const queueOptions: QueueOptions = {
+      connection: this.getRedisConnection(),
+      prefix: this.jobRedisPrefix
     }
 
-    this.addRepeatableJobs()
+    this.queues[handlerName] = new Queue(handlerName, queueOptions)
+  }
+
+  private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) {
+    const queueSchedulerOptions: QueueSchedulerOptions = {
+      autorun: !produceOnly,
+      connection: this.getRedisConnection(),
+      prefix: this.jobRedisPrefix,
+      maxStalledCount: 10
+    }
+    this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions)
   }
 
-  terminate () {
-    for (const queueName of Object.keys(this.queues)) {
-      const queue = this.queues[queueName]
-      queue.close()
+  private buildQueueEvent (handlerName: JobType, produceOnly: boolean) {
+    const queueEventsOptions: QueueEventsOptions = {
+      autorun: !produceOnly,
+      connection: this.getRedisConnection(),
+      prefix: this.jobRedisPrefix
     }
+    this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions)
+  }
+
+  private getRedisConnection () {
+    return {
+      password: CONFIG.REDIS.AUTH,
+      db: CONFIG.REDIS.DB,
+      host: CONFIG.REDIS.HOSTNAME,
+      port: CONFIG.REDIS.PORT,
+      path: CONFIG.REDIS.SOCKET
+    }
+  }
+
+  async terminate () {
+    const promises = Object.keys(this.workers)
+      .map(handlerName => {
+        const worker: Worker = this.workers[handlerName]
+        const queue: Queue = this.queues[handlerName]
+        const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
+        const queueEvent: QueueEvents = this.queueEvents[handlerName]
+
+        return Promise.all([
+          worker.close(false),
+          queue.close(),
+          queueScheduler.close(),
+          queueEvent.close()
+        ])
+      })
+
+    return Promise.all(promises)
   }
 
   async pause () {
-    for (const handler of Object.keys(this.queues)) {
-      await this.queues[handler].pause(true)
+    for (const handler of Object.keys(this.workers)) {
+      const worker: Worker = this.workers[handler]
+
+      await worker.pause()
     }
   }
 
-  async resume () {
-    for (const handler of Object.keys(this.queues)) {
-      await this.queues[handler].resume(true)
+  resume () {
+    for (const handler of Object.keys(this.workers)) {
+      const worker: Worker = this.workers[handler]
+
+      worker.resume()
     }
   }
 
@@ -211,22 +283,21 @@ class JobQueue {
         .catch(err => logger.error('Cannot create job.', { err, obj }))
   }
 
-  createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
+  async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
     const queue: Queue = this.queues[obj.type]
     if (queue === undefined) {
       logger.error('Unknown queue %s: cannot create job.', obj.type)
       return
     }
 
-    const jobArgs: JobOptions = {
+    const jobArgs: JobsOptions = {
       backoff: { delay: 60 * 1000, type: 'exponential' },
       attempts: JOB_ATTEMPTS[obj.type],
-      timeout: JOB_TTL[obj.type],
       priority: options.priority,
       delay: options.delay
     }
 
-    return queue.add(obj.payload, jobArgs)
+    return queue.add('job', obj.payload, jobArgs)
   }
 
   async listForApi (options: {
@@ -244,7 +315,8 @@ class JobQueue {
     const filteredJobTypes = this.filterJobTypes(jobType)
 
     for (const jobType of filteredJobTypes) {
-      const queue = this.queues[jobType]
+      const queue: Queue = this.queues[jobType]
+
       if (queue === undefined) {
         logger.error('Unknown queue %s to list jobs.', jobType)
         continue
@@ -297,18 +369,22 @@ class JobQueue {
 
   async removeOldJobs () {
     for (const key of Object.keys(this.queues)) {
-      const queue = this.queues[key]
-      await queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
+      const queue: Queue = this.queues[key]
+      await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed')
     }
   }
 
+  waitJob (job: Job) {
+    return job.waitUntilFinished(this.queueEvents[job.queueName])
+  }
+
   private addRepeatableJobs () {
-    this.queues['videos-views-stats'].add({}, {
+    this.queues['videos-views-stats'].add('job', {}, {
       repeat: REPEAT_JOBS['videos-views-stats']
     }).catch(err => logger.error('Cannot add repeatable job.', { err }))
 
     if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
-      this.queues['activitypub-cleaner'].add({}, {
+      this.queues['activitypub-cleaner'].add('job', {}, {
         repeat: REPEAT_JOBS['activitypub-cleaner']
       }).catch(err => logger.error('Cannot add repeatable job.', { err }))
     }
index 070c7ebda77018f914158d5367d38a2c414a7fca..07eee4122d3ad761eb34b841d51fb328156cb5d0 100644 (file)
@@ -1,4 +1,4 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { copyFile, ensureDir, move, remove, stat } from 'fs-extra'
 import { basename, extname as extnameUtil, join } from 'path'
 import { toEven } from '@server/helpers/core-utils'
index 7ef9d60b6a80948e0cbc5ebd4554961d547265c5..dc0db9074825594d3298e0c48b376b6dd16acc4d 100644 (file)
@@ -6,7 +6,20 @@ function isCatchable (value: any) {
   return value && typeof value.catch === 'function'
 }
 
+function timeoutPromise <T> (promise: Promise<T>, timeoutMs: number) {
+  let timer: ReturnType<typeof setTimeout>
+
+  return Promise.race([
+    promise,
+
+    new Promise((_res, rej) => {
+      timer = setTimeout(() => rej(new Error('Timeout')), timeoutMs)
+    })
+  ]).finally(() => clearTimeout(timer))
+}
+
 export {
   isPromise,
-  isCatchable
+  isCatchable,
+  timeoutPromise
 }
index ac10ea9646c057d2729281ce5ce7ce75e05fab86..a924183f2f7a43965a657f6bb11e615b9719efd9 100644 (file)
@@ -4,7 +4,7 @@ import { VideoResolution } from '../videos/file/video-resolution.enum'
 import { VideoStudioTaskCut } from '../videos/studio'
 import { SendEmailOptions } from './emailer.model'
 
-export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed' | 'paused'
+export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed' | 'paused' | 'waiting-children'
 
 export type JobType =
   | 'activitypub-http-unicast'
@@ -27,8 +27,8 @@ export type JobType =
   | 'video-studio-edition'
 
 export interface Job {
-  id: number
-  state: JobState
+  id: number | string
+  state: JobState | 'unknown'
   type: JobType
   data: any
   priority: number
index db5433be5ff349a90486fbfc87f920373e19950e..d16fd026c2b51c28d5eb03c6b63b1c035ef38125 100644 (file)
--- a/yarn.lock
+++ b/yarn.lock
     "@types/connect" "*"
     "@types/node" "*"
 
-"@types/bull@^3.15.0":
-  version "3.15.8"
-  resolved "https://registry.yarnpkg.com/@types/bull/-/bull-3.15.8.tgz#ae2139f94490d740b37c8da5d828ce75dd82ce7c"
-  integrity sha512-8DbSPMSsZH5PWPnGEkAZLYgJEH4ghHJNKF7LB6Wr5R0/v6g+Vs+JoaA7kcvLtHE936xg2WpFPkaoaJgExOmKDw==
-  dependencies:
-    "@types/ioredis" "*"
-    "@types/redis" "^2.8.0"
-
 "@types/bytes@^3.0.0":
   version "3.1.1"
   resolved "https://registry.yarnpkg.com/@types/bytes/-/bytes-3.1.1.tgz#67a876422e660dc4c10a27f3e5bcfbd5455f01d0"
   resolved "https://registry.yarnpkg.com/@types/http-cache-semantics/-/http-cache-semantics-4.0.1.tgz#0ea7b61496902b95890dc4c3a116b60cb8dae812"
   integrity sha512-SZs7ekbP8CN0txVG2xVRH6EgKmEm31BOxA07vkFaETzZz1xh+cbt8BcI0slpymvwhx5dlFnQG2rTlPVQn+iRPQ==
 
-"@types/ioredis@*":
-  version "4.28.10"
-  resolved "https://registry.yarnpkg.com/@types/ioredis/-/ioredis-4.28.10.tgz#40ceb157a4141088d1394bb87c98ed09a75a06ff"
-  integrity sha512-69LyhUgrXdgcNDv7ogs1qXZomnfOEnSmrmMFqKgt1XMJxmoOSG/u3wYy13yACIfKuMJ8IhKgHafDO3sx19zVQQ==
-  dependencies:
-    "@types/node" "*"
-
 "@types/json-buffer@~3.0.0":
   version "3.0.0"
   resolved "https://registry.yarnpkg.com/@types/json-buffer/-/json-buffer-3.0.0.tgz#85c1ff0f0948fc159810d4b5be35bf8c20875f64"
   resolved "https://registry.yarnpkg.com/@types/range-parser/-/range-parser-1.2.4.tgz#cd667bcfdd025213aafb7ca5915a932590acdcdc"
   integrity sha512-EEhsLsD6UsDM1yFhAvy0Cjr6VwmpMWqFBCb9w07wVugF7w9nfajxLuVmngTIpgS6svCnm6Vaw+MZhoDCKnOfsw==
 
-"@types/redis@^2.8.0":
-  version "2.8.32"
-  resolved "https://registry.yarnpkg.com/@types/redis/-/redis-2.8.32.tgz#1d3430219afbee10f8cfa389dad2571a05ecfb11"
-  integrity sha512-7jkMKxcGq9p242exlbsVzuJb57KqHRhNl4dHoQu2Y5v9bCAbtIXXH0R3HleSQW4CTOqpHIYUW3t6tpUj4BVQ+w==
-  dependencies:
-    "@types/node" "*"
-
 "@types/request@^2.0.3":
   version "2.48.8"
   resolved "https://registry.yarnpkg.com/@types/request/-/request-2.48.8.tgz#0b90fde3b655ab50976cb8c5ac00faca22f5a82c"
@@ -3178,20 +3156,20 @@ builtins@^5.0.1:
   dependencies:
     semver "^7.0.0"
 
-bull@^4.1.0:
-  version "4.8.4"
-  resolved "https://registry.yarnpkg.com/bull/-/bull-4.8.4.tgz#c538610492050d5160dbd9180704145f135a0aa9"
-  integrity sha512-vDNhM/pvfFY3+msulMbqPBdBO7ntKxRZRtMfi3EguVW/Ozo4uez+B81I8ZoDxYCLgSOBfwRuPnFtcv7QNzm4Ew==
+bullmq@^1.87.0:
+  version "1.87.0"
+  resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-1.87.0.tgz#e93618302f547239fbb85ee47f7f1f2c3d0c5eef"
+  integrity sha512-oN44FaiWJDviWBNx3V8o4FQBdHrfVHRwJuYvU4HnWpBVdCKd6HMbKqF+XeuuxcqBPbbf7cl6hThoKZ+9iTCOkA==
   dependencies:
     cron-parser "^4.2.1"
-    debuglog "^1.0.0"
     get-port "^5.1.1"
+    glob "^7.2.0"
     ioredis "^4.28.5"
     lodash "^4.17.21"
-    msgpackr "^1.5.2"
-    p-timeout "^3.2.0"
-    semver "^7.3.2"
-    uuid "^8.3.0"
+    msgpackr "^1.4.6"
+    semver "^7.3.7"
+    tslib "^1.14.1"
+    uuid "^8.3.2"
 
 busboy@^1.0.0:
   version "1.6.0"
@@ -3856,11 +3834,6 @@ debug@^3.2.7:
   dependencies:
     ms "^2.1.1"
 
-debuglog@^1.0.0:
-  version "1.0.1"
-  resolved "https://registry.yarnpkg.com/debuglog/-/debuglog-1.0.1.tgz#aa24ffb9ac3df9a2351837cfb2d279360cd78492"
-  integrity sha512-syBZ+rnAK3EgMsH2aYEOLUW7mZSY9Gb+0wUMCFsZvcmiz+HigA0LOcq/HoQqVuGG+EKykunc7QG2bzrponfaSw==
-
 decamelize@^1.2.0:
   version "1.2.0"
   resolved "https://registry.yarnpkg.com/decamelize/-/decamelize-1.2.0.tgz#f6534d15148269b20352e7bee26f501f9a191290"
@@ -5169,7 +5142,7 @@ glob@7.2.0:
     once "^1.3.0"
     path-is-absolute "^1.0.0"
 
-glob@^7.1.3:
+glob@^7.1.3, glob@^7.2.0:
   version "7.2.3"
   resolved "https://registry.yarnpkg.com/glob/-/glob-7.2.3.tgz#b8df0fb802bbfa8e89bd1d938b4e16578ed44f2b"
   integrity sha512-nFR0zLpU2YCaRxwoCJvL6UvCH2JFyFVIvwTLsIf21AuHlMskA1hhTdk+LlYJtOlYt9v6dvszD2BGRqBL+iQK9Q==
@@ -6696,10 +6669,10 @@ msgpackr-extract@^2.0.2:
     "@msgpackr-extract/msgpackr-extract-linux-x64" "2.0.2"
     "@msgpackr-extract/msgpackr-extract-win32-x64" "2.0.2"
 
-msgpackr@^1.5.2:
-  version "1.6.1"
-  resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.6.1.tgz#4f3c94d6a5b819b838ffc736eddaf60eba436d20"
-  integrity sha512-Je+xBEfdjtvA4bKaOv8iRhjC8qX2oJwpYH4f7JrG4uMVJVmnmkAT4pjKdbztKprGj3iwjcxPzb5umVZ02Qq3tA==
+msgpackr@^1.4.6:
+  version "1.6.2"
+  resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.6.2.tgz#176cd9f6b4437dad87a839b37f23c2dfee408d9a"
+  integrity sha512-bqSQ0DYJbXbrJcrZFmMygUZmqQiDfI2ewFVWcrZY12w5XHWtPuW4WppDT/e63Uu311ajwkRRXSoF0uILroBeTA==
   optionalDependencies:
     msgpackr-extract "^2.0.2"
 
@@ -9054,7 +9027,7 @@ tsconfig-paths@^4.0.0:
     minimist "^1.2.6"
     strip-bom "^3.0.0"
 
-tslib@^1.11.1, tslib@^1.8.1:
+tslib@^1.11.1, tslib@^1.14.1, tslib@^1.8.1:
   version "1.14.1"
   resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00"
   integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==
@@ -9277,7 +9250,7 @@ uuid-parse@^1.1.0:
   resolved "https://registry.yarnpkg.com/uuid-parse/-/uuid-parse-1.1.0.tgz#7061c5a1384ae0e1f943c538094597e1b5f3a65b"
   integrity sha512-OdmXxA8rDsQ7YpNVbKSJkNzTw2I+S5WsbMDnCtIWSQaosNAcWtFuI/YK1TjzUI6nbkgiqEyh8gWngfcv8Asd9A==
 
-uuid@^8.3.0, uuid@^8.3.2:
+uuid@^8.3.2:
   version "8.3.2"
   resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2"
   integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==