diff options
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 178 |
1 files changed, 80 insertions, 98 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 695fe0eea..77aaa7fa8 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,13 +1,12 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as Bull from 'bull' |
2 | import { JobState, JobType } from '../../../shared/models' | 2 | import { JobState, JobType } from '../../../shared/models' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' | 4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' |
5 | import { Redis } from '../redis' | ||
6 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 5 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
7 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | 6 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
8 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | 7 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
9 | import { EmailPayload, processEmail } from './handlers/email' | 8 | import { EmailPayload, processEmail } from './handlers/email' |
10 | import { processVideoFile, processVideoFileImport, VideoFilePayload, VideoFileImportPayload } from './handlers/video-file' | 9 | import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' |
11 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' | 10 | import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' |
12 | 11 | ||
13 | type CreateJobArgument = | 12 | type CreateJobArgument = |
@@ -19,7 +18,7 @@ type CreateJobArgument = | |||
19 | { type: 'video-file', payload: VideoFilePayload } | | 18 | { type: 'video-file', payload: VideoFilePayload } | |
20 | { type: 'email', payload: EmailPayload } | 19 | { type: 'email', payload: EmailPayload } |
21 | 20 | ||
22 | const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { | 21 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { |
23 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 22 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
24 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | 23 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
25 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | 24 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, |
@@ -29,18 +28,28 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { | |||
29 | 'email': processEmail | 28 | 'email': processEmail |
30 | } | 29 | } |
31 | 30 | ||
32 | const jobsWithTLL: JobType[] = [ | 31 | const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = { |
32 | 'activitypub-http-broadcast': true, | ||
33 | 'activitypub-http-unicast': true, | ||
34 | 'activitypub-http-fetcher': true, | ||
35 | 'activitypub-follow': true | ||
36 | } | ||
37 | |||
38 | const jobTypes: JobType[] = [ | ||
39 | 'activitypub-follow', | ||
33 | 'activitypub-http-broadcast', | 40 | 'activitypub-http-broadcast', |
34 | 'activitypub-http-unicast', | ||
35 | 'activitypub-http-fetcher', | 41 | 'activitypub-http-fetcher', |
36 | 'activitypub-follow' | 42 | 'activitypub-http-unicast', |
43 | 'email', | ||
44 | 'video-file', | ||
45 | 'video-file-import' | ||
37 | ] | 46 | ] |
38 | 47 | ||
39 | class JobQueue { | 48 | class JobQueue { |
40 | 49 | ||
41 | private static instance: JobQueue | 50 | private static instance: JobQueue |
42 | 51 | ||
43 | private jobQueue: kue.Queue | 52 | private queues: { [ id in JobType ]?: Bull.Queue } = {} |
44 | private initialized = false | 53 | private initialized = false |
45 | private jobRedisPrefix: string | 54 | private jobRedisPrefix: string |
46 | 55 | ||
@@ -51,9 +60,8 @@ class JobQueue { | |||
51 | if (this.initialized === true) return | 60 | if (this.initialized === true) return |
52 | this.initialized = true | 61 | this.initialized = true |
53 | 62 | ||
54 | this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST | 63 | this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST |
55 | 64 | const queueOptions = { | |
56 | this.jobQueue = kue.createQueue({ | ||
57 | prefix: this.jobRedisPrefix, | 65 | prefix: this.jobRedisPrefix, |
58 | redis: { | 66 | redis: { |
59 | host: CONFIG.REDIS.HOSTNAME, | 67 | host: CONFIG.REDIS.HOSTNAME, |
@@ -61,120 +69,94 @@ class JobQueue { | |||
61 | auth: CONFIG.REDIS.AUTH, | 69 | auth: CONFIG.REDIS.AUTH, |
62 | db: CONFIG.REDIS.DB | 70 | db: CONFIG.REDIS.DB |
63 | } | 71 | } |
64 | }) | 72 | } |
65 | |||
66 | this.jobQueue.setMaxListeners(20) | ||
67 | 73 | ||
68 | this.jobQueue.on('error', err => { | 74 | for (const handlerName of Object.keys(handlers)) { |
69 | logger.error('Error in job queue.', { err }) | 75 | const queue = new Bull(handlerName, queueOptions) |
70 | process.exit(-1) | 76 | const handler = handlers[handlerName] |
71 | }) | ||
72 | this.jobQueue.watchStuckJobs(5000) | ||
73 | 77 | ||
74 | await this.reactiveStuckJobs() | 78 | queue.process(JOB_CONCURRENCY[handlerName], handler) |
79 | .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err })) | ||
75 | 80 | ||
76 | for (const handlerName of Object.keys(handlers)) { | 81 | queue.on('error', err => { |
77 | this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { | 82 | logger.error('Error in job queue %s.', handlerName, { err }) |
78 | try { | 83 | process.exit(-1) |
79 | const res = await handlers[ handlerName ](job) | ||
80 | return done(null, res) | ||
81 | } catch (err) { | ||
82 | logger.error('Cannot execute job %d.', job.id, { err }) | ||
83 | return done(err) | ||
84 | } | ||
85 | }) | 84 | }) |
85 | |||
86 | this.queues[handlerName] = queue | ||
86 | } | 87 | } |
87 | } | 88 | } |
88 | 89 | ||
89 | createJob (obj: CreateJobArgument, priority = 'normal') { | 90 | createJob (obj: CreateJobArgument) { |
90 | return new Promise((res, rej) => { | 91 | const queue = this.queues[obj.type] |
91 | let job = this.jobQueue | 92 | if (queue === undefined) { |
92 | .create(obj.type, obj.payload) | 93 | logger.error('Unknown queue %s: cannot create job.', obj.type) |
93 | .priority(priority) | 94 | return |
94 | .attempts(JOB_ATTEMPTS[obj.type]) | 95 | } |
95 | .backoff({ delay: 60 * 1000, type: 'exponential' }) | ||
96 | 96 | ||
97 | if (jobsWithTLL.indexOf(obj.type) !== -1) { | 97 | const jobArgs: Bull.JobOptions = { |
98 | job = job.ttl(JOB_REQUEST_TTL) | 98 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
99 | } | 99 | attempts: JOB_ATTEMPTS[obj.type] |
100 | } | ||
100 | 101 | ||
101 | return job.save(err => { | 102 | if (jobsWithRequestTimeout[obj.type] === true) { |
102 | if (err) return rej(err) | 103 | jobArgs.timeout = JOB_REQUEST_TTL |
104 | } | ||
103 | 105 | ||
104 | return res() | 106 | return queue.add(obj.payload, jobArgs) |
105 | }) | ||
106 | }) | ||
107 | } | 107 | } |
108 | 108 | ||
109 | async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise<kue.Job[]> { | 109 | async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise<Bull.Job[]> { |
110 | const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) | 110 | let results: Bull.Job[] = [] |
111 | 111 | ||
112 | const jobPromises = jobStrings | 112 | // TODO: optimize |
113 | .map(s => s.split('|')) | 113 | for (const jobType of jobTypes) { |
114 | .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) | 114 | const queue = this.queues[ jobType ] |
115 | if (queue === undefined) { | ||
116 | logger.error('Unknown queue %s to list jobs.', jobType) | ||
117 | continue | ||
118 | } | ||
115 | 119 | ||
116 | return Promise.all(jobPromises) | 120 | // FIXME: Bull queue typings does not have getJobs method |
117 | } | 121 | const jobs = await (queue as any).getJobs(state, 0, start + count, asc) |
122 | results = results.concat(jobs) | ||
123 | } | ||
118 | 124 | ||
119 | count (state: JobState) { | 125 | results.sort((j1: any, j2: any) => { |
120 | return new Promise<number>((res, rej) => { | 126 | if (j1.timestamp < j2.timestamp) return -1 |
121 | this.jobQueue[state + 'Count']((err, total) => { | 127 | else if (j1.timestamp === j2.timestamp) return 0 |
122 | if (err) return rej(err) | ||
123 | 128 | ||
124 | return res(total) | 129 | return 1 |
125 | }) | ||
126 | }) | 130 | }) |
127 | } | ||
128 | 131 | ||
129 | removeOldJobs () { | 132 | if (asc === false) results.reverse() |
130 | const now = new Date().getTime() | ||
131 | kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { | ||
132 | if (err) { | ||
133 | logger.error('Cannot get jobs when removing old jobs.', { err }) | ||
134 | return | ||
135 | } | ||
136 | 133 | ||
137 | for (const job of jobs) { | 134 | return results.slice(start, start + count) |
138 | if (now - job.created_at > JOB_COMPLETED_LIFETIME) { | ||
139 | job.remove() | ||
140 | } | ||
141 | } | ||
142 | }) | ||
143 | } | 135 | } |
144 | 136 | ||
145 | private reactiveStuckJobs () { | 137 | async count (state: JobState): Promise<number> { |
146 | const promises: Promise<any>[] = [] | 138 | let total = 0 |
147 | |||
148 | this.jobQueue.active((err, ids) => { | ||
149 | if (err) throw err | ||
150 | 139 | ||
151 | for (const id of ids) { | 140 | for (const type of jobTypes) { |
152 | kue.Job.get(id, (err, job) => { | 141 | const queue = this.queues[ type ] |
153 | if (err) throw err | 142 | if (queue === undefined) { |
143 | logger.error('Unknown queue %s to count jobs.', type) | ||
144 | continue | ||
145 | } | ||
154 | 146 | ||
155 | const p = new Promise((res, rej) => { | 147 | const counts = await queue.getJobCounts() |
156 | job.inactive(err => { | ||
157 | if (err) return rej(err) | ||
158 | return res() | ||
159 | }) | ||
160 | }) | ||
161 | 148 | ||
162 | promises.push(p) | 149 | total += counts[ state ] |
163 | }) | 150 | } |
164 | } | ||
165 | }) | ||
166 | 151 | ||
167 | return Promise.all(promises) | 152 | return total |
168 | } | 153 | } |
169 | 154 | ||
170 | private getJob (id: number) { | 155 | removeOldJobs () { |
171 | return new Promise<kue.Job>((res, rej) => { | 156 | for (const key of Object.keys(this.queues)) { |
172 | kue.Job.get(id, (err, job) => { | 157 | const queue = this.queues[key] |
173 | if (err) return rej(err) | 158 | queue.clean(JOB_COMPLETED_LIFETIME, 'completed') |
174 | 159 | } | |
175 | return res(job) | ||
176 | }) | ||
177 | }) | ||
178 | } | 160 | } |
179 | 161 | ||
180 | static get Instance () { | 162 | static get Instance () { |