From 94831479f5facff9469540a3d49dd347b88bdf5a Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 10 Jul 2018 17:02:20 +0200 Subject: [PATCH] Migrate to bull --- .../jobs/jobs-list/jobs-list.component.html | 12 +- .../jobs/jobs-list/jobs-list.component.ts | 4 +- .../src/app/+admin/jobs/shared/job.service.ts | 9 +- scripts/clean/server/test.sh | 2 +- server/controllers/api/jobs.ts | 25 ++- server/helpers/custom-validators/jobs.ts | 2 +- server/initializers/constants.ts | 2 +- .../migrations/0230-kue-to-bull.ts | 63 +++++++ .../job-queue/handlers/activitypub-follow.ts | 4 +- .../handlers/activitypub-http-broadcast.ts | 4 +- .../handlers/activitypub-http-fetcher.ts | 4 +- .../handlers/activitypub-http-unicast.ts | 4 +- server/lib/job-queue/handlers/email.ts | 4 +- server/lib/job-queue/handlers/video-file.ts | 9 +- server/lib/job-queue/job-queue.ts | 178 ++++++++---------- server/lib/redis.ts | 10 - server/tests/api/server/handle-down.ts | 14 +- server/tests/api/server/jobs.ts | 11 +- server/tests/real-world/real-world.ts | 2 +- server/tests/utils/server/jobs.ts | 2 +- shared/models/server/job.model.ts | 5 +- 21 files changed, 214 insertions(+), 156 deletions(-) create mode 100644 server/initializers/migrations/0230-kue-to-bull.ts diff --git a/client/src/app/+admin/jobs/jobs-list/jobs-list.component.html b/client/src/app/+admin/jobs/jobs-list/jobs-list.component.html index 20c35cb5b..b52d026a7 100644 --- a/client/src/app/+admin/jobs/jobs-list/jobs-list.component.html +++ b/client/src/app/+admin/jobs/jobs-list/jobs-list.component.html @@ -9,7 +9,7 @@ @@ -19,7 +19,8 @@ Type State Created - Updated + Processed on + Finished on @@ -34,18 +35,19 @@ {{ job.type }} {{ job.state }} {{ job.createdAt }} - {{ job.updatedAt }} + {{ job.processedOn }} + {{ job.finishedOn }} - +
{{ job.data }}
- +
{{ job.error }}
diff --git a/client/src/app/+admin/jobs/jobs-list/jobs-list.component.ts b/client/src/app/+admin/jobs/jobs-list/jobs-list.component.ts index 29dd9f31c..a77f4a4a1 100644 --- a/client/src/app/+admin/jobs/jobs-list/jobs-list.component.ts +++ b/client/src/app/+admin/jobs/jobs-list/jobs-list.component.ts @@ -17,8 +17,8 @@ import { I18n } from '@ngx-translate/i18n-polyfill' export class JobsListComponent extends RestTable implements OnInit { private static JOB_STATE_LOCAL_STORAGE_STATE = 'jobs-list-state' - jobState: JobState = 'inactive' - jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ] + jobState: JobState = 'waiting' + jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ] jobs: Job[] = [] totalRecords: number rowsPerPage = 10 diff --git a/client/src/app/+admin/jobs/shared/job.service.ts b/client/src/app/+admin/jobs/shared/job.service.ts index 6441eaac1..b96dc3359 100644 --- a/client/src/app/+admin/jobs/shared/job.service.ts +++ b/client/src/app/+admin/jobs/shared/job.service.ts @@ -25,8 +25,11 @@ export class JobService { return this.authHttp.get>(JobService.BASE_JOB_URL + '/' + state, { params }) .pipe( - map(res => this.restExtractor.convertResultListDateToHuman(res, [ 'createdAt', 'updatedAt' ])), + map(res => { + return this.restExtractor.convertResultListDateToHuman(res, [ 'createdAt', 'processedOn', 'finishedOn' ]) + }), map(res => this.restExtractor.applyToResultListData(res, this.prettyPrintData)), + map(res => this.restExtractor.applyToResultListData(res, this.buildUniqId)), catchError(err => this.restExtractor.handleError(err)) ) } @@ -36,4 +39,8 @@ export class JobService { return Object.assign(obj, { data }) } + + private buildUniqId (obj: Job) { + return Object.assign(obj, { uniqId: `${obj.id}-${obj.type}` }) + } } diff --git a/scripts/clean/server/test.sh b/scripts/clean/server/test.sh index 303806fe2..753b8c67e 100755 --- a/scripts/clean/server/test.sh +++ b/scripts/clean/server/test.sh @@ -8,5 +8,5 @@ for i in $(seq 1 6); do rm -f "./config/local-test.json" rm -f "./config/local-test-$i.json" createdb -O peertube "peertube_test$i" - redis-cli KEYS "q-localhost:900$i*" | grep -v empty | xargs --no-run-if-empty redis-cli DEL + redis-cli KEYS "bull-localhost:900$i*" | grep -v empty | xargs --no-run-if-empty redis-cli DEL done diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts index aa58a9144..c19596dde 100644 --- a/server/controllers/api/jobs.ts +++ b/server/controllers/api/jobs.ts @@ -13,6 +13,7 @@ import { } from '../../middlewares' import { paginationValidator } from '../../middlewares/validators' import { listJobsValidator } from '../../middlewares/validators/jobs' +import { isArray } from '../../helpers/custom-validators/misc' const jobsRouter = express.Router() @@ -36,26 +37,30 @@ export { // --------------------------------------------------------------------------- async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) { - const sort = req.query.sort === 'createdAt' ? 'ASC' : 'DESC' + const state: JobState = req.params.state + const asc = req.query.sort === 'createdAt' - const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort) - const total = await JobQueue.Instance.count(req.params.state) + const jobs = await JobQueue.Instance.listForApi(state, req.query.start, req.query.count, asc) + const total = await JobQueue.Instance.count(state) const result: ResultList = { total, - data: jobs.map(j => formatJob(j.toJSON())) + data: jobs.map(j => formatJob(j, state)) } return res.json(result) } -function formatJob (job: any): Job { +function formatJob (job: any, state: JobState): Job { + const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 ? job.stacktrace[0] : null + return { id: job.id, - state: job.state as JobState, - type: job.type as JobType, + state: state, + type: job.queue.name as JobType, data: job.data, - error: job.error, - createdAt: new Date(parseInt(job.created_at, 10)), - updatedAt: new Date(parseInt(job.updated_at, 10)) + error, + createdAt: new Date(job.timestamp), + finishedOn: new Date(job.finishedOn), + processedOn: new Date(job.processedOn) } } diff --git a/server/helpers/custom-validators/jobs.ts b/server/helpers/custom-validators/jobs.ts index 9700fbd12..1cc6e6912 100644 --- a/server/helpers/custom-validators/jobs.ts +++ b/server/helpers/custom-validators/jobs.ts @@ -1,7 +1,7 @@ import { JobState } from '../../../shared/models' import { exists } from './misc' -const jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ] +const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ] function isValidJobState (value: JobState) { return exists(value) && jobStates.indexOf(value) !== -1 diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 24b7e2655..6173e1298 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -14,7 +14,7 @@ let config: IConfig = require('config') // --------------------------------------------------------------------------- -const LAST_MIGRATION_VERSION = 225 +const LAST_MIGRATION_VERSION = 230 // --------------------------------------------------------------------------- diff --git a/server/initializers/migrations/0230-kue-to-bull.ts b/server/initializers/migrations/0230-kue-to-bull.ts new file mode 100644 index 000000000..5fad87a61 --- /dev/null +++ b/server/initializers/migrations/0230-kue-to-bull.ts @@ -0,0 +1,63 @@ +import * as Sequelize from 'sequelize' +import { createClient } from 'redis' +import { CONFIG } from '../constants' +import { JobQueue } from '../../lib/job-queue' +import { initDatabaseModels } from '../database' + +async function up (utils: { + transaction: Sequelize.Transaction + queryInterface: Sequelize.QueryInterface + sequelize: Sequelize.Sequelize +}): Promise { + await initDatabaseModels(false) + + return new Promise((res, rej) => { + const client = createClient({ + host: CONFIG.REDIS.HOSTNAME, + port: CONFIG.REDIS.PORT, + db: CONFIG.REDIS.DB + }) + + const jobsPrefix = 'q-' + CONFIG.WEBSERVER.HOST + + client.sort(jobsPrefix + ':jobs:inactive', 'by', 'alpha', 'ASC', (err, jobStrings) => { + if (err) return rej(err) + + const jobPromises = jobStrings + .map(s => s.split('|')) + .map(([ , jobId ]) => { + return new Promise((res, rej) => { + client.hgetall(jobsPrefix + ':job:' + jobId, (err, job) => { + if (err) return rej(err) + + try { + const parsedData = JSON.parse(job.data) + + return res({ type: job.type, payload: parsedData }) + } catch (err) { + console.error('Cannot parse data %s.', job.data) + return res(null) + } + }) + }) + }) + + JobQueue.Instance.init() + .then(() => Promise.all(jobPromises)) + .then((jobs: any) => { + const createJobPromises = jobs + .filter(job => job !== null) + .map(job => JobQueue.Instance.createJob(job)) + + return Promise.all(createJobPromises) + }) + .then(() => res()) + }) + }) +} + +function down (options) { + throw new Error('Not implemented.') +} + +export { up, down } diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 286e343f2..2c1b4f49d 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts @@ -1,4 +1,4 @@ -import * as kue from 'kue' +import * as Bull from 'bull' import { logger } from '../../../helpers/logger' import { getServerActor } from '../../../helpers/utils' import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers' @@ -14,7 +14,7 @@ export type ActivitypubFollowPayload = { host: string } -async function processActivityPubFollow (job: kue.Job) { +async function processActivityPubFollow (job: Bull.Job) { const payload = job.data as ActivitypubFollowPayload const host = payload.host diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index d8b8ec222..03a9e12a4 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -1,4 +1,4 @@ -import * as kue from 'kue' +import * as Bull from 'bull' import * as Bluebird from 'bluebird' import { logger } from '../../../helpers/logger' import { doRequest } from '../../../helpers/requests' @@ -12,7 +12,7 @@ export type ActivitypubHttpBroadcastPayload = { body: any } -async function processActivityPubHttpBroadcast (job: kue.Job) { +async function processActivityPubHttpBroadcast (job: Bull.Job) { logger.info('Processing ActivityPub broadcast in job %d.', job.id) const payload = job.data as ActivitypubHttpBroadcastPayload diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index 10c0e606f..f21da087e 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts @@ -1,4 +1,4 @@ -import * as kue from 'kue' +import * as Bull from 'bull' import { logger } from '../../../helpers/logger' import { processActivities } from '../../activitypub/process' import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' @@ -9,7 +9,7 @@ export type ActivitypubHttpFetcherPayload = { uris: string[] } -async function processActivityPubHttpFetcher (job: kue.Job) { +async function processActivityPubHttpFetcher (job: Bull.Job) { logger.info('Processing ActivityPub fetcher in job %d.', job.id) const payload = job.data as ActivitypubHttpBroadcastPayload diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 173f3bb52..c90d735f6 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts @@ -1,4 +1,4 @@ -import * as kue from 'kue' +import * as Bull from 'bull' import { logger } from '../../../helpers/logger' import { doRequest } from '../../../helpers/requests' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' @@ -11,7 +11,7 @@ export type ActivitypubHttpUnicastPayload = { body: any } -async function processActivityPubHttpUnicast (job: kue.Job) { +async function processActivityPubHttpUnicast (job: Bull.Job) { logger.info('Processing ActivityPub unicast in job %d.', job.id) const payload = job.data as ActivitypubHttpUnicastPayload diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index 9d7686116..73d98ae54 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts @@ -1,4 +1,4 @@ -import * as kue from 'kue' +import * as Bull from 'bull' import { logger } from '../../../helpers/logger' import { Emailer } from '../../emailer' @@ -8,7 +8,7 @@ export type EmailPayload = { text: string } -async function processEmail (job: kue.Job) { +async function processEmail (job: Bull.Job) { const payload = job.data as EmailPayload logger.info('Processing email in job %d.', job.id) diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index fc40527c7..bd68dd78b 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts @@ -1,4 +1,4 @@ -import * as kue from 'kue' +import * as Bull from 'bull' import { VideoResolution, VideoState } from '../../../../shared' import { logger } from '../../../helpers/logger' import { computeResolutionsToTranscode } from '../../../helpers/utils' @@ -7,6 +7,7 @@ import { JobQueue } from '../job-queue' import { federateVideoIfNeeded } from '../../activitypub' import { retryTransactionWrapper } from '../../../helpers/database-utils' import { sequelizeTypescript } from '../../../initializers' +import * as Bluebird from 'bluebird' export type VideoFilePayload = { videoUUID: string @@ -20,7 +21,7 @@ export type VideoFileImportPayload = { filePath: string } -async function processVideoFileImport (job: kue.Job) { +async function processVideoFileImport (job: Bull.Job) { const payload = job.data as VideoFileImportPayload logger.info('Processing video file import in job %d.', job.id) @@ -37,7 +38,7 @@ async function processVideoFileImport (job: kue.Job) { return video } -async function processVideoFile (job: kue.Job) { +async function processVideoFile (job: Bull.Job) { const payload = job.data as VideoFilePayload logger.info('Processing video file in job %d.', job.id) @@ -109,7 +110,7 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole ) if (resolutionsEnabled.length !== 0) { - const tasks: Promise[] = [] + const tasks: Bluebird[] = [] for (const resolution of resolutionsEnabled) { const dataInput = { diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 695fe0eea..77aaa7fa8 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -1,13 +1,12 @@ -import * as kue from 'kue' +import * as Bull from 'bull' import { JobState, JobType } from '../../../shared/models' import { logger } from '../../helpers/logger' import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' -import { Redis } from '../redis' import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' import { EmailPayload, processEmail } from './handlers/email' -import { processVideoFile, processVideoFileImport, VideoFilePayload, VideoFileImportPayload } from './handlers/video-file' +import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' type CreateJobArgument = @@ -19,7 +18,7 @@ type CreateJobArgument = { type: 'video-file', payload: VideoFilePayload } | { type: 'email', payload: EmailPayload } -const handlers: { [ id in JobType ]: (job: kue.Job) => Promise} = { +const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, @@ -29,18 +28,28 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise} = { 'email': processEmail } -const jobsWithTLL: JobType[] = [ +const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = { + 'activitypub-http-broadcast': true, + 'activitypub-http-unicast': true, + 'activitypub-http-fetcher': true, + 'activitypub-follow': true +} + +const jobTypes: JobType[] = [ + 'activitypub-follow', 'activitypub-http-broadcast', - 'activitypub-http-unicast', 'activitypub-http-fetcher', - 'activitypub-follow' + 'activitypub-http-unicast', + 'email', + 'video-file', + 'video-file-import' ] class JobQueue { private static instance: JobQueue - private jobQueue: kue.Queue + private queues: { [ id in JobType ]?: Bull.Queue } = {} private initialized = false private jobRedisPrefix: string @@ -51,9 +60,8 @@ class JobQueue { if (this.initialized === true) return this.initialized = true - this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST - - this.jobQueue = kue.createQueue({ + this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST + const queueOptions = { prefix: this.jobRedisPrefix, redis: { host: CONFIG.REDIS.HOSTNAME, @@ -61,120 +69,94 @@ class JobQueue { auth: CONFIG.REDIS.AUTH, db: CONFIG.REDIS.DB } - }) - - this.jobQueue.setMaxListeners(20) + } - this.jobQueue.on('error', err => { - logger.error('Error in job queue.', { err }) - process.exit(-1) - }) - this.jobQueue.watchStuckJobs(5000) + for (const handlerName of Object.keys(handlers)) { + const queue = new Bull(handlerName, queueOptions) + const handler = handlers[handlerName] - await this.reactiveStuckJobs() + queue.process(JOB_CONCURRENCY[handlerName], handler) + .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err })) - for (const handlerName of Object.keys(handlers)) { - this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { - try { - const res = await handlers[ handlerName ](job) - return done(null, res) - } catch (err) { - logger.error('Cannot execute job %d.', job.id, { err }) - return done(err) - } + queue.on('error', err => { + logger.error('Error in job queue %s.', handlerName, { err }) + process.exit(-1) }) + + this.queues[handlerName] = queue } } - createJob (obj: CreateJobArgument, priority = 'normal') { - return new Promise((res, rej) => { - let job = this.jobQueue - .create(obj.type, obj.payload) - .priority(priority) - .attempts(JOB_ATTEMPTS[obj.type]) - .backoff({ delay: 60 * 1000, type: 'exponential' }) + createJob (obj: CreateJobArgument) { + const queue = this.queues[obj.type] + if (queue === undefined) { + logger.error('Unknown queue %s: cannot create job.', obj.type) + return + } - if (jobsWithTLL.indexOf(obj.type) !== -1) { - job = job.ttl(JOB_REQUEST_TTL) - } + const jobArgs: Bull.JobOptions = { + backoff: { delay: 60 * 1000, type: 'exponential' }, + attempts: JOB_ATTEMPTS[obj.type] + } - return job.save(err => { - if (err) return rej(err) + if (jobsWithRequestTimeout[obj.type] === true) { + jobArgs.timeout = JOB_REQUEST_TTL + } - return res() - }) - }) + return queue.add(obj.payload, jobArgs) } - async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise { - const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) + async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise { + let results: Bull.Job[] = [] - const jobPromises = jobStrings - .map(s => s.split('|')) - .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) + // TODO: optimize + for (const jobType of jobTypes) { + const queue = this.queues[ jobType ] + if (queue === undefined) { + logger.error('Unknown queue %s to list jobs.', jobType) + continue + } - return Promise.all(jobPromises) - } + // FIXME: Bull queue typings does not have getJobs method + const jobs = await (queue as any).getJobs(state, 0, start + count, asc) + results = results.concat(jobs) + } - count (state: JobState) { - return new Promise((res, rej) => { - this.jobQueue[state + 'Count']((err, total) => { - if (err) return rej(err) + results.sort((j1: any, j2: any) => { + if (j1.timestamp < j2.timestamp) return -1 + else if (j1.timestamp === j2.timestamp) return 0 - return res(total) - }) + return 1 }) - } - removeOldJobs () { - const now = new Date().getTime() - kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { - if (err) { - logger.error('Cannot get jobs when removing old jobs.', { err }) - return - } + if (asc === false) results.reverse() - for (const job of jobs) { - if (now - job.created_at > JOB_COMPLETED_LIFETIME) { - job.remove() - } - } - }) + return results.slice(start, start + count) } - private reactiveStuckJobs () { - const promises: Promise[] = [] - - this.jobQueue.active((err, ids) => { - if (err) throw err + async count (state: JobState): Promise { + let total = 0 - for (const id of ids) { - kue.Job.get(id, (err, job) => { - if (err) throw err + for (const type of jobTypes) { + const queue = this.queues[ type ] + if (queue === undefined) { + logger.error('Unknown queue %s to count jobs.', type) + continue + } - const p = new Promise((res, rej) => { - job.inactive(err => { - if (err) return rej(err) - return res() - }) - }) + const counts = await queue.getJobCounts() - promises.push(p) - }) - } - }) + total += counts[ state ] + } - return Promise.all(promises) + return total } - private getJob (id: number) { - return new Promise((res, rej) => { - kue.Job.get(id, (err, job) => { - if (err) return rej(err) - - return res(job) - }) - }) + removeOldJobs () { + for (const key of Object.keys(this.queues)) { + const queue = this.queues[key] + queue.clean(JOB_COMPLETED_LIFETIME, 'completed') + } } static get Instance () { diff --git a/server/lib/redis.ts b/server/lib/redis.ts index 5bd55109c..78b28986a 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts @@ -78,16 +78,6 @@ class Redis { return this.setObject(this.buildCachedRouteKey(req), cached, lifetime) } - listJobs (jobsPrefix: string, state: string, mode: 'alpha', order: 'ASC' | 'DESC', offset: number, count: number) { - return new Promise((res, rej) => { - this.client.sort(jobsPrefix + ':jobs:' + state, 'by', mode, order, 'LIMIT', offset.toString(), count.toString(), (err, values) => { - if (err) return rej(err) - - return res(values) - }) - }) - } - generateResetPasswordKey (userId: number) { return 'reset-password-' + userId } diff --git a/server/tests/api/server/handle-down.ts b/server/tests/api/server/handle-down.ts index 69609b4fc..84d310ae6 100644 --- a/server/tests/api/server/handle-down.ts +++ b/server/tests/api/server/handle-down.ts @@ -6,15 +6,21 @@ import { JobState } from '../../../../shared/models' import { VideoPrivacy } from '../../../../shared/models/videos' import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model' import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils' - import { - flushAndRunMultipleServers, flushTests, getVideosList, killallServers, ServerInfo, setAccessTokensToServers, uploadVideo, + flushAndRunMultipleServers, + getVideosList, + killallServers, + ServerInfo, + setAccessTokensToServers, + uploadVideo, wait } from '../../utils/index' import { follow, getFollowersListPaginationAndSort } from '../../utils/server/follows' import { getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs' import { - addVideoCommentReply, addVideoCommentThread, getVideoCommentThreads, + addVideoCommentReply, + addVideoCommentThread, + getVideoCommentThreads, getVideoThreadComments } from '../../utils/videos/video-comments' @@ -146,7 +152,7 @@ describe('Test handle downs', function () { }) it('Should not have pending/processing jobs anymore', async function () { - const states: JobState[] = [ 'inactive', 'active' ] + const states: JobState[] = [ 'waiting', 'active' ] for (const state of states) { const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt') diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts index 81e389de6..f248c5521 100644 --- a/server/tests/api/server/jobs.ts +++ b/server/tests/api/server/jobs.ts @@ -2,7 +2,7 @@ import * as chai from 'chai' import 'mocha' -import { flushTests, killallServers, ServerInfo, setAccessTokensToServers, wait } from '../../utils/index' +import { killallServers, ServerInfo, setAccessTokensToServers } from '../../utils/index' import { doubleFollow } from '../../utils/server/follows' import { getJobsList, getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs' import { flushAndRunMultipleServers } from '../../utils/server/servers' @@ -35,22 +35,23 @@ describe('Test jobs', function () { }) it('Should list jobs', async function () { - const res = await getJobsList(servers[1].url, servers[1].accessToken, 'complete') + const res = await getJobsList(servers[1].url, servers[1].accessToken, 'completed') expect(res.body.total).to.be.above(2) expect(res.body.data).to.have.length.above(2) }) it('Should list jobs with sort and pagination', async function () { - const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'complete', 1, 1, 'createdAt') + const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'completed', 1, 1, 'createdAt') expect(res.body.total).to.be.above(2) expect(res.body.data).to.have.lengthOf(1) const job = res.body.data[0] - expect(job.state).to.equal('complete') + expect(job.state).to.equal('completed') expect(job.type).to.equal('activitypub-http-unicast') expect(dateIsValid(job.createdAt)).to.be.true - expect(dateIsValid(job.updatedAt)).to.be.true + expect(dateIsValid(job.processedOn)).to.be.true + expect(dateIsValid(job.finishedOn)).to.be.true }) after(async function () { diff --git a/server/tests/real-world/real-world.ts b/server/tests/real-world/real-world.ts index b7375f778..a96469b11 100644 --- a/server/tests/real-world/real-world.ts +++ b/server/tests/real-world/real-world.ts @@ -347,7 +347,7 @@ function goodbye () { } async function isTherePendingRequests (servers: ServerInfo[]) { - const states: JobState[] = [ 'inactive', 'active', 'delayed' ] + const states: JobState[] = [ 'waiting', 'active', 'delayed' ] const tasks: Promise[] = [] let pendingRequests = false diff --git a/server/tests/utils/server/jobs.ts b/server/tests/utils/server/jobs.ts index 375e76f93..c9cb8d3a3 100644 --- a/server/tests/utils/server/jobs.ts +++ b/server/tests/utils/server/jobs.ts @@ -33,7 +33,7 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) { if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ] else servers = serversArg as ServerInfo[] - const states: JobState[] = [ 'inactive', 'active', 'delayed' ] + const states: JobState[] = [ 'waiting', 'active', 'delayed' ] const tasks: Promise[] = [] let pendingRequests: boolean diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts index 7d8d39a19..a38a8aa3b 100644 --- a/shared/models/server/job.model.ts +++ b/shared/models/server/job.model.ts @@ -1,4 +1,4 @@ -export type JobState = 'active' | 'complete' | 'failed' | 'inactive' | 'delayed' +export type JobState = 'active' | 'completed' | 'failed' | 'waiting' | 'delayed' export type JobType = 'activitypub-http-unicast' | 'activitypub-http-broadcast' | @@ -15,5 +15,6 @@ export interface Job { data: any, error: any, createdAt: Date - updatedAt: Date + finishedOn: Date + processedOn: Date } -- 2.41.0