aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-08 10:42:08 +0200
committerChocobozzz <me@florianbigard.com>2022-08-09 09:18:07 +0200
commit5a921e7b74910414626bfc9672b857e987e3ebed (patch)
treef627e2ccc11c55bcba9e630951e72c5f94864c12 /server/lib/job-queue
parent5e2afe4290103bf0d54ae7b3e62781f2a00487c9 (diff)
downloadPeerTube-5a921e7b74910414626bfc9672b857e987e3ebed.tar.gz
PeerTube-5a921e7b74910414626bfc9672b857e987e3ebed.tar.zst
PeerTube-5a921e7b74910414626bfc9672b857e987e3ebed.zip
Move to bullmq
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-cleaner.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-follow.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-refresher.ts2
-rw-r--r--server/lib/job-queue/handlers/actor-keys.ts2
-rw-r--r--server/lib/job-queue/handlers/email.ts2
-rw-r--r--server/lib/job-queue/handlers/manage-video-torrent.ts2
-rw-r--r--server/lib/job-queue/handlers/move-to-object-storage.ts2
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts2
-rw-r--r--server/lib/job-queue/handlers/video-import.ts2
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts2
-rw-r--r--server/lib/job-queue/handlers/video-redundancy.ts2
-rw-r--r--server/lib/job-queue/handlers/video-studio-edition.ts2
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts2
-rw-r--r--server/lib/job-queue/job-queue.ts188
17 files changed, 148 insertions, 72 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts
index 3d7dc6fb9..84c0a2de2 100644
--- a/server/lib/job-queue/handlers/activitypub-cleaner.ts
+++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts
@@ -1,5 +1,5 @@
1import { map } from 'bluebird' 1import { map } from 'bluebird'
2import { Job } from 'bull' 2import { Job } from 'bullmq'
3import { 3import {
4 isAnnounceActivityValid, 4 isAnnounceActivityValid,
5 isDislikeActivityValid, 5 isDislikeActivityValid,
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts
index 2ee98171c..944da5be1 100644
--- a/server/lib/job-queue/handlers/activitypub-follow.ts
+++ b/server/lib/job-queue/handlers/activitypub-follow.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url' 2import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url'
3import { ActivitypubFollowPayload } from '@shared/models' 3import { ActivitypubFollowPayload } from '@shared/models'
4import { sanitizeHost } from '../../../helpers/core-utils' 4import { sanitizeHost } from '../../../helpers/core-utils'
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 709e8501f..354c608fb 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -1,5 +1,5 @@
1import { map } from 'bluebird' 1import { map } from 'bluebird'
2import { Job } from 'bull' 2import { Job } from 'bullmq'
3import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' 3import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
4import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' 4import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
5import { ActivitypubHttpBroadcastPayload } from '@shared/models' 5import { ActivitypubHttpBroadcastPayload } from '@shared/models'
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index de533de6c..e0b841887 100644
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models' 2import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { VideoModel } from '../../../models/video/video' 4import { VideoModel } from '../../../models/video/video'
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index 99bcd3e8d..837a597a5 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' 2import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
3import { ActivitypubHttpUnicastPayload } from '@shared/models' 3import { ActivitypubHttpUnicastPayload } from '@shared/models'
4import { logger } from '../../../helpers/logger' 4import { logger } from '../../../helpers/logger'
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
index 92ceed180..600f858a0 100644
--- a/server/lib/job-queue/handlers/activitypub-refresher.ts
+++ b/server/lib/job-queue/handlers/activitypub-refresher.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists' 2import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists'
3import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos' 3import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos'
4import { loadVideoByUrl } from '@server/lib/model-loaders' 4import { loadVideoByUrl } from '@server/lib/model-loaders'
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts
index 9d5a65376..4a5bad9fb 100644
--- a/server/lib/job-queue/handlers/actor-keys.ts
+++ b/server/lib/job-queue/handlers/actor-keys.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors' 2import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors'
3import { ActorModel } from '@server/models/actor/actor' 3import { ActorModel } from '@server/models/actor/actor'
4import { ActorKeysPayload } from '@shared/models' 4import { ActorKeysPayload } from '@shared/models'
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts
index 6fc1caa84..b5b9475b1 100644
--- a/server/lib/job-queue/handlers/email.ts
+++ b/server/lib/job-queue/handlers/email.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { EmailPayload } from '@shared/models' 2import { EmailPayload } from '@shared/models'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { Emailer } from '../../emailer' 4import { Emailer } from '../../emailer'
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts
index dfd4e6140..4505ca79e 100644
--- a/server/lib/job-queue/handlers/manage-video-torrent.ts
+++ b/server/lib/job-queue/handlers/manage-video-torrent.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' 2import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent'
3import { VideoModel } from '@server/models/video/video' 3import { VideoModel } from '@server/models/video/video'
4import { VideoFileModel } from '@server/models/video/video-file' 4import { VideoFileModel } from '@server/models/video/video-file'
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts
index 49064052c..d608fd865 100644
--- a/server/lib/job-queue/handlers/move-to-object-storage.ts
+++ b/server/lib/job-queue/handlers/move-to-object-storage.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { remove } from 'fs-extra' 2import { remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { logger, loggerTagsFactory } from '@server/helpers/logger' 4import { logger, loggerTagsFactory } from '@server/helpers/logger'
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index 71c5444af..40c44cf52 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { copy, stat } from 'fs-extra' 2import { copy, stat } from 'fs-extra'
3import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' 3import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
4import { CONFIG } from '@server/initializers/config' 4import { CONFIG } from '@server/initializers/config'
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 4cde26aef..e5cd35865 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { move, remove, stat } from 'fs-extra' 2import { move, remove, stat } from 'fs-extra'
3import { retryTransactionWrapper } from '@server/helpers/database-utils' 3import { retryTransactionWrapper } from '@server/helpers/database-utils'
4import { YoutubeDLWrapper } from '@server/helpers/youtube-dl' 4import { YoutubeDLWrapper } from '@server/helpers/youtube-dl'
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index 78d0b2192..79002258c 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { readdir, remove } from 'fs-extra' 2import { readdir, remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg' 4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg'
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts
index 9cb7a6589..75ab2cd02 100644
--- a/server/lib/job-queue/handlers/video-redundancy.ts
+++ b/server/lib/job-queue/handlers/video-redundancy.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' 2import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler'
3import { VideoRedundancyPayload } from '@shared/models' 3import { VideoRedundancyPayload } from '@shared/models'
4import { logger } from '../../../helpers/logger' 4import { logger } from '../../../helpers/logger'
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts
index 735150d57..078243538 100644
--- a/server/lib/job-queue/handlers/video-studio-edition.ts
+++ b/server/lib/job-queue/handlers/video-studio-edition.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { move, remove } from 'fs-extra' 2import { move, remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg' 4import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg'
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 4e5e97919..8dbae8c42 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' 2import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg'
3import { Hooks } from '@server/lib/plugins/hooks' 3import { Hooks } from '@server/lib/plugins/hooks'
4import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' 4import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0ae325f4d..0cf5d53ce 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,7 +1,19 @@
1import Bull, { Job, JobOptions, Queue } from 'bull' 1import {
2 Job,
3 JobsOptions,
4 Queue,
5 QueueEvents,
6 QueueEventsOptions,
7 QueueOptions,
8 QueueScheduler,
9 QueueSchedulerOptions,
10 Worker,
11 WorkerOptions
12} from 'bullmq'
2import { jobStates } from '@server/helpers/custom-validators/jobs' 13import { jobStates } from '@server/helpers/custom-validators/jobs'
3import { CONFIG } from '@server/initializers/config' 14import { CONFIG } from '@server/initializers/config'
4import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 15import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
16import { timeoutPromise } from '@shared/core-utils'
5import { 17import {
6 ActivitypubFollowPayload, 18 ActivitypubFollowPayload,
7 ActivitypubHttpBroadcastPayload, 19 ActivitypubHttpBroadcastPayload,
@@ -120,7 +132,11 @@ class JobQueue {
120 132
121 private static instance: JobQueue 133 private static instance: JobQueue
122 134
135 private workers: { [id in JobType]?: Worker } = {}
123 private queues: { [id in JobType]?: Queue } = {} 136 private queues: { [id in JobType]?: Queue } = {}
137 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
138 private queueEvents: { [id in JobType]?: QueueEvents } = {}
139
124 private initialized = false 140 private initialized = false
125 private jobRedisPrefix: string 141 private jobRedisPrefix: string
126 142
@@ -134,75 +150,131 @@ class JobQueue {
134 150
135 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST 151 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
136 152
137 const queueOptions: Bull.QueueOptions = { 153 for (const handlerName of (Object.keys(handlers) as JobType[])) {
154 this.buildWorker(handlerName, produceOnly)
155 this.buildQueue(handlerName)
156 this.buildQueueScheduler(handlerName, produceOnly)
157 this.buildQueueEvent(handlerName, produceOnly)
158 }
159
160 this.addRepeatableJobs()
161 }
162
163 private buildWorker (handlerName: JobType, produceOnly: boolean) {
164 const workerOptions: WorkerOptions = {
165 autorun: !produceOnly,
166 concurrency: this.getJobConcurrency(handlerName),
138 prefix: this.jobRedisPrefix, 167 prefix: this.jobRedisPrefix,
139 redis: { 168 connection: this.getRedisConnection()
140 password: CONFIG.REDIS.AUTH,
141 db: CONFIG.REDIS.DB,
142 host: CONFIG.REDIS.HOSTNAME,
143 port: CONFIG.REDIS.PORT,
144 path: CONFIG.REDIS.SOCKET
145 },
146 settings: {
147 maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts
148 }
149 } 169 }
150 170
151 for (const handlerName of (Object.keys(handlers) as JobType[])) { 171 const handler = function (job: Job) {
152 const queue = new Bull(handlerName, queueOptions) 172 const timeout = JOB_TTL[handlerName]
173 const p = handlers[handlerName](job)
153 174
154 if (produceOnly) { 175 if (!timeout) return p
155 queue.pause(true)
156 .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err }))
157 }
158 176
159 const handler = handlers[handlerName] 177 return timeoutPromise(p, timeout)
178 }
160 179
161 queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => { 180 const processor = async (jobArg: Job<any>) => {
162 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) 181 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
163 182
164 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') 183 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
165 }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) 184 }
166 185
167 queue.on('failed', (job, err) => { 186 const worker = new Worker(handlerName, processor, workerOptions)
168 const logLevel = silentFailure.has(handlerName)
169 ? 'debug'
170 : 'error'
171 187
172 logger.log(logLevel, 'Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) 188 worker.on('failed', (job, err) => {
189 const logLevel = silentFailure.has(handlerName)
190 ? 'debug'
191 : 'error'
173 192
174 if (errorHandlers[job.name]) { 193 logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
175 errorHandlers[job.name](job, err)
176 .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
177 }
178 })
179 194
180 queue.on('error', err => { 195 if (errorHandlers[job.name]) {
181 logger.error('Error in job queue %s.', handlerName, { err }) 196 errorHandlers[job.name](job, err)
182 }) 197 .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
198 }
199 })
183 200
184 this.queues[handlerName] = queue 201 worker.on('error', err => {
202 logger.error('Error in job queue %s.', handlerName, { err })
203 })
204
205 this.workers[handlerName] = worker
206 }
207
208 private buildQueue (handlerName: JobType) {
209 const queueOptions: QueueOptions = {
210 connection: this.getRedisConnection(),
211 prefix: this.jobRedisPrefix
185 } 212 }
186 213
187 this.addRepeatableJobs() 214 this.queues[handlerName] = new Queue(handlerName, queueOptions)
215 }
216
217 private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) {
218 const queueSchedulerOptions: QueueSchedulerOptions = {
219 autorun: !produceOnly,
220 connection: this.getRedisConnection(),
221 prefix: this.jobRedisPrefix,
222 maxStalledCount: 10
223 }
224 this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions)
188 } 225 }
189 226
190 terminate () { 227 private buildQueueEvent (handlerName: JobType, produceOnly: boolean) {
191 for (const queueName of Object.keys(this.queues)) { 228 const queueEventsOptions: QueueEventsOptions = {
192 const queue = this.queues[queueName] 229 autorun: !produceOnly,
193 queue.close() 230 connection: this.getRedisConnection(),
231 prefix: this.jobRedisPrefix
194 } 232 }
233 this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions)
234 }
235
236 private getRedisConnection () {
237 return {
238 password: CONFIG.REDIS.AUTH,
239 db: CONFIG.REDIS.DB,
240 host: CONFIG.REDIS.HOSTNAME,
241 port: CONFIG.REDIS.PORT,
242 path: CONFIG.REDIS.SOCKET
243 }
244 }
245
246 async terminate () {
247 const promises = Object.keys(this.workers)
248 .map(handlerName => {
249 const worker: Worker = this.workers[handlerName]
250 const queue: Queue = this.queues[handlerName]
251 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
252 const queueEvent: QueueEvents = this.queueEvents[handlerName]
253
254 return Promise.all([
255 worker.close(false),
256 queue.close(),
257 queueScheduler.close(),
258 queueEvent.close()
259 ])
260 })
261
262 return Promise.all(promises)
195 } 263 }
196 264
197 async pause () { 265 async pause () {
198 for (const handler of Object.keys(this.queues)) { 266 for (const handler of Object.keys(this.workers)) {
199 await this.queues[handler].pause(true) 267 const worker: Worker = this.workers[handler]
268
269 await worker.pause()
200 } 270 }
201 } 271 }
202 272
203 async resume () { 273 resume () {
204 for (const handler of Object.keys(this.queues)) { 274 for (const handler of Object.keys(this.workers)) {
205 await this.queues[handler].resume(true) 275 const worker: Worker = this.workers[handler]
276
277 worker.resume()
206 } 278 }
207 } 279 }
208 280
@@ -211,22 +283,21 @@ class JobQueue {
211 .catch(err => logger.error('Cannot create job.', { err, obj })) 283 .catch(err => logger.error('Cannot create job.', { err, obj }))
212 } 284 }
213 285
214 createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { 286 async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
215 const queue: Queue = this.queues[obj.type] 287 const queue: Queue = this.queues[obj.type]
216 if (queue === undefined) { 288 if (queue === undefined) {
217 logger.error('Unknown queue %s: cannot create job.', obj.type) 289 logger.error('Unknown queue %s: cannot create job.', obj.type)
218 return 290 return
219 } 291 }
220 292
221 const jobArgs: JobOptions = { 293 const jobArgs: JobsOptions = {
222 backoff: { delay: 60 * 1000, type: 'exponential' }, 294 backoff: { delay: 60 * 1000, type: 'exponential' },
223 attempts: JOB_ATTEMPTS[obj.type], 295 attempts: JOB_ATTEMPTS[obj.type],
224 timeout: JOB_TTL[obj.type],
225 priority: options.priority, 296 priority: options.priority,
226 delay: options.delay 297 delay: options.delay
227 } 298 }
228 299
229 return queue.add(obj.payload, jobArgs) 300 return queue.add('job', obj.payload, jobArgs)
230 } 301 }
231 302
232 async listForApi (options: { 303 async listForApi (options: {
@@ -244,7 +315,8 @@ class JobQueue {
244 const filteredJobTypes = this.filterJobTypes(jobType) 315 const filteredJobTypes = this.filterJobTypes(jobType)
245 316
246 for (const jobType of filteredJobTypes) { 317 for (const jobType of filteredJobTypes) {
247 const queue = this.queues[jobType] 318 const queue: Queue = this.queues[jobType]
319
248 if (queue === undefined) { 320 if (queue === undefined) {
249 logger.error('Unknown queue %s to list jobs.', jobType) 321 logger.error('Unknown queue %s to list jobs.', jobType)
250 continue 322 continue
@@ -297,18 +369,22 @@ class JobQueue {
297 369
298 async removeOldJobs () { 370 async removeOldJobs () {
299 for (const key of Object.keys(this.queues)) { 371 for (const key of Object.keys(this.queues)) {
300 const queue = this.queues[key] 372 const queue: Queue = this.queues[key]
301 await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') 373 await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed')
302 } 374 }
303 } 375 }
304 376
377 waitJob (job: Job) {
378 return job.waitUntilFinished(this.queueEvents[job.queueName])
379 }
380
305 private addRepeatableJobs () { 381 private addRepeatableJobs () {
306 this.queues['videos-views-stats'].add({}, { 382 this.queues['videos-views-stats'].add('job', {}, {
307 repeat: REPEAT_JOBS['videos-views-stats'] 383 repeat: REPEAT_JOBS['videos-views-stats']
308 }).catch(err => logger.error('Cannot add repeatable job.', { err })) 384 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
309 385
310 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { 386 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
311 this.queues['activitypub-cleaner'].add({}, { 387 this.queues['activitypub-cleaner'].add('job', {}, {
312 repeat: REPEAT_JOBS['activitypub-cleaner'] 388 repeat: REPEAT_JOBS['activitypub-cleaner']
313 }).catch(err => logger.error('Cannot add repeatable job.', { err })) 389 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
314 } 390 }