diff options
author | Chocobozzz <me@florianbigard.com> | 2018-07-10 17:02:20 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2018-07-11 14:00:17 +0200 |
commit | 94831479f5facff9469540a3d49dd347b88bdf5a (patch) | |
tree | 4e8990fc4fded913952c732b6466b15fc52ab06d /server | |
parent | 2cdf27bae6acfaa0b99bb07555edc57f48b8bc43 (diff) | |
download | PeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.tar.gz PeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.tar.zst PeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.zip |
Migrate to bull
Diffstat (limited to 'server')
-rw-r--r-- | server/controllers/api/jobs.ts | 25 | ||||
-rw-r--r-- | server/helpers/custom-validators/jobs.ts | 2 | ||||
-rw-r--r-- | server/initializers/constants.ts | 2 | ||||
-rw-r--r-- | server/initializers/migrations/0230-kue-to-bull.ts | 63 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-follow.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-broadcast.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-fetcher.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-unicast.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/email.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-file.ts | 9 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 178 | ||||
-rw-r--r-- | server/lib/redis.ts | 10 | ||||
-rw-r--r-- | server/tests/api/server/handle-down.ts | 14 | ||||
-rw-r--r-- | server/tests/api/server/jobs.ts | 11 | ||||
-rw-r--r-- | server/tests/real-world/real-world.ts | 2 | ||||
-rw-r--r-- | server/tests/utils/server/jobs.ts | 2 |
16 files changed, 193 insertions, 145 deletions
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts index aa58a9144..c19596dde 100644 --- a/server/controllers/api/jobs.ts +++ b/server/controllers/api/jobs.ts | |||
@@ -13,6 +13,7 @@ import { | |||
13 | } from '../../middlewares' | 13 | } from '../../middlewares' |
14 | import { paginationValidator } from '../../middlewares/validators' | 14 | import { paginationValidator } from '../../middlewares/validators' |
15 | import { listJobsValidator } from '../../middlewares/validators/jobs' | 15 | import { listJobsValidator } from '../../middlewares/validators/jobs' |
16 | import { isArray } from '../../helpers/custom-validators/misc' | ||
16 | 17 | ||
17 | const jobsRouter = express.Router() | 18 | const jobsRouter = express.Router() |
18 | 19 | ||
@@ -36,26 +37,30 @@ export { | |||
36 | // --------------------------------------------------------------------------- | 37 | // --------------------------------------------------------------------------- |
37 | 38 | ||
38 | async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) { | 39 | async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) { |
39 | const sort = req.query.sort === 'createdAt' ? 'ASC' : 'DESC' | 40 | const state: JobState = req.params.state |
41 | const asc = req.query.sort === 'createdAt' | ||
40 | 42 | ||
41 | const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort) | 43 | const jobs = await JobQueue.Instance.listForApi(state, req.query.start, req.query.count, asc) |
42 | const total = await JobQueue.Instance.count(req.params.state) | 44 | const total = await JobQueue.Instance.count(state) |
43 | 45 | ||
44 | const result: ResultList<any> = { | 46 | const result: ResultList<any> = { |
45 | total, | 47 | total, |
46 | data: jobs.map(j => formatJob(j.toJSON())) | 48 | data: jobs.map(j => formatJob(j, state)) |
47 | } | 49 | } |
48 | return res.json(result) | 50 | return res.json(result) |
49 | } | 51 | } |
50 | 52 | ||
51 | function formatJob (job: any): Job { | 53 | function formatJob (job: any, state: JobState): Job { |
54 | const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 ? job.stacktrace[0] : null | ||
55 | |||
52 | return { | 56 | return { |
53 | id: job.id, | 57 | id: job.id, |
54 | state: job.state as JobState, | 58 | state: state, |
55 | type: job.type as JobType, | 59 | type: job.queue.name as JobType, |
56 | data: job.data, | 60 | data: job.data, |
57 | error: job.error, | 61 | error, |
58 | createdAt: new Date(parseInt(job.created_at, 10)), | 62 | createdAt: new Date(job.timestamp), |
59 | updatedAt: new Date(parseInt(job.updated_at, 10)) | 63 | finishedOn: new Date(job.finishedOn), |
64 | processedOn: new Date(job.processedOn) | ||
60 | } | 65 | } |
61 | } | 66 | } |
diff --git a/server/helpers/custom-validators/jobs.ts b/server/helpers/custom-validators/jobs.ts index 9700fbd12..1cc6e6912 100644 --- a/server/helpers/custom-validators/jobs.ts +++ b/server/helpers/custom-validators/jobs.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { JobState } from '../../../shared/models' | 1 | import { JobState } from '../../../shared/models' |
2 | import { exists } from './misc' | 2 | import { exists } from './misc' |
3 | 3 | ||
4 | const jobStates: JobState[] = [ 'active', 'complete', 'failed', 'inactive', 'delayed' ] | 4 | const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed' ] |
5 | 5 | ||
6 | function isValidJobState (value: JobState) { | 6 | function isValidJobState (value: JobState) { |
7 | return exists(value) && jobStates.indexOf(value) !== -1 | 7 | return exists(value) && jobStates.indexOf(value) !== -1 |
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 24b7e2655..6173e1298 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts | |||
@@ -14,7 +14,7 @@ let config: IConfig = require('config') | |||
14 | 14 | ||
15 | // --------------------------------------------------------------------------- | 15 | // --------------------------------------------------------------------------- |
16 | 16 | ||
17 | const LAST_MIGRATION_VERSION = 225 | 17 | const LAST_MIGRATION_VERSION = 230 |
18 | 18 | ||
19 | // --------------------------------------------------------------------------- | 19 | // --------------------------------------------------------------------------- |
20 | 20 | ||
diff --git a/server/initializers/migrations/0230-kue-to-bull.ts b/server/initializers/migrations/0230-kue-to-bull.ts new file mode 100644 index 000000000..5fad87a61 --- /dev/null +++ b/server/initializers/migrations/0230-kue-to-bull.ts | |||
@@ -0,0 +1,63 @@ | |||
1 | import * as Sequelize from 'sequelize' | ||
2 | import { createClient } from 'redis' | ||
3 | import { CONFIG } from '../constants' | ||
4 | import { JobQueue } from '../../lib/job-queue' | ||
5 | import { initDatabaseModels } from '../database' | ||
6 | |||
7 | async function up (utils: { | ||
8 | transaction: Sequelize.Transaction | ||
9 | queryInterface: Sequelize.QueryInterface | ||
10 | sequelize: Sequelize.Sequelize | ||
11 | }): Promise<any> { | ||
12 | await initDatabaseModels(false) | ||
13 | |||
14 | return new Promise((res, rej) => { | ||
15 | const client = createClient({ | ||
16 | host: CONFIG.REDIS.HOSTNAME, | ||
17 | port: CONFIG.REDIS.PORT, | ||
18 | db: CONFIG.REDIS.DB | ||
19 | }) | ||
20 | |||
21 | const jobsPrefix = 'q-' + CONFIG.WEBSERVER.HOST | ||
22 | |||
23 | client.sort(jobsPrefix + ':jobs:inactive', 'by', 'alpha', 'ASC', (err, jobStrings) => { | ||
24 | if (err) return rej(err) | ||
25 | |||
26 | const jobPromises = jobStrings | ||
27 | .map(s => s.split('|')) | ||
28 | .map(([ , jobId ]) => { | ||
29 | return new Promise((res, rej) => { | ||
30 | client.hgetall(jobsPrefix + ':job:' + jobId, (err, job) => { | ||
31 | if (err) return rej(err) | ||
32 | |||
33 | try { | ||
34 | const parsedData = JSON.parse(job.data) | ||
35 | |||
36 | return res({ type: job.type, payload: parsedData }) | ||
37 | } catch (err) { | ||
38 | console.error('Cannot parse data %s.', job.data) | ||
39 | return res(null) | ||
40 | } | ||
41 | }) | ||
42 | }) | ||
43 | }) | ||
44 | |||
45 | JobQueue.Instance.init() | ||
46 | .then(() => Promise.all(jobPromises)) | ||
47 | .then((jobs: any) => { | ||
48 | const createJobPromises = jobs | ||
49 | .filter(job => job !== null) | ||
50 | .map(job => JobQueue.Instance.createJob(job)) | ||
51 | |||
52 | return Promise.all(createJobPromises) | ||
53 | }) | ||
54 | .then(() => res()) | ||
55 | }) | ||
56 | }) | ||
57 | } | ||
58 | |||
59 | function down (options) { | ||
60 | throw new Error('Not implemented.') | ||
61 | } | ||
62 | |||
63 | export { up, down } | ||
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 286e343f2..2c1b4f49d 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { getServerActor } from '../../../helpers/utils' | 3 | import { getServerActor } from '../../../helpers/utils' |
4 | import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers' | 4 | import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers' |
@@ -14,7 +14,7 @@ export type ActivitypubFollowPayload = { | |||
14 | host: string | 14 | host: string |
15 | } | 15 | } |
16 | 16 | ||
17 | async function processActivityPubFollow (job: kue.Job) { | 17 | async function processActivityPubFollow (job: Bull.Job) { |
18 | const payload = job.data as ActivitypubFollowPayload | 18 | const payload = job.data as ActivitypubFollowPayload |
19 | const host = payload.host | 19 | const host = payload.host |
20 | 20 | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index d8b8ec222..03a9e12a4 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import * as Bluebird from 'bluebird' | 2 | import * as Bluebird from 'bluebird' |
3 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
4 | import { doRequest } from '../../../helpers/requests' | 4 | import { doRequest } from '../../../helpers/requests' |
@@ -12,7 +12,7 @@ export type ActivitypubHttpBroadcastPayload = { | |||
12 | body: any | 12 | body: any |
13 | } | 13 | } |
14 | 14 | ||
15 | async function processActivityPubHttpBroadcast (job: kue.Job) { | 15 | async function processActivityPubHttpBroadcast (job: Bull.Job) { |
16 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) | 16 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) |
17 | 17 | ||
18 | const payload = job.data as ActivitypubHttpBroadcastPayload | 18 | const payload = job.data as ActivitypubHttpBroadcastPayload |
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index 10c0e606f..f21da087e 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { processActivities } from '../../activitypub/process' | 3 | import { processActivities } from '../../activitypub/process' |
4 | import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' | 4 | import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' |
@@ -9,7 +9,7 @@ export type ActivitypubHttpFetcherPayload = { | |||
9 | uris: string[] | 9 | uris: string[] |
10 | } | 10 | } |
11 | 11 | ||
12 | async function processActivityPubHttpFetcher (job: kue.Job) { | 12 | async function processActivityPubHttpFetcher (job: Bull.Job) { |
13 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) | 13 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) |
14 | 14 | ||
15 | const payload = job.data as ActivitypubHttpBroadcastPayload | 15 | const payload = job.data as ActivitypubHttpBroadcastPayload |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 173f3bb52..c90d735f6 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { doRequest } from '../../../helpers/requests' | 3 | import { doRequest } from '../../../helpers/requests' |
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
@@ -11,7 +11,7 @@ export type ActivitypubHttpUnicastPayload = { | |||
11 | body: any | 11 | body: any |
12 | } | 12 | } |
13 | 13 | ||
14 | async function processActivityPubHttpUnicast (job: kue.Job) { | 14 | async function processActivityPubHttpUnicast (job: Bull.Job) { |
15 | logger.info('Processing ActivityPub unicast in job %d.', job.id) | 15 | logger.info('Processing ActivityPub unicast in job %d.', job.id) |
16 | 16 | ||
17 | const payload = job.data as ActivitypubHttpUnicastPayload | 17 | const payload = job.data as ActivitypubHttpUnicastPayload |
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index 9d7686116..73d98ae54 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { Emailer } from '../../emailer' | 3 | import { Emailer } from '../../emailer' |
4 | 4 | ||
@@ -8,7 +8,7 @@ export type EmailPayload = { | |||
8 | text: string | 8 | text: string |
9 | } | 9 | } |
10 | 10 | ||
11 | async function processEmail (job: kue.Job) { | 11 | async function processEmail (job: Bull.Job) { |
12 | const payload = job.data as EmailPayload | 12 | const payload = job.data as EmailPayload |
13 | logger.info('Processing email in job %d.', job.id) | 13 | logger.info('Processing email in job %d.', job.id) |
14 | 14 | ||
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index fc40527c7..bd68dd78b 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import { VideoResolution, VideoState } from '../../../../shared' | 2 | import { VideoResolution, VideoState } from '../../../../shared' |
3 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
4 | import { computeResolutionsToTranscode } from '../../../helpers/utils' | 4 | import { computeResolutionsToTranscode } from '../../../helpers/utils' |
@@ -7,6 +7,7 @@ import { JobQueue } from '../job-queue' | |||
7 | import { federateVideoIfNeeded } from '../../activitypub' | 7 | import { federateVideoIfNeeded } from '../../activitypub' |
8 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | 8 | import { retryTransactionWrapper } from '../../../helpers/database-utils' |
9 | import { sequelizeTypescript } from '../../../initializers' | 9 | import { sequelizeTypescript } from '../../../initializers' |
10 | import * as Bluebird from 'bluebird' | ||
10 | 11 | ||
11 | export type VideoFilePayload = { | 12 | export type VideoFilePayload = { |
12 | videoUUID: string | 13 | videoUUID: string |
@@ -20,7 +21,7 @@ export type VideoFileImportPayload = { | |||
20 | filePath: string | 21 | filePath: string |
21 | } | 22 | } |
22 | 23 | ||
23 | async function processVideoFileImport (job: kue.Job) { | 24 | async function processVideoFileImport (job: Bull.Job) { |
24 | const payload = job.data as VideoFileImportPayload | 25 | const payload = job.data as VideoFileImportPayload |
25 | logger.info('Processing video file import in job %d.', job.id) | 26 | logger.info('Processing video file import in job %d.', job.id) |
26 | 27 | ||
@@ -37,7 +38,7 @@ async function processVideoFileImport (job: kue.Job) { | |||
37 | return video | 38 | return video |
38 | } | 39 | } |
39 | 40 | ||
40 | async function processVideoFile (job: kue.Job) { | 41 | async function processVideoFile (job: Bull.Job) { |
41 | const payload = job.data as VideoFilePayload | 42 | const payload = job.data as VideoFilePayload |
42 | logger.info('Processing video file in job %d.', job.id) | 43 | logger.info('Processing video file in job %d.', job.id) |
43 | 44 | ||
@@ -109,7 +110,7 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole | |||
109 | ) | 110 | ) |
110 | 111 | ||
111 | if (resolutionsEnabled.length !== 0) { | 112 | if (resolutionsEnabled.length !== 0) { |
112 | const tasks: Promise<any>[] = [] | 113 | const tasks: Bluebird<any>[] = [] |
113 | 114 | ||
114 | for (const resolution of resolutionsEnabled) { | 115 | for (const resolution of resolutionsEnabled) { |
115 | const dataInput = { | 116 | const dataInput = { |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 695fe0eea..77aaa7fa8 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,13 +1,12 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import { JobState, JobType } from '../../../shared/models' | 2 | import { JobState, JobType } from '../../../shared/models' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' | 4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' |
5 | import { Redis } from '../redis' | ||
6 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 5 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
7 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | 6 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
8 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | 7 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
9 | import { EmailPayload, processEmail } from './handlers/email' | 8 | import { EmailPayload, processEmail } from './handlers/email' |
10 | import { processVideoFile, processVideoFileImport, VideoFilePayload, VideoFileImportPayload } from './handlers/video-file' | 9 | import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' |
11 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' | 10 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' |
12 | 11 | ||
13 | type CreateJobArgument = | 12 | type CreateJobArgument = |
@@ -19,7 +18,7 @@ type CreateJobArgument = | |||
19 | { type: 'video-file', payload: VideoFilePayload } | | 18 | { type: 'video-file', payload: VideoFilePayload } | |
20 | { type: 'email', payload: EmailPayload } | 19 | { type: 'email', payload: EmailPayload } |
21 | 20 | ||
22 | const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { | 21 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { |
23 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 22 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
24 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | 23 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
25 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | 24 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, |
@@ -29,18 +28,28 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { | |||
29 | 'email': processEmail | 28 | 'email': processEmail |
30 | } | 29 | } |
31 | 30 | ||
32 | const jobsWithTLL: JobType[] = [ | 31 | const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = { |
32 | 'activitypub-http-broadcast': true, | ||
33 | 'activitypub-http-unicast': true, | ||
34 | 'activitypub-http-fetcher': true, | ||
35 | 'activitypub-follow': true | ||
36 | } | ||
37 | |||
38 | const jobTypes: JobType[] = [ | ||
39 | 'activitypub-follow', | ||
33 | 'activitypub-http-broadcast', | 40 | 'activitypub-http-broadcast', |
34 | 'activitypub-http-unicast', | ||
35 | 'activitypub-http-fetcher', | 41 | 'activitypub-http-fetcher', |
36 | 'activitypub-follow' | 42 | 'activitypub-http-unicast', |
43 | 'email', | ||
44 | 'video-file', | ||
45 | 'video-file-import' | ||
37 | ] | 46 | ] |
38 | 47 | ||
39 | class JobQueue { | 48 | class JobQueue { |
40 | 49 | ||
41 | private static instance: JobQueue | 50 | private static instance: JobQueue |
42 | 51 | ||
43 | private jobQueue: kue.Queue | 52 | private queues: { [ id in JobType ]?: Bull.Queue } = {} |
44 | private initialized = false | 53 | private initialized = false |
45 | private jobRedisPrefix: string | 54 | private jobRedisPrefix: string |
46 | 55 | ||
@@ -51,9 +60,8 @@ class JobQueue { | |||
51 | if (this.initialized === true) return | 60 | if (this.initialized === true) return |
52 | this.initialized = true | 61 | this.initialized = true |
53 | 62 | ||
54 | this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST | 63 | this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST |
55 | 64 | const queueOptions = { | |
56 | this.jobQueue = kue.createQueue({ | ||
57 | prefix: this.jobRedisPrefix, | 65 | prefix: this.jobRedisPrefix, |
58 | redis: { | 66 | redis: { |
59 | host: CONFIG.REDIS.HOSTNAME, | 67 | host: CONFIG.REDIS.HOSTNAME, |
@@ -61,120 +69,94 @@ class JobQueue { | |||
61 | auth: CONFIG.REDIS.AUTH, | 69 | auth: CONFIG.REDIS.AUTH, |
62 | db: CONFIG.REDIS.DB | 70 | db: CONFIG.REDIS.DB |
63 | } | 71 | } |
64 | }) | 72 | } |
65 | |||
66 | this.jobQueue.setMaxListeners(20) | ||
67 | 73 | ||
68 | this.jobQueue.on('error', err => { | 74 | for (const handlerName of Object.keys(handlers)) { |
69 | logger.error('Error in job queue.', { err }) | 75 | const queue = new Bull(handlerName, queueOptions) |
70 | process.exit(-1) | 76 | const handler = handlers[handlerName] |
71 | }) | ||
72 | this.jobQueue.watchStuckJobs(5000) | ||
73 | 77 | ||
74 | await this.reactiveStuckJobs() | 78 | queue.process(JOB_CONCURRENCY[handlerName], handler) |
79 | .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err })) | ||
75 | 80 | ||
76 | for (const handlerName of Object.keys(handlers)) { | 81 | queue.on('error', err => { |
77 | this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { | 82 | logger.error('Error in job queue %s.', handlerName, { err }) |
78 | try { | 83 | process.exit(-1) |
79 | const res = await handlers[ handlerName ](job) | ||
80 | return done(null, res) | ||
81 | } catch (err) { | ||
82 | logger.error('Cannot execute job %d.', job.id, { err }) | ||
83 | return done(err) | ||
84 | } | ||
85 | }) | 84 | }) |
85 | |||
86 | this.queues[handlerName] = queue | ||
86 | } | 87 | } |
87 | } | 88 | } |
88 | 89 | ||
89 | createJob (obj: CreateJobArgument, priority = 'normal') { | 90 | createJob (obj: CreateJobArgument) { |
90 | return new Promise((res, rej) => { | 91 | const queue = this.queues[obj.type] |
91 | let job = this.jobQueue | 92 | if (queue === undefined) { |
92 | .create(obj.type, obj.payload) | 93 | logger.error('Unknown queue %s: cannot create job.', obj.type) |
93 | .priority(priority) | 94 | return |
94 | .attempts(JOB_ATTEMPTS[obj.type]) | 95 | } |
95 | .backoff({ delay: 60 * 1000, type: 'exponential' }) | ||
96 | 96 | ||
97 | if (jobsWithTLL.indexOf(obj.type) !== -1) { | 97 | const jobArgs: Bull.JobOptions = { |
98 | job = job.ttl(JOB_REQUEST_TTL) | 98 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
99 | } | 99 | attempts: JOB_ATTEMPTS[obj.type] |
100 | } | ||
100 | 101 | ||
101 | return job.save(err => { | 102 | if (jobsWithRequestTimeout[obj.type] === true) { |
102 | if (err) return rej(err) | 103 | jobArgs.timeout = JOB_REQUEST_TTL |
104 | } | ||
103 | 105 | ||
104 | return res() | 106 | return queue.add(obj.payload, jobArgs) |
105 | }) | ||
106 | }) | ||
107 | } | 107 | } |
108 | 108 | ||
109 | async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise<kue.Job[]> { | 109 | async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise<Bull.Job[]> { |
110 | const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) | 110 | let results: Bull.Job[] = [] |
111 | 111 | ||
112 | const jobPromises = jobStrings | 112 | // TODO: optimize |
113 | .map(s => s.split('|')) | 113 | for (const jobType of jobTypes) { |
114 | .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) | 114 | const queue = this.queues[ jobType ] |
115 | if (queue === undefined) { | ||
116 | logger.error('Unknown queue %s to list jobs.', jobType) | ||
117 | continue | ||
118 | } | ||
115 | 119 | ||
116 | return Promise.all(jobPromises) | 120 | // FIXME: Bull queue typings does not have getJobs method |
117 | } | 121 | const jobs = await (queue as any).getJobs(state, 0, start + count, asc) |
122 | results = results.concat(jobs) | ||
123 | } | ||
118 | 124 | ||
119 | count (state: JobState) { | 125 | results.sort((j1: any, j2: any) => { |
120 | return new Promise<number>((res, rej) => { | 126 | if (j1.timestamp < j2.timestamp) return -1 |
121 | this.jobQueue[state + 'Count']((err, total) => { | 127 | else if (j1.timestamp === j2.timestamp) return 0 |
122 | if (err) return rej(err) | ||
123 | 128 | ||
124 | return res(total) | 129 | return 1 |
125 | }) | ||
126 | }) | 130 | }) |
127 | } | ||
128 | 131 | ||
129 | removeOldJobs () { | 132 | if (asc === false) results.reverse() |
130 | const now = new Date().getTime() | ||
131 | kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { | ||
132 | if (err) { | ||
133 | logger.error('Cannot get jobs when removing old jobs.', { err }) | ||
134 | return | ||
135 | } | ||
136 | 133 | ||
137 | for (const job of jobs) { | 134 | return results.slice(start, start + count) |
138 | if (now - job.created_at > JOB_COMPLETED_LIFETIME) { | ||
139 | job.remove() | ||
140 | } | ||
141 | } | ||
142 | }) | ||
143 | } | 135 | } |
144 | 136 | ||
145 | private reactiveStuckJobs () { | 137 | async count (state: JobState): Promise<number> { |
146 | const promises: Promise<any>[] = [] | 138 | let total = 0 |
147 | |||
148 | this.jobQueue.active((err, ids) => { | ||
149 | if (err) throw err | ||
150 | 139 | ||
151 | for (const id of ids) { | 140 | for (const type of jobTypes) { |
152 | kue.Job.get(id, (err, job) => { | 141 | const queue = this.queues[ type ] |
153 | if (err) throw err | 142 | if (queue === undefined) { |
143 | logger.error('Unknown queue %s to count jobs.', type) | ||
144 | continue | ||
145 | } | ||
154 | 146 | ||
155 | const p = new Promise((res, rej) => { | 147 | const counts = await queue.getJobCounts() |
156 | job.inactive(err => { | ||
157 | if (err) return rej(err) | ||
158 | return res() | ||
159 | }) | ||
160 | }) | ||
161 | 148 | ||
162 | promises.push(p) | 149 | total += counts[ state ] |
163 | }) | 150 | } |
164 | } | ||
165 | }) | ||
166 | 151 | ||
167 | return Promise.all(promises) | 152 | return total |
168 | } | 153 | } |
169 | 154 | ||
170 | private getJob (id: number) { | 155 | removeOldJobs () { |
171 | return new Promise<kue.Job>((res, rej) => { | 156 | for (const key of Object.keys(this.queues)) { |
172 | kue.Job.get(id, (err, job) => { | 157 | const queue = this.queues[key] |
173 | if (err) return rej(err) | 158 | queue.clean(JOB_COMPLETED_LIFETIME, 'completed') |
174 | 159 | } | |
175 | return res(job) | ||
176 | }) | ||
177 | }) | ||
178 | } | 160 | } |
179 | 161 | ||
180 | static get Instance () { | 162 | static get Instance () { |
diff --git a/server/lib/redis.ts b/server/lib/redis.ts index 5bd55109c..78b28986a 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts | |||
@@ -78,16 +78,6 @@ class Redis { | |||
78 | return this.setObject(this.buildCachedRouteKey(req), cached, lifetime) | 78 | return this.setObject(this.buildCachedRouteKey(req), cached, lifetime) |
79 | } | 79 | } |
80 | 80 | ||
81 | listJobs (jobsPrefix: string, state: string, mode: 'alpha', order: 'ASC' | 'DESC', offset: number, count: number) { | ||
82 | return new Promise<string[]>((res, rej) => { | ||
83 | this.client.sort(jobsPrefix + ':jobs:' + state, 'by', mode, order, 'LIMIT', offset.toString(), count.toString(), (err, values) => { | ||
84 | if (err) return rej(err) | ||
85 | |||
86 | return res(values) | ||
87 | }) | ||
88 | }) | ||
89 | } | ||
90 | |||
91 | generateResetPasswordKey (userId: number) { | 81 | generateResetPasswordKey (userId: number) { |
92 | return 'reset-password-' + userId | 82 | return 'reset-password-' + userId |
93 | } | 83 | } |
diff --git a/server/tests/api/server/handle-down.ts b/server/tests/api/server/handle-down.ts index 69609b4fc..84d310ae6 100644 --- a/server/tests/api/server/handle-down.ts +++ b/server/tests/api/server/handle-down.ts | |||
@@ -6,15 +6,21 @@ import { JobState } from '../../../../shared/models' | |||
6 | import { VideoPrivacy } from '../../../../shared/models/videos' | 6 | import { VideoPrivacy } from '../../../../shared/models/videos' |
7 | import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model' | 7 | import { VideoCommentThreadTree } from '../../../../shared/models/videos/video-comment.model' |
8 | import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils' | 8 | import { completeVideoCheck, getVideo, immutableAssign, reRunServer, viewVideo } from '../../utils' |
9 | |||
10 | import { | 9 | import { |
11 | flushAndRunMultipleServers, flushTests, getVideosList, killallServers, ServerInfo, setAccessTokensToServers, uploadVideo, | 10 | flushAndRunMultipleServers, |
11 | getVideosList, | ||
12 | killallServers, | ||
13 | ServerInfo, | ||
14 | setAccessTokensToServers, | ||
15 | uploadVideo, | ||
12 | wait | 16 | wait |
13 | } from '../../utils/index' | 17 | } from '../../utils/index' |
14 | import { follow, getFollowersListPaginationAndSort } from '../../utils/server/follows' | 18 | import { follow, getFollowersListPaginationAndSort } from '../../utils/server/follows' |
15 | import { getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs' | 19 | import { getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs' |
16 | import { | 20 | import { |
17 | addVideoCommentReply, addVideoCommentThread, getVideoCommentThreads, | 21 | addVideoCommentReply, |
22 | addVideoCommentThread, | ||
23 | getVideoCommentThreads, | ||
18 | getVideoThreadComments | 24 | getVideoThreadComments |
19 | } from '../../utils/videos/video-comments' | 25 | } from '../../utils/videos/video-comments' |
20 | 26 | ||
@@ -146,7 +152,7 @@ describe('Test handle downs', function () { | |||
146 | }) | 152 | }) |
147 | 153 | ||
148 | it('Should not have pending/processing jobs anymore', async function () { | 154 | it('Should not have pending/processing jobs anymore', async function () { |
149 | const states: JobState[] = [ 'inactive', 'active' ] | 155 | const states: JobState[] = [ 'waiting', 'active' ] |
150 | 156 | ||
151 | for (const state of states) { | 157 | for (const state of states) { |
152 | const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt') | 158 | const res = await getJobsListPaginationAndSort(servers[ 0 ].url, servers[ 0 ].accessToken, state,0, 50, '-createdAt') |
diff --git a/server/tests/api/server/jobs.ts b/server/tests/api/server/jobs.ts index 81e389de6..f248c5521 100644 --- a/server/tests/api/server/jobs.ts +++ b/server/tests/api/server/jobs.ts | |||
@@ -2,7 +2,7 @@ | |||
2 | 2 | ||
3 | import * as chai from 'chai' | 3 | import * as chai from 'chai' |
4 | import 'mocha' | 4 | import 'mocha' |
5 | import { flushTests, killallServers, ServerInfo, setAccessTokensToServers, wait } from '../../utils/index' | 5 | import { killallServers, ServerInfo, setAccessTokensToServers } from '../../utils/index' |
6 | import { doubleFollow } from '../../utils/server/follows' | 6 | import { doubleFollow } from '../../utils/server/follows' |
7 | import { getJobsList, getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs' | 7 | import { getJobsList, getJobsListPaginationAndSort, waitJobs } from '../../utils/server/jobs' |
8 | import { flushAndRunMultipleServers } from '../../utils/server/servers' | 8 | import { flushAndRunMultipleServers } from '../../utils/server/servers' |
@@ -35,22 +35,23 @@ describe('Test jobs', function () { | |||
35 | }) | 35 | }) |
36 | 36 | ||
37 | it('Should list jobs', async function () { | 37 | it('Should list jobs', async function () { |
38 | const res = await getJobsList(servers[1].url, servers[1].accessToken, 'complete') | 38 | const res = await getJobsList(servers[1].url, servers[1].accessToken, 'completed') |
39 | expect(res.body.total).to.be.above(2) | 39 | expect(res.body.total).to.be.above(2) |
40 | expect(res.body.data).to.have.length.above(2) | 40 | expect(res.body.data).to.have.length.above(2) |
41 | }) | 41 | }) |
42 | 42 | ||
43 | it('Should list jobs with sort and pagination', async function () { | 43 | it('Should list jobs with sort and pagination', async function () { |
44 | const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'complete', 1, 1, 'createdAt') | 44 | const res = await getJobsListPaginationAndSort(servers[1].url, servers[1].accessToken, 'completed', 1, 1, 'createdAt') |
45 | expect(res.body.total).to.be.above(2) | 45 | expect(res.body.total).to.be.above(2) |
46 | expect(res.body.data).to.have.lengthOf(1) | 46 | expect(res.body.data).to.have.lengthOf(1) |
47 | 47 | ||
48 | const job = res.body.data[0] | 48 | const job = res.body.data[0] |
49 | 49 | ||
50 | expect(job.state).to.equal('complete') | 50 | expect(job.state).to.equal('completed') |
51 | expect(job.type).to.equal('activitypub-http-unicast') | 51 | expect(job.type).to.equal('activitypub-http-unicast') |
52 | expect(dateIsValid(job.createdAt)).to.be.true | 52 | expect(dateIsValid(job.createdAt)).to.be.true |
53 | expect(dateIsValid(job.updatedAt)).to.be.true | 53 | expect(dateIsValid(job.processedOn)).to.be.true |
54 | expect(dateIsValid(job.finishedOn)).to.be.true | ||
54 | }) | 55 | }) |
55 | 56 | ||
56 | after(async function () { | 57 | after(async function () { |
diff --git a/server/tests/real-world/real-world.ts b/server/tests/real-world/real-world.ts index b7375f778..a96469b11 100644 --- a/server/tests/real-world/real-world.ts +++ b/server/tests/real-world/real-world.ts | |||
@@ -347,7 +347,7 @@ function goodbye () { | |||
347 | } | 347 | } |
348 | 348 | ||
349 | async function isTherePendingRequests (servers: ServerInfo[]) { | 349 | async function isTherePendingRequests (servers: ServerInfo[]) { |
350 | const states: JobState[] = [ 'inactive', 'active', 'delayed' ] | 350 | const states: JobState[] = [ 'waiting', 'active', 'delayed' ] |
351 | const tasks: Promise<any>[] = [] | 351 | const tasks: Promise<any>[] = [] |
352 | let pendingRequests = false | 352 | let pendingRequests = false |
353 | 353 | ||
diff --git a/server/tests/utils/server/jobs.ts b/server/tests/utils/server/jobs.ts index 375e76f93..c9cb8d3a3 100644 --- a/server/tests/utils/server/jobs.ts +++ b/server/tests/utils/server/jobs.ts | |||
@@ -33,7 +33,7 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) { | |||
33 | if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ] | 33 | if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ] |
34 | else servers = serversArg as ServerInfo[] | 34 | else servers = serversArg as ServerInfo[] |
35 | 35 | ||
36 | const states: JobState[] = [ 'inactive', 'active', 'delayed' ] | 36 | const states: JobState[] = [ 'waiting', 'active', 'delayed' ] |
37 | const tasks: Promise<any>[] = [] | 37 | const tasks: Promise<any>[] = [] |
38 | let pendingRequests: boolean | 38 | let pendingRequests: boolean |
39 | 39 | ||