]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Use bullmq job dependency
authorChocobozzz <me@florianbigard.com>
Mon, 8 Aug 2022 13:48:17 +0000 (15:48 +0200)
committerChocobozzz <me@florianbigard.com>
Tue, 9 Aug 2022 07:18:07 +0000 (09:18 +0200)
42 files changed:
scripts/create-import-video-file-job.ts
server/controllers/api/accounts.ts
server/controllers/api/server/follows.ts
server/controllers/api/server/redundancy.ts
server/controllers/api/users/my-subscriptions.ts
server/controllers/api/video-channel.ts
server/controllers/api/videos/import.ts
server/controllers/api/videos/index.ts
server/controllers/api/videos/studio.ts
server/controllers/api/videos/update.ts
server/controllers/api/videos/upload.ts
server/initializers/constants.ts
server/lib/activitypub/actors/get.ts
server/lib/activitypub/follow.ts
server/lib/activitypub/outbox.ts
server/lib/activitypub/playlists/refresh.ts
server/lib/activitypub/send/shared/send-utils.ts
server/lib/activitypub/videos/get.ts
server/lib/activitypub/videos/shared/video-sync-attributes.ts
server/lib/emailer.ts
server/lib/job-queue/handlers/activitypub-follow.ts
server/lib/job-queue/handlers/activitypub-http-broadcast.ts
server/lib/job-queue/handlers/activitypub-http-fetcher.ts
server/lib/job-queue/handlers/activitypub-http-unicast.ts
server/lib/job-queue/handlers/activitypub-refresher.ts
server/lib/job-queue/handlers/actor-keys.ts
server/lib/job-queue/handlers/email.ts
server/lib/job-queue/handlers/federate-video.ts [new file with mode: 0644]
server/lib/job-queue/handlers/manage-video-torrent.ts
server/lib/job-queue/handlers/move-to-object-storage.ts
server/lib/job-queue/handlers/notify.ts [new file with mode: 0644]
server/lib/job-queue/handlers/video-file-import.ts
server/lib/job-queue/handlers/video-import.ts
server/lib/job-queue/handlers/video-redundancy.ts
server/lib/job-queue/handlers/video-studio-edition.ts
server/lib/job-queue/job-queue.ts
server/lib/live/live-manager.ts
server/lib/notifier/notifier.ts
server/lib/schedulers/auto-follow-index-instances.ts
server/lib/video-state.ts
server/lib/video.ts
shared/models/server/job.model.ts

index 97e9c7933803c43e09c800c1aa089dc309be49b1..cf974f2402d36e7a232771adcddcd984127b821b 100644 (file)
@@ -45,6 +45,6 @@ async function run () {
   }
 
   JobQueue.Instance.init(true)
-  await JobQueue.Instance.createJobWithPromise({ type: 'video-file-import', payload: dataInput })
+  await JobQueue.Instance.createJob({ type: 'video-file-import', payload: dataInput })
   console.log('Import job for video %s created.', video.uuid)
 }
index 8d9f92d933f930f10e52f51b5abac206c5f09369..66cdaab82c6bee6e48b627704af70b711d5e1f14 100644 (file)
@@ -119,7 +119,7 @@ function getAccount (req: express.Request, res: express.Response) {
   const account = res.locals.account
 
   if (account.isOutdated()) {
-    JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'actor', url: account.Actor.url } })
+    JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'actor', url: account.Actor.url } })
   }
 
   return res.json(account.toFormattedJSON())
index 60d36ed593888c30e16274f422f058628a821457..87828813a7d54672c2a1cd5dfa3f769068446f59 100644 (file)
@@ -138,7 +138,7 @@ async function addFollow (req: express.Request, res: express.Response) {
       followerActorId: follower.id
     }
 
-    JobQueue.Instance.createJob({ type: 'activitypub-follow', payload })
+    JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
   }
 
   for (const handle of handles) {
@@ -150,7 +150,7 @@ async function addFollow (req: express.Request, res: express.Response) {
       followerActorId: follower.id
     }
 
-    JobQueue.Instance.createJob({ type: 'activitypub-follow', payload })
+    JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
   }
 
   return res.status(HttpStatusCode.NO_CONTENT_204).end()
index 9f43d3e4e9901ed01648a1f8e9dccc6130a0ea3f..94e187cd42c47f0c5156ac33b36cc33d99f8ef02 100644 (file)
@@ -85,7 +85,7 @@ async function addVideoRedundancy (req: express.Request, res: express.Response)
     videoId: res.locals.onlyVideo.id
   }
 
-  await JobQueue.Instance.createJobWithPromise({
+  await JobQueue.Instance.createJob({
     type: 'video-redundancy',
     payload
   })
index fb1f6863513441b862a5e1bac1f2a2ad0d8b61a2..a750f9bd1c5fb14364470a58024113b13feb0490 100644 (file)
@@ -122,7 +122,7 @@ function addUserSubscription (req: express.Request, res: express.Response) {
     followerActorId: user.Account.Actor.id
   }
 
-  JobQueue.Instance.createJob({ type: 'activitypub-follow', payload })
+  JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
 
   return res.status(HttpStatusCode.NO_CONTENT_204).end()
 }
index 411ec86304ff74f49b23582765dfd1ac5ae2b31e..6b33e894d837da37a8c1cfc4ae548db2a32764d5 100644 (file)
@@ -245,7 +245,7 @@ async function addVideoChannel (req: express.Request, res: express.Response) {
   })
 
   const payload = { actorId: videoChannelCreated.actorId }
-  await JobQueue.Instance.createJobWithPromise({ type: 'actor-keys', payload })
+  await JobQueue.Instance.createJob({ type: 'actor-keys', payload })
 
   auditLogger.create(getAuditIdFromRes(res), new VideoChannelAuditView(videoChannelCreated.toFormattedJSON()))
   logger.info('Video channel %s created.', videoChannelCreated.Actor.url)
@@ -335,7 +335,7 @@ async function getVideoChannel (req: express.Request, res: express.Response) {
   const videoChannel = await Hooks.wrapObject(res.locals.videoChannel, 'filter:api.video-channel.get.result', { id })
 
   if (videoChannel.isOutdated()) {
-    JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'actor', url: videoChannel.Actor.url } })
+    JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'actor', url: videoChannel.Actor.url } })
   }
 
   return res.json(videoChannel.toFormattedJSON())
index b1295363003a6c7e33b1cf04cc88c6e0a76d7aa4..5a2e1006a64fc18b5308482bd6c1cbe5b7098873 100644 (file)
@@ -163,7 +163,7 @@ async function addTorrentImport (req: express.Request, res: express.Response, to
     videoImportId: videoImport.id,
     magnetUri
   }
-  await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload })
+  await JobQueue.Instance.createJob({ type: 'video-import', payload })
 
   auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON()))
 
@@ -255,7 +255,7 @@ async function addYoutubeDLImport (req: express.Request, res: express.Response)
     videoImportId: videoImport.id,
     fileExt
   }
-  await JobQueue.Instance.createJobWithPromise({ type: 'video-import', payload })
+  await JobQueue.Instance.createJob({ type: 'video-import', payload })
 
   auditLogger.create(getAuditIdFromRes(res), new VideoImportAuditView(videoImport.toFormattedJSON()))
 
index eca72c397a85492f82a688fa516dc0233ddeb6a5..b301515df397184d2ddbe0a59291d84719a4a256 100644 (file)
@@ -151,7 +151,7 @@ async function getVideo (_req: express.Request, res: express.Response) {
   const video = await Hooks.wrapObject(res.locals.videoAPI, 'filter:api.video.get.result', { id: videoId, userId })
 
   if (video.isOutdated()) {
-    JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } })
+    JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video', url: video.url } })
   }
 
   return res.json(video.toFormattedDetailsJSON())
index bff344f3f226fc223cb36747e8afd23ed4480806..6667532bf53cfcc61f8f970da875d4071d02b5c9 100644 (file)
@@ -71,7 +71,7 @@ async function createEditionTasks (req: express.Request, res: express.Response)
     tasks: body.tasks.map((t, i) => buildTaskPayload(t, i, files))
   }
 
-  JobQueue.Instance.createJob({ type: 'video-studio-edition', payload })
+  JobQueue.Instance.createJobAsync({ type: 'video-studio-edition', payload })
 
   return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
 }
index 1545a223252af382a855c4f5097c96d054eec696..ab1a23d9a4ea4098962f191cb900aff4dff15c68 100644 (file)
@@ -1,7 +1,7 @@
 import express from 'express'
 import { Transaction } from 'sequelize/types'
 import { changeVideoChannelShare } from '@server/lib/activitypub/share'
-import { JobQueue } from '@server/lib/job-queue'
+import { CreateJobArgument, JobQueue } from '@server/lib/job-queue'
 import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video'
 import { openapiOperationDoc } from '@server/middlewares/doc'
 import { FilteredModelAttributes } from '@server/types'
@@ -13,8 +13,6 @@ import { createReqFiles } from '../../../helpers/express-utils'
 import { logger, loggerTagsFactory } from '../../../helpers/logger'
 import { MIMETYPES } from '../../../initializers/constants'
 import { sequelizeTypescript } from '../../../initializers/database'
-import { federateVideoIfNeeded } from '../../../lib/activitypub/videos'
-import { Notifier } from '../../../lib/notifier'
 import { Hooks } from '../../../lib/plugins/hooks'
 import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist'
 import { asyncMiddleware, asyncRetryTransactionMiddleware, authenticate, videosUpdateValidator } from '../../../middlewares'
@@ -139,13 +137,9 @@ async function updateVideo (req: express.Request, res: express.Response) {
       return { videoInstanceUpdated, isNewVideo }
     })
 
-    const refreshedVideo = await updateTorrentsMetadataIfNeeded(videoInstanceUpdated, videoInfoToUpdate)
+    Hooks.runAction('action:api.video.updated', { video: videoInstanceUpdated, body: req.body, req, res })
 
-    await sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, isNewVideo, t))
-
-    if (wasConfidentialVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
-
-    Hooks.runAction('action:api.video.updated', { video: refreshedVideo, body: req.body, req, res })
+    await addVideoJobsAfterUpdate({ video: videoInstanceUpdated, videoInfoToUpdate, wasConfidentialVideo, isNewVideo })
   } catch (err) {
     // Force fields we want to update
     // If the transaction is retried, sequelize will think the object has not changed
@@ -192,25 +186,49 @@ function updateSchedule (videoInstance: MVideoFullLight, videoInfoToUpdate: Vide
   }
 }
 
-async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfoToUpdate: VideoUpdate) {
-  if (video.isLive || !videoInfoToUpdate.name) return video
+async function addVideoJobsAfterUpdate (options: {
+  video: MVideoFullLight
+  videoInfoToUpdate: VideoUpdate
+  wasConfidentialVideo: boolean
+  isNewVideo: boolean
+}) {
+  const { video, videoInfoToUpdate, wasConfidentialVideo, isNewVideo } = options
+  const jobs: CreateJobArgument[] = []
+
+  if (!video.isLive && videoInfoToUpdate.name) {
 
-  for (const file of (video.VideoFiles || [])) {
-    const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id }
+    for (const file of (video.VideoFiles || [])) {
+      const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id }
 
-    const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
-    await JobQueue.Instance.waitJob(job)
-  }
+      jobs.push({ type: 'manage-video-torrent', payload })
+    }
 
-  const hls = video.getHLSPlaylist()
+    const hls = video.getHLSPlaylist()
 
-  for (const file of (hls?.VideoFiles || [])) {
-    const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id }
+    for (const file of (hls?.VideoFiles || [])) {
+      const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id }
 
-    const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
-    await JobQueue.Instance.waitJob(job)
+      jobs.push({ type: 'manage-video-torrent', payload })
+    }
+  }
+
+  jobs.push({
+    type: 'federate-video',
+    payload: {
+      videoUUID: video.uuid,
+      isNewVideo
+    }
+  })
+
+  if (wasConfidentialVideo) {
+    jobs.push({
+      type: 'notify',
+      payload: {
+        action: 'new-video',
+        videoUUID: video.uuid
+      }
+    })
   }
 
-  // Refresh video since files have changed
-  return VideoModel.loadFull(video.id)
+  return JobQueue.Instance.createSequentialJobFlow(...jobs)
 }
index 4a9d7b61952b51ad8381057c8cde1d3496ff0b87..cc171eeceb5d857a67e770e2c9c113c942ed9444 100644 (file)
@@ -8,9 +8,9 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths'
 import { Redis } from '@server/lib/redis'
 import { uploadx } from '@server/lib/uploadx'
 import {
-  addMoveToObjectStorageJob,
-  addOptimizeOrMergeAudioJob,
   buildLocalVideoFromReq,
+  buildMoveToObjectStorageJob,
+  buildOptimizeOrMergeAudioJob,
   buildVideoThumbnailsFromReq,
   setVideoTags
 } from '@server/lib/video'
@@ -18,19 +18,16 @@ import { VideoPathManager } from '@server/lib/video-path-manager'
 import { buildNextVideoState } from '@server/lib/video-state'
 import { openapiOperationDoc } from '@server/middlewares/doc'
 import { VideoSourceModel } from '@server/models/video/video-source'
-import { MVideoFile, MVideoFullLight } from '@server/types/models'
+import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models'
 import { getLowercaseExtension } from '@shared/core-utils'
 import { isAudioFile, uuidToShort } from '@shared/extra-utils'
-import { HttpStatusCode, ManageVideoTorrentPayload, VideoCreate, VideoResolution, VideoState } from '@shared/models'
+import { HttpStatusCode, VideoCreate, VideoResolution, VideoState } from '@shared/models'
 import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger'
-import { retryTransactionWrapper } from '../../../helpers/database-utils'
 import { createReqFiles } from '../../../helpers/express-utils'
 import { buildFileMetadata, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '../../../helpers/ffmpeg'
 import { logger, loggerTagsFactory } from '../../../helpers/logger'
 import { MIMETYPES } from '../../../initializers/constants'
 import { sequelizeTypescript } from '../../../initializers/database'
-import { federateVideoIfNeeded } from '../../../lib/activitypub/videos'
-import { Notifier } from '../../../lib/notifier'
 import { Hooks } from '../../../lib/plugins/hooks'
 import { generateVideoMiniature } from '../../../lib/thumbnail'
 import { autoBlacklistVideoIfNeeded } from '../../../lib/video-blacklist'
@@ -216,22 +213,8 @@ async function addVideo (options: {
   // Channel has a new content, set as updated
   await videoCreated.VideoChannel.setAsUpdated()
 
-  createTorrentFederate(videoCreated, videoFile)
-    .catch(err => {
-      logger.error('Cannot create torrent or federate video for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })
-
-      return videoCreated
-    }).then(refreshedVideo => {
-      if (!refreshedVideo) return
-
-      if (refreshedVideo.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
-        return addMoveToObjectStorageJob({ video: refreshedVideo, previousVideoState: undefined })
-      }
-
-      if (refreshedVideo.state === VideoState.TO_TRANSCODE) {
-        return addOptimizeOrMergeAudioJob({ video: refreshedVideo, videoFile, user })
-      }
-    }).catch(err => logger.error('Cannot add optimize/merge audio job for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }))
+  addVideoJobsAfterUpload(videoCreated, videoFile, user)
+    .catch(err => logger.error('Cannot build new video jobs of %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }))
 
   Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res })
 
@@ -266,23 +249,32 @@ async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) {
   return videoFile
 }
 
-async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoFile) {
-  const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' }
-
-  const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
-  await JobQueue.Instance.waitJob(job)
-
-  const refreshedVideo = await VideoModel.loadFull(video.id)
-  if (!refreshedVideo) return
-
-  // Only federate and notify after the torrent creation
-  Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
+async function addVideoJobsAfterUpload (video: MVideoFullLight, videoFile: MVideoFile, user: MUserId) {
+  return JobQueue.Instance.createSequentialJobFlow(
+    {
+      type: 'manage-video-torrent' as 'manage-video-torrent',
+      payload: {
+        videoId: video.id,
+        videoFileId: videoFile.id,
+        action: 'create'
+      }
+    },
+    {
+      type: 'federate-video' as 'federate-video',
+      payload: {
+        videoUUID: video.uuid,
+        isNewVideo: true
+      }
+    },
 
-  await retryTransactionWrapper(() => {
-    return sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, true, t))
-  })
+    video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE
+      ? await buildMoveToObjectStorageJob({ video, previousVideoState: undefined })
+      : undefined,
 
-  return refreshedVideo
+    video.state === VideoState.TO_TRANSCODE
+      ? await buildOptimizeOrMergeAudioJob({ video, videoFile, user })
+      : undefined
+  )
 }
 
 async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) {
index db43c59bebcffdc328801c6fde1937bdeb0626c2..a53c226629be8af199ef2d1e6aa8ee6bbe8c955c 100644 (file)
@@ -156,7 +156,9 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
   'video-live-ending': 1,
   'video-studio-edition': 1,
   'manage-video-torrent': 1,
-  'move-to-object-storage': 3
+  'move-to-object-storage': 3,
+  'notify': 1,
+  'federate-video': 1
 }
 // Excluded keys are jobs that can be configured by admins
 const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
@@ -175,7 +177,9 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im
   'video-live-ending': 10,
   'video-studio-edition': 1,
   'manage-video-torrent': 1,
-  'move-to-object-storage': 1
+  'move-to-object-storage': 1,
+  'notify': 5,
+  'federate-video': 3
 }
 const JOB_TTL: { [id in JobType]: number } = {
   'activitypub-http-broadcast': 60000 * 10, // 10 minutes
@@ -195,6 +199,8 @@ const JOB_TTL: { [id in JobType]: number } = {
   'video-redundancy': 1000 * 3600 * 3, // 3 hours
   'video-live-ending': 1000 * 60 * 10, // 10 minutes
   'manage-video-torrent': 1000 * 3600 * 3, // 3 hours
+  'notify': 60000 * 5, // 5 minutes
+  'federate-video': 60000 * 5, // 5 minutes
   'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours
 }
 const REPEAT_JOBS: { [ id in JobType ]?: RepeatOptions } = {
index d2b651082478f19f7817c3e27422294596fead17..e73b7d707e0b298b0395677d83df0aeca2f1d5b1 100644 (file)
@@ -110,7 +110,7 @@ async function loadActorFromDB (actorUrl: string, fetchType: ActorLoadByUrlType)
 async function scheduleOutboxFetchIfNeeded (actor: MActor, created: boolean, refreshed: boolean, updateCollections: boolean) {
   if ((created === true || refreshed === true) && updateCollections === true) {
     const payload = { uri: actor.outboxUrl, type: 'activity' as 'activity' }
-    await JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload })
+    await JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
   }
 }
 
@@ -118,6 +118,6 @@ async function schedulePlaylistFetchIfNeeded (actor: MActorAccountId, created: b
   // We created a new account: fetch the playlists
   if (created === true && actor.Account && accountPlaylistsUrl) {
     const payload = { uri: accountPlaylistsUrl, type: 'account-playlists' as 'account-playlists' }
-    await JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload })
+    await JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
   }
 }
index 741b54df51be336ab0c988bd72d6d3dfcab4f23d..f6e2a48fdec3e889bb04a9c72d832fbb110eef35 100644 (file)
@@ -27,7 +27,7 @@ async function autoFollowBackIfNeeded (actorFollow: MActorFollowActors, transact
       isAutoFollow: true
     }
 
-    JobQueue.Instance.createJob({ type: 'activitypub-follow', payload })
+    JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
   }
 }
 
index ecdc33a777b436ab17236736a866e972585287ea..5eef7687118ede1b33c07dd28a1c1c2d0a71626a 100644 (file)
@@ -16,7 +16,7 @@ async function addFetchOutboxJob (actor: Pick<ActorModel, 'id' | 'outboxUrl'>) {
     type: 'activity' as 'activity'
   }
 
-  return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
+  return JobQueue.Instance.createJobAsync({ type: 'activitypub-http-fetcher', payload })
 }
 
 export {
index 493e8c7ec7bd6abbf8120aa26bb5579c3face4e8..33260ea024afc8e254fe80ccc7c86beab06c74b2 100644 (file)
@@ -9,7 +9,7 @@ import { fetchRemoteVideoPlaylist } from './shared'
 function scheduleRefreshIfNeeded (playlist: MVideoPlaylist) {
   if (!playlist.isOutdated()) return
 
-  JobQueue.Instance.createJob({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } })
+  JobQueue.Instance.createJobAsync({ type: 'activitypub-refresher', payload: { type: 'video-playlist', url: playlist.url } })
 }
 
 async function refreshVideoPlaylistIfNeeded (videoPlaylist: MVideoPlaylistOwner): Promise<MVideoPlaylistOwner> {
index fcec63991a9a62f3eaf1ecb77ad8ec3bc405331f..2bc1ef8f56fef7f4695c87b95781d733d162eb66 100644 (file)
@@ -120,7 +120,7 @@ async function forwardActivity (
     body: activity,
     contextType: null
   }
-  return afterCommitIfTransaction(t, () => JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }))
+  return afterCommitIfTransaction(t, () => JobQueue.Instance.createJobAsync({ type: 'activitypub-http-broadcast', payload }))
 }
 
 // ---------------------------------------------------------------------------
@@ -205,7 +205,7 @@ function broadcastTo (options: {
       contextType
     }
 
-    JobQueue.Instance.createJob({
+    JobQueue.Instance.createJobAsync({
       type: parallelizable
         ? 'activitypub-http-broadcast-parallel'
         : 'activitypub-http-broadcast',
@@ -222,7 +222,7 @@ function broadcastTo (options: {
       contextType
     }
 
-    JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
+    JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload })
   }
 }
 
@@ -243,7 +243,7 @@ function unicastTo (options: {
     contextType
   }
 
-  JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
+  JobQueue.Instance.createJobAsync({ type: 'activitypub-http-unicast', payload })
 }
 
 // ---------------------------------------------------------------------------
index b74df132c0cac31856353fab687a61410c47195a..14ba550346d5722fd13fe7589b0768df0726971c 100644 (file)
@@ -107,7 +107,7 @@ async function scheduleRefresh (video: MVideoThumbnail, fetchType: VideoLoadByUr
     return refreshVideoIfNeeded(refreshOptions)
   }
 
-  await JobQueue.Instance.createJobWithPromise({
+  await JobQueue.Instance.createJob({
     type: 'activitypub-refresher',
     payload: { type: 'video', url: video.url }
   })
index 8cf0c87a6850c0b6ef51de3f04b9811bb1c68c44..8ed1b6447adb0ea0372e885681b6f77003745b18 100644 (file)
@@ -74,7 +74,7 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi
 }
 
 function createJob (payload: ActivitypubHttpFetcherPayload) {
-  return JobQueue.Instance.createJobWithPromise({ type: 'activitypub-http-fetcher', payload })
+  return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
 }
 
 function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) {
index bd10895308e29f213b22df00443fce3206b654c7..9e546de7f85d39adf55545afe8510005d7d64ee0 100644 (file)
@@ -66,7 +66,7 @@ class Emailer {
       }
     }
 
-    return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload })
+    return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
   }
 
   addPasswordCreateEmailJob (username: string, to: string, createPasswordUrl: string) {
@@ -80,7 +80,7 @@ class Emailer {
       }
     }
 
-    return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload })
+    return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
   }
 
   addVerifyEmailJob (username: string, to: string, verifyEmailUrl: string) {
@@ -94,7 +94,7 @@ class Emailer {
       }
     }
 
-    return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload })
+    return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
   }
 
   addUserBlockJob (user: MUser, blocked: boolean, reason?: string) {
@@ -108,7 +108,7 @@ class Emailer {
       text: `Your account ${user.username} on ${CONFIG.INSTANCE.NAME} has been ${blockedWord}${reasonString}.`
     }
 
-    return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload })
+    return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
   }
 
   addContactFormJob (fromEmail: string, fromName: string, subject: string, body: string) {
@@ -127,7 +127,7 @@ class Emailer {
       }
     }
 
-    return JobQueue.Instance.createJob({ type: 'email', payload: emailPayload })
+    return JobQueue.Instance.createJobAsync({ type: 'email', payload: emailPayload })
   }
 
   async sendMail (options: EmailPayload) {
index 944da5be17c4dada7e7aed19c072b63f5ebea250..a68c32ba069f1de826613c8cd0e325fb0dd1f1aa 100644 (file)
@@ -17,7 +17,7 @@ async function processActivityPubFollow (job: Job) {
   const payload = job.data as ActivitypubFollowPayload
   const host = payload.host
 
-  logger.info('Processing ActivityPub follow in job %d.', job.id)
+  logger.info('Processing ActivityPub follow in job %s.', job.id)
 
   let targetActor: MActorFull
   if (!host || host === WEBSERVER.HOST) {
index 354c608fb6212421c422e6b5f98fcac0f0690cbb..13eff52111025d288e7152d2f75230e3f533be20 100644 (file)
@@ -8,7 +8,7 @@ import { doRequest } from '../../../helpers/requests'
 import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
 
 async function processActivityPubHttpBroadcast (job: Job) {
-  logger.info('Processing ActivityPub broadcast in job %d.', job.id)
+  logger.info('Processing ActivityPub broadcast in job %s.', job.id)
 
   const payload = job.data as ActivitypubHttpBroadcastPayload
 
index e0b8418872d75d78fc67f142779dfa8d112b424b..b6cb3c4a694df286382b16b90f2a8cc85c2596a8 100644 (file)
@@ -12,7 +12,7 @@ import { addVideoShares } from '../../activitypub/share'
 import { addVideoComments } from '../../activitypub/video-comments'
 
 async function processActivityPubHttpFetcher (job: Job) {
-  logger.info('Processing ActivityPub fetcher in job %d.', job.id)
+  logger.info('Processing ActivityPub fetcher in job %s.', job.id)
 
   const payload = job.data as ActivitypubHttpFetcherPayload
 
index 837a597a58aeb56cda164b75120c18edc3953d40..9e4e84002528abd90471f88afc347d90aa71610f 100644 (file)
@@ -6,7 +6,7 @@ import { doRequest } from '../../../helpers/requests'
 import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
 
 async function processActivityPubHttpUnicast (job: Job) {
-  logger.info('Processing ActivityPub unicast in job %d.', job.id)
+  logger.info('Processing ActivityPub unicast in job %s.', job.id)
 
   const payload = job.data as ActivitypubHttpUnicastPayload
   const uri = payload.uri
index 600f858a0811026b6ed1ca744981766037ee80a0..307e771ffc7312f9b2084473edcb98f53d541fcb 100644 (file)
@@ -11,7 +11,7 @@ import { refreshActorIfNeeded } from '../../activitypub/actors'
 async function refreshAPObject (job: Job) {
   const payload = job.data as RefreshPayload
 
-  logger.info('Processing AP refresher in job %d for %s.', job.id, payload.url)
+  logger.info('Processing AP refresher in job %s for %s.', job.id, payload.url)
 
   if (payload.type === 'video') return refreshVideo(payload.url)
   if (payload.type === 'video-playlist') return refreshVideoPlaylist(payload.url)
index 4a5bad9fb9272877dce9fb57fd6caca423ad925b..27a2d431b1758c3179ff9a4f62b683560e8e89ce 100644 (file)
@@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger'
 
 async function processActorKeys (job: Job) {
   const payload = job.data as ActorKeysPayload
-  logger.info('Processing actor keys in job %d.', job.id)
+  logger.info('Processing actor keys in job %s.', job.id)
 
   const actor = await ActorModel.load(payload.actorId)
 
index b5b9475b1db2c7cf2070464878c25fb624e144a2..567bcc076b62eed662b7ac7f45855315d847aa05 100644 (file)
@@ -5,7 +5,7 @@ import { Emailer } from '../../emailer'
 
 async function processEmail (job: Job) {
   const payload = job.data as EmailPayload
-  logger.info('Processing email in job %d.', job.id)
+  logger.info('Processing email in job %s.', job.id)
 
   return Emailer.Instance.sendMail(payload)
 }
diff --git a/server/lib/job-queue/handlers/federate-video.ts b/server/lib/job-queue/handlers/federate-video.ts
new file mode 100644 (file)
index 0000000..6aac367
--- /dev/null
@@ -0,0 +1,28 @@
+import { Job } from 'bullmq'
+import { retryTransactionWrapper } from '@server/helpers/database-utils'
+import { sequelizeTypescript } from '@server/initializers/database'
+import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
+import { VideoModel } from '@server/models/video/video'
+import { FederateVideoPayload } from '@shared/models'
+import { logger } from '../../../helpers/logger'
+
+function processFederateVideo (job: Job) {
+  const payload = job.data as FederateVideoPayload
+
+  logger.info('Processing video federation in job %s.', job.id)
+
+  return retryTransactionWrapper(() => {
+    return sequelizeTypescript.transaction(async t => {
+      const video = await VideoModel.loadFull(payload.videoUUID, t)
+      if (!video) return
+
+      return federateVideoIfNeeded(video, payload.isNewVideo, t)
+    })
+  })
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+  processFederateVideo
+}
index 4505ca79ebb085f9814a5aa21e6255f877a0c95c..03aa414c9836e5e5e7a5b9ce6147c235db6203d8 100644 (file)
@@ -8,7 +8,7 @@ import { logger } from '../../../helpers/logger'
 
 async function processManageVideoTorrent (job: Job) {
   const payload = job.data as ManageVideoTorrentPayload
-  logger.info('Processing torrent in job %d.', job.id)
+  logger.info('Processing torrent in job %s.', job.id)
 
   if (payload.action === 'create') return doCreateAction(payload)
   if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload)
index d608fd865232db27e25b8a9398707fa62685c13e..25bdebeea98b6a5a2acac48b390ac70fad063804 100644 (file)
@@ -17,7 +17,7 @@ const lTagsBase = loggerTagsFactory('move-object-storage')
 
 export async function processMoveToObjectStorage (job: Job) {
   const payload = job.data as MoveObjectStoragePayload
-  logger.info('Moving video %s in job %d.', payload.videoUUID, job.id)
+  logger.info('Moving video %s in job %s.', payload.videoUUID, job.id)
 
   const video = await VideoModel.loadWithFiles(payload.videoUUID)
   // No video, maybe deleted?
@@ -43,7 +43,7 @@ export async function processMoveToObjectStorage (job: Job) {
 
     const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove')
     if (pendingMove === 0) {
-      logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags)
+      logger.info('Running cleanup after moving files to object storage (video %s in job %s)', video.uuid, job.id, lTags)
 
       await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo })
     }
diff --git a/server/lib/job-queue/handlers/notify.ts b/server/lib/job-queue/handlers/notify.ts
new file mode 100644 (file)
index 0000000..8360539
--- /dev/null
@@ -0,0 +1,27 @@
+import { Job } from 'bullmq'
+import { Notifier } from '@server/lib/notifier'
+import { VideoModel } from '@server/models/video/video'
+import { NotifyPayload } from '@shared/models'
+import { logger } from '../../../helpers/logger'
+
+async function processNotify (job: Job) {
+  const payload = job.data as NotifyPayload
+  logger.info('Processing %s notification in job %s.', payload.action, job.id)
+
+  if (payload.action === 'new-video') return doNotifyNewVideo(payload)
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+  processNotify
+}
+
+// ---------------------------------------------------------------------------
+
+async function doNotifyNewVideo (payload: NotifyPayload & { action: 'new-video' }) {
+  const refreshedVideo = await VideoModel.loadFull(payload.videoUUID)
+  if (!refreshedVideo) return
+
+  Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
+}
index 40c44cf52ce19b8e15b8fdd44edb0469ee8b00c4..d950f64072e20db266a2989641322358db7e9bef 100644 (file)
@@ -4,7 +4,7 @@ import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
 import { CONFIG } from '@server/initializers/config'
 import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
 import { generateWebTorrentVideoFilename } from '@server/lib/paths'
-import { addMoveToObjectStorageJob } from '@server/lib/video'
+import { buildMoveToObjectStorageJob } from '@server/lib/video'
 import { VideoPathManager } from '@server/lib/video-path-manager'
 import { VideoModel } from '@server/models/video/video'
 import { VideoFileModel } from '@server/models/video/video-file'
@@ -13,10 +13,11 @@ import { getLowercaseExtension } from '@shared/core-utils'
 import { VideoFileImportPayload, VideoStorage } from '@shared/models'
 import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg'
 import { logger } from '../../../helpers/logger'
+import { JobQueue } from '../job-queue'
 
 async function processVideoFileImport (job: Job) {
   const payload = job.data as VideoFileImportPayload
-  logger.info('Processing video file import in job %d.', job.id)
+  logger.info('Processing video file import in job %s.', job.id)
 
   const video = await VideoModel.loadFull(payload.videoUUID)
   // No video, maybe deleted?
@@ -28,7 +29,7 @@ async function processVideoFileImport (job: Job) {
   await updateVideoFile(video, payload.filePath)
 
   if (CONFIG.OBJECT_STORAGE.ENABLED) {
-    await addMoveToObjectStorageJob({ video, previousVideoState: video.state })
+    await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState: video.state }))
   } else {
     await federateVideoIfNeeded(video, false)
   }
index e5cd35865fdbfb4bafcbe0e87b37b36bac60dffa..f4629159c13e0b47e4836d919ce3e4710724e7cc 100644 (file)
@@ -8,7 +8,7 @@ import { generateWebTorrentVideoFilename } from '@server/lib/paths'
 import { Hooks } from '@server/lib/plugins/hooks'
 import { ServerConfigManager } from '@server/lib/server-config-manager'
 import { isAbleToUploadVideo } from '@server/lib/user'
-import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video'
+import { buildOptimizeOrMergeAudioJob, buildMoveToObjectStorageJob } from '@server/lib/video'
 import { VideoPathManager } from '@server/lib/video-path-manager'
 import { buildNextVideoState } from '@server/lib/video-state'
 import { ThumbnailModel } from '@server/models/video/thumbnail'
@@ -39,6 +39,7 @@ import { MThumbnail } from '../../../types/models/video/thumbnail'
 import { federateVideoIfNeeded } from '../../activitypub/videos'
 import { Notifier } from '../../notifier'
 import { generateVideoMiniature } from '../../thumbnail'
+import { JobQueue } from '../job-queue'
 
 async function processVideoImport (job: Job) {
   const payload = job.data as VideoImportPayload
@@ -65,7 +66,7 @@ export {
 // ---------------------------------------------------------------------------
 
 async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) {
-  logger.info('Processing torrent video import in job %d.', job.id)
+  logger.info('Processing torrent video import in job %s.', job.id)
 
   const options = { type: payload.type, videoImportId: payload.videoImportId }
 
@@ -77,7 +78,7 @@ async function processTorrentImport (job: Job, videoImport: MVideoImportDefault,
 }
 
 async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) {
-  logger.info('Processing youtubeDL video import in job %d.', job.id)
+  logger.info('Processing youtubeDL video import in job %s.', job.id)
 
   const options = { type: payload.type, videoImportId: videoImport.id }
 
@@ -259,12 +260,16 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
     }
 
     if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
-      return addMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT })
+      await JobQueue.Instance.createJob(
+        await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT })
+      )
     }
 
     // Create transcoding jobs?
     if (video.state === VideoState.TO_TRANSCODE) {
-      await addOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User })
+      await JobQueue.Instance.createJob(
+        await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User })
+      )
     }
 
   } catch (err) {
index 75ab2cd02ec6ab513ddf38c6f31056055cc3b060..bac99fdb79c33d0718b2caa13e93a0a68845537e 100644 (file)
@@ -5,7 +5,7 @@ import { logger } from '../../../helpers/logger'
 
 async function processVideoRedundancy (job: Job) {
   const payload = job.data as VideoRedundancyPayload
-  logger.info('Processing video redundancy in job %d.', job.id)
+  logger.info('Processing video redundancy in job %s.', job.id)
 
   return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId)
 }
index 0782435381b684d899928fe669b0f81b9177e569..23f9a34ccdc0fa36191a3005474bc2ef2d31f1ed 100644 (file)
@@ -8,7 +8,7 @@ import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
 import { generateWebTorrentVideoFilename } from '@server/lib/paths'
 import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles'
 import { isAbleToUploadVideo } from '@server/lib/user'
-import { addOptimizeOrMergeAudioJob } from '@server/lib/video'
+import { buildOptimizeOrMergeAudioJob } from '@server/lib/video'
 import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file'
 import { VideoPathManager } from '@server/lib/video-path-manager'
 import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio'
@@ -36,6 +36,7 @@ import {
   VideoStudioTaskWatermarkPayload
 } from '@shared/models'
 import { logger, loggerTagsFactory } from '../../../helpers/logger'
+import { JobQueue } from '../job-queue'
 
 const lTagsBase = loggerTagsFactory('video-edition')
 
@@ -43,7 +44,7 @@ async function processVideoStudioEdition (job: Job) {
   const payload = job.data as VideoStudioEditionPayload
   const lTags = lTagsBase(payload.videoUUID)
 
-  logger.info('Process video studio edition of %s in job %d.', payload.videoUUID, job.id, lTags)
+  logger.info('Process video studio edition of %s in job %s.', payload.videoUUID, job.id, lTags)
 
   const video = await VideoModel.loadFull(payload.videoUUID)
 
@@ -100,7 +101,10 @@ async function processVideoStudioEdition (job: Job) {
   await federateVideoIfNeeded(video, false, undefined)
 
   const user = await UserModel.loadByVideoId(video.id)
-  await addOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false })
+
+  await JobQueue.Instance.createJob(
+    await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false })
+  )
 }
 
 // ---------------------------------------------------------------------------
index 0cf5d53ce015d3ece080dad3c44ccd910b1ee84f..50d732bebcf5c05c9df0c1eac2b12d3c6585ab24 100644 (file)
@@ -1,4 +1,6 @@
 import {
+  FlowJob,
+  FlowProducer,
   Job,
   JobsOptions,
   Queue,
@@ -13,7 +15,7 @@ import {
 import { jobStates } from '@server/helpers/custom-validators/jobs'
 import { CONFIG } from '@server/initializers/config'
 import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
-import { timeoutPromise } from '@shared/core-utils'
+import { pick, timeoutPromise } from '@shared/core-utils'
 import {
   ActivitypubFollowPayload,
   ActivitypubHttpBroadcastPayload,
@@ -22,10 +24,12 @@ import {
   ActorKeysPayload,
   DeleteResumableUploadMetaFilePayload,
   EmailPayload,
+  FederateVideoPayload,
   JobState,
   JobType,
   ManageVideoTorrentPayload,
   MoveObjectStoragePayload,
+  NotifyPayload,
   RefreshPayload,
   VideoFileImportPayload,
   VideoImportPayload,
@@ -45,8 +49,10 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica
 import { refreshAPObject } from './handlers/activitypub-refresher'
 import { processActorKeys } from './handlers/actor-keys'
 import { processEmail } from './handlers/email'
+import { processFederateVideo } from './handlers/federate-video'
 import { processManageVideoTorrent } from './handlers/manage-video-torrent'
 import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage'
+import { processNotify } from './handlers/notify'
 import { processVideoFileImport } from './handlers/video-file-import'
 import { processVideoImport } from './handlers/video-import'
 import { processVideoLiveEnding } from './handlers/video-live-ending'
@@ -54,7 +60,7 @@ import { processVideoStudioEdition } from './handlers/video-studio-edition'
 import { processVideoTranscoding } from './handlers/video-transcoding'
 import { processVideosViewsStats } from './handlers/video-views-stats'
 
-type CreateJobArgument =
+export type CreateJobArgument =
   { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
   { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
   { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
@@ -73,7 +79,9 @@ type CreateJobArgument =
   { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
   { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
   { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
-  { type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
+  { type: 'notify', payload: NotifyPayload } |
+  { type: 'move-to-object-storage', payload: MoveObjectStoragePayload } |
+  { type: 'federate-video', payload: FederateVideoPayload }
 
 export type CreateJobOptions = {
   delay?: number
@@ -98,7 +106,9 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
   'video-redundancy': processVideoRedundancy,
   'move-to-object-storage': processMoveToObjectStorage,
   'manage-video-torrent': processManageVideoTorrent,
-  'video-studio-edition': processVideoStudioEdition
+  'notify': processNotify,
+  'video-studio-edition': processVideoStudioEdition,
+  'federate-video': processFederateVideo
 }
 
 const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
@@ -123,7 +133,9 @@ const jobTypes: JobType[] = [
   'video-live-ending',
   'move-to-object-storage',
   'manage-video-torrent',
-  'video-studio-edition'
+  'video-studio-edition',
+  'notify',
+  'federate-video'
 ]
 
 const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
@@ -137,6 +149,8 @@ class JobQueue {
   private queueSchedulers: { [id in JobType]?: QueueScheduler } = {}
   private queueEvents: { [id in JobType]?: QueueEvents } = {}
 
+  private flowProducer: FlowProducer
+
   private initialized = false
   private jobRedisPrefix: string
 
@@ -157,6 +171,11 @@ class JobQueue {
       this.buildQueueEvent(handlerName, produceOnly)
     }
 
+    this.flowProducer = new FlowProducer({
+      connection: this.getRedisConnection(),
+      prefix: this.jobRedisPrefix
+    })
+
     this.addRepeatableJobs()
   }
 
@@ -243,6 +262,8 @@ class JobQueue {
     }
   }
 
+  // ---------------------------------------------------------------------------
+
   async terminate () {
     const promises = Object.keys(this.workers)
       .map(handlerName => {
@@ -278,28 +299,56 @@ class JobQueue {
     }
   }
 
-  createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void {
-    this.createJobWithPromise(obj, options)
-        .catch(err => logger.error('Cannot create job.', { err, obj }))
+  // ---------------------------------------------------------------------------
+
+  createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
+    this.createJob(options)
+        .catch(err => logger.error('Cannot create job.', { err, options }))
   }
 
-  async createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
-    const queue: Queue = this.queues[obj.type]
+  async createJob (options: CreateJobArgument & CreateJobOptions) {
+    const queue: Queue = this.queues[options.type]
     if (queue === undefined) {
-      logger.error('Unknown queue %s: cannot create job.', obj.type)
+      logger.error('Unknown queue %s: cannot create job.', options.type)
       return
     }
 
-    const jobArgs: JobsOptions = {
+    const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
+
+    return queue.add('job', options.payload, jobOptions)
+  }
+
+  async createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
+    let lastJob: FlowJob
+
+    for (const job of jobs) {
+      if (!job) continue
+
+      lastJob = {
+        name: 'job',
+        data: job.payload,
+        queueName: job.type,
+        opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
+        children: lastJob
+          ? [ lastJob ]
+          : []
+      }
+    }
+
+    return this.flowProducer.add(lastJob)
+  }
+
+  private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
+    return {
       backoff: { delay: 60 * 1000, type: 'exponential' },
-      attempts: JOB_ATTEMPTS[obj.type],
+      attempts: JOB_ATTEMPTS[type],
       priority: options.priority,
       delay: options.delay
     }
-
-    return queue.add('job', obj.payload, jobArgs)
   }
 
+  // ---------------------------------------------------------------------------
+
   async listForApi (options: {
     state?: JobState
     start: number
@@ -367,6 +416,8 @@ class JobQueue {
     return Promise.all(promises)
   }
 
+  // ---------------------------------------------------------------------------
+
   async removeOldJobs () {
     for (const key of Object.keys(this.queues)) {
       const queue: Queue = this.queues[key]
index 1410889a2dc1ba31f7419d4630c124911d573a70..aadd8e30829e61b63aede0a8a00891e77f0013f3 100644 (file)
@@ -408,7 +408,7 @@ class LiveManager {
         await liveSession.save()
       }
 
-      JobQueue.Instance.createJob({
+      JobQueue.Instance.createJobAsync({
         type: 'video-live-ending',
         payload: {
           videoId: fullVideo.id,
@@ -421,8 +421,12 @@ class LiveManager {
           streamingPlaylistId: fullVideo.getHLSPlaylist()?.id,
 
           publishedAt: fullVideo.publishedAt.toISOString()
-        }
-      }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
+        },
+
+        delay: cleanupNow
+          ? 0
+          : VIDEO_LIVE.CLEANUP_DELAY
+      })
 
       fullVideo.state = live.permanentLive
         ? VideoState.WAITING_FOR_LIVE
index d1c4c0215328bd2d2591565277f104750dec042b..66cfc31c4b61da61a1430a4bc84f5a2e717fa203 100644 (file)
@@ -242,7 +242,7 @@ class Notifier {
 
     for (const to of toEmails) {
       const payload = await object.createEmail(to)
-      JobQueue.Instance.createJob({ type: 'email', payload })
+      JobQueue.Instance.createJobAsync({ type: 'email', payload })
     }
   }
 
index d9f9c2de3d46b3026d0d896b11ef5d891ae03d74..956ece749b8d54d87af690b0dbce53bd15046035 100644 (file)
@@ -59,7 +59,7 @@ export class AutoFollowIndexInstances extends AbstractScheduler {
             isAutoFollow: true
           }
 
-          JobQueue.Instance.createJob({ type: 'activitypub-follow', payload })
+          JobQueue.Instance.createJobAsync({ type: 'activitypub-follow', payload })
         }
       }
 
index b5d8353b77c641f4bb7ed888ab4d4661f1f75102..9ebbd76796a2e78b06ba7b41e55b3b10834f7335 100644 (file)
@@ -1,4 +1,5 @@
 import { Transaction } from 'sequelize'
+import { retryTransactionWrapper } from '@server/helpers/database-utils'
 import { logger } from '@server/helpers/logger'
 import { CONFIG } from '@server/initializers/config'
 import { sequelizeTypescript } from '@server/initializers/database'
@@ -7,9 +8,9 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info'
 import { MVideo, MVideoFullLight, MVideoUUID } from '@server/types/models'
 import { VideoState } from '@shared/models'
 import { federateVideoIfNeeded } from './activitypub/videos'
+import { JobQueue } from './job-queue'
 import { Notifier } from './notifier'
-import { addMoveToObjectStorageJob } from './video'
-import { retryTransactionWrapper } from '@server/helpers/database-utils'
+import { buildMoveToObjectStorageJob } from './video'
 
 function buildNextVideoState (currentState?: VideoState) {
   if (currentState === VideoState.PUBLISHED) {
@@ -86,7 +87,7 @@ async function moveToExternalStorageState (options: {
   logger.info('Creating external storage move job for video %s.', video.uuid, { tags: [ video.uuid ] })
 
   try {
-    await addMoveToObjectStorageJob({ video, previousVideoState, isNewVideo })
+    await JobQueue.Instance.createJob(await buildMoveToObjectStorageJob({ video, previousVideoState, isNewVideo }))
 
     return true
   } catch (err) {
index b843b11bc4d65844710ac63a1eee2d595592293a..f7d7aa186bf589db7279dc7252e6851ea23d512d 100644 (file)
@@ -1,5 +1,7 @@
 import { UploadFiles } from 'express'
+import memoizee from 'memoizee'
 import { Transaction } from 'sequelize/types'
+import { CONFIG } from '@server/initializers/config'
 import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY, MEMOIZE_LENGTH, MEMOIZE_TTL } from '@server/initializers/constants'
 import { TagModel } from '@server/models/video/tag'
 import { VideoModel } from '@server/models/video/video'
@@ -9,8 +11,6 @@ import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID
 import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models'
 import { CreateJobOptions, JobQueue } from './job-queue/job-queue'
 import { updateVideoMiniatureFromExisting } from './thumbnail'
-import { CONFIG } from '@server/initializers/config'
-import memoizee from 'memoizee'
 
 function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> {
   return {
@@ -86,7 +86,7 @@ async function setVideoTags (options: {
 
 // ---------------------------------------------------------------------------
 
-async function addOptimizeOrMergeAudioJob (options: {
+async function buildOptimizeOrMergeAudioJob (options: {
   video: MVideoUUID
   videoFile: MVideoFile
   user: MUserId
@@ -94,10 +94,10 @@ async function addOptimizeOrMergeAudioJob (options: {
 }) {
   const { video, videoFile, user, isNewVideo } = options
 
-  let dataInput: VideoTranscodingPayload
+  let payload: VideoTranscodingPayload
 
   if (videoFile.isAudio()) {
-    dataInput = {
+    payload = {
       type: 'merge-audio-to-webtorrent',
       resolution: DEFAULT_AUDIO_RESOLUTION,
       videoUUID: video.uuid,
@@ -105,24 +105,26 @@ async function addOptimizeOrMergeAudioJob (options: {
       isNewVideo
     }
   } else {
-    dataInput = {
+    payload = {
       type: 'optimize-to-webtorrent',
       videoUUID: video.uuid,
       isNewVideo
     }
   }
 
-  const jobOptions = {
-    priority: await getTranscodingJobPriority(user)
-  }
+  await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
 
-  return addTranscodingJob(dataInput, jobOptions)
+  return {
+    type: 'video-transcoding' as 'video-transcoding',
+    priority: await getTranscodingJobPriority(user),
+    payload
+  }
 }
 
 async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) {
   await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
 
-  return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload }, options)
+  return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options })
 }
 
 async function getTranscodingJobPriority (user: MUserId) {
@@ -136,7 +138,7 @@ async function getTranscodingJobPriority (user: MUserId) {
 
 // ---------------------------------------------------------------------------
 
-async function addMoveToObjectStorageJob (options: {
+async function buildMoveToObjectStorageJob (options: {
   video: MVideoUUID
   previousVideoState: VideoState
   isNewVideo?: boolean // Default true
@@ -145,8 +147,14 @@ async function addMoveToObjectStorageJob (options: {
 
   await VideoJobInfoModel.increaseOrCreate(video.uuid, 'pendingMove')
 
-  const dataInput = { videoUUID: video.uuid, isNewVideo, previousVideoState }
-  return JobQueue.Instance.createJobWithPromise({ type: 'move-to-object-storage', payload: dataInput })
+  return {
+    type: 'move-to-object-storage' as 'move-to-object-storage',
+    payload: {
+      videoUUID: video.uuid,
+      isNewVideo,
+      previousVideoState
+    }
+  }
 }
 
 // ---------------------------------------------------------------------------
@@ -173,9 +181,9 @@ export {
   buildLocalVideoFromReq,
   buildVideoThumbnailsFromReq,
   setVideoTags,
-  addOptimizeOrMergeAudioJob,
+  buildOptimizeOrMergeAudioJob,
   addTranscodingJob,
-  addMoveToObjectStorageJob,
+  buildMoveToObjectStorageJob,
   getTranscodingJobPriority,
   getCachedVideoDuration
 }
index a924183f2f7a43965a657f6bb11e615b9719efd9..8c8f64de9a9dc082e927af171720f0ed0678b9b5 100644 (file)
@@ -25,6 +25,8 @@ export type JobType =
   | 'manage-video-torrent'
   | 'move-to-object-storage'
   | 'video-studio-edition'
+  | 'notify'
+  | 'federate-video'
 
 export interface Job {
   id: number | string
@@ -214,3 +216,18 @@ export interface VideoStudioEditionPayload {
   videoUUID: string
   tasks: VideoStudioTaskPayload[]
 }
+
+// ---------------------------------------------------------------------------
+
+export type NotifyPayload =
+  {
+    action: 'new-video'
+    videoUUID: string
+  }
+
+// ---------------------------------------------------------------------------
+
+export interface FederateVideoPayload {
+  videoUUID: string
+  isNewVideo: boolean
+}