]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Migrate to bull
authorChocobozzz <me@florianbigard.com>
Tue, 10 Jul 2018 15:02:20 +0000 (17:02 +0200)
committerChocobozzz <me@florianbigard.com>
Wed, 11 Jul 2018 12:00:17 +0000 (14:00 +0200)
21 files changed:
client/src/app/+admin/jobs/jobs-list/jobs-list.component.html
client/src/app/+admin/jobs/jobs-list/jobs-list.component.ts
client/src/app/+admin/jobs/shared/job.service.ts
scripts/clean/server/test.sh
server/controllers/api/jobs.ts
server/helpers/custom-validators/jobs.ts
server/initializers/constants.ts
server/initializers/migrations/0230-kue-to-bull.ts [new file with mode: 0644]
server/lib/job-queue/handlers/activitypub-follow.ts
server/lib/job-queue/handlers/activitypub-http-broadcast.ts
server/lib/job-queue/handlers/activitypub-http-fetcher.ts
server/lib/job-queue/handlers/activitypub-http-unicast.ts
server/lib/job-queue/handlers/email.ts
server/lib/job-queue/handlers/video-file.ts
server/lib/job-queue/job-queue.ts
server/lib/redis.ts
server/tests/api/server/handle-down.ts
server/tests/api/server/jobs.ts
server/tests/real-world/real-world.ts
server/tests/utils/server/jobs.ts
shared/models/server/job.model.ts

index 20c35cb5b7c49a154661c45ed9a57c7b16be1c62..b52d026a72f6a0c6f55be0ad79af38f9794f27a0 100644 (file)
@@ -9,7 +9,7 @@
 </div>
 
 <p-table
-  [value]="jobs" [lazy]="true" [paginator]="true" [totalRecords]="totalRecords" [rows]="rowsPerPage" dataKey="id"
+  [value]="jobs" [lazy]="true" [paginator]="true" [totalRecords]="totalRecords" [rows]="rowsPerPage" dataKey="uniqId"
   [sortField]="sort.field" [sortOrder]="sort.order" (onLazyLoad)="loadLazy($event)"
 >
   <ng-template pTemplate="header">
@@ -19,7 +19,8 @@
       <th i18n style="width: 210px">Type</th>
       <th i18n style="width: 130px">State</th>
       <th i18n style="width: 250px" pSortableColumn="createdAt">Created <p-sortIcon field="createdAt"></p-sortIcon></th>
-      <th i18n style="width: 250px">Updated</th>
+      <th i18n style="width: 250px">Processed on</th>
+      <th i18n style="width: 250px">Finished on</th>
     </tr>
   </ng-template>
 
       <td>{{ job.type }}</td>
       <td>{{ job.state }}</td>
       <td>{{ job.createdAt }}</td>
-      <td>{{ job.updatedAt }}</td>
+      <td>{{ job.processedOn }}</td>
+      <td>{{ job.finishedOn }}</td>
     </tr>
   </ng-template>
 
   <ng-template pTemplate="rowexpansion" let-job>
     <tr>
-      <td colspan="6">
+      <td colspan="7">
         <pre>{{ job.data }}</pre>
       </td>
     </tr>
     <tr class="job-error" *ngIf="job.error">
-      <td colspan="6">
+      <td colspan="7">
         <pre>{{ job.error }}</pre>
       </td>
     </tr>
index 29dd9f31c3cad9f127c828ef410d35aa0e864f7d..a77f4a4a140aace6f511e9794d9fc81cce4772c7 100644 (file)
@@ -17,8 +17,8 @@ import { I18n } from '@ngx-translate/i18n-polyfill'
 export class JobsListComponent extends RestTable implements OnInit {
   private static JOB_STATE_LOCAL_STORAGE_STATE = 'jobs-list-state'
 
-  jobState: JobState = 'inactive'
-  jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ]
+  jobState: JobState = 'waiting'
+  jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
   jobs: Job[] = []
   totalRecords: number
   rowsPerPage = 10
index 6441eaac1f380e65652c0f35030452e3f4c22d1c..b96dc335958dcc6f81742e6546a1e02d1e7e1b22 100644 (file)
@@ -25,8 +25,11 @@ export class JobService {
 
     return this.authHttp.get<ResultList<Job>>(JobService.BASE_JOB_URL + '/' + state, { params })
                .pipe(
-                 map(res => this.restExtractor.convertResultListDateToHuman(res, [ 'createdAt', 'updatedAt' ])),
+                 map(res => {
+                   return this.restExtractor.convertResultListDateToHuman(res, [ 'createdAt', 'processedOn', 'finishedOn' ])
+                 }),
                  map(res => this.restExtractor.applyToResultListData(res, this.prettyPrintData)),
+                 map(res => this.restExtractor.applyToResultListData(res, this.buildUniqId)),
                  catchError(err => this.restExtractor.handleError(err))
                )
   }
@@ -36,4 +39,8 @@ export class JobService {
 
     return Object.assign(obj, { data })
   }
+
+  private buildUniqId (obj: Job) {
+    return Object.assign(obj, { uniqId: `${obj.id}-${obj.type}` })
+  }
 }
index 303806fe2bf94e939a28745148149c8b7c96b5b5..753b8c67eb32724633868679f26c58b612eed0a8 100755 (executable)
@@ -8,5 +8,5 @@ for i in $(seq 1 6); do
   rm -f "./config/local-test.json"
   rm -f "./config/local-test-$i.json"
   createdb -O peertube "peertube_test$i"
-  redis-cli KEYS "q-localhost:900$i*" | grep -v empty | xargs --no-run-if-empty redis-cli DEL
+  redis-cli KEYS "bull-localhost:900$i*" | grep -v empty | xargs --no-run-if-empty redis-cli DEL
 done
index aa58a9144ea4472db750337219939cdb7f78b5ae..c19596dde2c5518c2048616c38721afab19e1d4b 100644 (file)
@@ -13,6 +13,7 @@ import {
 } from '../../middlewares'
 import { paginationValidator } from '../../middlewares/validators'
 import { listJobsValidator } from '../../middlewares/validators/jobs'
+import { isArray } from '../../helpers/custom-validators/misc'
 
 const jobsRouter = express.Router()
 
@@ -36,26 +37,30 @@ export {
 // ---------------------------------------------------------------------------
 
 async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) {
-  const sort = req.query.sort === 'createdAt' ? 'ASC' : 'DESC'
+  const state: JobState = req.params.state
+  const asc = req.query.sort === 'createdAt'
 
-  const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort)
-  const total = await JobQueue.Instance.count(req.params.state)
+  const jobs = await JobQueue.Instance.listForApi(state, req.query.start, req.query.count, asc)
+  const total = await JobQueue.Instance.count(state)
 
   const result: ResultList<any> = {
     total,
-    data: jobs.map(j => formatJob(j.toJSON()))
+    data: jobs.map(j => formatJob(j, state))
   }
   return res.json(result)
 }
 
-function formatJob (job: any): Job {
+function formatJob (job: any, state: JobState): Job {
+  const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 ? job.stacktrace[0] : null
+
   return {
     id: job.id,
-    state: job.state as JobState,
-    type: job.type as JobType,
+    state: state,
+    type: job.queue.name as JobType,
     data: job.data,
-    error: job.error,
-    createdAt: new Date(parseInt(job.created_at, 10)),
-    updatedAt: new Date(parseInt(job.updated_at, 10))
+    error,
+    createdAt: new Date(job.timestamp),
+    finishedOn: new Date(job.finishedOn),
+    processedOn: new Date(job.processedOn)
   }
 }
index 9700fbd12246e7171bcaaafa800e361e64e0e792..1cc6e6912b3668e02d9495329e7e8076506ae0d0 100644 (file)
@@ -1,7 +1,7 @@
 import { JobState } from '../../../shared/models'
 import { exists } from './misc'
 
-const jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ]
+const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ]
 
 function isValidJobState (value: JobState) {
   return exists(value) && jobStates.indexOf(value) !== -1
index 24b7e265595a49938b6d18236678a494f734a5bd..6173e1298d6afd17b7b5690dfc10c07a1dace0fc 100644 (file)
@@ -14,7 +14,7 @@ let config: IConfig = require('config')
 
 // ---------------------------------------------------------------------------
 
-const LAST_MIGRATION_VERSION = 225
+const LAST_MIGRATION_VERSION = 230
 
 // ---------------------------------------------------------------------------
 
diff --git a/server/initializers/migrations/0230-kue-to-bull.ts b/server/initializers/migrations/0230-kue-to-bull.ts
new file mode 100644 (file)
index 0000000..5fad87a
--- /dev/null
@@ -0,0 +1,63 @@
+import * as Sequelize from 'sequelize'
+import { createClient } from 'redis'
+import { CONFIG } from '../constants'
+import { JobQueue } from '../../lib/job-queue'
+import { initDatabaseModels } from '../database'
+
+async function up (utils: {
+  transaction: Sequelize.Transaction
+  queryInterface: Sequelize.QueryInterface
+  sequelize: Sequelize.Sequelize
+}): Promise<any> {
+  await initDatabaseModels(false)
+
+  return new Promise((res, rej) => {
+    const client = createClient({
+      host: CONFIG.REDIS.HOSTNAME,
+      port: CONFIG.REDIS.PORT,
+      db: CONFIG.REDIS.DB
+    })
+
+    const jobsPrefix = 'q-' + CONFIG.WEBSERVER.HOST
+
+    client.sort(jobsPrefix + ':jobs:inactive', 'by', 'alpha', 'ASC', (err, jobStrings) => {
+      if (err) return rej(err)
+
+      const jobPromises = jobStrings
+        .map(s => s.split('|'))
+        .map(([ , jobId ]) => {
+          return new Promise((res, rej) => {
+            client.hgetall(jobsPrefix + ':job:' + jobId, (err, job) => {
+              if (err) return rej(err)
+
+              try {
+                const parsedData = JSON.parse(job.data)
+
+                return res({ type: job.type, payload: parsedData })
+              } catch (err) {
+                console.error('Cannot parse data %s.', job.data)
+                return res(null)
+              }
+            })
+          })
+        })
+
+      JobQueue.Instance.init()
+              .then(() => Promise.all(jobPromises))
+              .then((jobs: any) => {
+                const createJobPromises = jobs
+                  .filter(job => job !== null)
+                  .map(job => JobQueue.Instance.createJob(job))
+
+                return Promise.all(createJobPromises)
+              })
+              .then(() => res())
+    })
+  })
+}
+
+function down (options) {
+  throw new Error('Not implemented.')
+}
+
+export { up, down }
index 286e343f2b4736a83f8c73c0b1d7d7e5fa97f2a9..2c1b4f49d960b720c4a9888958e5fbf5585ada8b 100644 (file)
@@ -1,4 +1,4 @@
-import * as kue from 'kue'
+import * as Bull from 'bull'
 import { logger } from '../../../helpers/logger'
 import { getServerActor } from '../../../helpers/utils'
 import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers'
@@ -14,7 +14,7 @@ export type ActivitypubFollowPayload = {
   host: string
 }
 
-async function processActivityPubFollow (job: kue.Job) {
+async function processActivityPubFollow (job: Bull.Job) {
   const payload = job.data as ActivitypubFollowPayload
   const host = payload.host
 
index d8b8ec22206d439e7aa85b39221db97ed5cfa33e..03a9e12a410e96900528b29cf30dbee164141c34 100644 (file)
@@ -1,4 +1,4 @@
-import * as kue from 'kue'
+import * as Bull from 'bull'
 import * as Bluebird from 'bluebird'
 import { logger } from '../../../helpers/logger'
 import { doRequest } from '../../../helpers/requests'
@@ -12,7 +12,7 @@ export type ActivitypubHttpBroadcastPayload = {
   body: any
 }
 
-async function processActivityPubHttpBroadcast (job: kue.Job) {
+async function processActivityPubHttpBroadcast (job: Bull.Job) {
   logger.info('Processing ActivityPub broadcast in job %d.', job.id)
 
   const payload = job.data as ActivitypubHttpBroadcastPayload
index 10c0e606f6123afb6b938d055c90291a74bcded2..f21da087e9b80e127c4aac21eded92a1b4071a52 100644 (file)
@@ -1,4 +1,4 @@
-import * as kue from 'kue'
+import * as Bull from 'bull'
 import { logger } from '../../../helpers/logger'
 import { processActivities } from '../../activitypub/process'
 import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
@@ -9,7 +9,7 @@ export type ActivitypubHttpFetcherPayload = {
   uris: string[]
 }
 
-async function processActivityPubHttpFetcher (job: kue.Job) {
+async function processActivityPubHttpFetcher (job: Bull.Job) {
   logger.info('Processing ActivityPub fetcher in job %d.', job.id)
 
   const payload = job.data as ActivitypubHttpBroadcastPayload
index 173f3bb522cdfaa9a5d78fd0f88082c76e6368c1..c90d735f6f7271bfd58bfb5f99a33985b4619697 100644 (file)
@@ -1,4 +1,4 @@
-import * as kue from 'kue'
+import * as Bull from 'bull'
 import { logger } from '../../../helpers/logger'
 import { doRequest } from '../../../helpers/requests'
 import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
@@ -11,7 +11,7 @@ export type ActivitypubHttpUnicastPayload = {
   body: any
 }
 
-async function processActivityPubHttpUnicast (job: kue.Job) {
+async function processActivityPubHttpUnicast (job: Bull.Job) {
   logger.info('Processing ActivityPub unicast in job %d.', job.id)
 
   const payload = job.data as ActivitypubHttpUnicastPayload
index 9d76861167d40e3847720000ecae6e26954ff9e0..73d98ae548ffa9b54b11ff70f623db667cd85e4b 100644 (file)
@@ -1,4 +1,4 @@
-import * as kue from 'kue'
+import * as Bull from 'bull'
 import { logger } from '../../../helpers/logger'
 import { Emailer } from '../../emailer'
 
@@ -8,7 +8,7 @@ export type EmailPayload = {
   text: string
 }
 
-async function processEmail (job: kue.Job) {
+async function processEmail (job: Bull.Job) {
   const payload = job.data as EmailPayload
   logger.info('Processing email in job %d.', job.id)
 
index fc40527c7ab1c68573a0de5e31f5515cd3f411a9..bd68dd78b8d6f0685b2815663aed84465bc0d5e1 100644 (file)
@@ -1,4 +1,4 @@
-import * as kue from 'kue'
+import * as Bull from 'bull'
 import { VideoResolution, VideoState } from '../../../../shared'
 import { logger } from '../../../helpers/logger'
 import { computeResolutionsToTranscode } from '../../../helpers/utils'
@@ -7,6 +7,7 @@ import { JobQueue } from '../job-queue'
 import { federateVideoIfNeeded } from '../../activitypub'
 import { retryTransactionWrapper } from '../../../helpers/database-utils'
 import { sequelizeTypescript } from '../../../initializers'
+import * as Bluebird from 'bluebird'
 
 export type VideoFilePayload = {
   videoUUID: string
@@ -20,7 +21,7 @@ export type VideoFileImportPayload = {
   filePath: string
 }
 
-async function processVideoFileImport (job: kue.Job) {
+async function processVideoFileImport (job: Bull.Job) {
   const payload = job.data as VideoFileImportPayload
   logger.info('Processing video file import in job %d.', job.id)
 
@@ -37,7 +38,7 @@ async function processVideoFileImport (job: kue.Job) {
   return video
 }
 
-async function processVideoFile (job: kue.Job) {
+async function processVideoFile (job: Bull.Job) {
   const payload = job.data as VideoFilePayload
   logger.info('Processing video file in job %d.', job.id)
 
@@ -109,7 +110,7 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole
     )
 
     if (resolutionsEnabled.length !== 0) {
-      const tasks: Promise<any>[] = []
+      const tasks: Bluebird<any>[] = []
 
       for (const resolution of resolutionsEnabled) {
         const dataInput = {
index 695fe0eea2bec605b5a1f573e10414397174c6c1..77aaa7fa8a55d6fcb4398aaee80cba6a51463a5f 100644 (file)
@@ -1,13 +1,12 @@
-import * as kue from 'kue'
+import * as Bull from 'bull'
 import { JobState, JobType } from '../../../shared/models'
 import { logger } from '../../helpers/logger'
 import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers'
-import { Redis } from '../redis'
 import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
 import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
 import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
 import { EmailPayload, processEmail } from './handlers/email'
-import { processVideoFile, processVideoFileImport, VideoFilePayload, VideoFileImportPayload } from './handlers/video-file'
+import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file'
 import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
 
 type CreateJobArgument =
@@ -19,7 +18,7 @@ type CreateJobArgument =
   { type: 'video-file', payload: VideoFilePayload } |
   { type: 'email', payload: EmailPayload }
 
-const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
+const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
   'activitypub-http-broadcast': processActivityPubHttpBroadcast,
   'activitypub-http-unicast': processActivityPubHttpUnicast,
   'activitypub-http-fetcher': processActivityPubHttpFetcher,
@@ -29,18 +28,28 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
   'email': processEmail
 }
 
-const jobsWithTLL: JobType[] = [
+const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = {
+  'activitypub-http-broadcast': true,
+  'activitypub-http-unicast': true,
+  'activitypub-http-fetcher': true,
+  'activitypub-follow': true
+}
+
+const jobTypes: JobType[] = [
+  'activitypub-follow',
   'activitypub-http-broadcast',
-  'activitypub-http-unicast',
   'activitypub-http-fetcher',
-  'activitypub-follow'
+  'activitypub-http-unicast',
+  'email',
+  'video-file',
+  'video-file-import'
 ]
 
 class JobQueue {
 
   private static instance: JobQueue
 
-  private jobQueue: kue.Queue
+  private queues: { [ id in JobType ]?: Bull.Queue } = {}
   private initialized = false
   private jobRedisPrefix: string
 
@@ -51,9 +60,8 @@ class JobQueue {
     if (this.initialized === true) return
     this.initialized = true
 
-    this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST
-
-    this.jobQueue = kue.createQueue({
+    this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST
+    const queueOptions = {
       prefix: this.jobRedisPrefix,
       redis: {
         host: CONFIG.REDIS.HOSTNAME,
@@ -61,120 +69,94 @@ class JobQueue {
         auth: CONFIG.REDIS.AUTH,
         db: CONFIG.REDIS.DB
       }
-    })
-
-    this.jobQueue.setMaxListeners(20)
+    }
 
-    this.jobQueue.on('error', err => {
-      logger.error('Error in job queue.', { err })
-      process.exit(-1)
-    })
-    this.jobQueue.watchStuckJobs(5000)
+    for (const handlerName of Object.keys(handlers)) {
+      const queue = new Bull(handlerName, queueOptions)
+      const handler = handlers[handlerName]
 
-    await this.reactiveStuckJobs()
+      queue.process(JOB_CONCURRENCY[handlerName], handler)
+        .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err }))
 
-    for (const handlerName of Object.keys(handlers)) {
-      this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
-        try {
-          const res = await handlers[ handlerName ](job)
-          return done(null, res)
-        } catch (err) {
-          logger.error('Cannot execute job %d.', job.id, { err })
-          return done(err)
-        }
+      queue.on('error', err => {
+        logger.error('Error in job queue %s.', handlerName, { err })
+        process.exit(-1)
       })
+
+      this.queues[handlerName] = queue
     }
   }
 
-  createJob (obj: CreateJobArgument, priority = 'normal') {
-    return new Promise((res, rej) => {
-      let job = this.jobQueue
-        .create(obj.type, obj.payload)
-        .priority(priority)
-        .attempts(JOB_ATTEMPTS[obj.type])
-        .backoff({ delay: 60 * 1000, type: 'exponential' })
+  createJob (obj: CreateJobArgument) {
+    const queue = this.queues[obj.type]
+    if (queue === undefined) {
+      logger.error('Unknown queue %s: cannot create job.', obj.type)
+      return
+    }
 
-      if (jobsWithTLL.indexOf(obj.type) !== -1) {
-        job = job.ttl(JOB_REQUEST_TTL)
-      }
+    const jobArgs: Bull.JobOptions = {
+      backoff: { delay: 60 * 1000, type: 'exponential' },
+      attempts: JOB_ATTEMPTS[obj.type]
+    }
 
-      return job.save(err => {
-        if (err) return rej(err)
+    if (jobsWithRequestTimeout[obj.type] === true) {
+      jobArgs.timeout = JOB_REQUEST_TTL
+    }
 
-        return res()
-      })
-    })
+    return queue.add(obj.payload, jobArgs)
   }
 
-  async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise<kue.Job[]> {
-    const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count)
+  async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise<Bull.Job[]> {
+    let results: Bull.Job[] = []
 
-    const jobPromises = jobStrings
-      .map(s => s.split('|'))
-      .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10)))
+    // TODO: optimize
+    for (const jobType of jobTypes) {
+      const queue = this.queues[ jobType ]
+      if (queue === undefined) {
+        logger.error('Unknown queue %s to list jobs.', jobType)
+        continue
+      }
 
-    return Promise.all(jobPromises)
-  }
+      // FIXME: Bull queue typings does not have getJobs method
+      const jobs = await (queue as any).getJobs(state, 0, start + count, asc)
+      results = results.concat(jobs)
+    }
 
-  count (state: JobState) {
-    return new Promise<number>((res, rej) => {
-      this.jobQueue[state + 'Count']((err, total) => {
-        if (err) return rej(err)
+    results.sort((j1: any, j2: any) => {
+      if (j1.timestamp < j2.timestamp) return -1
+      else if (j1.timestamp === j2.timestamp) return 0
 
-        return res(total)
-      })
+      return 1
     })
-  }
 
-  removeOldJobs () {
-    const now = new Date().getTime()
-    kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
-      if (err) {
-        logger.error('Cannot get jobs when removing old jobs.', { err })
-        return
-      }
+    if (asc === false) results.reverse()
 
-      for (const job of jobs) {
-        if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
-          job.remove()
-        }
-      }
-    })
+    return results.slice(start, start + count)
   }
 
-  private reactiveStuckJobs () {
-    const promises: Promise<any>[] = []
-
-    this.jobQueue.active((err, ids) => {
-      if (err) throw err
+  async count (state: JobState): Promise<number> {
+    let total = 0
 
-      for (const id of ids) {
-        kue.Job.get(id, (err, job) => {
-          if (err) throw err
+    for (const type of jobTypes) {
+      const queue = this.queues[ type ]
+      if (queue === undefined) {
+        logger.error('Unknown queue %s to count jobs.', type)
+        continue
+      }
 
-          const p = new Promise((res, rej) => {
-            job.inactive(err => {
-              if (err) return rej(err)
-              return res()
-            })
-          })
+      const counts = await queue.getJobCounts()
 
-          promises.push(p)
-        })
-      }
-    })
+      total += counts[ state ]
+    }
 
-    return Promise.all(promises)
+    return total
   }
 
-  private getJob (id: number) {
-    return new Promise<kue.Job>((res, rej) => {
-      kue.Job.get(id, (err, job) => {
-        if (err) return rej(err)
-
-        return res(job)
-      })
-    })
+  removeOldJobs () {
+    for (const key of Object.keys(this.queues)) {
+      const queue = this.queues[key]
+      queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
+    }
   }
 
   static get Instance () {
index 5bd55109c7b7c05065a9dfca1228b05752f15aff..78b28986a0b4955f65286f90231fdf1b2156132f 100644 (file)
@@ -78,16 +78,6 @@ class Redis {
     return this.setObject(this.buildCachedRouteKey(req), cached, lifetime)
   }
 
-  listJobs (jobsPrefix: string, state: string, mode: 'alpha', order: 'ASC' | 'DESC', offset: number, count: number) {
-    return new Promise<string[]>((res, rej) => {
-      this.client.sort(jobsPrefix + ':jobs:' + state, 'by', mode, order, 'LIMIT', offset.toString(), count.toString(), (err, values) => {
-        if (err) return rej(err)
-
-        return res(values)
-      })
-    })
-  }
-
   generateResetPasswordKey (userId: number) {
     return 'reset-password-' + userId
   }
index 69609b4fc5d1c1d85c20316f142e6f703d8ee0c0..84d310ae628e21b4ad7ab92c7acf4ea5c59178cf 100644 (file)
@@ -6,15 +6,21 @@ import { JobState } from '../../../../shared/models'
 import { VideoPrivacy } from '../../../../shared/models/videos'
 import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model'
 import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils'
-
 import {
-  flushAndRunMultipleServers, flushTests, getVideosList, killallServers, ServerInfo, setAccessTokensToServers, uploadVideo,
+  flushAndRunMultipleServers,
+  getVideosList,
+  killallServers,
+  ServerInfo,
+  setAccessTokensToServers,
+  uploadVideo,
   wait
 } from '../../utils/index'
 import { follow, getFollowersListPaginationAndSort } from '../../utils/server/follows'
 import { getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs'
 import {
-  addVideoCommentReply, addVideoCommentThread, getVideoCommentThreads,
+  addVideoCommentReply,
+  addVideoCommentThread,
+  getVideoCommentThreads,
   getVideoThreadComments
 } from '../../utils/videos/video-comments'
 
@@ -146,7 +152,7 @@ describe('Test handle downs', function () {
   })
 
   it('Should not have pending/processing jobs anymore', async function () {
-    const states: JobState[] = [ 'inactive', 'active' ]
+    const states: JobState[] = [ 'waiting', 'active' ]
 
     for (const state of states) {
       const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt')
index 81e389de66839c964e72f2b16a926df6bdf892c7..f248c55219f7de300c7ce57d09a31e8c296b0d84 100644 (file)
@@ -2,7 +2,7 @@
 
 import * as chai from 'chai'
 import 'mocha'
-import { flushTests, killallServers, ServerInfo, setAccessTokensToServers, wait } from '../../utils/index'
+import { killallServers, ServerInfo, setAccessTokensToServers } from '../../utils/index'
 import { doubleFollow } from '../../utils/server/follows'
 import { getJobsList, getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs'
 import { flushAndRunMultipleServers } from '../../utils/server/servers'
@@ -35,22 +35,23 @@ describe('Test jobs', function () {
   })
 
   it('Should list jobs', async function () {
-    const res = await getJobsList(servers[1].url, servers[1].accessToken, 'complete')
+    const res = await getJobsList(servers[1].url, servers[1].accessToken, 'completed')
     expect(res.body.total).to.be.above(2)
     expect(res.body.data).to.have.length.above(2)
   })
 
   it('Should list jobs with sort and pagination', async function () {
-    const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'complete', 1, 1, 'createdAt')
+    const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'completed', 1, 1, 'createdAt')
     expect(res.body.total).to.be.above(2)
     expect(res.body.data).to.have.lengthOf(1)
 
     const job = res.body.data[0]
 
-    expect(job.state).to.equal('complete')
+    expect(job.state).to.equal('completed')
     expect(job.type).to.equal('activitypub-http-unicast')
     expect(dateIsValid(job.createdAt)).to.be.true
-    expect(dateIsValid(job.updatedAt)).to.be.true
+    expect(dateIsValid(job.processedOn)).to.be.true
+    expect(dateIsValid(job.finishedOn)).to.be.true
   })
 
   after(async function () {
index b7375f77826a84fef48fa2cedfa643ebbda42295..a96469b11181be6782ebdf872e370401b06a9f69 100644 (file)
@@ -347,7 +347,7 @@ function goodbye () {
 }
 
 async function isTherePendingRequests (servers: ServerInfo[]) {
-  const states: JobState[] = [ 'inactive', 'active', 'delayed' ]
+  const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
   const tasks: Promise<any>[] = []
   let pendingRequests = false
 
index 375e76f93411ca8214a66f122887ab0eabfa5ddb..c9cb8d3a3ce36aa4234d902ca633fff7b1a85cfb 100644 (file)
@@ -33,7 +33,7 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
   if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ]
   else servers = serversArg as ServerInfo[]
 
-  const states: JobState[] = [ 'inactive', 'active', 'delayed' ]
+  const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
   const tasks: Promise<any>[] = []
   let pendingRequests: boolean
 
index 7d8d39a1953dd339122e94d709086923e4e749c0..a38a8aa3b9b236715b505e686970322de0c0c95b 100644 (file)
@@ -1,4 +1,4 @@
-export type JobState = 'active' | 'complete' | 'failed' | 'inactive' | 'delayed'
+export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed'
 
 export type JobType = 'activitypub-http-unicast' |
   'activitypub-http-broadcast' |
@@ -15,5 +15,6 @@ export interface Job {
   data: any,
   error: any,
   createdAt: Date
-  updatedAt: Date
+  finishedOn: Date
+  processedOn: Date
 }