aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r--server/lib/job-queue/job-queue.ts178
1 files changed, 80 insertions, 98 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 695fe0eea..77aaa7fa8 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,13 +1,12 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import { JobState, JobType } from '../../../shared/models' 2import { JobState, JobType } from '../../../shared/models'
3import { logger } from '../../helpers/logger' 3import { logger } from '../../helpers/logger'
4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' 4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers'
5import { Redis } from '../redis'
6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 5import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 6import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
8import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 7import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
9import { EmailPayload, processEmail } from './handlers/email' 8import { EmailPayload, processEmail } from './handlers/email'
10import { processVideoFile, processVideoFileImport, VideoFilePayload, VideoFileImportPayload } from './handlers/video-file' 9import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file'
11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' 10import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
12 11
13type CreateJobArgument = 12type CreateJobArgument =
@@ -19,7 +18,7 @@ type CreateJobArgument =
19 { type: 'video-file', payload: VideoFilePayload } | 18 { type: 'video-file', payload: VideoFilePayload } |
20 { type: 'email', payload: EmailPayload } 19 { type: 'email', payload: EmailPayload }
21 20
22const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { 21const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
23 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 22 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
24 'activitypub-http-unicast': processActivityPubHttpUnicast, 23 'activitypub-http-unicast': processActivityPubHttpUnicast,
25 'activitypub-http-fetcher': processActivityPubHttpFetcher, 24 'activitypub-http-fetcher': processActivityPubHttpFetcher,
@@ -29,18 +28,28 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
29 'email': processEmail 28 'email': processEmail
30} 29}
31 30
32const jobsWithTLL: JobType[] = [ 31const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = {
32 'activitypub-http-broadcast': true,
33 'activitypub-http-unicast': true,
34 'activitypub-http-fetcher': true,
35 'activitypub-follow': true
36}
37
38const jobTypes: JobType[] = [
39 'activitypub-follow',
33 'activitypub-http-broadcast', 40 'activitypub-http-broadcast',
34 'activitypub-http-unicast',
35 'activitypub-http-fetcher', 41 'activitypub-http-fetcher',
36 'activitypub-follow' 42 'activitypub-http-unicast',
43 'email',
44 'video-file',
45 'video-file-import'
37] 46]
38 47
39class JobQueue { 48class JobQueue {
40 49
41 private static instance: JobQueue 50 private static instance: JobQueue
42 51
43 private jobQueue: kue.Queue 52 private queues: { [ id in JobType ]?: Bull.Queue } = {}
44 private initialized = false 53 private initialized = false
45 private jobRedisPrefix: string 54 private jobRedisPrefix: string
46 55
@@ -51,9 +60,8 @@ class JobQueue {
51 if (this.initialized === true) return 60 if (this.initialized === true) return
52 this.initialized = true 61 this.initialized = true
53 62
54 this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST 63 this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST
55 64 const queueOptions = {
56 this.jobQueue = kue.createQueue({
57 prefix: this.jobRedisPrefix, 65 prefix: this.jobRedisPrefix,
58 redis: { 66 redis: {
59 host: CONFIG.REDIS.HOSTNAME, 67 host: CONFIG.REDIS.HOSTNAME,
@@ -61,120 +69,94 @@ class JobQueue {
61 auth: CONFIG.REDIS.AUTH, 69 auth: CONFIG.REDIS.AUTH,
62 db: CONFIG.REDIS.DB 70 db: CONFIG.REDIS.DB
63 } 71 }
64 }) 72 }
65
66 this.jobQueue.setMaxListeners(20)
67 73
68 this.jobQueue.on('error', err => { 74 for (const handlerName of Object.keys(handlers)) {
69 logger.error('Error in job queue.', { err }) 75 const queue = new Bull(handlerName, queueOptions)
70 process.exit(-1) 76 const handler = handlers[handlerName]
71 })
72 this.jobQueue.watchStuckJobs(5000)
73 77
74 await this.reactiveStuckJobs() 78 queue.process(JOB_CONCURRENCY[handlerName], handler)
79 .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err }))
75 80
76 for (const handlerName of Object.keys(handlers)) { 81 queue.on('error', err => {
77 this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { 82 logger.error('Error in job queue %s.', handlerName, { err })
78 try { 83 process.exit(-1)
79 const res = await handlers[ handlerName ](job)
80 return done(null, res)
81 } catch (err) {
82 logger.error('Cannot execute job %d.', job.id, { err })
83 return done(err)
84 }
85 }) 84 })
85
86 this.queues[handlerName] = queue
86 } 87 }
87 } 88 }
88 89
89 createJob (obj: CreateJobArgument, priority = 'normal') { 90 createJob (obj: CreateJobArgument) {
90 return new Promise((res, rej) => { 91 const queue = this.queues[obj.type]
91 let job = this.jobQueue 92 if (queue === undefined) {
92 .create(obj.type, obj.payload) 93 logger.error('Unknown queue %s: cannot create job.', obj.type)
93 .priority(priority) 94 return
94 .attempts(JOB_ATTEMPTS[obj.type]) 95 }
95 .backoff({ delay: 60 * 1000, type: 'exponential' })
96 96
97 if (jobsWithTLL.indexOf(obj.type) !== -1) { 97 const jobArgs: Bull.JobOptions = {
98 job = job.ttl(JOB_REQUEST_TTL) 98 backoff: { delay: 60 * 1000, type: 'exponential' },
99 } 99 attempts: JOB_ATTEMPTS[obj.type]
100 }
100 101
101 return job.save(err => { 102 if (jobsWithRequestTimeout[obj.type] === true) {
102 if (err) return rej(err) 103 jobArgs.timeout = JOB_REQUEST_TTL
104 }
103 105
104 return res() 106 return queue.add(obj.payload, jobArgs)
105 })
106 })
107 } 107 }
108 108
109 async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise<kue.Job[]> { 109 async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise<Bull.Job[]> {
110 const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) 110 let results: Bull.Job[] = []
111 111
112 const jobPromises = jobStrings 112 // TODO: optimize
113 .map(s => s.split('|')) 113 for (const jobType of jobTypes) {
114 .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) 114 const queue = this.queues[ jobType ]
115 if (queue === undefined) {
116 logger.error('Unknown queue %s to list jobs.', jobType)
117 continue
118 }
115 119
116 return Promise.all(jobPromises) 120 // FIXME: Bull queue typings does not have getJobs method
117 } 121 const jobs = await (queue as any).getJobs(state, 0, start + count, asc)
122 results = results.concat(jobs)
123 }
118 124
119 count (state: JobState) { 125 results.sort((j1: any, j2: any) => {
120 return new Promise<number>((res, rej) => { 126 if (j1.timestamp < j2.timestamp) return -1
121 this.jobQueue[state + 'Count']((err, total) => { 127 else if (j1.timestamp === j2.timestamp) return 0
122 if (err) return rej(err)
123 128
124 return res(total) 129 return 1
125 })
126 }) 130 })
127 }
128 131
129 removeOldJobs () { 132 if (asc === false) results.reverse()
130 const now = new Date().getTime()
131 kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
132 if (err) {
133 logger.error('Cannot get jobs when removing old jobs.', { err })
134 return
135 }
136 133
137 for (const job of jobs) { 134 return results.slice(start, start + count)
138 if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
139 job.remove()
140 }
141 }
142 })
143 } 135 }
144 136
145 private reactiveStuckJobs () { 137 async count (state: JobState): Promise<number> {
146 const promises: Promise<any>[] = [] 138 let total = 0
147
148 this.jobQueue.active((err, ids) => {
149 if (err) throw err
150 139
151 for (const id of ids) { 140 for (const type of jobTypes) {
152 kue.Job.get(id, (err, job) => { 141 const queue = this.queues[ type ]
153 if (err) throw err 142 if (queue === undefined) {
143 logger.error('Unknown queue %s to count jobs.', type)
144 continue
145 }
154 146
155 const p = new Promise((res, rej) => { 147 const counts = await queue.getJobCounts()
156 job.inactive(err => {
157 if (err) return rej(err)
158 return res()
159 })
160 })
161 148
162 promises.push(p) 149 total += counts[ state ]
163 }) 150 }
164 }
165 })
166 151
167 return Promise.all(promises) 152 return total
168 } 153 }
169 154
170 private getJob (id: number) { 155 removeOldJobs () {
171 return new Promise<kue.Job>((res, rej) => { 156 for (const key of Object.keys(this.queues)) {
172 kue.Job.get(id, (err, job) => { 157 const queue = this.queues[key]
173 if (err) return rej(err) 158 queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
174 159 }
175 return res(job)
176 })
177 })
178 } 160 }
179 161
180 static get Instance () { 162 static get Instance () {