From bd911b54b555b11df7e9849cf92d358bccfecf6e Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 8 Aug 2022 15:48:17 +0200 Subject: Use bullmq job dependency --- scripts/create-import-video-file-job.ts | 2 +- server/controllers/api/accounts.ts | 2 +- server/controllers/api/server/follows.ts | 4 +- server/controllers/api/server/redundancy.ts | 2 +- server/controllers/api/users/my-subscriptions.ts | 2 +- server/controllers/api/video-channel.ts | 4 +- server/controllers/api/videos/import.ts | 4 +- server/controllers/api/videos/index.ts | 2 +- server/controllers/api/videos/studio.ts | 2 +- server/controllers/api/videos/update.ts | 64 +++++++++++------ server/controllers/api/videos/upload.ts | 68 ++++++++---------- server/initializers/constants.ts | 10 ++- server/lib/activitypub/actors/get.ts | 4 +- server/lib/activitypub/follow.ts | 2 +- server/lib/activitypub/outbox.ts | 2 +- server/lib/activitypub/playlists/refresh.ts | 2 +- server/lib/activitypub/send/shared/send-utils.ts | 8 +-- server/lib/activitypub/videos/get.ts | 2 +- .../videos/shared/video-sync-attributes.ts | 2 +- server/lib/emailer.ts | 10 +-- .../lib/job-queue/handlers/activitypub-follow.ts | 2 +- .../handlers/activitypub-http-broadcast.ts | 2 +- .../job-queue/handlers/activitypub-http-fetcher.ts | 2 +- .../job-queue/handlers/activitypub-http-unicast.ts | 2 +- .../job-queue/handlers/activitypub-refresher.ts | 2 +- server/lib/job-queue/handlers/actor-keys.ts | 2 +- server/lib/job-queue/handlers/email.ts | 2 +- server/lib/job-queue/handlers/federate-video.ts | 28 ++++++++ .../lib/job-queue/handlers/manage-video-torrent.ts | 2 +- .../job-queue/handlers/move-to-object-storage.ts | 4 +- server/lib/job-queue/handlers/notify.ts | 27 ++++++++ server/lib/job-queue/handlers/video-file-import.ts | 7 +- server/lib/job-queue/handlers/video-import.ts | 15 ++-- server/lib/job-queue/handlers/video-redundancy.ts | 2 +- .../lib/job-queue/handlers/video-studio-edition.ts | 10 ++- server/lib/job-queue/job-queue.ts | 81 ++++++++++++++++++---- server/lib/live/live-manager.ts | 10 ++- server/lib/notifier/notifier.ts | 2 +- .../lib/schedulers/auto-follow-index-instances.ts | 2 +- server/lib/video-state.ts | 7 +- server/lib/video.ts | 40 ++++++----- shared/models/server/job.model.ts | 17 +++++ 42 files changed, 314 insertions(+), 152 deletions(-) create mode 100644 server/lib/job-queue/handlers/federate-video.ts create mode 100644 server/lib/job-queue/handlers/notify.ts diff --git a/scripts/create-import-video-file-job.ts b/scripts/create-import-video-file-job.ts index 97e9c7933..cf974f240 100644 --- a/scripts/create-import-video-file-job.ts +++ b/scripts/create-import-video-file-job.ts @@ -45,6 +45,6 @@ async function run () { } JobQueue.Instance.init(true) - await JobQueue.Instance.createJobWithPromise({ type: 'video-file-import', payload: dataInput }) + await JobQueue.Instance.createJob({ type: 'video-file-import', payload: dataInput }) console.log('Import job for video %s created.', video.uuid) } diff --git a/server/controllers/api/accounts.ts b/server/controllers/api/accounts.ts index 8d9f92d93..66cdaab82 100644 --- a/server/controllers/api/accounts.ts +++ b/server/controllers/api/accounts.ts @@ -119,7 +119,7 @@ function getAccount (req: express.Request, res: express.Response) { const account = res.locals.account if (account.isOutdated()) { - JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'actor', url: account.Actor.url } }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'actor', url: account.Actor.url } }) } return res.json(account.toFormattedJSON()) diff --git a/server/controllers/api/server/follows.ts b/server/controllers/api/server/follows.ts index 60d36ed59..87828813a 100644 --- a/server/controllers/api/server/follows.ts +++ b/server/controllers/api/server/follows.ts @@ -138,7 +138,7 @@ async function addFollow (req: express.Request, res: express.Response) { followerActorId: follower.id } - JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) } for (const handle of handles) { @@ -150,7 +150,7 @@ async function addFollow (req: express.Request, res: express.Response) { followerActorId: follower.id } - JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) } return res.status(HttpStatusCode.NO_CONTENT_204).end() diff --git a/server/controllers/api/server/redundancy.ts b/server/controllers/api/server/redundancy.ts index 9f43d3e4e..94e187cd4 100644 --- a/server/controllers/api/server/redundancy.ts +++ b/server/controllers/api/server/redundancy.ts @@ -85,7 +85,7 @@ async function addVideoRedundancy (req: express.Request, res: express.Response) videoId: res.locals.onlyVideo.id } - await JobQueue.Instance.createJobWithPromise({ + await JobQueue.Instance.createJob({ type: 'video-redundancy', payload }) diff --git a/server/controllers/api/users/my-subscriptions.ts b/server/controllers/api/users/my-subscriptions.ts index fb1f68635..a750f9bd1 100644 --- a/server/controllers/api/users/my-subscriptions.ts +++ b/server/controllers/api/users/my-subscriptions.ts @@ -122,7 +122,7 @@ function addUserSubscription (req: express.Request, res: express.Response) { followerActorId: user.Account.Actor.id } - JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) return res.status(HttpStatusCode.NO_CONTENT_204).end() } diff --git a/server/controllers/api/video-channel.ts b/server/controllers/api/video-channel.ts index 411ec8630..6b33e894d 100644 --- a/server/controllers/api/video-channel.ts +++ b/server/controllers/api/video-channel.ts @@ -245,7 +245,7 @@ async function addVideoChannel (req: express.Request, res: express.Response) { }) const payload = { actorId: videoChannelCreated.actorId } - await JobQueue.Instance.createJobWithPromise({ type: 'actor-keys', payload }) + await JobQueue.Instance.createJob({ type: 'actor-keys', payload }) auditLogger.create(getAuditIdFromRes(res), new VideoChannelAuditView(videoChannelCreated.toFormattedJSON())) logger.info('Video channel %s created.', videoChannelCreated.Actor.url) @@ -335,7 +335,7 @@ async function getVideoChannel (req: express.Request, res: express.Response) { const videoChannel = await Hooks.wrapObject(res.locals.videoChannel, 'filter:api.video-channel.get.result', { id }) if (videoChannel.isOutdated()) { - JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'actor', url: videoChannel.Actor.url } }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'actor', url: videoChannel.Actor.url } }) } return res.json(videoChannel.toFormattedJSON()) diff --git a/server/controllers/api/videos/import.ts b/server/controllers/api/videos/import.ts index b12953630..5a2e1006a 100644 --- a/server/controllers/api/videos/import.ts +++ b/server/controllers/api/videos/import.ts @@ -163,7 +163,7 @@ async function addTorrentImport (req: express.Request, res: express.Response, to videoImportId: videoImport.id, magnetUri } - await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload }) + await JobQueue.Instance.createJob({ type: 'video-import', payload }) auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) @@ -255,7 +255,7 @@ async function addYoutubeDLImport (req: express.Request, res: express.Response) videoImportId: videoImport.id, fileExt } - await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload }) + await JobQueue.Instance.createJob({ type: 'video-import', payload }) auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index eca72c397..b301515df 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts @@ -151,7 +151,7 @@ async function getVideo (_req: express.Request, res: express.Response) { const video = await Hooks.wrapObject(res.locals.videoAPI, 'filter:api.video.get.result', { id: videoId, userId }) if (video.isOutdated()) { - JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } }) } return res.json(video.toFormattedDetailsJSON()) diff --git a/server/controllers/api/videos/studio.ts b/server/controllers/api/videos/studio.ts index bff344f3f..6667532bf 100644 --- a/server/controllers/api/videos/studio.ts +++ b/server/controllers/api/videos/studio.ts @@ -71,7 +71,7 @@ async function createEditionTasks (req: express.Request, res: express.Response) tasks: body.tasks.map((t, i) => buildTaskPayload(t, i, files)) } - JobQueue.Instance.createJob({ type: 'video-studio-edition', payload }) + JobQueue.Instance.createJobAsync({ type: 'video-studio-edition', payload }) return res.sendStatus(HttpStatusCode.NO_CONTENT_204) } diff --git a/server/controllers/api/videos/update.ts b/server/controllers/api/videos/update.ts index 1545a2232..ab1a23d9a 100644 --- a/server/controllers/api/videos/update.ts +++ b/server/controllers/api/videos/update.ts @@ -1,7 +1,7 @@ import express from 'express' import { Transaction } from 'sequelize/types' import { changeVideoChannelShare } from '@server/lib/activitypub/share' -import { JobQueue } from '@server/lib/job-queue' +import { CreateJobArgument, JobQueue } from '@server/lib/job-queue' import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' import { openapiOperationDoc } from '@server/middlewares/doc' import { FilteredModelAttributes } from '@server/types' @@ -13,8 +13,6 @@ import { createReqFiles } from '../../../helpers/express-utils' import { logger, loggerTagsFactory } from '../../../helpers/logger' import { MIMETYPES } from '../../../initializers/constants' import { sequelizeTypescript } from '../../../initializers/database' -import { federateVideoIfNeeded } from '../../../lib/activitypub/videos' -import { Notifier } from '../../../lib/notifier' import { Hooks } from '../../../lib/plugins/hooks' import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videosUpdateValidator } from '../../../middlewares' @@ -139,13 +137,9 @@ async function updateVideo (req: express.Request, res: express.Response) { return { videoInstanceUpdated, isNewVideo } }) - const refreshedVideo = await updateTorrentsMetadataIfNeeded(videoInstanceUpdated, videoInfoToUpdate) + Hooks.runAction('action:api.video.updated', { video: videoInstanceUpdated, body: req.body, req, res }) - await sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, isNewVideo, t)) - - if (wasConfidentialVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) - - Hooks.runAction('action:api.video.updated', { video: refreshedVideo, body: req.body, req, res }) + await addVideoJobsAfterUpdate({ video: videoInstanceUpdated, videoInfoToUpdate, wasConfidentialVideo, isNewVideo }) } catch (err) { // Force fields we want to update // If the transaction is retried, sequelize will think the object has not changed @@ -192,25 +186,49 @@ function updateSchedule (videoInstance: MVideoFullLight, videoInfoToUpdate: Vide } } -async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfoToUpdate: VideoUpdate) { - if (video.isLive || !videoInfoToUpdate.name) return video +async function addVideoJobsAfterUpdate (options: { + video: MVideoFullLight + videoInfoToUpdate: VideoUpdate + wasConfidentialVideo: boolean + isNewVideo: boolean +}) { + const { video, videoInfoToUpdate, wasConfidentialVideo, isNewVideo } = options + const jobs: CreateJobArgument[] = [] + + if (!video.isLive && videoInfoToUpdate.name) { - for (const file of (video.VideoFiles || [])) { - const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } + for (const file of (video.VideoFiles || [])) { + const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } - const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) - await JobQueue.Instance.waitJob(job) - } + jobs.push({ type: 'manage-video-torrent', payload }) + } - const hls = video.getHLSPlaylist() + const hls = video.getHLSPlaylist() - for (const file of (hls?.VideoFiles || [])) { - const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } + for (const file of (hls?.VideoFiles || [])) { + const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } - const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) - await JobQueue.Instance.waitJob(job) + jobs.push({ type: 'manage-video-torrent', payload }) + } + } + + jobs.push({ + type: 'federate-video', + payload: { + videoUUID: video.uuid, + isNewVideo + } + }) + + if (wasConfidentialVideo) { + jobs.push({ + type: 'notify', + payload: { + action: 'new-video', + videoUUID: video.uuid + } + }) } - // Refresh video since files have changed - return VideoModel.loadFull(video.id) + return JobQueue.Instance.createSequentialJobFlow(...jobs) } diff --git a/server/controllers/api/videos/upload.ts b/server/controllers/api/videos/upload.ts index 4a9d7b619..cc171eece 100644 --- a/server/controllers/api/videos/upload.ts +++ b/server/controllers/api/videos/upload.ts @@ -8,9 +8,9 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths' import { Redis } from '@server/lib/redis' import { uploadx } from '@server/lib/uploadx' import { - addMoveToObjectStorageJob, - addOptimizeOrMergeAudioJob, buildLocalVideoFromReq, + buildMoveToObjectStorageJob, + buildOptimizeOrMergeAudioJob, buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' @@ -18,19 +18,16 @@ import { VideoPathManager } from '@server/lib/video-path-manager' import { buildNextVideoState } from '@server/lib/video-state' import { openapiOperationDoc } from '@server/middlewares/doc' import { VideoSourceModel } from '@server/models/video/video-source' -import { MVideoFile, MVideoFullLight } from '@server/types/models' +import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' import { getLowercaseExtension } from '@shared/core-utils' import { isAudioFile, uuidToShort } from '@shared/extra-utils' -import { HttpStatusCode, ManageVideoTorrentPayload, VideoCreate, VideoResolution, VideoState } from '@shared/models' +import { HttpStatusCode, VideoCreate, VideoResolution, VideoState } from '@shared/models' import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger' -import { retryTransactionWrapper } from '../../../helpers/database-utils' import { createReqFiles } from '../../../helpers/express-utils' import { buildFileMetadata, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '../../../helpers/ffmpeg' import { logger, loggerTagsFactory } from '../../../helpers/logger' import { MIMETYPES } from '../../../initializers/constants' import { sequelizeTypescript } from '../../../initializers/database' -import { federateVideoIfNeeded } from '../../../lib/activitypub/videos' -import { Notifier } from '../../../lib/notifier' import { Hooks } from '../../../lib/plugins/hooks' import { generateVideoMiniature } from '../../../lib/thumbnail' import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' @@ -216,22 +213,8 @@ async function addVideo (options: { // Channel has a new content, set as updated await videoCreated.VideoChannel.setAsUpdated() - createTorrentFederate(videoCreated, videoFile) - .catch(err => { - logger.error('Cannot create torrent or federate video for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }) - - return videoCreated - }).then(refreshedVideo => { - if (!refreshedVideo) return - - if (refreshedVideo.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { - return addMoveToObjectStorageJob({ video: refreshedVideo, previousVideoState: undefined }) - } - - if (refreshedVideo.state === VideoState.TO_TRANSCODE) { - return addOptimizeOrMergeAudioJob({ video: refreshedVideo, videoFile, user }) - } - }).catch(err => logger.error('Cannot add optimize/merge audio job for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })) + addVideoJobsAfterUpload(videoCreated, videoFile, user) + .catch(err => logger.error('Cannot build new video jobs of %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })) Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res }) @@ -266,23 +249,32 @@ async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) { return videoFile } -async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoFile) { - const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' } - - const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) - await JobQueue.Instance.waitJob(job) - - const refreshedVideo = await VideoModel.loadFull(video.id) - if (!refreshedVideo) return - - // Only federate and notify after the torrent creation - Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) +async function addVideoJobsAfterUpload (video: MVideoFullLight, videoFile: MVideoFile, user: MUserId) { + return JobQueue.Instance.createSequentialJobFlow( + { + type: 'manage-video-torrent' as 'manage-video-torrent', + payload: { + videoId: video.id, + videoFileId: videoFile.id, + action: 'create' + } + }, + { + type: 'federate-video' as 'federate-video', + payload: { + videoUUID: video.uuid, + isNewVideo: true + } + }, - await retryTransactionWrapper(() => { - return sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, true, t)) - }) + video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE + ? await buildMoveToObjectStorageJob({ video, previousVideoState: undefined }) + : undefined, - return refreshedVideo + video.state === VideoState.TO_TRANSCODE + ? await buildOptimizeOrMergeAudioJob({ video, videoFile, user }) + : undefined + ) } async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) { diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index db43c59be..a53c22662 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -156,7 +156,9 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { 'video-live-ending': 1, 'video-studio-edition': 1, 'manage-video-torrent': 1, - 'move-to-object-storage': 3 + 'move-to-object-storage': 3, + 'notify': 1, + 'federate-video': 1 } // Excluded keys are jobs that can be configured by admins const JOB_CONCURRENCY: { [id in Exclude]: number } = { @@ -175,7 +177,9 @@ const JOB_CONCURRENCY: { [id in Exclude) { type: 'activity' as 'activity' } - return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) + return JobQueue.Instance.createJobAsync({ type: 'activitypub-http-fetcher', payload }) } export { diff --git a/server/lib/activitypub/playlists/refresh.ts b/server/lib/activitypub/playlists/refresh.ts index 493e8c7ec..33260ea02 100644 --- a/server/lib/activitypub/playlists/refresh.ts +++ b/server/lib/activitypub/playlists/refresh.ts @@ -9,7 +9,7 @@ import { fetchRemoteVideoPlaylist } from './shared' function scheduleRefreshIfNeeded (playlist: MVideoPlaylist) { if (!playlist.isOutdated()) return - JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } }) } async function refreshVideoPlaylistIfNeeded (videoPlaylist: MVideoPlaylistOwner): Promise { diff --git a/server/lib/activitypub/send/shared/send-utils.ts b/server/lib/activitypub/send/shared/send-utils.ts index fcec63991..2bc1ef8f5 100644 --- a/server/lib/activitypub/send/shared/send-utils.ts +++ b/server/lib/activitypub/send/shared/send-utils.ts @@ -120,7 +120,7 @@ async function forwardActivity ( body: activity, contextType: null } - return afterCommitIfTransaction(t, () => JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })) + return afterCommitIfTransaction(t, () => JobQueue.Instance.createJobAsync({ type: 'activitypub-http-broadcast', payload })) } // --------------------------------------------------------------------------- @@ -205,7 +205,7 @@ function broadcastTo (options: { contextType } - JobQueue.Instance.createJob({ + JobQueue.Instance.createJobAsync({ type: parallelizable ? 'activitypub-http-broadcast-parallel' : 'activitypub-http-broadcast', @@ -222,7 +222,7 @@ function broadcastTo (options: { contextType } - JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload }) } } @@ -243,7 +243,7 @@ function unicastTo (options: { contextType } - JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload }) } // --------------------------------------------------------------------------- diff --git a/server/lib/activitypub/videos/get.ts b/server/lib/activitypub/videos/get.ts index b74df132c..14ba55034 100644 --- a/server/lib/activitypub/videos/get.ts +++ b/server/lib/activitypub/videos/get.ts @@ -107,7 +107,7 @@ async function scheduleRefresh (video: MVideoThumbnail, fetchType: VideoLoadByUr return refreshVideoIfNeeded(refreshOptions) } - await JobQueue.Instance.createJobWithPromise({ + await JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } }) diff --git a/server/lib/activitypub/videos/shared/video-sync-attributes.ts b/server/lib/activitypub/videos/shared/video-sync-attributes.ts index 8cf0c87a6..8ed1b6447 100644 --- a/server/lib/activitypub/videos/shared/video-sync-attributes.ts +++ b/server/lib/activitypub/videos/shared/video-sync-attributes.ts @@ -74,7 +74,7 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi } function createJob (payload: ActivitypubHttpFetcherPayload) { - return JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload }) + return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) } function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { diff --git a/server/lib/emailer.ts b/server/lib/emailer.ts index bd1089530..9e546de7f 100644 --- a/server/lib/emailer.ts +++ b/server/lib/emailer.ts @@ -66,7 +66,7 @@ class Emailer { } } - return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) + return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) } addPasswordCreateEmailJob (username: string, to: string, createPasswordUrl: string) { @@ -80,7 +80,7 @@ class Emailer { } } - return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) + return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) } addVerifyEmailJob (username: string, to: string, verifyEmailUrl: string) { @@ -94,7 +94,7 @@ class Emailer { } } - return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) + return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) } addUserBlockJob (user: MUser, blocked: boolean, reason?: string) { @@ -108,7 +108,7 @@ class Emailer { text: `Your account ${user.username} on ${CONFIG.INSTANCE.NAME} has been ${blockedWord}${reasonString}.` } - return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) + return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) } addContactFormJob (fromEmail: string, fromName: string, subject: string, body: string) { @@ -127,7 +127,7 @@ class Emailer { } } - return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload }) + return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload }) } async sendMail (options: EmailPayload) { diff --git a/server/lib/job-queue/handlers/activitypub-follow.ts b/server/lib/job-queue/handlers/activitypub-follow.ts index 944da5be1..a68c32ba0 100644 --- a/server/lib/job-queue/handlers/activitypub-follow.ts +++ b/server/lib/job-queue/handlers/activitypub-follow.ts @@ -17,7 +17,7 @@ async function processActivityPubFollow (job: Job) { const payload = job.data as ActivitypubFollowPayload const host = payload.host - logger.info('Processing ActivityPub follow in job %d.', job.id) + logger.info('Processing ActivityPub follow in job %s.', job.id) let targetActor: MActorFull if (!host || host === WEBSERVER.HOST) { diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 354c608fb..13eff5211 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -8,7 +8,7 @@ import { doRequest } from '../../../helpers/requests' import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' async function processActivityPubHttpBroadcast (job: Job) { - logger.info('Processing ActivityPub broadcast in job %d.', job.id) + logger.info('Processing ActivityPub broadcast in job %s.', job.id) const payload = job.data as ActivitypubHttpBroadcastPayload diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index e0b841887..b6cb3c4a6 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts @@ -12,7 +12,7 @@ import { addVideoShares } from '../../activitypub/share' import { addVideoComments } from '../../activitypub/video-comments' async function processActivityPubHttpFetcher (job: Job) { - logger.info('Processing ActivityPub fetcher in job %d.', job.id) + logger.info('Processing ActivityPub fetcher in job %s.', job.id) const payload = job.data as ActivitypubHttpFetcherPayload diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 837a597a5..9e4e84002 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts @@ -6,7 +6,7 @@ import { doRequest } from '../../../helpers/requests' import { ActorFollowHealthCache } from '../../actor-follow-health-cache' async function processActivityPubHttpUnicast (job: Job) { - logger.info('Processing ActivityPub unicast in job %d.', job.id) + logger.info('Processing ActivityPub unicast in job %s.', job.id) const payload = job.data as ActivitypubHttpUnicastPayload const uri = payload.uri diff --git a/server/lib/job-queue/handlers/activitypub-refresher.ts b/server/lib/job-queue/handlers/activitypub-refresher.ts index 600f858a0..307e771ff 100644 --- a/server/lib/job-queue/handlers/activitypub-refresher.ts +++ b/server/lib/job-queue/handlers/activitypub-refresher.ts @@ -11,7 +11,7 @@ import { refreshActorIfNeeded } from '../../activitypub/actors' async function refreshAPObject (job: Job) { const payload = job.data as RefreshPayload - logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url) + logger.info('Processing AP refresher in job %s for %s.', job.id, payload.url) if (payload.type === 'video') return refreshVideo(payload.url) if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url) diff --git a/server/lib/job-queue/handlers/actor-keys.ts b/server/lib/job-queue/handlers/actor-keys.ts index 4a5bad9fb..27a2d431b 100644 --- a/server/lib/job-queue/handlers/actor-keys.ts +++ b/server/lib/job-queue/handlers/actor-keys.ts @@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger' async function processActorKeys (job: Job) { const payload = job.data as ActorKeysPayload - logger.info('Processing actor keys in job %d.', job.id) + logger.info('Processing actor keys in job %s.', job.id) const actor = await ActorModel.load(payload.actorId) diff --git a/server/lib/job-queue/handlers/email.ts b/server/lib/job-queue/handlers/email.ts index b5b9475b1..567bcc076 100644 --- a/server/lib/job-queue/handlers/email.ts +++ b/server/lib/job-queue/handlers/email.ts @@ -5,7 +5,7 @@ import { Emailer } from '../../emailer' async function processEmail (job: Job) { const payload = job.data as EmailPayload - logger.info('Processing email in job %d.', job.id) + logger.info('Processing email in job %s.', job.id) return Emailer.Instance.sendMail(payload) } diff --git a/server/lib/job-queue/handlers/federate-video.ts b/server/lib/job-queue/handlers/federate-video.ts new file mode 100644 index 000000000..6aac36741 --- /dev/null +++ b/server/lib/job-queue/handlers/federate-video.ts @@ -0,0 +1,28 @@ +import { Job } from 'bullmq' +import { retryTransactionWrapper } from '@server/helpers/database-utils' +import { sequelizeTypescript } from '@server/initializers/database' +import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' +import { VideoModel } from '@server/models/video/video' +import { FederateVideoPayload } from '@shared/models' +import { logger } from '../../../helpers/logger' + +function processFederateVideo (job: Job) { + const payload = job.data as FederateVideoPayload + + logger.info('Processing video federation in job %s.', job.id) + + return retryTransactionWrapper(() => { + return sequelizeTypescript.transaction(async t => { + const video = await VideoModel.loadFull(payload.videoUUID, t) + if (!video) return + + return federateVideoIfNeeded(video, payload.isNewVideo, t) + }) + }) +} + +// --------------------------------------------------------------------------- + +export { + processFederateVideo +} diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts index 4505ca79e..03aa414c9 100644 --- a/server/lib/job-queue/handlers/manage-video-torrent.ts +++ b/server/lib/job-queue/handlers/manage-video-torrent.ts @@ -8,7 +8,7 @@ import { logger } from '../../../helpers/logger' async function processManageVideoTorrent (job: Job) { const payload = job.data as ManageVideoTorrentPayload - logger.info('Processing torrent in job %d.', job.id) + logger.info('Processing torrent in job %s.', job.id) if (payload.action === 'create') return doCreateAction(payload) if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload) diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index d608fd865..25bdebeea 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts @@ -17,7 +17,7 @@ const lTagsBase = loggerTagsFactory('move-object-storage') export async function processMoveToObjectStorage (job: Job) { const payload = job.data as MoveObjectStoragePayload - logger.info('Moving video %s in job %d.', payload.videoUUID, job.id) + logger.info('Moving video %s in job %s.', payload.videoUUID, job.id) const video = await VideoModel.loadWithFiles(payload.videoUUID) // No video, maybe deleted? @@ -43,7 +43,7 @@ export async function processMoveToObjectStorage (job: Job) { const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove') if (pendingMove === 0) { - logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags) + logger.info('Running cleanup after moving files to object storage (video %s in job %s)', video.uuid, job.id, lTags) await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo }) } diff --git a/server/lib/job-queue/handlers/notify.ts b/server/lib/job-queue/handlers/notify.ts new file mode 100644 index 000000000..83605396c --- /dev/null +++ b/server/lib/job-queue/handlers/notify.ts @@ -0,0 +1,27 @@ +import { Job } from 'bullmq' +import { Notifier } from '@server/lib/notifier' +import { VideoModel } from '@server/models/video/video' +import { NotifyPayload } from '@shared/models' +import { logger } from '../../../helpers/logger' + +async function processNotify (job: Job) { + const payload = job.data as NotifyPayload + logger.info('Processing %s notification in job %s.', payload.action, job.id) + + if (payload.action === 'new-video') return doNotifyNewVideo(payload) +} + +// --------------------------------------------------------------------------- + +export { + processNotify +} + +// --------------------------------------------------------------------------- + +async function doNotifyNewVideo (payload: NotifyPayload & { action: 'new-video' }) { + const refreshedVideo = await VideoModel.loadFull(payload.videoUUID) + if (!refreshedVideo) return + + Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) +} diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 40c44cf52..d950f6407 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts @@ -4,7 +4,7 @@ import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' import { CONFIG } from '@server/initializers/config' import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' import { generateWebTorrentVideoFilename } from '@server/lib/paths' -import { addMoveToObjectStorageJob } from '@server/lib/video' +import { buildMoveToObjectStorageJob } from '@server/lib/video' import { VideoPathManager } from '@server/lib/video-path-manager' import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' @@ -13,10 +13,11 @@ import { getLowercaseExtension } from '@shared/core-utils' import { VideoFileImportPayload, VideoStorage } from '@shared/models' import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' import { logger } from '../../../helpers/logger' +import { JobQueue } from '../job-queue' async function processVideoFileImport (job: Job) { const payload = job.data as VideoFileImportPayload - logger.info('Processing video file import in job %d.', job.id) + logger.info('Processing video file import in job %s.', job.id) const video = await VideoModel.loadFull(payload.videoUUID) // No video, maybe deleted? @@ -28,7 +29,7 @@ async function processVideoFileImport (job: Job) { await updateVideoFile(video, payload.filePath) if (CONFIG.OBJECT_STORAGE.ENABLED) { - await addMoveToObjectStorageJob({ video, previousVideoState: video.state }) + await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState: video.state })) } else { await federateVideoIfNeeded(video, false) } diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index e5cd35865..f4629159c 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -8,7 +8,7 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths' import { Hooks } from '@server/lib/plugins/hooks' import { ServerConfigManager } from '@server/lib/server-config-manager' import { isAbleToUploadVideo } from '@server/lib/user' -import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video' +import { buildOptimizeOrMergeAudioJob, buildMoveToObjectStorageJob } from '@server/lib/video' import { VideoPathManager } from '@server/lib/video-path-manager' import { buildNextVideoState } from '@server/lib/video-state' import { ThumbnailModel } from '@server/models/video/thumbnail' @@ -39,6 +39,7 @@ import { MThumbnail } from '../../../types/models/video/thumbnail' import { federateVideoIfNeeded } from '../../activitypub/videos' import { Notifier } from '../../notifier' import { generateVideoMiniature } from '../../thumbnail' +import { JobQueue } from '../job-queue' async function processVideoImport (job: Job) { const payload = job.data as VideoImportPayload @@ -65,7 +66,7 @@ export { // --------------------------------------------------------------------------- async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) { - logger.info('Processing torrent video import in job %d.', job.id) + logger.info('Processing torrent video import in job %s.', job.id) const options = { type: payload.type, videoImportId: payload.videoImportId } @@ -77,7 +78,7 @@ async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, } async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) { - logger.info('Processing youtubeDL video import in job %d.', job.id) + logger.info('Processing youtubeDL video import in job %s.', job.id) const options = { type: payload.type, videoImportId: videoImport.id } @@ -259,12 +260,16 @@ async function processFile (downloader: () => Promise, videoImport: MVid } if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { - return addMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) + await JobQueue.Instance.createJob( + await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) + ) } // Create transcoding jobs? if (video.state === VideoState.TO_TRANSCODE) { - await addOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) + await JobQueue.Instance.createJob( + await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) + ) } } catch (err) { diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts index 75ab2cd02..bac99fdb7 100644 --- a/server/lib/job-queue/handlers/video-redundancy.ts +++ b/server/lib/job-queue/handlers/video-redundancy.ts @@ -5,7 +5,7 @@ import { logger } from '../../../helpers/logger' async function processVideoRedundancy (job: Job) { const payload = job.data as VideoRedundancyPayload - logger.info('Processing video redundancy in job %d.', job.id) + logger.info('Processing video redundancy in job %s.', job.id) return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) } diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts index 078243538..23f9a34cc 100644 --- a/server/lib/job-queue/handlers/video-studio-edition.ts +++ b/server/lib/job-queue/handlers/video-studio-edition.ts @@ -8,7 +8,7 @@ import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' import { generateWebTorrentVideoFilename } from '@server/lib/paths' import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' import { isAbleToUploadVideo } from '@server/lib/user' -import { addOptimizeOrMergeAudioJob } from '@server/lib/video' +import { buildOptimizeOrMergeAudioJob } from '@server/lib/video' import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' import { VideoPathManager } from '@server/lib/video-path-manager' import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' @@ -36,6 +36,7 @@ import { VideoStudioTaskWatermarkPayload } from '@shared/models' import { logger, loggerTagsFactory } from '../../../helpers/logger' +import { JobQueue } from '../job-queue' const lTagsBase = loggerTagsFactory('video-edition') @@ -43,7 +44,7 @@ async function processVideoStudioEdition (job: Job) { const payload = job.data as VideoStudioEditionPayload const lTags = lTagsBase(payload.videoUUID) - logger.info('Process video studio edition of %s in job %d.', payload.videoUUID, job.id, lTags) + logger.info('Process video studio edition of %s in job %s.', payload.videoUUID, job.id, lTags) const video = await VideoModel.loadFull(payload.videoUUID) @@ -100,7 +101,10 @@ async function processVideoStudioEdition (job: Job) { await federateVideoIfNeeded(video, false, undefined) const user = await UserModel.loadByVideoId(video.id) - await addOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) + + await JobQueue.Instance.createJob( + await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) + ) } // --------------------------------------------------------------------------- diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 0cf5d53ce..50d732beb 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -1,4 +1,6 @@ import { + FlowJob, + FlowProducer, Job, JobsOptions, Queue, @@ -13,7 +15,7 @@ import { import { jobStates } from '@server/helpers/custom-validators/jobs' import { CONFIG } from '@server/initializers/config' import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' -import { timeoutPromise } from '@shared/core-utils' +import { pick, timeoutPromise } from '@shared/core-utils' import { ActivitypubFollowPayload, ActivitypubHttpBroadcastPayload, @@ -22,10 +24,12 @@ import { ActorKeysPayload, DeleteResumableUploadMetaFilePayload, EmailPayload, + FederateVideoPayload, JobState, JobType, ManageVideoTorrentPayload, MoveObjectStoragePayload, + NotifyPayload, RefreshPayload, VideoFileImportPayload, VideoImportPayload, @@ -45,8 +49,10 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica import { refreshAPObject } from './handlers/activitypub-refresher' import { processActorKeys } from './handlers/actor-keys' import { processEmail } from './handlers/email' +import { processFederateVideo } from './handlers/federate-video' import { processManageVideoTorrent } from './handlers/manage-video-torrent' import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' +import { processNotify } from './handlers/notify' import { processVideoFileImport } from './handlers/video-file-import' import { processVideoImport } from './handlers/video-import' import { processVideoLiveEnding } from './handlers/video-live-ending' @@ -54,7 +60,7 @@ import { processVideoStudioEdition } from './handlers/video-studio-edition' import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideosViewsStats } from './handlers/video-views-stats' -type CreateJobArgument = +export type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | @@ -73,7 +79,9 @@ type CreateJobArgument = { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } | { type: 'video-studio-edition', payload: VideoStudioEditionPayload } | { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } | - { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } + { type: 'notify', payload: NotifyPayload } | + { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } | + { type: 'federate-video', payload: FederateVideoPayload } export type CreateJobOptions = { delay?: number @@ -98,7 +106,9 @@ const handlers: { [id in JobType]: (job: Job) => Promise } = { 'video-redundancy': processVideoRedundancy, 'move-to-object-storage': processMoveToObjectStorage, 'manage-video-torrent': processManageVideoTorrent, - 'video-studio-edition': processVideoStudioEdition + 'notify': processNotify, + 'video-studio-edition': processVideoStudioEdition, + 'federate-video': processFederateVideo } const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise } = { @@ -123,7 +133,9 @@ const jobTypes: JobType[] = [ 'video-live-ending', 'move-to-object-storage', 'manage-video-torrent', - 'video-studio-edition' + 'video-studio-edition', + 'notify', + 'federate-video' ] const silentFailure = new Set([ 'activitypub-http-unicast' ]) @@ -137,6 +149,8 @@ class JobQueue { private queueSchedulers: { [id in JobType]?: QueueScheduler } = {} private queueEvents: { [id in JobType]?: QueueEvents } = {} + private flowProducer: FlowProducer + private initialized = false private jobRedisPrefix: string @@ -157,6 +171,11 @@ class JobQueue { this.buildQueueEvent(handlerName, produceOnly) } + this.flowProducer = new FlowProducer({ + connection: this.getRedisConnection(), + prefix: this.jobRedisPrefix + }) + this.addRepeatableJobs() } @@ -243,6 +262,8 @@ class JobQueue { } } + // --------------------------------------------------------------------------- + async terminate () { const promises = Object.keys(this.workers) .map(handlerName => { @@ -278,28 +299,56 @@ class JobQueue { } } - createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { - this.createJobWithPromise(obj, options) - .catch(err => logger.error('Cannot create job.', { err, obj })) + // --------------------------------------------------------------------------- + + createJobAsync (options: CreateJobArgument & CreateJobOptions): void { + this.createJob(options) + .catch(err => logger.error('Cannot create job.', { err, options })) } - async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { - const queue: Queue = this.queues[obj.type] + async createJob (options: CreateJobArgument & CreateJobOptions) { + const queue: Queue = this.queues[options.type] if (queue === undefined) { - logger.error('Unknown queue %s: cannot create job.', obj.type) + logger.error('Unknown queue %s: cannot create job.', options.type) return } - const jobArgs: JobsOptions = { + const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ])) + + return queue.add('job', options.payload, jobOptions) + } + + async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) { + let lastJob: FlowJob + + for (const job of jobs) { + if (!job) continue + + lastJob = { + name: 'job', + data: job.payload, + queueName: job.type, + opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), + children: lastJob + ? [ lastJob ] + : [] + } + } + + return this.flowProducer.add(lastJob) + } + + private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { + return { backoff: { delay: 60 * 1000, type: 'exponential' }, - attempts: JOB_ATTEMPTS[obj.type], + attempts: JOB_ATTEMPTS[type], priority: options.priority, delay: options.delay } - - return queue.add('job', obj.payload, jobArgs) } + // --------------------------------------------------------------------------- + async listForApi (options: { state?: JobState start: number @@ -367,6 +416,8 @@ class JobQueue { return Promise.all(promises) } + // --------------------------------------------------------------------------- + async removeOldJobs () { for (const key of Object.keys(this.queues)) { const queue: Queue = this.queues[key] diff --git a/server/lib/live/live-manager.ts b/server/lib/live/live-manager.ts index 1410889a2..aadd8e308 100644 --- a/server/lib/live/live-manager.ts +++ b/server/lib/live/live-manager.ts @@ -408,7 +408,7 @@ class LiveManager { await liveSession.save() } - JobQueue.Instance.createJob({ + JobQueue.Instance.createJobAsync({ type: 'video-live-ending', payload: { videoId: fullVideo.id, @@ -421,8 +421,12 @@ class LiveManager { streamingPlaylistId: fullVideo.getHLSPlaylist()?.id, publishedAt: fullVideo.publishedAt.toISOString() - } - }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY }) + }, + + delay: cleanupNow + ? 0 + : VIDEO_LIVE.CLEANUP_DELAY + }) fullVideo.state = live.permanentLive ? VideoState.WAITING_FOR_LIVE diff --git a/server/lib/notifier/notifier.ts b/server/lib/notifier/notifier.ts index d1c4c0215..66cfc31c4 100644 --- a/server/lib/notifier/notifier.ts +++ b/server/lib/notifier/notifier.ts @@ -242,7 +242,7 @@ class Notifier { for (const to of toEmails) { const payload = await object.createEmail(to) - JobQueue.Instance.createJob({ type: 'email', payload }) + JobQueue.Instance.createJobAsync({ type: 'email', payload }) } } diff --git a/server/lib/schedulers/auto-follow-index-instances.ts b/server/lib/schedulers/auto-follow-index-instances.ts index d9f9c2de3..956ece749 100644 --- a/server/lib/schedulers/auto-follow-index-instances.ts +++ b/server/lib/schedulers/auto-follow-index-instances.ts @@ -59,7 +59,7 @@ export class AutoFollowIndexInstances extends AbstractScheduler { isAutoFollow: true } - JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) } } diff --git a/server/lib/video-state.ts b/server/lib/video-state.ts index b5d8353b7..9ebbd7679 100644 --- a/server/lib/video-state.ts +++ b/server/lib/video-state.ts @@ -1,4 +1,5 @@ import { Transaction } from 'sequelize' +import { retryTransactionWrapper } from '@server/helpers/database-utils' import { logger } from '@server/helpers/logger' import { CONFIG } from '@server/initializers/config' import { sequelizeTypescript } from '@server/initializers/database' @@ -7,9 +8,9 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info' import { MVideo, MVideoFullLight, MVideoUUID } from '@server/types/models' import { VideoState } from '@shared/models' import { federateVideoIfNeeded } from './activitypub/videos' +import { JobQueue } from './job-queue' import { Notifier } from './notifier' -import { addMoveToObjectStorageJob } from './video' -import { retryTransactionWrapper } from '@server/helpers/database-utils' +import { buildMoveToObjectStorageJob } from './video' function buildNextVideoState (currentState?: VideoState) { if (currentState === VideoState.PUBLISHED) { @@ -86,7 +87,7 @@ async function moveToExternalStorageState (options: { logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] }) try { - await addMoveToObjectStorageJob({ video, previousVideoState, isNewVideo }) + await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState, isNewVideo })) return true } catch (err) { diff --git a/server/lib/video.ts b/server/lib/video.ts index b843b11bc..f7d7aa186 100644 --- a/server/lib/video.ts +++ b/server/lib/video.ts @@ -1,5 +1,7 @@ import { UploadFiles } from 'express' +import memoizee from 'memoizee' import { Transaction } from 'sequelize/types' +import { CONFIG } from '@server/initializers/config' import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants' import { TagModel } from '@server/models/video/tag' import { VideoModel } from '@server/models/video/video' @@ -9,8 +11,6 @@ import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' import { CreateJobOptions, JobQueue } from './job-queue/job-queue' import { updateVideoMiniatureFromExisting } from './thumbnail' -import { CONFIG } from '@server/initializers/config' -import memoizee from 'memoizee' function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes { return { @@ -86,7 +86,7 @@ async function setVideoTags (options: { // --------------------------------------------------------------------------- -async function addOptimizeOrMergeAudioJob (options: { +async function buildOptimizeOrMergeAudioJob (options: { video: MVideoUUID videoFile: MVideoFile user: MUserId @@ -94,10 +94,10 @@ async function addOptimizeOrMergeAudioJob (options: { }) { const { video, videoFile, user, isNewVideo } = options - let dataInput: VideoTranscodingPayload + let payload: VideoTranscodingPayload if (videoFile.isAudio()) { - dataInput = { + payload = { type: 'merge-audio-to-webtorrent', resolution: DEFAULT_AUDIO_RESOLUTION, videoUUID: video.uuid, @@ -105,24 +105,26 @@ async function addOptimizeOrMergeAudioJob (options: { isNewVideo } } else { - dataInput = { + payload = { type: 'optimize-to-webtorrent', videoUUID: video.uuid, isNewVideo } } - const jobOptions = { - priority: await getTranscodingJobPriority(user) - } + await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') - return addTranscodingJob(dataInput, jobOptions) + return { + type: 'video-transcoding' as 'video-transcoding', + priority: await getTranscodingJobPriority(user), + payload + } } async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') - return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload }, options) + return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options }) } async function getTranscodingJobPriority (user: MUserId) { @@ -136,7 +138,7 @@ async function getTranscodingJobPriority (user: MUserId) { // --------------------------------------------------------------------------- -async function addMoveToObjectStorageJob (options: { +async function buildMoveToObjectStorageJob (options: { video: MVideoUUID previousVideoState: VideoState isNewVideo?: boolean // Default true @@ -145,8 +147,14 @@ async function addMoveToObjectStorageJob (options: { await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingMove') - const dataInput = { videoUUID: video.uuid, isNewVideo, previousVideoState } - return JobQueue.Instance.createJobWithPromise({ type: 'move-to-object-storage', payload: dataInput }) + return { + type: 'move-to-object-storage' as 'move-to-object-storage', + payload: { + videoUUID: video.uuid, + isNewVideo, + previousVideoState + } + } } // --------------------------------------------------------------------------- @@ -173,9 +181,9 @@ export { buildLocalVideoFromReq, buildVideoThumbnailsFromReq, setVideoTags, - addOptimizeOrMergeAudioJob, + buildOptimizeOrMergeAudioJob, addTranscodingJob, - addMoveToObjectStorageJob, + buildMoveToObjectStorageJob, getTranscodingJobPriority, getCachedVideoDuration } diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts index a924183f2..8c8f64de9 100644 --- a/shared/models/server/job.model.ts +++ b/shared/models/server/job.model.ts @@ -25,6 +25,8 @@ export type JobType = | 'manage-video-torrent' | 'move-to-object-storage' | 'video-studio-edition' + | 'notify' + | 'federate-video' export interface Job { id: number | string @@ -214,3 +216,18 @@ export interface VideoStudioEditionPayload { videoUUID: string tasks: VideoStudioTaskPayload[] } + +// --------------------------------------------------------------------------- + +export type NotifyPayload = + { + action: 'new-video' + videoUUID: string + } + +// --------------------------------------------------------------------------- + +export interface FederateVideoPayload { + videoUUID: string + isNewVideo: boolean +} -- cgit v1.2.3