aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-07-10 17:02:20 +0200
committerChocobozzz <me@florianbigard.com>2018-07-11 14:00:17 +0200
commit94831479f5facff9469540a3d49dd347b88bdf5a (patch)
tree4e8990fc4fded913952c732b6466b15fc52ab06d /server/lib/job-queue
parent2cdf27bae6acfaa0b99bb07555edc57f48b8bc43 (diff)
downloadPeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.tar.gz
PeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.tar.zst
PeerTube-94831479f5facff9469540a3d49dd347b88bdf5a.zip
Migrate to bull
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-follow.ts4
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts4
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts4
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts4
-rw-r--r--server/lib/job-queue/handlers/email.ts4
-rw-r--r--server/lib/job-queue/handlers/video-file.ts9
-rw-r--r--server/lib/job-queue/job-queue.ts178
7 files changed, 95 insertions, 112 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts
index 286e343f2..2c1b4f49d 100644
--- a/server/lib/job-queue/handlers/activitypub-follow.ts
+++ b/server/lib/job-queue/handlers/activitypub-follow.ts
@@ -1,4 +1,4 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { getServerActor } from '../../../helpers/utils' 3import { getServerActor } from '../../../helpers/utils'
4import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers' 4import { REMOTE_SCHEME, sequelizeTypescript, SERVER_ACTOR_NAME } from '../../../initializers'
@@ -14,7 +14,7 @@ export type ActivitypubFollowPayload = {
14 host: string 14 host: string
15} 15}
16 16
17async function processActivityPubFollow (job: kue.Job) { 17async function processActivityPubFollow (job: Bull.Job) {
18 const payload = job.data as ActivitypubFollowPayload 18 const payload = job.data as ActivitypubFollowPayload
19 const host = payload.host 19 const host = payload.host
20 20
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index d8b8ec222..03a9e12a4 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -1,4 +1,4 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import * as Bluebird from 'bluebird' 2import * as Bluebird from 'bluebird'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { doRequest } from '../../../helpers/requests' 4import { doRequest } from '../../../helpers/requests'
@@ -12,7 +12,7 @@ export type ActivitypubHttpBroadcastPayload = {
12 body: any 12 body: any
13} 13}
14 14
15async function processActivityPubHttpBroadcast (job: kue.Job) { 15async function processActivityPubHttpBroadcast (job: Bull.Job) {
16 logger.info('Processing ActivityPub broadcast in job %d.', job.id) 16 logger.info('Processing ActivityPub broadcast in job %d.', job.id)
17 17
18 const payload = job.data as ActivitypubHttpBroadcastPayload 18 const payload = job.data as ActivitypubHttpBroadcastPayload
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index 10c0e606f..f21da087e 100644
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -1,4 +1,4 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { processActivities } from '../../activitypub/process' 3import { processActivities } from '../../activitypub/process'
4import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' 4import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
@@ -9,7 +9,7 @@ export type ActivitypubHttpFetcherPayload = {
9 uris: string[] 9 uris: string[]
10} 10}
11 11
12async function processActivityPubHttpFetcher (job: kue.Job) { 12async function processActivityPubHttpFetcher (job: Bull.Job) {
13 logger.info('Processing ActivityPub fetcher in job %d.', job.id) 13 logger.info('Processing ActivityPub fetcher in job %d.', job.id)
14 14
15 const payload = job.data as ActivitypubHttpBroadcastPayload 15 const payload = job.data as ActivitypubHttpBroadcastPayload
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index 173f3bb52..c90d735f6 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -1,4 +1,4 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests' 3import { doRequest } from '../../../helpers/requests'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
@@ -11,7 +11,7 @@ export type ActivitypubHttpUnicastPayload = {
11 body: any 11 body: any
12} 12}
13 13
14async function processActivityPubHttpUnicast (job: kue.Job) { 14async function processActivityPubHttpUnicast (job: Bull.Job) {
15 logger.info('Processing ActivityPub unicast in job %d.', job.id) 15 logger.info('Processing ActivityPub unicast in job %d.', job.id)
16 16
17 const payload = job.data as ActivitypubHttpUnicastPayload 17 const payload = job.data as ActivitypubHttpUnicastPayload
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts
index 9d7686116..73d98ae54 100644
--- a/server/lib/job-queue/handlers/email.ts
+++ b/server/lib/job-queue/handlers/email.ts
@@ -1,4 +1,4 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { Emailer } from '../../emailer' 3import { Emailer } from '../../emailer'
4 4
@@ -8,7 +8,7 @@ export type EmailPayload = {
8 text: string 8 text: string
9} 9}
10 10
11async function processEmail (job: kue.Job) { 11async function processEmail (job: Bull.Job) {
12 const payload = job.data as EmailPayload 12 const payload = job.data as EmailPayload
13 logger.info('Processing email in job %d.', job.id) 13 logger.info('Processing email in job %d.', job.id)
14 14
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts
index fc40527c7..bd68dd78b 100644
--- a/server/lib/job-queue/handlers/video-file.ts
+++ b/server/lib/job-queue/handlers/video-file.ts
@@ -1,4 +1,4 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import { VideoResolution, VideoState } from '../../../../shared' 2import { VideoResolution, VideoState } from '../../../../shared'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { computeResolutionsToTranscode } from '../../../helpers/utils' 4import { computeResolutionsToTranscode } from '../../../helpers/utils'
@@ -7,6 +7,7 @@ import { JobQueue } from '../job-queue'
7import { federateVideoIfNeeded } from '../../activitypub' 7import { federateVideoIfNeeded } from '../../activitypub'
8import { retryTransactionWrapper } from '../../../helpers/database-utils' 8import { retryTransactionWrapper } from '../../../helpers/database-utils'
9import { sequelizeTypescript } from '../../../initializers' 9import { sequelizeTypescript } from '../../../initializers'
10import * as Bluebird from 'bluebird'
10 11
11export type VideoFilePayload = { 12export type VideoFilePayload = {
12 videoUUID: string 13 videoUUID: string
@@ -20,7 +21,7 @@ export type VideoFileImportPayload = {
20 filePath: string 21 filePath: string
21} 22}
22 23
23async function processVideoFileImport (job: kue.Job) { 24async function processVideoFileImport (job: Bull.Job) {
24 const payload = job.data as VideoFileImportPayload 25 const payload = job.data as VideoFileImportPayload
25 logger.info('Processing video file import in job %d.', job.id) 26 logger.info('Processing video file import in job %d.', job.id)
26 27
@@ -37,7 +38,7 @@ async function processVideoFileImport (job: kue.Job) {
37 return video 38 return video
38} 39}
39 40
40async function processVideoFile (job: kue.Job) { 41async function processVideoFile (job: Bull.Job) {
41 const payload = job.data as VideoFilePayload 42 const payload = job.data as VideoFilePayload
42 logger.info('Processing video file in job %d.', job.id) 43 logger.info('Processing video file in job %d.', job.id)
43 44
@@ -109,7 +110,7 @@ async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boole
109 ) 110 )
110 111
111 if (resolutionsEnabled.length !== 0) { 112 if (resolutionsEnabled.length !== 0) {
112 const tasks: Promise<any>[] = [] 113 const tasks: Bluebird<any>[] = []
113 114
114 for (const resolution of resolutionsEnabled) { 115 for (const resolution of resolutionsEnabled) {
115 const dataInput = { 116 const dataInput = {
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 695fe0eea..77aaa7fa8 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,13 +1,12 @@
1import * as kue from 'kue' 1import * as Bull from 'bull'
2import { JobState, JobType } from '../../../shared/models' 2import { JobState, JobType } from '../../../shared/models'
3import { logger } from '../../helpers/logger' 3import { logger } from '../../helpers/logger'
4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers' 4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_REQUEST_TTL } from '../../initializers'
5import { Redis } from '../redis'
6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 5import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 6import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
8import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 7import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
9import { EmailPayload, processEmail } from './handlers/email' 8import { EmailPayload, processEmail } from './handlers/email'
10import { processVideoFile, processVideoFileImport, VideoFilePayload, VideoFileImportPayload } from './handlers/video-file' 9import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file'
11import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' 10import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow'
12 11
13type CreateJobArgument = 12type CreateJobArgument =
@@ -19,7 +18,7 @@ type CreateJobArgument =
19 { type: 'video-file', payload: VideoFilePayload } | 18 { type: 'video-file', payload: VideoFilePayload } |
20 { type: 'email', payload: EmailPayload } 19 { type: 'email', payload: EmailPayload }
21 20
22const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { 21const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = {
23 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 22 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
24 'activitypub-http-unicast': processActivityPubHttpUnicast, 23 'activitypub-http-unicast': processActivityPubHttpUnicast,
25 'activitypub-http-fetcher': processActivityPubHttpFetcher, 24 'activitypub-http-fetcher': processActivityPubHttpFetcher,
@@ -29,18 +28,28 @@ const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
29 'email': processEmail 28 'email': processEmail
30} 29}
31 30
32const jobsWithTLL: JobType[] = [ 31const jobsWithRequestTimeout: { [ id in JobType ]?: boolean } = {
32 'activitypub-http-broadcast': true,
33 'activitypub-http-unicast': true,
34 'activitypub-http-fetcher': true,
35 'activitypub-follow': true
36}
37
38const jobTypes: JobType[] = [
39 'activitypub-follow',
33 'activitypub-http-broadcast', 40 'activitypub-http-broadcast',
34 'activitypub-http-unicast',
35 'activitypub-http-fetcher', 41 'activitypub-http-fetcher',
36 'activitypub-follow' 42 'activitypub-http-unicast',
43 'email',
44 'video-file',
45 'video-file-import'
37] 46]
38 47
39class JobQueue { 48class JobQueue {
40 49
41 private static instance: JobQueue 50 private static instance: JobQueue
42 51
43 private jobQueue: kue.Queue 52 private queues: { [ id in JobType ]?: Bull.Queue } = {}
44 private initialized = false 53 private initialized = false
45 private jobRedisPrefix: string 54 private jobRedisPrefix: string
46 55
@@ -51,9 +60,8 @@ class JobQueue {
51 if (this.initialized === true) return 60 if (this.initialized === true) return
52 this.initialized = true 61 this.initialized = true
53 62
54 this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST 63 this.jobRedisPrefix = 'bull-' + CONFIG.WEBSERVER.HOST
55 64 const queueOptions = {
56 this.jobQueue = kue.createQueue({
57 prefix: this.jobRedisPrefix, 65 prefix: this.jobRedisPrefix,
58 redis: { 66 redis: {
59 host: CONFIG.REDIS.HOSTNAME, 67 host: CONFIG.REDIS.HOSTNAME,
@@ -61,120 +69,94 @@ class JobQueue {
61 auth: CONFIG.REDIS.AUTH, 69 auth: CONFIG.REDIS.AUTH,
62 db: CONFIG.REDIS.DB 70 db: CONFIG.REDIS.DB
63 } 71 }
64 }) 72 }
65
66 this.jobQueue.setMaxListeners(20)
67 73
68 this.jobQueue.on('error', err => { 74 for (const handlerName of Object.keys(handlers)) {
69 logger.error('Error in job queue.', { err }) 75 const queue = new Bull(handlerName, queueOptions)
70 process.exit(-1) 76 const handler = handlers[handlerName]
71 })
72 this.jobQueue.watchStuckJobs(5000)
73 77
74 await this.reactiveStuckJobs() 78 queue.process(JOB_CONCURRENCY[handlerName], handler)
79 .catch(err => logger.error('Cannot execute job queue %s.', handlerName, { err }))
75 80
76 for (const handlerName of Object.keys(handlers)) { 81 queue.on('error', err => {
77 this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { 82 logger.error('Error in job queue %s.', handlerName, { err })
78 try { 83 process.exit(-1)
79 const res = await handlers[ handlerName ](job)
80 return done(null, res)
81 } catch (err) {
82 logger.error('Cannot execute job %d.', job.id, { err })
83 return done(err)
84 }
85 }) 84 })
85
86 this.queues[handlerName] = queue
86 } 87 }
87 } 88 }
88 89
89 createJob (obj: CreateJobArgument, priority = 'normal') { 90 createJob (obj: CreateJobArgument) {
90 return new Promise((res, rej) => { 91 const queue = this.queues[obj.type]
91 let job = this.jobQueue 92 if (queue === undefined) {
92 .create(obj.type, obj.payload) 93 logger.error('Unknown queue %s: cannot create job.', obj.type)
93 .priority(priority) 94 return
94 .attempts(JOB_ATTEMPTS[obj.type]) 95 }
95 .backoff({ delay: 60 * 1000, type: 'exponential' })
96 96
97 if (jobsWithTLL.indexOf(obj.type) !== -1) { 97 const jobArgs: Bull.JobOptions = {
98 job = job.ttl(JOB_REQUEST_TTL) 98 backoff: { delay: 60 * 1000, type: 'exponential' },
99 } 99 attempts: JOB_ATTEMPTS[obj.type]
100 }
100 101
101 return job.save(err => { 102 if (jobsWithRequestTimeout[obj.type] === true) {
102 if (err) return rej(err) 103 jobArgs.timeout = JOB_REQUEST_TTL
104 }
103 105
104 return res() 106 return queue.add(obj.payload, jobArgs)
105 })
106 })
107 } 107 }
108 108
109 async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC'): Promise<kue.Job[]> { 109 async listForApi (state: JobState, start: number, count: number, asc?: boolean): Promise<Bull.Job[]> {
110 const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) 110 let results: Bull.Job[] = []
111 111
112 const jobPromises = jobStrings 112 // TODO: optimize
113 .map(s => s.split('|')) 113 for (const jobType of jobTypes) {
114 .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) 114 const queue = this.queues[ jobType ]
115 if (queue === undefined) {
116 logger.error('Unknown queue %s to list jobs.', jobType)
117 continue
118 }
115 119
116 return Promise.all(jobPromises) 120 // FIXME: Bull queue typings does not have getJobs method
117 } 121 const jobs = await (queue as any).getJobs(state, 0, start + count, asc)
122 results = results.concat(jobs)
123 }
118 124
119 count (state: JobState) { 125 results.sort((j1: any, j2: any) => {
120 return new Promise<number>((res, rej) => { 126 if (j1.timestamp < j2.timestamp) return -1
121 this.jobQueue[state + 'Count']((err, total) => { 127 else if (j1.timestamp === j2.timestamp) return 0
122 if (err) return rej(err)
123 128
124 return res(total) 129 return 1
125 })
126 }) 130 })
127 }
128 131
129 removeOldJobs () { 132 if (asc === false) results.reverse()
130 const now = new Date().getTime()
131 kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
132 if (err) {
133 logger.error('Cannot get jobs when removing old jobs.', { err })
134 return
135 }
136 133
137 for (const job of jobs) { 134 return results.slice(start, start + count)
138 if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
139 job.remove()
140 }
141 }
142 })
143 } 135 }
144 136
145 private reactiveStuckJobs () { 137 async count (state: JobState): Promise<number> {
146 const promises: Promise<any>[] = [] 138 let total = 0
147
148 this.jobQueue.active((err, ids) => {
149 if (err) throw err
150 139
151 for (const id of ids) { 140 for (const type of jobTypes) {
152 kue.Job.get(id, (err, job) => { 141 const queue = this.queues[ type ]
153 if (err) throw err 142 if (queue === undefined) {
143 logger.error('Unknown queue %s to count jobs.', type)
144 continue
145 }
154 146
155 const p = new Promise((res, rej) => { 147 const counts = await queue.getJobCounts()
156 job.inactive(err => {
157 if (err) return rej(err)
158 return res()
159 })
160 })
161 148
162 promises.push(p) 149 total += counts[ state ]
163 }) 150 }
164 }
165 })
166 151
167 return Promise.all(promises) 152 return total
168 } 153 }
169 154
170 private getJob (id: number) { 155 removeOldJobs () {
171 return new Promise<kue.Job>((res, rej) => { 156 for (const key of Object.keys(this.queues)) {
172 kue.Job.get(id, (err, job) => { 157 const queue = this.queues[key]
173 if (err) return rej(err) 158 queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
174 159 }
175 return res(job)
176 })
177 })
178 } 160 }
179 161
180 static get Instance () { 162 static get Instance () {