From bd911b54b555b11df7e9849cf92d358bccfecf6e Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Mon, 8 Aug 2022 15:48:17 +0200 Subject: Use bullmq job dependency --- server/controllers/api/accounts.ts | 2 +- server/controllers/api/server/follows.ts | 4 +- server/controllers/api/server/redundancy.ts | 2 +- server/controllers/api/users/my-subscriptions.ts | 2 +- server/controllers/api/video-channel.ts | 4 +- server/controllers/api/videos/import.ts | 4 +- server/controllers/api/videos/index.ts | 2 +- server/controllers/api/videos/studio.ts | 2 +- server/controllers/api/videos/update.ts | 64 ++++++++++++++-------- server/controllers/api/videos/upload.ts | 68 +++++++++++------------- 10 files changed, 82 insertions(+), 72 deletions(-) (limited to 'server/controllers') diff --git a/server/controllers/api/accounts.ts b/server/controllers/api/accounts.ts index 8d9f92d93..66cdaab82 100644 --- a/server/controllers/api/accounts.ts +++ b/server/controllers/api/accounts.ts @@ -119,7 +119,7 @@ function getAccount (req: express.Request, res: express.Response) { const account = res.locals.account if (account.isOutdated()) { - JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'actor', url: account.Actor.url } }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'actor', url: account.Actor.url } }) } return res.json(account.toFormattedJSON()) diff --git a/server/controllers/api/server/follows.ts b/server/controllers/api/server/follows.ts index 60d36ed59..87828813a 100644 --- a/server/controllers/api/server/follows.ts +++ b/server/controllers/api/server/follows.ts @@ -138,7 +138,7 @@ async function addFollow (req: express.Request, res: express.Response) { followerActorId: follower.id } - JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) } for (const handle of handles) { @@ -150,7 +150,7 @@ async function addFollow (req: express.Request, res: express.Response) { followerActorId: follower.id } - JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) } return res.status(HttpStatusCode.NO_CONTENT_204).end() diff --git a/server/controllers/api/server/redundancy.ts b/server/controllers/api/server/redundancy.ts index 9f43d3e4e..94e187cd4 100644 --- a/server/controllers/api/server/redundancy.ts +++ b/server/controllers/api/server/redundancy.ts @@ -85,7 +85,7 @@ async function addVideoRedundancy (req: express.Request, res: express.Response) videoId: res.locals.onlyVideo.id } - await JobQueue.Instance.createJobWithPromise({ + await JobQueue.Instance.createJob({ type: 'video-redundancy', payload }) diff --git a/server/controllers/api/users/my-subscriptions.ts b/server/controllers/api/users/my-subscriptions.ts index fb1f68635..a750f9bd1 100644 --- a/server/controllers/api/users/my-subscriptions.ts +++ b/server/controllers/api/users/my-subscriptions.ts @@ -122,7 +122,7 @@ function addUserSubscription (req: express.Request, res: express.Response) { followerActorId: user.Account.Actor.id } - JobQueue.Instance.createJob({ type: 'activitypub-follow', payload }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload }) return res.status(HttpStatusCode.NO_CONTENT_204).end() } diff --git a/server/controllers/api/video-channel.ts b/server/controllers/api/video-channel.ts index 411ec8630..6b33e894d 100644 --- a/server/controllers/api/video-channel.ts +++ b/server/controllers/api/video-channel.ts @@ -245,7 +245,7 @@ async function addVideoChannel (req: express.Request, res: express.Response) { }) const payload = { actorId: videoChannelCreated.actorId } - await JobQueue.Instance.createJobWithPromise({ type: 'actor-keys', payload }) + await JobQueue.Instance.createJob({ type: 'actor-keys', payload }) auditLogger.create(getAuditIdFromRes(res), new VideoChannelAuditView(videoChannelCreated.toFormattedJSON())) logger.info('Video channel %s created.', videoChannelCreated.Actor.url) @@ -335,7 +335,7 @@ async function getVideoChannel (req: express.Request, res: express.Response) { const videoChannel = await Hooks.wrapObject(res.locals.videoChannel, 'filter:api.video-channel.get.result', { id }) if (videoChannel.isOutdated()) { - JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'actor', url: videoChannel.Actor.url } }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'actor', url: videoChannel.Actor.url } }) } return res.json(videoChannel.toFormattedJSON()) diff --git a/server/controllers/api/videos/import.ts b/server/controllers/api/videos/import.ts index b12953630..5a2e1006a 100644 --- a/server/controllers/api/videos/import.ts +++ b/server/controllers/api/videos/import.ts @@ -163,7 +163,7 @@ async function addTorrentImport (req: express.Request, res: express.Response, to videoImportId: videoImport.id, magnetUri } - await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload }) + await JobQueue.Instance.createJob({ type: 'video-import', payload }) auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) @@ -255,7 +255,7 @@ async function addYoutubeDLImport (req: express.Request, res: express.Response) videoImportId: videoImport.id, fileExt } - await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload }) + await JobQueue.Instance.createJob({ type: 'video-import', payload }) auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON())) diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index eca72c397..b301515df 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts @@ -151,7 +151,7 @@ async function getVideo (_req: express.Request, res: express.Response) { const video = await Hooks.wrapObject(res.locals.videoAPI, 'filter:api.video.get.result', { id: videoId, userId }) if (video.isOutdated()) { - JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } }) + JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } }) } return res.json(video.toFormattedDetailsJSON()) diff --git a/server/controllers/api/videos/studio.ts b/server/controllers/api/videos/studio.ts index bff344f3f..6667532bf 100644 --- a/server/controllers/api/videos/studio.ts +++ b/server/controllers/api/videos/studio.ts @@ -71,7 +71,7 @@ async function createEditionTasks (req: express.Request, res: express.Response) tasks: body.tasks.map((t, i) => buildTaskPayload(t, i, files)) } - JobQueue.Instance.createJob({ type: 'video-studio-edition', payload }) + JobQueue.Instance.createJobAsync({ type: 'video-studio-edition', payload }) return res.sendStatus(HttpStatusCode.NO_CONTENT_204) } diff --git a/server/controllers/api/videos/update.ts b/server/controllers/api/videos/update.ts index 1545a2232..ab1a23d9a 100644 --- a/server/controllers/api/videos/update.ts +++ b/server/controllers/api/videos/update.ts @@ -1,7 +1,7 @@ import express from 'express' import { Transaction } from 'sequelize/types' import { changeVideoChannelShare } from '@server/lib/activitypub/share' -import { JobQueue } from '@server/lib/job-queue' +import { CreateJobArgument, JobQueue } from '@server/lib/job-queue' import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' import { openapiOperationDoc } from '@server/middlewares/doc' import { FilteredModelAttributes } from '@server/types' @@ -13,8 +13,6 @@ import { createReqFiles } from '../../../helpers/express-utils' import { logger, loggerTagsFactory } from '../../../helpers/logger' import { MIMETYPES } from '../../../initializers/constants' import { sequelizeTypescript } from '../../../initializers/database' -import { federateVideoIfNeeded } from '../../../lib/activitypub/videos' -import { Notifier } from '../../../lib/notifier' import { Hooks } from '../../../lib/plugins/hooks' import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videosUpdateValidator } from '../../../middlewares' @@ -139,13 +137,9 @@ async function updateVideo (req: express.Request, res: express.Response) { return { videoInstanceUpdated, isNewVideo } }) - const refreshedVideo = await updateTorrentsMetadataIfNeeded(videoInstanceUpdated, videoInfoToUpdate) + Hooks.runAction('action:api.video.updated', { video: videoInstanceUpdated, body: req.body, req, res }) - await sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, isNewVideo, t)) - - if (wasConfidentialVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) - - Hooks.runAction('action:api.video.updated', { video: refreshedVideo, body: req.body, req, res }) + await addVideoJobsAfterUpdate({ video: videoInstanceUpdated, videoInfoToUpdate, wasConfidentialVideo, isNewVideo }) } catch (err) { // Force fields we want to update // If the transaction is retried, sequelize will think the object has not changed @@ -192,25 +186,49 @@ function updateSchedule (videoInstance: MVideoFullLight, videoInfoToUpdate: Vide } } -async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfoToUpdate: VideoUpdate) { - if (video.isLive || !videoInfoToUpdate.name) return video +async function addVideoJobsAfterUpdate (options: { + video: MVideoFullLight + videoInfoToUpdate: VideoUpdate + wasConfidentialVideo: boolean + isNewVideo: boolean +}) { + const { video, videoInfoToUpdate, wasConfidentialVideo, isNewVideo } = options + const jobs: CreateJobArgument[] = [] + + if (!video.isLive && videoInfoToUpdate.name) { - for (const file of (video.VideoFiles || [])) { - const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } + for (const file of (video.VideoFiles || [])) { + const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id } - const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) - await JobQueue.Instance.waitJob(job) - } + jobs.push({ type: 'manage-video-torrent', payload }) + } - const hls = video.getHLSPlaylist() + const hls = video.getHLSPlaylist() - for (const file of (hls?.VideoFiles || [])) { - const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } + for (const file of (hls?.VideoFiles || [])) { + const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id } - const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) - await JobQueue.Instance.waitJob(job) + jobs.push({ type: 'manage-video-torrent', payload }) + } + } + + jobs.push({ + type: 'federate-video', + payload: { + videoUUID: video.uuid, + isNewVideo + } + }) + + if (wasConfidentialVideo) { + jobs.push({ + type: 'notify', + payload: { + action: 'new-video', + videoUUID: video.uuid + } + }) } - // Refresh video since files have changed - return VideoModel.loadFull(video.id) + return JobQueue.Instance.createSequentialJobFlow(...jobs) } diff --git a/server/controllers/api/videos/upload.ts b/server/controllers/api/videos/upload.ts index 4a9d7b619..cc171eece 100644 --- a/server/controllers/api/videos/upload.ts +++ b/server/controllers/api/videos/upload.ts @@ -8,9 +8,9 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths' import { Redis } from '@server/lib/redis' import { uploadx } from '@server/lib/uploadx' import { - addMoveToObjectStorageJob, - addOptimizeOrMergeAudioJob, buildLocalVideoFromReq, + buildMoveToObjectStorageJob, + buildOptimizeOrMergeAudioJob, buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' @@ -18,19 +18,16 @@ import { VideoPathManager } from '@server/lib/video-path-manager' import { buildNextVideoState } from '@server/lib/video-state' import { openapiOperationDoc } from '@server/middlewares/doc' import { VideoSourceModel } from '@server/models/video/video-source' -import { MVideoFile, MVideoFullLight } from '@server/types/models' +import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' import { getLowercaseExtension } from '@shared/core-utils' import { isAudioFile, uuidToShort } from '@shared/extra-utils' -import { HttpStatusCode, ManageVideoTorrentPayload, VideoCreate, VideoResolution, VideoState } from '@shared/models' +import { HttpStatusCode, VideoCreate, VideoResolution, VideoState } from '@shared/models' import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger' -import { retryTransactionWrapper } from '../../../helpers/database-utils' import { createReqFiles } from '../../../helpers/express-utils' import { buildFileMetadata, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '../../../helpers/ffmpeg' import { logger, loggerTagsFactory } from '../../../helpers/logger' import { MIMETYPES } from '../../../initializers/constants' import { sequelizeTypescript } from '../../../initializers/database' -import { federateVideoIfNeeded } from '../../../lib/activitypub/videos' -import { Notifier } from '../../../lib/notifier' import { Hooks } from '../../../lib/plugins/hooks' import { generateVideoMiniature } from '../../../lib/thumbnail' import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist' @@ -216,22 +213,8 @@ async function addVideo (options: { // Channel has a new content, set as updated await videoCreated.VideoChannel.setAsUpdated() - createTorrentFederate(videoCreated, videoFile) - .catch(err => { - logger.error('Cannot create torrent or federate video for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }) - - return videoCreated - }).then(refreshedVideo => { - if (!refreshedVideo) return - - if (refreshedVideo.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { - return addMoveToObjectStorageJob({ video: refreshedVideo, previousVideoState: undefined }) - } - - if (refreshedVideo.state === VideoState.TO_TRANSCODE) { - return addOptimizeOrMergeAudioJob({ video: refreshedVideo, videoFile, user }) - } - }).catch(err => logger.error('Cannot add optimize/merge audio job for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })) + addVideoJobsAfterUpload(videoCreated, videoFile, user) + .catch(err => logger.error('Cannot build new video jobs of %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })) Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res }) @@ -266,23 +249,32 @@ async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) { return videoFile } -async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoFile) { - const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' } - - const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload }) - await JobQueue.Instance.waitJob(job) - - const refreshedVideo = await VideoModel.loadFull(video.id) - if (!refreshedVideo) return - - // Only federate and notify after the torrent creation - Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo) +async function addVideoJobsAfterUpload (video: MVideoFullLight, videoFile: MVideoFile, user: MUserId) { + return JobQueue.Instance.createSequentialJobFlow( + { + type: 'manage-video-torrent' as 'manage-video-torrent', + payload: { + videoId: video.id, + videoFileId: videoFile.id, + action: 'create' + } + }, + { + type: 'federate-video' as 'federate-video', + payload: { + videoUUID: video.uuid, + isNewVideo: true + } + }, - await retryTransactionWrapper(() => { - return sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, true, t)) - }) + video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE + ? await buildMoveToObjectStorageJob({ video, previousVideoState: undefined }) + : undefined, - return refreshedVideo + video.state === VideoState.TO_TRANSCODE + ? await buildOptimizeOrMergeAudioJob({ video, videoFile, user }) + : undefined + ) } async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) { -- cgit v1.2.3