diff options
Diffstat (limited to 'server')
25 files changed, 166 insertions, 89 deletions
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts index c61b7362f..6a53e3083 100644 --- a/server/controllers/api/jobs.ts +++ b/server/controllers/api/jobs.ts | |||
@@ -1,3 +1,4 @@ | |||
1 | import { Job as BullJob } from 'bullmq' | ||
1 | import express from 'express' | 2 | import express from 'express' |
2 | import { HttpStatusCode, Job, JobState, JobType, ResultList, UserRight } from '@shared/models' | 3 | import { HttpStatusCode, Job, JobState, JobType, ResultList, UserRight } from '@shared/models' |
3 | import { isArray } from '../../helpers/custom-validators/misc' | 4 | import { isArray } from '../../helpers/custom-validators/misc' |
@@ -25,7 +26,7 @@ jobsRouter.post('/pause', | |||
25 | jobsRouter.post('/resume', | 26 | jobsRouter.post('/resume', |
26 | authenticate, | 27 | authenticate, |
27 | ensureUserHasRight(UserRight.MANAGE_JOBS), | 28 | ensureUserHasRight(UserRight.MANAGE_JOBS), |
28 | asyncMiddleware(resumeJobQueue) | 29 | resumeJobQueue |
29 | ) | 30 | ) |
30 | 31 | ||
31 | jobsRouter.get('/:state?', | 32 | jobsRouter.get('/:state?', |
@@ -54,8 +55,8 @@ async function pauseJobQueue (req: express.Request, res: express.Response) { | |||
54 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | 55 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) |
55 | } | 56 | } |
56 | 57 | ||
57 | async function resumeJobQueue (req: express.Request, res: express.Response) { | 58 | function resumeJobQueue (req: express.Request, res: express.Response) { |
58 | await JobQueue.Instance.resume() | 59 | JobQueue.Instance.resume() |
59 | 60 | ||
60 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | 61 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) |
61 | } | 62 | } |
@@ -82,7 +83,7 @@ async function listJobs (req: express.Request, res: express.Response) { | |||
82 | return res.json(result) | 83 | return res.json(result) |
83 | } | 84 | } |
84 | 85 | ||
85 | async function formatJob (job: any, state?: JobState): Promise<Job> { | 86 | async function formatJob (job: BullJob, state?: JobState): Promise<Job> { |
86 | const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 | 87 | const error = isArray(job.stacktrace) && job.stacktrace.length !== 0 |
87 | ? job.stacktrace[0] | 88 | ? job.stacktrace[0] |
88 | : null | 89 | : null |
@@ -90,9 +91,9 @@ async function formatJob (job: any, state?: JobState): Promise<Job> { | |||
90 | return { | 91 | return { |
91 | id: job.id, | 92 | id: job.id, |
92 | state: state || await job.getState(), | 93 | state: state || await job.getState(), |
93 | type: job.queue.name as JobType, | 94 | type: job.queueName as JobType, |
94 | data: job.data, | 95 | data: job.data, |
95 | progress: await job.progress(), | 96 | progress: job.progress as number, |
96 | priority: job.opts.priority, | 97 | priority: job.opts.priority, |
97 | error, | 98 | error, |
98 | createdAt: new Date(job.timestamp), | 99 | createdAt: new Date(job.timestamp), |
diff --git a/server/controllers/api/videos/update.ts b/server/controllers/api/videos/update.ts index 65a7321fd..1545a2232 100644 --- a/server/controllers/api/videos/update.ts +++ b/server/controllers/api/videos/update.ts | |||
@@ -199,7 +199,7 @@ async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfo | |||
199 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } | 199 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } |
200 | 200 | ||
201 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) | 201 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) |
202 | await job.finished() | 202 | await JobQueue.Instance.waitJob(job) |
203 | } | 203 | } |
204 | 204 | ||
205 | const hls = video.getHLSPlaylist() | 205 | const hls = video.getHLSPlaylist() |
@@ -208,7 +208,7 @@ async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfo | |||
208 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } | 208 | const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } |
209 | 209 | ||
210 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) | 210 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) |
211 | await job.finished() | 211 | await JobQueue.Instance.waitJob(job) |
212 | } | 212 | } |
213 | 213 | ||
214 | // Refresh video since files have changed | 214 | // Refresh video since files have changed |
diff --git a/server/controllers/api/videos/upload.ts b/server/controllers/api/videos/upload.ts index 3ce66c9ca..4a9d7b619 100644 --- a/server/controllers/api/videos/upload.ts +++ b/server/controllers/api/videos/upload.ts | |||
@@ -17,6 +17,7 @@ import { | |||
17 | import { VideoPathManager } from '@server/lib/video-path-manager' | 17 | import { VideoPathManager } from '@server/lib/video-path-manager' |
18 | import { buildNextVideoState } from '@server/lib/video-state' | 18 | import { buildNextVideoState } from '@server/lib/video-state' |
19 | import { openapiOperationDoc } from '@server/middlewares/doc' | 19 | import { openapiOperationDoc } from '@server/middlewares/doc' |
20 | import { VideoSourceModel } from '@server/models/video/video-source' | ||
20 | import { MVideoFile, MVideoFullLight } from '@server/types/models' | 21 | import { MVideoFile, MVideoFullLight } from '@server/types/models' |
21 | import { getLowercaseExtension } from '@shared/core-utils' | 22 | import { getLowercaseExtension } from '@shared/core-utils' |
22 | import { isAudioFile, uuidToShort } from '@shared/extra-utils' | 23 | import { isAudioFile, uuidToShort } from '@shared/extra-utils' |
@@ -44,7 +45,6 @@ import { | |||
44 | import { ScheduleVideoUpdateModel } from '../../../models/video/schedule-video-update' | 45 | import { ScheduleVideoUpdateModel } from '../../../models/video/schedule-video-update' |
45 | import { VideoModel } from '../../../models/video/video' | 46 | import { VideoModel } from '../../../models/video/video' |
46 | import { VideoFileModel } from '../../../models/video/video-file' | 47 | import { VideoFileModel } from '../../../models/video/video-file' |
47 | import { VideoSourceModel } from '@server/models/video/video-source' | ||
48 | 48 | ||
49 | const lTags = loggerTagsFactory('api', 'video') | 49 | const lTags = loggerTagsFactory('api', 'video') |
50 | const auditLogger = auditLoggerFactory('videos') | 50 | const auditLogger = auditLoggerFactory('videos') |
@@ -270,7 +270,7 @@ async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoF | |||
270 | const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' } | 270 | const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' } |
271 | 271 | ||
272 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) | 272 | const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) |
273 | await job.finished() | 273 | await JobQueue.Instance.waitJob(job) |
274 | 274 | ||
275 | const refreshedVideo = await VideoModel.loadFull(video.id) | 275 | const refreshedVideo = await VideoModel.loadFull(video.id) |
276 | if (!refreshedVideo) return | 276 | if (!refreshedVideo) return |
diff --git a/server/helpers/custom-validators/jobs.ts b/server/helpers/custom-validators/jobs.ts index f6777ecd5..c168b3e91 100644 --- a/server/helpers/custom-validators/jobs.ts +++ b/server/helpers/custom-validators/jobs.ts | |||
@@ -2,7 +2,7 @@ import { JobState } from '../../../shared/models' | |||
2 | import { exists } from './misc' | 2 | import { exists } from './misc' |
3 | import { jobTypes } from '@server/lib/job-queue/job-queue' | 3 | import { jobTypes } from '@server/lib/job-queue/job-queue' |
4 | 4 | ||
5 | const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed', 'paused' ] | 5 | const jobStates: JobState[] = [ 'active', 'completed', 'failed', 'waiting', 'delayed', 'paused', 'waiting-children' ] |
6 | 6 | ||
7 | function isValidJobState (value: JobState) { | 7 | function isValidJobState (value: JobState) { |
8 | return exists(value) && jobStates.includes(value) | 8 | return exists(value) && jobStates.includes(value) |
diff --git a/server/helpers/ffmpeg/ffmpeg-commons.ts b/server/helpers/ffmpeg/ffmpeg-commons.ts index ee338889c..b01989899 100644 --- a/server/helpers/ffmpeg/ffmpeg-commons.ts +++ b/server/helpers/ffmpeg/ffmpeg-commons.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import ffmpeg, { FfmpegCommand } from 'fluent-ffmpeg' | 2 | import ffmpeg, { FfmpegCommand } from 'fluent-ffmpeg' |
3 | import { execPromise } from '@server/helpers/core-utils' | 3 | import { execPromise } from '@server/helpers/core-utils' |
4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
@@ -81,7 +81,7 @@ async function runCommand (options: { | |||
81 | command.on('progress', progress => { | 81 | command.on('progress', progress => { |
82 | if (!progress.percent) return | 82 | if (!progress.percent) return |
83 | 83 | ||
84 | job.progress(Math.round(progress.percent)) | 84 | job.updateProgress(Math.round(progress.percent)) |
85 | .catch(err => logger.warn('Cannot set ffmpeg job progress.', { err, ...lTags() })) | 85 | .catch(err => logger.warn('Cannot set ffmpeg job progress.', { err, ...lTags() })) |
86 | }) | 86 | }) |
87 | } | 87 | } |
diff --git a/server/helpers/ffmpeg/ffmpeg-vod.ts b/server/helpers/ffmpeg/ffmpeg-vod.ts index f84157e0f..7a81a1313 100644 --- a/server/helpers/ffmpeg/ffmpeg-vod.ts +++ b/server/helpers/ffmpeg/ffmpeg-vod.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { FfmpegCommand } from 'fluent-ffmpeg' | 2 | import { FfmpegCommand } from 'fluent-ffmpeg' |
3 | import { readFile, writeFile } from 'fs-extra' | 3 | import { readFile, writeFile } from 'fs-extra' |
4 | import { dirname } from 'path' | 4 | import { dirname } from 'path' |
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 8165a289d..db43c59be 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { CronRepeatOptions, EveryRepeatOptions } from 'bull' | 1 | import { RepeatOptions } from 'bullmq' |
2 | import { randomBytes } from 'crypto' | 2 | import { randomBytes } from 'crypto' |
3 | import { invert } from 'lodash' | 3 | import { invert } from 'lodash' |
4 | import { join } from 'path' | 4 | import { join } from 'path' |
@@ -197,7 +197,7 @@ const JOB_TTL: { [id in JobType]: number } = { | |||
197 | 'manage-video-torrent': 1000 * 3600 * 3, // 3 hours | 197 | 'manage-video-torrent': 1000 * 3600 * 3, // 3 hours |
198 | 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours | 198 | 'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours |
199 | } | 199 | } |
200 | const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = { | 200 | const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = { |
201 | 'videos-views-stats': { | 201 | 'videos-views-stats': { |
202 | cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour | 202 | cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour |
203 | }, | 203 | }, |
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts index 3d7dc6fb9..84c0a2de2 100644 --- a/server/lib/job-queue/handlers/activitypub-cleaner.ts +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts | |||
@@ -1,5 +1,5 @@ | |||
1 | import { map } from 'bluebird' | 1 | import { map } from 'bluebird' |
2 | import { Job } from 'bull' | 2 | import { Job } from 'bullmq' |
3 | import { | 3 | import { |
4 | isAnnounceActivityValid, | 4 | isAnnounceActivityValid, |
5 | isDislikeActivityValid, | 5 | isDislikeActivityValid, |
diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 2ee98171c..944da5be1 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url' | 2 | import { getLocalActorFollowActivityPubUrl } from '@server/lib/activitypub/url' |
3 | import { ActivitypubFollowPayload } from '@shared/models' | 3 | import { ActivitypubFollowPayload } from '@shared/models' |
4 | import { sanitizeHost } from '../../../helpers/core-utils' | 4 | import { sanitizeHost } from '../../../helpers/core-utils' |
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 709e8501f..354c608fb 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -1,5 +1,5 @@ | |||
1 | import { map } from 'bluebird' | 1 | import { map } from 'bluebird' |
2 | import { Job } from 'bull' | 2 | import { Job } from 'bullmq' |
3 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' | 3 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' |
4 | import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' | 4 | import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' |
5 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' | 5 | import { ActivitypubHttpBroadcastPayload } from '@shared/models' |
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index de533de6c..e0b841887 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models' | 2 | import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models' |
3 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
4 | import { VideoModel } from '../../../models/video/video' | 4 | import { VideoModel } from '../../../models/video/video' |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 99bcd3e8d..837a597a5 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' | 2 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from '@server/lib/activitypub/send' |
3 | import { ActivitypubHttpUnicastPayload } from '@shared/models' | 3 | import { ActivitypubHttpUnicastPayload } from '@shared/models' |
4 | import { logger } from '../../../helpers/logger' | 4 | import { logger } from '../../../helpers/logger' |
diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts index 92ceed180..600f858a0 100644 --- a/server/lib/job-queue/handlers/activitypub-refresher.ts +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists' | 2 | import { refreshVideoPlaylistIfNeeded } from '@server/lib/activitypub/playlists' |
3 | import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos' | 3 | import { refreshVideoIfNeeded } from '@server/lib/activitypub/videos' |
4 | import { loadVideoByUrl } from '@server/lib/model-loaders' | 4 | import { loadVideoByUrl } from '@server/lib/model-loaders' |
diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts index 9d5a65376..4a5bad9fb 100644 --- a/server/lib/job-queue/handlers/actor-keys.ts +++ b/server/lib/job-queue/handlers/actor-keys.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors' | 2 | import { generateAndSaveActorKeys } from '@server/lib/activitypub/actors' |
3 | import { ActorModel } from '@server/models/actor/actor' | 3 | import { ActorModel } from '@server/models/actor/actor' |
4 | import { ActorKeysPayload } from '@shared/models' | 4 | import { ActorKeysPayload } from '@shared/models' |
diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index 6fc1caa84..b5b9475b1 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { EmailPayload } from '@shared/models' | 2 | import { EmailPayload } from '@shared/models' |
3 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
4 | import { Emailer } from '../../emailer' | 4 | import { Emailer } from '../../emailer' |
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts index dfd4e6140..4505ca79e 100644 --- a/server/lib/job-queue/handlers/manage-video-torrent.ts +++ b/server/lib/job-queue/handlers/manage-video-torrent.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' | 2 | import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' |
3 | import { VideoModel } from '@server/models/video/video' | 3 | import { VideoModel } from '@server/models/video/video' |
4 | import { VideoFileModel } from '@server/models/video/video-file' | 4 | import { VideoFileModel } from '@server/models/video/video-file' |
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index 49064052c..d608fd865 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { remove } from 'fs-extra' | 2 | import { remove } from 'fs-extra' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 71c5444af..40c44cf52 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { copy, stat } from 'fs-extra' | 2 | import { copy, stat } from 'fs-extra' |
3 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | 3 | import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' |
4 | import { CONFIG } from '@server/initializers/config' | 4 | import { CONFIG } from '@server/initializers/config' |
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 4cde26aef..e5cd35865 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { move, remove, stat } from 'fs-extra' | 2 | import { move, remove, stat } from 'fs-extra' |
3 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | 3 | import { retryTransactionWrapper } from '@server/helpers/database-utils' |
4 | import { YoutubeDLWrapper } from '@server/helpers/youtube-dl' | 4 | import { YoutubeDLWrapper } from '@server/helpers/youtube-dl' |
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 78d0b2192..79002258c 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { readdir, remove } from 'fs-extra' | 2 | import { readdir, remove } from 'fs-extra' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg' | 4 | import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamDuration } from '@server/helpers/ffmpeg' |
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts index 9cb7a6589..75ab2cd02 100644 --- a/server/lib/job-queue/handlers/video-redundancy.ts +++ b/server/lib/job-queue/handlers/video-redundancy.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' | 2 | import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' |
3 | import { VideoRedundancyPayload } from '@shared/models' | 3 | import { VideoRedundancyPayload } from '@shared/models' |
4 | import { logger } from '../../../helpers/logger' | 4 | import { logger } from '../../../helpers/logger' |
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts index 735150d57..078243538 100644 --- a/server/lib/job-queue/handlers/video-studio-edition.ts +++ b/server/lib/job-queue/handlers/video-studio-edition.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { move, remove } from 'fs-extra' | 2 | import { move, remove } from 'fs-extra' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg' | 4 | import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg' |
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 4e5e97919..8dbae8c42 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' | 2 | import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' |
3 | import { Hooks } from '@server/lib/plugins/hooks' | 3 | import { Hooks } from '@server/lib/plugins/hooks' |
4 | import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' | 4 | import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0ae325f4d..0cf5d53ce 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -1,7 +1,19 @@ | |||
1 | import Bull, { Job, JobOptions, Queue } from 'bull' | 1 | import { |
2 | Job, | ||
3 | JobsOptions, | ||
4 | Queue, | ||
5 | QueueEvents, | ||
6 | QueueEventsOptions, | ||
7 | QueueOptions, | ||
8 | QueueScheduler, | ||
9 | QueueSchedulerOptions, | ||
10 | Worker, | ||
11 | WorkerOptions | ||
12 | } from 'bullmq' | ||
2 | import { jobStates } from '@server/helpers/custom-validators/jobs' | 13 | import { jobStates } from '@server/helpers/custom-validators/jobs' |
3 | import { CONFIG } from '@server/initializers/config' | 14 | import { CONFIG } from '@server/initializers/config' |
4 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 15 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
16 | import { timeoutPromise } from '@shared/core-utils' | ||
5 | import { | 17 | import { |
6 | ActivitypubFollowPayload, | 18 | ActivitypubFollowPayload, |
7 | ActivitypubHttpBroadcastPayload, | 19 | ActivitypubHttpBroadcastPayload, |
@@ -120,7 +132,11 @@ class JobQueue { | |||
120 | 132 | ||
121 | private static instance: JobQueue | 133 | private static instance: JobQueue |
122 | 134 | ||
135 | private workers: { [id in JobType]?: Worker } = {} | ||
123 | private queues: { [id in JobType]?: Queue } = {} | 136 | private queues: { [id in JobType]?: Queue } = {} |
137 | private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} | ||
138 | private queueEvents: { [id in JobType]?: QueueEvents } = {} | ||
139 | |||
124 | private initialized = false | 140 | private initialized = false |
125 | private jobRedisPrefix: string | 141 | private jobRedisPrefix: string |
126 | 142 | ||
@@ -134,75 +150,131 @@ class JobQueue { | |||
134 | 150 | ||
135 | this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST | 151 | this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST |
136 | 152 | ||
137 | const queueOptions: Bull.QueueOptions = { | 153 | for (const handlerName of (Object.keys(handlers) as JobType[])) { |
154 | this.buildWorker(handlerName, produceOnly) | ||
155 | this.buildQueue(handlerName) | ||
156 | this.buildQueueScheduler(handlerName, produceOnly) | ||
157 | this.buildQueueEvent(handlerName, produceOnly) | ||
158 | } | ||
159 | |||
160 | this.addRepeatableJobs() | ||
161 | } | ||
162 | |||
163 | private buildWorker (handlerName: JobType, produceOnly: boolean) { | ||
164 | const workerOptions: WorkerOptions = { | ||
165 | autorun: !produceOnly, | ||
166 | concurrency: this.getJobConcurrency(handlerName), | ||
138 | prefix: this.jobRedisPrefix, | 167 | prefix: this.jobRedisPrefix, |
139 | redis: { | 168 | connection: this.getRedisConnection() |
140 | password: CONFIG.REDIS.AUTH, | ||
141 | db: CONFIG.REDIS.DB, | ||
142 | host: CONFIG.REDIS.HOSTNAME, | ||
143 | port: CONFIG.REDIS.PORT, | ||
144 | path: CONFIG.REDIS.SOCKET | ||
145 | }, | ||
146 | settings: { | ||
147 | maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts | ||
148 | } | ||
149 | } | 169 | } |
150 | 170 | ||
151 | for (const handlerName of (Object.keys(handlers) as JobType[])) { | 171 | const handler = function (job: Job) { |
152 | const queue = new Bull(handlerName, queueOptions) | 172 | const timeout = JOB_TTL[handlerName] |
173 | const p = handlers[handlerName](job) | ||
153 | 174 | ||
154 | if (produceOnly) { | 175 | if (!timeout) return p |
155 | queue.pause(true) | ||
156 | .catch(err => logger.error('Cannot pause queue %s in produced only job queue', handlerName, { err })) | ||
157 | } | ||
158 | 176 | ||
159 | const handler = handlers[handlerName] | 177 | return timeoutPromise(p, timeout) |
178 | } | ||
160 | 179 | ||
161 | queue.process(this.getJobConcurrency(handlerName), async (jobArg: Job<any>) => { | 180 | const processor = async (jobArg: Job<any>) => { |
162 | const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) | 181 | const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName }) |
163 | 182 | ||
164 | return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') | 183 | return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result') |
165 | }).catch(err => logger.error('Error in job queue processor %s.', handlerName, { err })) | 184 | } |
166 | 185 | ||
167 | queue.on('failed', (job, err) => { | 186 | const worker = new Worker(handlerName, processor, workerOptions) |
168 | const logLevel = silentFailure.has(handlerName) | ||
169 | ? 'debug' | ||
170 | : 'error' | ||
171 | 187 | ||
172 | logger.log(logLevel, 'Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err }) | 188 | worker.on('failed', (job, err) => { |
189 | const logLevel = silentFailure.has(handlerName) | ||
190 | ? 'debug' | ||
191 | : 'error' | ||
173 | 192 | ||
174 | if (errorHandlers[job.name]) { | 193 | logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err }) |
175 | errorHandlers[job.name](job, err) | ||
176 | .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err })) | ||
177 | } | ||
178 | }) | ||
179 | 194 | ||
180 | queue.on('error', err => { | 195 | if (errorHandlers[job.name]) { |
181 | logger.error('Error in job queue %s.', handlerName, { err }) | 196 | errorHandlers[job.name](job, err) |
182 | }) | 197 | .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err })) |
198 | } | ||
199 | }) | ||
183 | 200 | ||
184 | this.queues[handlerName] = queue | 201 | worker.on('error', err => { |
202 | logger.error('Error in job queue %s.', handlerName, { err }) | ||
203 | }) | ||
204 | |||
205 | this.workers[handlerName] = worker | ||
206 | } | ||
207 | |||
208 | private buildQueue (handlerName: JobType) { | ||
209 | const queueOptions: QueueOptions = { | ||
210 | connection: this.getRedisConnection(), | ||
211 | prefix: this.jobRedisPrefix | ||
185 | } | 212 | } |
186 | 213 | ||
187 | this.addRepeatableJobs() | 214 | this.queues[handlerName] = new Queue(handlerName, queueOptions) |
215 | } | ||
216 | |||
217 | private buildQueueScheduler (handlerName: JobType, produceOnly: boolean) { | ||
218 | const queueSchedulerOptions: QueueSchedulerOptions = { | ||
219 | autorun: !produceOnly, | ||
220 | connection: this.getRedisConnection(), | ||
221 | prefix: this.jobRedisPrefix, | ||
222 | maxStalledCount: 10 | ||
223 | } | ||
224 | this.queueSchedulers[handlerName] = new QueueScheduler(handlerName, queueSchedulerOptions) | ||
188 | } | 225 | } |
189 | 226 | ||
190 | terminate () { | 227 | private buildQueueEvent (handlerName: JobType, produceOnly: boolean) { |
191 | for (const queueName of Object.keys(this.queues)) { | 228 | const queueEventsOptions: QueueEventsOptions = { |
192 | const queue = this.queues[queueName] | 229 | autorun: !produceOnly, |
193 | queue.close() | 230 | connection: this.getRedisConnection(), |
231 | prefix: this.jobRedisPrefix | ||
194 | } | 232 | } |
233 | this.queueEvents[handlerName] = new QueueEvents(handlerName, queueEventsOptions) | ||
234 | } | ||
235 | |||
236 | private getRedisConnection () { | ||
237 | return { | ||
238 | password: CONFIG.REDIS.AUTH, | ||
239 | db: CONFIG.REDIS.DB, | ||
240 | host: CONFIG.REDIS.HOSTNAME, | ||
241 | port: CONFIG.REDIS.PORT, | ||
242 | path: CONFIG.REDIS.SOCKET | ||
243 | } | ||
244 | } | ||
245 | |||
246 | async terminate () { | ||
247 | const promises = Object.keys(this.workers) | ||
248 | .map(handlerName => { | ||
249 | const worker: Worker = this.workers[handlerName] | ||
250 | const queue: Queue = this.queues[handlerName] | ||
251 | const queueScheduler: QueueScheduler = this.queueSchedulers[handlerName] | ||
252 | const queueEvent: QueueEvents = this.queueEvents[handlerName] | ||
253 | |||
254 | return Promise.all([ | ||
255 | worker.close(false), | ||
256 | queue.close(), | ||
257 | queueScheduler.close(), | ||
258 | queueEvent.close() | ||
259 | ]) | ||
260 | }) | ||
261 | |||
262 | return Promise.all(promises) | ||
195 | } | 263 | } |
196 | 264 | ||
197 | async pause () { | 265 | async pause () { |
198 | for (const handler of Object.keys(this.queues)) { | 266 | for (const handler of Object.keys(this.workers)) { |
199 | await this.queues[handler].pause(true) | 267 | const worker: Worker = this.workers[handler] |
268 | |||
269 | await worker.pause() | ||
200 | } | 270 | } |
201 | } | 271 | } |
202 | 272 | ||
203 | async resume () { | 273 | resume () { |
204 | for (const handler of Object.keys(this.queues)) { | 274 | for (const handler of Object.keys(this.workers)) { |
205 | await this.queues[handler].resume(true) | 275 | const worker: Worker = this.workers[handler] |
276 | |||
277 | worker.resume() | ||
206 | } | 278 | } |
207 | } | 279 | } |
208 | 280 | ||
@@ -211,22 +283,21 @@ class JobQueue { | |||
211 | .catch(err => logger.error('Cannot create job.', { err, obj })) | 283 | .catch(err => logger.error('Cannot create job.', { err, obj })) |
212 | } | 284 | } |
213 | 285 | ||
214 | createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { | 286 | async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { |
215 | const queue: Queue = this.queues[obj.type] | 287 | const queue: Queue = this.queues[obj.type] |
216 | if (queue === undefined) { | 288 | if (queue === undefined) { |
217 | logger.error('Unknown queue %s: cannot create job.', obj.type) | 289 | logger.error('Unknown queue %s: cannot create job.', obj.type) |
218 | return | 290 | return |
219 | } | 291 | } |
220 | 292 | ||
221 | const jobArgs: JobOptions = { | 293 | const jobArgs: JobsOptions = { |
222 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 294 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
223 | attempts: JOB_ATTEMPTS[obj.type], | 295 | attempts: JOB_ATTEMPTS[obj.type], |
224 | timeout: JOB_TTL[obj.type], | ||
225 | priority: options.priority, | 296 | priority: options.priority, |
226 | delay: options.delay | 297 | delay: options.delay |
227 | } | 298 | } |
228 | 299 | ||
229 | return queue.add(obj.payload, jobArgs) | 300 | return queue.add('job', obj.payload, jobArgs) |
230 | } | 301 | } |
231 | 302 | ||
232 | async listForApi (options: { | 303 | async listForApi (options: { |
@@ -244,7 +315,8 @@ class JobQueue { | |||
244 | const filteredJobTypes = this.filterJobTypes(jobType) | 315 | const filteredJobTypes = this.filterJobTypes(jobType) |
245 | 316 | ||
246 | for (const jobType of filteredJobTypes) { | 317 | for (const jobType of filteredJobTypes) { |
247 | const queue = this.queues[jobType] | 318 | const queue: Queue = this.queues[jobType] |
319 | |||
248 | if (queue === undefined) { | 320 | if (queue === undefined) { |
249 | logger.error('Unknown queue %s to list jobs.', jobType) | 321 | logger.error('Unknown queue %s to list jobs.', jobType) |
250 | continue | 322 | continue |
@@ -297,18 +369,22 @@ class JobQueue { | |||
297 | 369 | ||
298 | async removeOldJobs () { | 370 | async removeOldJobs () { |
299 | for (const key of Object.keys(this.queues)) { | 371 | for (const key of Object.keys(this.queues)) { |
300 | const queue = this.queues[key] | 372 | const queue: Queue = this.queues[key] |
301 | await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') | 373 | await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed') |
302 | } | 374 | } |
303 | } | 375 | } |
304 | 376 | ||
377 | waitJob (job: Job) { | ||
378 | return job.waitUntilFinished(this.queueEvents[job.queueName]) | ||
379 | } | ||
380 | |||
305 | private addRepeatableJobs () { | 381 | private addRepeatableJobs () { |
306 | this.queues['videos-views-stats'].add({}, { | 382 | this.queues['videos-views-stats'].add('job', {}, { |
307 | repeat: REPEAT_JOBS['videos-views-stats'] | 383 | repeat: REPEAT_JOBS['videos-views-stats'] |
308 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | 384 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
309 | 385 | ||
310 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | 386 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { |
311 | this.queues['activitypub-cleaner'].add({}, { | 387 | this.queues['activitypub-cleaner'].add('job', {}, { |
312 | repeat: REPEAT_JOBS['activitypub-cleaner'] | 388 | repeat: REPEAT_JOBS['activitypub-cleaner'] |
313 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | 389 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
314 | } | 390 | } |
diff --git a/server/lib/transcoding/transcoding.ts b/server/lib/transcoding/transcoding.ts index 070c7ebda..07eee4122 100644 --- a/server/lib/transcoding/transcoding.ts +++ b/server/lib/transcoding/transcoding.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { Job } from 'bull' | 1 | import { Job } from 'bullmq' |
2 | import { copyFile, ensureDir, move, remove, stat } from 'fs-extra' | 2 | import { copyFile, ensureDir, move, remove, stat } from 'fs-extra' |
3 | import { basename, extname as extnameUtil, join } from 'path' | 3 | import { basename, extname as extnameUtil, join } from 'path' |
4 | import { toEven } from '@server/helpers/core-utils' | 4 | import { toEven } from '@server/helpers/core-utils' |