diff options
author | Chocobozzz <me@florianbigard.com> | 2018-07-10 17:02:20 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2018-07-11 14:00:17 +0200 |
commit | 94831479f5facff9469540a3d49dd347b88bdf5a (patch) | |
tree | 4e8990fc4fded913952c732b6466b15fc52ab06d /server/lib/job-queue | |
parent | 2cdf27bae6acfaa0b99bb07555edc57f48b8bc43 (diff) | |
download | PeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.tar.gz PeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.tar.zst PeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.zip |
Migrate to bull
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-follow.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-broadcast.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-fetcher.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-unicast.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/email.ts | 4 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-file.ts | 9 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 178 |
7 files changed, 95 insertions, 112 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 286e343f2..2c1b4f49d 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { getServerActor } from '../../../helpers/utils' | 3 | import { getServerActor } from '../../../helpers/utils' |
4 | import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers' | 4 | import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers' |
@@ -14,7 +14,7 @@ export type ActivitypubFollowPayload = { | |||
14 | host: string | 14 | host: string |
15 | } | 15 | } |
16 | 16 | ||
17 | async function processActivityPubFollow (job: kue.Job) { | 17 | async function processActivityPubFollow (job: Bull.Job) { |
18 | const payload = job.data as ActivitypubFollowPayload | 18 | const payload = job.data as ActivitypubFollowPayload |
19 | const host = payload.host | 19 | const host = payload.host |
20 | 20 | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index d8b8ec222..03a9e12a4 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import * as Bluebird from 'bluebird' | 2 | import * as Bluebird from 'bluebird' |
3 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
4 | import { doRequest } from '../../../helpers/requests' | 4 | import { doRequest } from '../../../helpers/requests' |
@@ -12,7 +12,7 @@ export type ActivitypubHttpBroadcastPayload = { | |||
12 | body: any | 12 | body: any |
13 | } | 13 | } |
14 | 14 | ||
15 | async function processActivityPubHttpBroadcast (job: kue.Job) { | 15 | async function processActivityPubHttpBroadcast (job: Bull.Job) { |
16 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) | 16 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) |
17 | 17 | ||
18 | const payload = job.data as ActivitypubHttpBroadcastPayload | 18 | const payload = job.data as ActivitypubHttpBroadcastPayload |
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index 10c0e606f..f21da087e 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { processActivities } from '../../activitypub/process' | 3 | import { processActivities } from '../../activitypub/process' |
4 | import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' | 4 | import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' |
@@ -9,7 +9,7 @@ export type ActivitypubHttpFetcherPayload = { | |||
9 | uris: string[] | 9 | uris: string[] |
10 | } | 10 | } |
11 | 11 | ||
12 | async function processActivityPubHttpFetcher (job: kue.Job) { | 12 | async function processActivityPubHttpFetcher (job: Bull.Job) { |
13 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) | 13 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) |
14 | 14 | ||
15 | const payload = job.data as ActivitypubHttpBroadcastPayload | 15 | const payload = job.data as ActivitypubHttpBroadcastPayload |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 173f3bb52..c90d735f6 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { doRequest } from '../../../helpers/requests' | 3 | import { doRequest } from '../../../helpers/requests' |
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | 4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' |
@@ -11,7 +11,7 @@ export type ActivitypubHttpUnicastPayload = { | |||
11 | body: any | 11 | body: any |
12 | } | 12 | } |
13 | 13 | ||
14 | async function processActivityPubHttpUnicast (job: kue.Job) { | 14 | async function processActivityPubHttpUnicast (job: Bull.Job) { |
15 | logger.info('Processing ActivityPub unicast in job %d.', job.id) | 15 | logger.info('Processing ActivityPub unicast in job %d.', job.id) |
16 | 16 | ||
17 | const payload = job.data as ActivitypubHttpUnicastPayload | 17 | const payload = job.data as ActivitypubHttpUnicastPayload |
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index 9d7686116..73d98ae54 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { Emailer } from '../../emailer' | 3 | import { Emailer } from '../../emailer' |
4 | 4 | ||
@@ -8,7 +8,7 @@ export type EmailPayload = { | |||
8 | text: string | 8 | text: string |
9 | } | 9 | } |
10 | 10 | ||
11 | async function processEmail (job: kue.Job) { | 11 | async function processEmail (job: Bull.Job) { |
12 | const payload = job.data as EmailPayload | 12 | const payload = job.data as EmailPayload |
13 | logger.info('Processing email in job %d.', job.id) | 13 | logger.info('Processing email in job %d.', job.id) |
14 | 14 | ||
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index fc40527c7..bd68dd78b 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import { VideoResolution, VideoState } from '../../../../shared' | 2 | import { VideoResolution, VideoState } from '../../../../shared' |
3 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
4 | import { computeResolutionsToTranscode } from '../../../helpers/utils' | 4 | import { computeResolutionsToTranscode } from '../../../helpers/utils' |
@@ -7,6 +7,7 @@ import { JobQueue } from '../job-queue' | |||
7 | import { federateVideoIfNeeded } from '../../activitypub' | 7 | import { federateVideoIfNeeded } from '../../activitypub' |
8 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | 8 | import { retryTransactionWrapper } from '../../../helpers/database-utils' |
9 | import { sequelizeTypescript } from '../../../initializers' | 9 | import { sequelizeTypescript } from '../../../initializers' |
10 | import * as Bluebird from 'bluebird' | ||
10 | 11 | ||
11 | export type VideoFilePayload = { | 12 | export type VideoFilePayload = { |
12 | videoUUID: string | 13 | videoUUID: string |
@@ -20,7 +21,7 @@ export type VideoFileImportPayload = { | |||
20 | filePath: string | 21 | filePath: string |
21 | } | 22 | } |
22 | 23 | ||
23 | async function processVideoFileImport (job: kue.Job) { | 24 | async function processVideoFileImport (job: Bull.Job) { |
24 | const payload = job.data as VideoFileImportPayload | 25 | const payload = job.data as VideoFileImportPayload |
25 | logger.info('Processing video file import in job %d.', job.id) | 26 | logger.info('Processing video file import in job %d.', job.id) |
26 | 27 | ||
@@ -37,7 +38,7 @@ async function processVideoFileImport (job: kue.Job) { | |||
37 | return video | 38 | return video |
38 | } | 39 | } |
39 | 40 | ||
40 | async function processVideoFile (job: kue.Job) { | 41 | async function processVideoFile (job: Bull.Job) { |
41 | const payload = job.data as VideoFilePayload | 42 | const payload = job.data as VideoFilePayload |
42 | logger.info('Processing video file in job %d.', job.id) | 43 | logger.info('Processing video file in job %d.', job.id) |
43 | 44 | ||
@@ -109,7 +110,7 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole | |||
109 | ) | 110 | ) |
110 | 111 | ||
111 | if (resolutionsEnabled.length !== 0) { | 112 | if (resolutionsEnabled.length !== 0) { |
112 | const tasks: Promise<any>[] = [] | 113 | const tasks: Bluebird<any>[] = [] |
113 | 114 | ||
114 | for (const resolution of resolutionsEnabled) { | 115 | for (const resolution of resolutionsEnabled) { |
115 | const dataInput = { | 116 | const dataInput = { |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 695fe0eea..77aaa7fa8 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,13 +1,12 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import { JobState, JobType } from '../../../shared/models' | 2 | import { JobState, JobType } from '../../../shared/models' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' | 4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' |
5 | import { Redis } from '../redis' | ||
6 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 5 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
7 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | 6 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
8 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | 7 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
9 | import { EmailPayload, processEmail } from './handlers/email' | 8 | import { EmailPayload, processEmail } from './handlers/email' |
10 | import { processVideoFile, processVideoFileImport, VideoFilePayload, VideoFileImportPayload } from './handlers/video-file' | 9 | import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' |
11 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' | 10 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' |
12 | 11 | ||
13 | type CreateJobArgument = | 12 | type CreateJobArgument = |
@@ -19,7 +18,7 @@ type CreateJobArgument = | |||
19 | { type: 'video-file', payload: VideoFilePayload } | | 18 | { type: 'video-file', payload: VideoFilePayload } | |
20 | { type: 'email', payload: EmailPayload } | 19 | { type: 'email', payload: EmailPayload } |
21 | 20 | ||
22 | const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { | 21 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { |
23 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 22 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
24 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | 23 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
25 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | 24 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, |
@@ -29,18 +28,28 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { | |||
29 | 'email': processEmail | 28 | 'email': processEmail |
30 | } | 29 | } |
31 | 30 | ||
32 | const jobsWithTLL: JobType[] = [ | 31 | const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = { |
32 | 'activitypub-http-broadcast': true, | ||
33 | 'activitypub-http-unicast': true, | ||
34 | 'activitypub-http-fetcher': true, | ||
35 | 'activitypub-follow': true | ||
36 | } | ||
37 | |||
38 | const jobTypes: JobType[] = [ | ||
39 | 'activitypub-follow', | ||
33 | 'activitypub-http-broadcast', | 40 | 'activitypub-http-broadcast', |
34 | 'activitypub-http-unicast', | ||
35 | 'activitypub-http-fetcher', | 41 | 'activitypub-http-fetcher', |
36 | 'activitypub-follow' | 42 | 'activitypub-http-unicast', |
43 | 'email', | ||
44 | 'video-file', | ||
45 | 'video-file-import' | ||
37 | ] | 46 | ] |
38 | 47 | ||
39 | class JobQueue { | 48 | class JobQueue { |
40 | 49 | ||
41 | private static instance: JobQueue | 50 | private static instance: JobQueue |
42 | 51 | ||
43 | private jobQueue: kue.Queue | 52 | private queues: { [ id in JobType ]?: Bull.Queue } = {} |
44 | private initialized = false | 53 | private initialized = false |
45 | private jobRedisPrefix: string | 54 | private jobRedisPrefix: string |
46 | 55 | ||
@@ -51,9 +60,8 @@ class JobQueue { | |||
51 | if (this.initialized === true) return | 60 | if (this.initialized === true) return |
52 | this.initialized = true | 61 | this.initialized = true |
53 | 62 | ||
54 | this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST | 63 | this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST |
55 | 64 | const queueOptions = { | |
56 | this.jobQueue = kue.createQueue({ | ||
57 | prefix: this.jobRedisPrefix, | 65 | prefix: this.jobRedisPrefix, |
58 | redis: { | 66 | redis: { |
59 | host: CONFIG.REDIS.HOSTNAME, | 67 | host: CONFIG.REDIS.HOSTNAME, |
@@ -61,120 +69,94 @@ class JobQueue { | |||
61 | auth: CONFIG.REDIS.AUTH, | 69 | auth: CONFIG.REDIS.AUTH, |
62 | db: CONFIG.REDIS.DB | 70 | db: CONFIG.REDIS.DB |
63 | } | 71 | } |
64 | }) | 72 | } |
65 | |||
66 | this.jobQueue.setMaxListeners(20) | ||
67 | 73 | ||
68 | this.jobQueue.on('error', err => { | 74 | for (const handlerName of Object.keys(handlers)) { |
69 | logger.error('Error in job queue.', { err }) | 75 | const queue = new Bull(handlerName, queueOptions) |
70 | process.exit(-1) | 76 | const handler = handlers[handlerName] |
71 | }) | ||
72 | this.jobQueue.watchStuckJobs(5000) | ||
73 | 77 | ||
74 | await this.reactiveStuckJobs() | 78 | queue.process(JOB_CONCURRENCY[handlerName], handler) |
79 | .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err })) | ||
75 | 80 | ||
76 | for (const handlerName of Object.keys(handlers)) { | 81 | queue.on('error', err => { |
77 | this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { | 82 | logger.error('Error in job queue %s.', handlerName, { err }) |
78 | try { | 83 | process.exit(-1) |
79 | const res = await handlers[ handlerName ](job) | ||
80 | return done(null, res) | ||
81 | } catch (err) { | ||
82 | logger.error('Cannot execute job %d.', job.id, { err }) | ||
83 | return done(err) | ||
84 | } | ||
85 | }) | 84 | }) |
85 | |||
86 | this.queues[handlerName] = queue | ||
86 | } | 87 | } |
87 | } | 88 | } |
88 | 89 | ||
89 | createJob (obj: CreateJobArgument, priority = 'normal') { | 90 | createJob (obj: CreateJobArgument) { |
90 | return new Promise((res, rej) => { | 91 | const queue = this.queues[obj.type] |
91 | let job = this.jobQueue | 92 | if (queue === undefined) { |
92 | .create(obj.type, obj.payload) | 93 | logger.error('Unknown queue %s: cannot create job.', obj.type) |
93 | .priority(priority) | 94 | return |
94 | .attempts(JOB_ATTEMPTS[obj.type]) | 95 | } |
95 | .backoff({ delay: 60 * 1000, type: 'exponential' }) | ||
96 | 96 | ||
97 | if (jobsWithTLL.indexOf(obj.type) !== -1) { | 97 | const jobArgs: Bull.JobOptions = { |
98 | job = job.ttl(JOB_REQUEST_TTL) | 98 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
99 | } | 99 | attempts: JOB_ATTEMPTS[obj.type] |
100 | } | ||
100 | 101 | ||
101 | return job.save(err => { | 102 | if (jobsWithRequestTimeout[obj.type] === true) { |
102 | if (err) return rej(err) | 103 | jobArgs.timeout = JOB_REQUEST_TTL |
104 | } | ||
103 | 105 | ||
104 | return res() | 106 | return queue.add(obj.payload, jobArgs) |
105 | }) | ||
106 | }) | ||
107 | } | 107 | } |
108 | 108 | ||
109 | async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise<kue.Job[]> { | 109 | async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise<Bull.Job[]> { |
110 | const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) | 110 | let results: Bull.Job[] = [] |
111 | 111 | ||
112 | const jobPromises = jobStrings | 112 | // TODO: optimize |
113 | .map(s => s.split('|')) | 113 | for (const jobType of jobTypes) { |
114 | .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) | 114 | const queue = this.queues[ jobType ] |
115 | if (queue === undefined) { | ||
116 | logger.error('Unknown queue %s to list jobs.', jobType) | ||
117 | continue | ||
118 | } | ||
115 | 119 | ||
116 | return Promise.all(jobPromises) | 120 | // FIXME: Bull queue typings does not have getJobs method |
117 | } | 121 | const jobs = await (queue as any).getJobs(state, 0, start + count, asc) |
122 | results = results.concat(jobs) | ||
123 | } | ||
118 | 124 | ||
119 | count (state: JobState) { | 125 | results.sort((j1: any, j2: any) => { |
120 | return new Promise<number>((res, rej) => { | 126 | if (j1.timestamp < j2.timestamp) return -1 |
121 | this.jobQueue[state + 'Count']((err, total) => { | 127 | else if (j1.timestamp === j2.timestamp) return 0 |
122 | if (err) return rej(err) | ||
123 | 128 | ||
124 | return res(total) | 129 | return 1 |
125 | }) | ||
126 | }) | 130 | }) |
127 | } | ||
128 | 131 | ||
129 | removeOldJobs () { | 132 | if (asc === false) results.reverse() |
130 | const now = new Date().getTime() | ||
131 | kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { | ||
132 | if (err) { | ||
133 | logger.error('Cannot get jobs when removing old jobs.', { err }) | ||
134 | return | ||
135 | } | ||
136 | 133 | ||
137 | for (const job of jobs) { | 134 | return results.slice(start, start + count) |
138 | if (now - job.created_at > JOB_COMPLETED_LIFETIME) { | ||
139 | job.remove() | ||
140 | } | ||
141 | } | ||
142 | }) | ||
143 | } | 135 | } |
144 | 136 | ||
145 | private reactiveStuckJobs () { | 137 | async count (state: JobState): Promise<number> { |
146 | const promises: Promise<any>[] = [] | 138 | let total = 0 |
147 | |||
148 | this.jobQueue.active((err, ids) => { | ||
149 | if (err) throw err | ||
150 | 139 | ||
151 | for (const id of ids) { | 140 | for (const type of jobTypes) { |
152 | kue.Job.get(id, (err, job) => { | 141 | const queue = this.queues[ type ] |
153 | if (err) throw err | 142 | if (queue === undefined) { |
143 | logger.error('Unknown queue %s to count jobs.', type) | ||
144 | continue | ||
145 | } | ||
154 | 146 | ||
155 | const p = new Promise((res, rej) => { | 147 | const counts = await queue.getJobCounts() |
156 | job.inactive(err => { | ||
157 | if (err) return rej(err) | ||
158 | return res() | ||
159 | }) | ||
160 | }) | ||
161 | 148 | ||
162 | promises.push(p) | 149 | total += counts[ state ] |
163 | }) | 150 | } |
164 | } | ||
165 | }) | ||
166 | 151 | ||
167 | return Promise.all(promises) | 152 | return total |
168 | } | 153 | } |
169 | 154 | ||
170 | private getJob (id: number) { | 155 | removeOldJobs () { |
171 | return new Promise<kue.Job>((res, rej) => { | 156 | for (const key of Object.keys(this.queues)) { |
172 | kue.Job.get(id, (err, job) => { | 157 | const queue = this.queues[key] |
173 | if (err) return rej(err) | 158 | queue.clean(JOB_COMPLETED_LIFETIME, 'completed') |
174 | 159 | } | |
175 | return res(job) | ||
176 | }) | ||
177 | }) | ||
178 | } | 160 | } |
179 | 161 | ||
180 | static get Instance () { | 162 | static get Instance () { |