aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-08-08 10:42:08 +0200
committerChocobozzz <me@florianbigard.com>2022-08-09 09:18:07 +0200
commit5a921e7b74910414626bfc9672b857e987e3ebed (patch)
treef627e2ccc11c55bcba9e630951e72c5f94864c12 /server
parent5e2afe4290103bf0d54ae7b3e62781f2a00487c9 (diff)
downloadPeerTube-5a921e7b74910414626bfc9672b857e987e3ebed.tar.gz
PeerTube-5a921e7b74910414626bfc9672b857e987e3ebed.tar.zst
PeerTube-5a921e7b74910414626bfc9672b857e987e3ebed.zip
Move to bullmq
Diffstat (limited to 'server')
-rw-r--r--server/controllers/api/jobs.ts13
-rw-r--r--server/controllers/api/videos/update.ts4
-rw-r--r--server/controllers/api/videos/upload.ts4
-rw-r--r--server/helpers/custom-validators/jobs.ts2
-rw-r--r--server/helpers/ffmpeg/ffmpeg-commons.ts4
-rw-r--r--server/helpers/ffmpeg/ffmpeg-vod.ts2
-rw-r--r--server/initializers/constants.ts4
-rw-r--r--server/lib/job-queue/handlers/activitypub-cleaner.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-follow.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts2
-rw-r--r--server/lib/job-queue/handlers/activitypub-refresher.ts2
-rw-r--r--server/lib/job-queue/handlers/actor-keys.ts2
-rw-r--r--server/lib/job-queue/handlers/email.ts2
-rw-r--r--server/lib/job-queue/handlers/manage-video-torrent.ts2
-rw-r--r--server/lib/job-queue/handlers/move-to-object-storage.ts2
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts2
-rw-r--r--server/lib/job-queue/handlers/video-import.ts2
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts2
-rw-r--r--server/lib/job-queue/handlers/video-redundancy.ts2
-rw-r--r--server/lib/job-queue/handlers/video-studio-edition.ts2
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts2
-rw-r--r--server/lib/job-queue/job-queue.ts188
-rw-r--r--server/lib/transcoding/transcoding.ts2
25 files changed, 166 insertions, 89 deletions
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts
index c61b7362f..6a53e3083 100644
--- a/server/controllers/api/jobs.ts
+++ b/server/controllers/api/jobs.ts
@@ -1,3 +1,4 @@
1import { Job as BullJob } from 'bullmq'
1import express from 'express' 2import express from 'express'
2import { HttpStatusCode, Job, JobState, JobType, ResultList, UserRight } from '@shared/models' 3import { HttpStatusCode, Job, JobState, JobType, ResultList, UserRight } from '@shared/models'
3import { isArray } from '../../helpers/custom-validators/misc' 4import { isArray } from '../../helpers/custom-validators/misc'
@@ -25,7 +26,7 @@ jobsRouter.post('/pause',
25jobsRouter.post('/resume', 26jobsRouter.post('/resume',
26 authenticate, 27 authenticate,
27 ensureUserHasRight(UserRight.MANAGE_JOBS), 28 ensureUserHasRight(UserRight.MANAGE_JOBS),
28 asyncMiddleware(resumeJobQueue) 29 resumeJobQueue
29) 30)
30 31
31jobsRouter.get('/:state?', 32jobsRouter.get('/:state?',
@@ -54,8 +55,8 @@ async function pauseJobQueue (req: express.Request, res: express.Response) {
54 return res.sendStatus(HttpStatusCode.NO_CONTENT_204) 55 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
55} 56}
56 57
57async function resumeJobQueue (req: express.Request, res: express.Response) { 58function resumeJobQueue (req: express.Request, res: express.Response) {
58 await JobQueue.Instance.resume() 59 JobQueue.Instance.resume()
59 60
60 return res.sendStatus(HttpStatusCode.NO_CONTENT_204) 61 return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
61} 62}
@@ -82,7 +83,7 @@ async function listJobs (req: express.Request, res: express.Response) {
82 return res.json(result) 83 return res.json(result)
83} 84}
84 85
85async function formatJob (job: any, state?: JobState): Promise<Job> { 86async function formatJob (job: BullJob, state?: JobState): Promise<Job> {
86 const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 87 const error = isArray(job.stacktrace) && job.stacktrace.length !== 0
87 ? job.stacktrace[0] 88 ? job.stacktrace[0]
88 : null 89 : null
@@ -90,9 +91,9 @@ async function formatJob (job: any, state?: JobState): Promise<Job> {
90 return { 91 return {
91 id: job.id, 92 id: job.id,
92 state: state || await job.getState(), 93 state: state || await job.getState(),
93 type: job.queue.name as JobType, 94 type: job.queueName as JobType,
94 data: job.data, 95 data: job.data,
95 progress: await job.progress(), 96 progress: job.progress as number,
96 priority: job.opts.priority, 97 priority: job.opts.priority,
97 error, 98 error,
98 createdAt: new Date(job.timestamp), 99 createdAt: new Date(job.timestamp),
diff --git a/server/controllers/api/videos/update.ts b/server/controllers/api/videos/update.ts
index 65a7321fd..1545a2232 100644
--- a/server/controllers/api/videos/update.ts
+++ b/server/controllers/api/videos/update.ts
@@ -199,7 +199,7 @@ async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfo
199 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } 199 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id }
200 200
201 const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) 201 const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
202 await job.finished() 202 await JobQueue.Instance.waitJob(job)
203 } 203 }
204 204
205 const hls = video.getHLSPlaylist() 205 const hls = video.getHLSPlaylist()
@@ -208,7 +208,7 @@ async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfo
208 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } 208 const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id }
209 209
210 const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) 210 const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
211 await job.finished() 211 await JobQueue.Instance.waitJob(job)
212 } 212 }
213 213
214 // Refresh video since files have changed 214 // Refresh video since files have changed
diff --git a/server/controllers/api/videos/upload.ts b/server/controllers/api/videos/upload.ts
index 3ce66c9ca..4a9d7b619 100644
--- a/server/controllers/api/videos/upload.ts
+++ b/server/controllers/api/videos/upload.ts
@@ -17,6 +17,7 @@ import {
17import { VideoPathManager } from '@server/lib/video-path-manager' 17import { VideoPathManager } from '@server/lib/video-path-manager'
18import { buildNextVideoState } from '@server/lib/video-state' 18import { buildNextVideoState } from '@server/lib/video-state'
19import { openapiOperationDoc } from '@server/middlewares/doc' 19import { openapiOperationDoc } from '@server/middlewares/doc'
20import { VideoSourceModel } from '@server/models/video/video-source'
20import { MVideoFile, MVideoFullLight } from '@server/types/models' 21import { MVideoFile, MVideoFullLight } from '@server/types/models'
21import { getLowercaseExtension } from '@shared/core-utils' 22import { getLowercaseExtension } from '@shared/core-utils'
22import { isAudioFile, uuidToShort } from '@shared/extra-utils' 23import { isAudioFile, uuidToShort } from '@shared/extra-utils'
@@ -44,7 +45,6 @@ import {
44import { ScheduleVideoUpdateModel } from '../../../models/video/schedule-video-update' 45import { ScheduleVideoUpdateModel } from '../../../models/video/schedule-video-update'
45import { VideoModel } from '../../../models/video/video' 46import { VideoModel } from '../../../models/video/video'
46import { VideoFileModel } from '../../../models/video/video-file' 47import { VideoFileModel } from '../../../models/video/video-file'
47import { VideoSourceModel } from '@server/models/video/video-source'
48 48
49const lTags = loggerTagsFactory('api', 'video') 49const lTags = loggerTagsFactory('api', 'video')
50const auditLogger = auditLoggerFactory('videos') 50const auditLogger = auditLoggerFactory('videos')
@@ -270,7 +270,7 @@ async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoF
270 const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' } 270 const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' }
271 271
272 const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) 272 const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
273 await job.finished() 273 await JobQueue.Instance.waitJob(job)
274 274
275 const refreshedVideo = await VideoModel.loadFull(video.id) 275 const refreshedVideo = await VideoModel.loadFull(video.id)
276 if (!refreshedVideo) return 276 if (!refreshedVideo) return
diff --git a/server/helpers/custom-validators/jobs.ts b/server/helpers/custom-validators/jobs.ts
index f6777ecd5..c168b3e91 100644
--- a/server/helpers/custom-validators/jobs.ts
+++ b/server/helpers/custom-validators/jobs.ts
@@ -2,7 +2,7 @@ import { JobState } from '../../../shared/models'
2import { exists } from './misc' 2import { exists } from './misc'
3import { jobTypes } from '@server/lib/job-queue/job-queue' 3import { jobTypes } from '@server/lib/job-queue/job-queue'
4 4
5const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed', 'paused' ] 5const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed', 'paused', 'waiting-children' ]
6 6
7function isValidJobState (value: JobState) { 7function isValidJobState (value: JobState) {
8 return exists(value) && jobStates.includes(value) 8 return exists(value) && jobStates.includes(value)
diff --git a/server/helpers/ffmpeg/ffmpeg-commons.ts b/server/helpers/ffmpeg/ffmpeg-commons.ts
index ee338889c..b01989899 100644
--- a/server/helpers/ffmpeg/ffmpeg-commons.ts
+++ b/server/helpers/ffmpeg/ffmpeg-commons.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import ffmpeg, { FfmpegCommand } from 'fluent-ffmpeg' 2import ffmpeg, { FfmpegCommand } from 'fluent-ffmpeg'
3import { execPromise } from '@server/helpers/core-utils' 3import { execPromise } from '@server/helpers/core-utils'
4import { logger, loggerTagsFactory } from '@server/helpers/logger' 4import { logger, loggerTagsFactory } from '@server/helpers/logger'
@@ -81,7 +81,7 @@ async function runCommand (options: {
81 command.on('progress', progress => { 81 command.on('progress', progress => {
82 if (!progress.percent) return 82 if (!progress.percent) return
83 83
84 job.progress(Math.round(progress.percent)) 84 job.updateProgress(Math.round(progress.percent))
85 .catch(err => logger.warn('Cannot set ffmpeg job progress.', { err, ...lTags() })) 85 .catch(err => logger.warn('Cannot set ffmpeg job progress.', { err, ...lTags() }))
86 }) 86 })
87 } 87 }
diff --git a/server/helpers/ffmpeg/ffmpeg-vod.ts b/server/helpers/ffmpeg/ffmpeg-vod.ts
index f84157e0f..7a81a1313 100644
--- a/server/helpers/ffmpeg/ffmpeg-vod.ts
+++ b/server/helpers/ffmpeg/ffmpeg-vod.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { FfmpegCommand } from 'fluent-ffmpeg' 2import { FfmpegCommand } from 'fluent-ffmpeg'
3import { readFile, writeFile } from 'fs-extra' 3import { readFile, writeFile } from 'fs-extra'
4import { dirname } from 'path' 4import { dirname } from 'path'
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index 8165a289d..db43c59be 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -1,4 +1,4 @@
1import { CronRepeatOptions, EveryRepeatOptions } from 'bull' 1import { RepeatOptions } from 'bullmq'
2import { randomBytes } from 'crypto' 2import { randomBytes } from 'crypto'
3import { invert } from 'lodash' 3import { invert } from 'lodash'
4import { join } from 'path' 4import { join } from 'path'
@@ -197,7 +197,7 @@ const JOB_TTL: { [id in JobType]: number } = {
197 'manage-video-torrent': 1000 * 3600 * 3, // 3 hours 197 'manage-video-torrent': 1000 * 3600 * 3, // 3 hours
198 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours 198 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours
199} 199}
200const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = { 200const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = {
201 'videos-views-stats': { 201 'videos-views-stats': {
202 cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour 202 cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
203 }, 203 },
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts
index 3d7dc6fb9..84c0a2de2 100644
--- a/server/lib/job-queue/handlers/activitypub-cleaner.ts
+++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts
@@ -1,5 +1,5 @@
1import { map } from 'bluebird' 1import { map } from 'bluebird'
2import { Job } from 'bull' 2import { Job } from 'bullmq'
3import { 3import {
4 isAnnounceActivityValid, 4 isAnnounceActivityValid,
5 isDislikeActivityValid, 5 isDislikeActivityValid,
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts
index 2ee98171c..944da5be1 100644
--- a/server/lib/job-queue/handlers/activitypub-follow.ts
+++ b/server/lib/job-queue/handlers/activitypub-follow.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url' 2import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url'
3import { ActivitypubFollowPayload } from '@shared/models' 3import { ActivitypubFollowPayload } from '@shared/models'
4import { sanitizeHost } from '../../../helpers/core-utils' 4import { sanitizeHost } from '../../../helpers/core-utils'
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index 709e8501f..354c608fb 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -1,5 +1,5 @@
1import { map } from 'bluebird' 1import { map } from 'bluebird'
2import { Job } from 'bull' 2import { Job } from 'bullmq'
3import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' 3import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
4import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' 4import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
5import { ActivitypubHttpBroadcastPayload } from '@shared/models' 5import { ActivitypubHttpBroadcastPayload } from '@shared/models'
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index de533de6c..e0b841887 100644
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models' 2import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { VideoModel } from '../../../models/video/video' 4import { VideoModel } from '../../../models/video/video'
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index 99bcd3e8d..837a597a5 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' 2import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send'
3import { ActivitypubHttpUnicastPayload } from '@shared/models' 3import { ActivitypubHttpUnicastPayload } from '@shared/models'
4import { logger } from '../../../helpers/logger' 4import { logger } from '../../../helpers/logger'
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts
index 92ceed180..600f858a0 100644
--- a/server/lib/job-queue/handlers/activitypub-refresher.ts
+++ b/server/lib/job-queue/handlers/activitypub-refresher.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists' 2import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists'
3import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos' 3import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos'
4import { loadVideoByUrl } from '@server/lib/model-loaders' 4import { loadVideoByUrl } from '@server/lib/model-loaders'
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts
index 9d5a65376..4a5bad9fb 100644
--- a/server/lib/job-queue/handlers/actor-keys.ts
+++ b/server/lib/job-queue/handlers/actor-keys.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors' 2import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors'
3import { ActorModel } from '@server/models/actor/actor' 3import { ActorModel } from '@server/models/actor/actor'
4import { ActorKeysPayload } from '@shared/models' 4import { ActorKeysPayload } from '@shared/models'
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts
index 6fc1caa84..b5b9475b1 100644
--- a/server/lib/job-queue/handlers/email.ts
+++ b/server/lib/job-queue/handlers/email.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { EmailPayload } from '@shared/models' 2import { EmailPayload } from '@shared/models'
3import { logger } from '../../../helpers/logger' 3import { logger } from '../../../helpers/logger'
4import { Emailer } from '../../emailer' 4import { Emailer } from '../../emailer'
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts
index dfd4e6140..4505ca79e 100644
--- a/server/lib/job-queue/handlers/manage-video-torrent.ts
+++ b/server/lib/job-queue/handlers/manage-video-torrent.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' 2import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent'
3import { VideoModel } from '@server/models/video/video' 3import { VideoModel } from '@server/models/video/video'
4import { VideoFileModel } from '@server/models/video/video-file' 4import { VideoFileModel } from '@server/models/video/video-file'
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts
index 49064052c..d608fd865 100644
--- a/server/lib/job-queue/handlers/move-to-object-storage.ts
+++ b/server/lib/job-queue/handlers/move-to-object-storage.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { remove } from 'fs-extra' 2import { remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { logger, loggerTagsFactory } from '@server/helpers/logger' 4import { logger, loggerTagsFactory } from '@server/helpers/logger'
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index 71c5444af..40c44cf52 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { copy, stat } from 'fs-extra' 2import { copy, stat } from 'fs-extra'
3import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' 3import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
4import { CONFIG } from '@server/initializers/config' 4import { CONFIG } from '@server/initializers/config'
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 4cde26aef..e5cd35865 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { move, remove, stat } from 'fs-extra' 2import { move, remove, stat } from 'fs-extra'
3import { retryTransactionWrapper } from '@server/helpers/database-utils' 3import { retryTransactionWrapper } from '@server/helpers/database-utils'
4import { YoutubeDLWrapper } from '@server/helpers/youtube-dl' 4import { YoutubeDLWrapper } from '@server/helpers/youtube-dl'
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index 78d0b2192..79002258c 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { readdir, remove } from 'fs-extra' 2import { readdir, remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg' 4import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg'
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts
index 9cb7a6589..75ab2cd02 100644
--- a/server/lib/job-queue/handlers/video-redundancy.ts
+++ b/server/lib/job-queue/handlers/video-redundancy.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' 2import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler'
3import { VideoRedundancyPayload } from '@shared/models' 3import { VideoRedundancyPayload } from '@shared/models'
4import { logger } from '../../../helpers/logger' 4import { logger } from '../../../helpers/logger'
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts
index 735150d57..078243538 100644
--- a/server/lib/job-queue/handlers/video-studio-edition.ts
+++ b/server/lib/job-queue/handlers/video-studio-edition.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { move, remove } from 'fs-extra' 2import { move, remove } from 'fs-extra'
3import { join } from 'path' 3import { join } from 'path'
4import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg' 4import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg'
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 4e5e97919..8dbae8c42 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' 2import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg'
3import { Hooks } from '@server/lib/plugins/hooks' 3import { Hooks } from '@server/lib/plugins/hooks'
4import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' 4import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 0ae325f4d..0cf5d53ce 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -1,7 +1,19 @@
1import Bull, { Job, JobOptions, Queue } from 'bull' 1import {
2 Job,
3 JobsOptions,
4 Queue,
5 QueueEvents,
6 QueueEventsOptions,
7 QueueOptions,
8 QueueScheduler,
9 QueueSchedulerOptions,
10 Worker,
11 WorkerOptions
12} from 'bullmq'
2import { jobStates } from '@server/helpers/custom-validators/jobs' 13import { jobStates } from '@server/helpers/custom-validators/jobs'
3import { CONFIG } from '@server/initializers/config' 14import { CONFIG } from '@server/initializers/config'
4import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' 15import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
16import { timeoutPromise } from '@shared/core-utils'
5import { 17import {
6 ActivitypubFollowPayload, 18 ActivitypubFollowPayload,
7 ActivitypubHttpBroadcastPayload, 19 ActivitypubHttpBroadcastPayload,
@@ -120,7 +132,11 @@ class JobQueue {
120 132
121 private static instance: JobQueue 133 private static instance: JobQueue
122 134
135 private workers: { [id in JobType]?: Worker } = {}
123 private queues: { [id in JobType]?: Queue } = {} 136 private queues: { [id in JobType]?: Queue } = {}
137 private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
138 private queueEvents: { [id in JobType]?: QueueEvents } = {}
139
124 private initialized = false 140 private initialized = false
125 private jobRedisPrefix: string 141 private jobRedisPrefix: string
126 142
@@ -134,75 +150,131 @@ class JobQueue {
134 150
135 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST 151 this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
136 152
137 const queueOptions: Bull.QueueOptions = { 153 for (const handlerName of (Object.keys(handlers) as JobType[])) {
154 this.buildWorker(handlerName, produceOnly)
155 this.buildQueue(handlerName)
156 this.buildQueueScheduler(handlerName, produceOnly)
157 this.buildQueueEvent(handlerName, produceOnly)
158 }
159
160 this.addRepeatableJobs()
161 }
162
163 private buildWorker (handlerName: JobType, produceOnly: boolean) {
164 const workerOptions: WorkerOptions = {
165 autorun: !produceOnly,
166 concurrency: this.getJobConcurrency(handlerName),
138 prefix: this.jobRedisPrefix, 167 prefix: this.jobRedisPrefix,
139 redis: { 168 connection: this.getRedisConnection()
140 password: CONFIG.REDIS.AUTH,
141 db: CONFIG.REDIS.DB,
142 host: CONFIG.REDIS.HOSTNAME,
143 port: CONFIG.REDIS.PORT,
144 path: CONFIG.REDIS.SOCKET
145 },
146 settings: {
147 maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts
148 }
149 } 169 }
150 170
151 for (const handlerName of (Object.keys(handlers) as JobType[])) { 171 const handler = function (job: Job) {
152 const queue = new Bull(handlerName, queueOptions) 172 const timeout = JOB_TTL[handlerName]
173 const p = handlers[handlerName](job)
153 174
154 if (produceOnly) { 175 if (!timeout) return p
155 queue.pause(true)
156 .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err }))
157 }
158 176
159 const handler = handlers[handlerName] 177 return timeoutPromise(p, timeout)
178 }
160 179
161 queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => { 180 const processor = async (jobArg: Job<any>) => {
162 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) 181 const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
163 182
164 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') 183 return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
165 }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) 184 }
166 185
167 queue.on('failed', (job, err) => { 186 const worker = new Worker(handlerName, processor, workerOptions)
168 const logLevel = silentFailure.has(handlerName)
169 ? 'debug'
170 : 'error'
171 187
172 logger.log(logLevel, 'Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) 188 worker.on('failed', (job, err) => {
189 const logLevel = silentFailure.has(handlerName)
190 ? 'debug'
191 : 'error'
173 192
174 if (errorHandlers[job.name]) { 193 logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
175 errorHandlers[job.name](job, err)
176 .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
177 }
178 })
179 194
180 queue.on('error', err => { 195 if (errorHandlers[job.name]) {
181 logger.error('Error in job queue %s.', handlerName, { err }) 196 errorHandlers[job.name](job, err)
182 }) 197 .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
198 }
199 })
183 200
184 this.queues[handlerName] = queue 201 worker.on('error', err => {
202 logger.error('Error in job queue %s.', handlerName, { err })
203 })
204
205 this.workers[handlerName] = worker
206 }
207
208 private buildQueue (handlerName: JobType) {
209 const queueOptions: QueueOptions = {
210 connection: this.getRedisConnection(),
211 prefix: this.jobRedisPrefix
185 } 212 }
186 213
187 this.addRepeatableJobs() 214 this.queues[handlerName] = new Queue(handlerName, queueOptions)
215 }
216
217 private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) {
218 const queueSchedulerOptions: QueueSchedulerOptions = {
219 autorun: !produceOnly,
220 connection: this.getRedisConnection(),
221 prefix: this.jobRedisPrefix,
222 maxStalledCount: 10
223 }
224 this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions)
188 } 225 }
189 226
190 terminate () { 227 private buildQueueEvent (handlerName: JobType, produceOnly: boolean) {
191 for (const queueName of Object.keys(this.queues)) { 228 const queueEventsOptions: QueueEventsOptions = {
192 const queue = this.queues[queueName] 229 autorun: !produceOnly,
193 queue.close() 230 connection: this.getRedisConnection(),
231 prefix: this.jobRedisPrefix
194 } 232 }
233 this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions)
234 }
235
236 private getRedisConnection () {
237 return {
238 password: CONFIG.REDIS.AUTH,
239 db: CONFIG.REDIS.DB,
240 host: CONFIG.REDIS.HOSTNAME,
241 port: CONFIG.REDIS.PORT,
242 path: CONFIG.REDIS.SOCKET
243 }
244 }
245
246 async terminate () {
247 const promises = Object.keys(this.workers)
248 .map(handlerName => {
249 const worker: Worker = this.workers[handlerName]
250 const queue: Queue = this.queues[handlerName]
251 const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName]
252 const queueEvent: QueueEvents = this.queueEvents[handlerName]
253
254 return Promise.all([
255 worker.close(false),
256 queue.close(),
257 queueScheduler.close(),
258 queueEvent.close()
259 ])
260 })
261
262 return Promise.all(promises)
195 } 263 }
196 264
197 async pause () { 265 async pause () {
198 for (const handler of Object.keys(this.queues)) { 266 for (const handler of Object.keys(this.workers)) {
199 await this.queues[handler].pause(true) 267 const worker: Worker = this.workers[handler]
268
269 await worker.pause()
200 } 270 }
201 } 271 }
202 272
203 async resume () { 273 resume () {
204 for (const handler of Object.keys(this.queues)) { 274 for (const handler of Object.keys(this.workers)) {
205 await this.queues[handler].resume(true) 275 const worker: Worker = this.workers[handler]
276
277 worker.resume()
206 } 278 }
207 } 279 }
208 280
@@ -211,22 +283,21 @@ class JobQueue {
211 .catch(err => logger.error('Cannot create job.', { err, obj })) 283 .catch(err => logger.error('Cannot create job.', { err, obj }))
212 } 284 }
213 285
214 createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { 286 async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
215 const queue: Queue = this.queues[obj.type] 287 const queue: Queue = this.queues[obj.type]
216 if (queue === undefined) { 288 if (queue === undefined) {
217 logger.error('Unknown queue %s: cannot create job.', obj.type) 289 logger.error('Unknown queue %s: cannot create job.', obj.type)
218 return 290 return
219 } 291 }
220 292
221 const jobArgs: JobOptions = { 293 const jobArgs: JobsOptions = {
222 backoff: { delay: 60 * 1000, type: 'exponential' }, 294 backoff: { delay: 60 * 1000, type: 'exponential' },
223 attempts: JOB_ATTEMPTS[obj.type], 295 attempts: JOB_ATTEMPTS[obj.type],
224 timeout: JOB_TTL[obj.type],
225 priority: options.priority, 296 priority: options.priority,
226 delay: options.delay 297 delay: options.delay
227 } 298 }
228 299
229 return queue.add(obj.payload, jobArgs) 300 return queue.add('job', obj.payload, jobArgs)
230 } 301 }
231 302
232 async listForApi (options: { 303 async listForApi (options: {
@@ -244,7 +315,8 @@ class JobQueue {
244 const filteredJobTypes = this.filterJobTypes(jobType) 315 const filteredJobTypes = this.filterJobTypes(jobType)
245 316
246 for (const jobType of filteredJobTypes) { 317 for (const jobType of filteredJobTypes) {
247 const queue = this.queues[jobType] 318 const queue: Queue = this.queues[jobType]
319
248 if (queue === undefined) { 320 if (queue === undefined) {
249 logger.error('Unknown queue %s to list jobs.', jobType) 321 logger.error('Unknown queue %s to list jobs.', jobType)
250 continue 322 continue
@@ -297,18 +369,22 @@ class JobQueue {
297 369
298 async removeOldJobs () { 370 async removeOldJobs () {
299 for (const key of Object.keys(this.queues)) { 371 for (const key of Object.keys(this.queues)) {
300 const queue = this.queues[key] 372 const queue: Queue = this.queues[key]
301 await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') 373 await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed')
302 } 374 }
303 } 375 }
304 376
377 waitJob (job: Job) {
378 return job.waitUntilFinished(this.queueEvents[job.queueName])
379 }
380
305 private addRepeatableJobs () { 381 private addRepeatableJobs () {
306 this.queues['videos-views-stats'].add({}, { 382 this.queues['videos-views-stats'].add('job', {}, {
307 repeat: REPEAT_JOBS['videos-views-stats'] 383 repeat: REPEAT_JOBS['videos-views-stats']
308 }).catch(err => logger.error('Cannot add repeatable job.', { err })) 384 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
309 385
310 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { 386 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
311 this.queues['activitypub-cleaner'].add({}, { 387 this.queues['activitypub-cleaner'].add('job', {}, {
312 repeat: REPEAT_JOBS['activitypub-cleaner'] 388 repeat: REPEAT_JOBS['activitypub-cleaner']
313 }).catch(err => logger.error('Cannot add repeatable job.', { err })) 389 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
314 } 390 }
diff --git a/server/lib/transcoding/transcoding.ts b/server/lib/transcoding/transcoding.ts
index 070c7ebda..07eee4122 100644
--- a/server/lib/transcoding/transcoding.ts
+++ b/server/lib/transcoding/transcoding.ts
@@ -1,4 +1,4 @@
1import { Job } from 'bull' 1import { Job } from 'bullmq'
2import { copyFile, ensureDir, move, remove, stat } from 'fs-extra' 2import { copyFile, ensureDir, move, remove, stat } from 'fs-extra'
3import { basename, extname as extnameUtil, join } from 'path' 3import { basename, extname as extnameUtil, join } from 'path'
4import { toEven } from '@server/helpers/core-utils' 4import { toEven } from '@server/helpers/core-utils'