]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Process video torrents in order
authorChocobozzz <me@florianbigard.com>
Wed, 16 Mar 2022 14:34:21 +0000 (15:34 +0100)
committerChocobozzz <me@florianbigard.com>
Wed, 16 Mar 2022 14:45:58 +0000 (15:45 +0100)
Prevent update before video torrent generation for example

server/controllers/api/videos/update.ts
server/controllers/api/videos/upload.ts
server/initializers/constants.ts
server/lib/job-queue/handlers/manage-video-torrent.ts [new file with mode: 0644]
server/lib/job-queue/job-queue.ts
server/models/video/video.ts
shared/models/server/job.model.ts

index 15899307de27407f6f43e559da71747079e216c7..2cf8a5883ecf25e0dbf6e34d8295ca8bd335665d 100644 (file)
@@ -1,12 +1,12 @@
 import express from 'express'
 import { Transaction } from 'sequelize/types'
-import { updateTorrentMetadata } from '@server/helpers/webtorrent'
 import { changeVideoChannelShare } from '@server/lib/activitypub/share'
+import { JobQueue } from '@server/lib/job-queue'
 import { buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video'
 import { openapiOperationDoc } from '@server/middlewares/doc'
 import { FilteredModelAttributes } from '@server/types'
 import { MVideoFullLight } from '@server/types/models'
-import { HttpStatusCode, VideoUpdate } from '@shared/models'
+import { HttpStatusCode, ManageVideoTorrentPayload, VideoUpdate } from '@shared/models'
 import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger'
 import { resetSequelizeInstance } from '../../../helpers/database-utils'
 import { createReqFiles } from '../../../helpers/express-utils'
@@ -139,15 +139,13 @@ async function updateVideo (req: express.Request, res: express.Response) {
       return { videoInstanceUpdated, isNewVideo }
     })
 
-    if (videoInstanceUpdated.isLive !== true && videoInfoToUpdate.name) {
-      await updateTorrentsMetadata(videoInstanceUpdated)
-    }
+    const refreshedVideo = await updateTorrentsMetadataIfNeeded(videoInstanceUpdated, videoInfoToUpdate)
 
-    await sequelizeTypescript.transaction(t => federateVideoIfNeeded(videoInstanceUpdated, isNewVideo, t))
+    await sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, isNewVideo, t))
 
-    if (wasConfidentialVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(videoInstanceUpdated)
+    if (wasConfidentialVideo) Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
 
-    Hooks.runAction('action:api.video.updated', { video: videoInstanceUpdated, body: req.body, req, res })
+    Hooks.runAction('action:api.video.updated', { video: refreshedVideo, body: req.body, req, res })
   } catch (err) {
     // Force fields we want to update
     // If the transaction is retried, sequelize will think the object has not changed
@@ -194,19 +192,25 @@ function updateSchedule (videoInstance: MVideoFullLight, videoInfoToUpdate: Vide
   }
 }
 
-async function updateTorrentsMetadata (video: MVideoFullLight) {
+async function updateTorrentsMetadataIfNeeded (video: MVideoFullLight, videoInfoToUpdate: VideoUpdate) {
+  if (video.isLive || !videoInfoToUpdate.name) return video
+
   for (const file of (video.VideoFiles || [])) {
-    await updateTorrentMetadata(video, file)
+    const payload: ManageVideoTorrentPayload = { action: 'update-metadata', videoId: video.id, videoFileId: file.id }
 
-    await file.save()
+    const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
+    await job.finished()
   }
 
   const hls = video.getHLSPlaylist()
-  if (!hls) return
 
-  for (const file of (hls.VideoFiles || [])) {
-    await updateTorrentMetadata(hls, file)
+  for (const file of (hls?.VideoFiles || [])) {
+    const payload: ManageVideoTorrentPayload = { action: 'update-metadata', streamingPlaylistId: hls.id, videoFileId: file.id }
 
-    await file.save()
+    const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
+    await job.finished()
   }
+
+  // Refresh video since files have changed
+  return VideoModel.loadAndPopulateAccountAndServerAndTags(video.id)
 }
index dd69cf2385403bc5684c0c9e77b69a1cacf91a5c..14ae9d920a4b005f0e20138616efc96f7d6991f1 100644 (file)
@@ -2,8 +2,8 @@ import express from 'express'
 import { move } from 'fs-extra'
 import { basename } from 'path'
 import { getResumableUploadPath } from '@server/helpers/upload'
-import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
 import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url'
+import { JobQueue } from '@server/lib/job-queue'
 import { generateWebTorrentVideoFilename } from '@server/lib/paths'
 import { Redis } from '@server/lib/redis'
 import { uploadx } from '@server/lib/uploadx'
@@ -17,10 +17,10 @@ import {
 import { VideoPathManager } from '@server/lib/video-path-manager'
 import { buildNextVideoState } from '@server/lib/video-state'
 import { openapiOperationDoc } from '@server/middlewares/doc'
-import { MVideo, MVideoFile, MVideoFullLight } from '@server/types/models'
+import { MVideoFile, MVideoFullLight } from '@server/types/models'
 import { getLowercaseExtension } from '@shared/core-utils'
 import { isAudioFile, uuidToShort } from '@shared/extra-utils'
-import { HttpStatusCode, VideoCreate, VideoResolution, VideoState } from '@shared/models'
+import { HttpStatusCode, ManageVideoTorrentPayload, VideoCreate, VideoResolution, VideoState } from '@shared/models'
 import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger'
 import { retryTransactionWrapper } from '../../../helpers/database-utils'
 import { createReqFiles } from '../../../helpers/express-utils'
@@ -209,17 +209,22 @@ async function addVideo (options: {
   // Channel has a new content, set as updated
   await videoCreated.VideoChannel.setAsUpdated()
 
-  createTorrentFederate(video, videoFile)
-    .then(() => {
-      if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
-        return addMoveToObjectStorageJob(video)
+  createTorrentFederate(videoCreated, videoFile)
+    .catch(err => {
+      logger.error('Cannot create torrent or federate video for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) })
+
+      return videoCreated
+    }).then(refreshedVideo => {
+      if (!refreshedVideo) return
+
+      if (refreshedVideo.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
+        return addMoveToObjectStorageJob(refreshedVideo)
       }
 
-      if (video.state === VideoState.TO_TRANSCODE) {
-        return addOptimizeOrMergeAudioJob(videoCreated, videoFile, user)
+      if (refreshedVideo.state === VideoState.TO_TRANSCODE) {
+        return addOptimizeOrMergeAudioJob(refreshedVideo, videoFile, user)
       }
-    })
-    .catch(err => logger.error('Cannot add optimize/merge audio job for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }))
+    }).catch(err => logger.error('Cannot add optimize/merge audio job for %s.', videoCreated.uuid, { err, ...lTags(videoCreated.uuid) }))
 
   Hooks.runAction('action:api.video.uploaded', { video: videoCreated, req, res })
 
@@ -254,36 +259,23 @@ async function buildNewFile (videoPhysicalFile: express.VideoUploadFile) {
   return videoFile
 }
 
-async function createTorrentAndSetInfoHashAsync (video: MVideo, fileArg: MVideoFile) {
-  await createTorrentAndSetInfoHash(video, fileArg)
+async function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoFile) {
+  const payload: ManageVideoTorrentPayload = { videoId: video.id, videoFileId: videoFile.id, action: 'create' }
 
-  // Refresh videoFile because the createTorrentAndSetInfoHash could be long
-  const refreshedFile = await VideoFileModel.loadWithVideo(fileArg.id)
-  // File does not exist anymore, remove the generated torrent
-  if (!refreshedFile) return fileArg.removeTorrent()
+  const job = await JobQueue.Instance.createJobWithPromise({ type: 'manage-video-torrent', payload })
+  await job.finished()
 
-  refreshedFile.infoHash = fileArg.infoHash
-  refreshedFile.torrentFilename = fileArg.torrentFilename
+  const refreshedVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.id)
+  if (!refreshedVideo) return
 
-  return refreshedFile.save()
-}
+  // Only federate and notify after the torrent creation
+  Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
 
-function createTorrentFederate (video: MVideoFullLight, videoFile: MVideoFile) {
-  // Create the torrent file in async way because it could be long
-  return createTorrentAndSetInfoHashAsync(video, videoFile)
-    .catch(err => logger.error('Cannot create torrent file for video %s', video.url, { err, ...lTags(video.uuid) }))
-    .then(() => VideoModel.loadAndPopulateAccountAndServerAndTags(video.id))
-    .then(refreshedVideo => {
-      if (!refreshedVideo) return
-
-      // Only federate and notify after the torrent creation
-      Notifier.Instance.notifyOnNewVideoIfNeeded(refreshedVideo)
+  await retryTransactionWrapper(() => {
+    return sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, true, t))
+  })
 
-      return retryTransactionWrapper(() => {
-        return sequelizeTypescript.transaction(t => federateVideoIfNeeded(refreshedVideo, true, t))
-      })
-    })
-    .catch(err => logger.error('Cannot federate or notify video creation %s', video.url, { err, ...lTags(video.uuid) }))
+  return refreshedVideo
 }
 
 async function deleteUploadResumableCache (req: express.Request, res: express.Response, next: express.NextFunction) {
index 1c849b5614153a592ad0a931a8ece3cc1f1abb8d..e0f6f2bd2e94272dd63901993c67c92f206da5e4 100644 (file)
@@ -153,6 +153,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
   'video-redundancy': 1,
   'video-live-ending': 1,
   'video-edition': 1,
+  'manage-video-torrent': 1,
   'move-to-object-storage': 3
 }
 // Excluded keys are jobs that can be configured by admins
@@ -170,6 +171,7 @@ const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-im
   'video-redundancy': 1,
   'video-live-ending': 10,
   'video-edition': 1,
+  'manage-video-torrent': 1,
   'move-to-object-storage': 1
 }
 const JOB_TTL: { [id in JobType]: number } = {
@@ -188,6 +190,7 @@ const JOB_TTL: { [id in JobType]: number } = {
   'activitypub-refresher': 60000 * 10, // 10 minutes
   'video-redundancy': 1000 * 3600 * 3, // 3 hours
   'video-live-ending': 1000 * 60 * 10, // 10 minutes
+  'manage-video-torrent': 1000 * 3600 * 3, // 3 hours
   'move-to-object-storage': 1000 * 60 * 60 * 3 // 3 hours
 }
 const REPEAT_JOBS: { [ id in JobType ]?: EveryRepeatOptions | CronRepeatOptions } = {
diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts
new file mode 100644 (file)
index 0000000..5cb4287
--- /dev/null
@@ -0,0 +1,88 @@
+import { Job } from 'bull'
+import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent'
+import { VideoModel } from '@server/models/video/video'
+import { VideoFileModel } from '@server/models/video/video-file'
+import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
+import { ManageVideoTorrentPayload } from '@shared/models'
+import { logger } from '../../../helpers/logger'
+
+async function processManageVideoTorrent (job: Job) {
+  const payload = job.data as ManageVideoTorrentPayload
+  logger.info('Processing torrent in job %d.', job.id)
+
+  if (payload.action === 'create') return doCreateAction(payload)
+  if (payload.action === 'update-metadata') return doUpdateMetadataAction(payload)
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+  processManageVideoTorrent
+}
+
+// ---------------------------------------------------------------------------
+
+async function doCreateAction (payload: ManageVideoTorrentPayload & { action: 'create' }) {
+  const [ video, file ] = await Promise.all([
+    loadVideoOrLog(payload.videoId),
+    loadFileOrLog(payload.videoFileId)
+  ])
+
+  await createTorrentAndSetInfoHash(video, file)
+
+  // Refresh videoFile because the createTorrentAndSetInfoHash could be long
+  const refreshedFile = await VideoFileModel.loadWithVideo(file.id)
+  // File does not exist anymore, remove the generated torrent
+  if (!refreshedFile) return file.removeTorrent()
+
+  refreshedFile.infoHash = file.infoHash
+  refreshedFile.torrentFilename = file.torrentFilename
+
+  return refreshedFile.save()
+}
+
+async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { action: 'update-metadata' }) {
+  const [ video, streamingPlaylist, file ] = await Promise.all([
+    loadVideoOrLog(payload.videoId),
+    loadStreamingPlaylistOrLog(payload.streamingPlaylistId),
+    loadFileOrLog(payload.videoFileId)
+  ])
+
+  await updateTorrentMetadata(video || streamingPlaylist, file)
+
+  await file.save()
+}
+
+async function loadVideoOrLog (videoId: number) {
+  if (!videoId) return undefined
+
+  const video = await VideoModel.load(videoId)
+  if (!video) {
+    logger.debug('Do not process torrent for video %d: does not exist anymore.', videoId)
+  }
+
+  return video
+}
+
+async function loadStreamingPlaylistOrLog (streamingPlaylistId: number) {
+  if (!streamingPlaylistId) return undefined
+
+  const streamingPlaylist = await VideoStreamingPlaylistModel.loadWithVideo(streamingPlaylistId)
+  if (!streamingPlaylist) {
+    logger.debug('Do not process torrent for streaming playlist %d: does not exist anymore.', streamingPlaylistId)
+  }
+
+  return streamingPlaylist
+}
+
+async function loadFileOrLog (videoFileId: number) {
+  if (!videoFileId) return undefined
+
+  const file = await VideoFileModel.loadWithVideo(videoFileId)
+
+  if (!file) {
+    logger.debug('Do not process torrent for file %d: does not exist anymore.', videoFileId)
+  }
+
+  return file
+}
index e10a3bab5df56e6592dde3e032b20ea34bc0c22b..3224abcc3ac04bff9de55df6184e77934f23c6b4 100644 (file)
@@ -12,6 +12,7 @@ import {
   EmailPayload,
   JobState,
   JobType,
+  ManageVideoTorrentPayload,
   MoveObjectStoragePayload,
   RefreshPayload,
   VideoEditionPayload,
@@ -31,6 +32,7 @@ import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unica
 import { refreshAPObject } from './handlers/activitypub-refresher'
 import { processActorKeys } from './handlers/actor-keys'
 import { processEmail } from './handlers/email'
+import { processManageVideoTorrent } from './handlers/manage-video-torrent'
 import { processMoveToObjectStorage } from './handlers/move-to-object-storage'
 import { processVideoEdition } from './handlers/video-edition'
 import { processVideoFileImport } from './handlers/video-file-import'
@@ -56,6 +58,7 @@ type CreateJobArgument =
   { type: 'video-redundancy', payload: VideoRedundancyPayload } |
   { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
   { type: 'video-edition', payload: VideoEditionPayload } |
+  { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
   { type: 'move-to-object-storage', payload: MoveObjectStoragePayload }
 
 export type CreateJobOptions = {
@@ -79,6 +82,7 @@ const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
   'actor-keys': processActorKeys,
   'video-redundancy': processVideoRedundancy,
   'move-to-object-storage': processMoveToObjectStorage,
+  'manage-video-torrent': processManageVideoTorrent,
   'video-edition': processVideoEdition
 }
 
@@ -98,6 +102,7 @@ const jobTypes: JobType[] = [
   'actor-keys',
   'video-live-ending',
   'move-to-object-storage',
+  'manage-video-torrent',
   'video-edition'
 ]
 
@@ -185,7 +190,7 @@ class JobQueue {
   }
 
   createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
-    const queue = this.queues[obj.type]
+    const queue: Queue = this.queues[obj.type]
     if (queue === undefined) {
       logger.error('Unknown queue %s: cannot create job.', obj.type)
       return
index a4093ce3ba9aba83f2cf0f0d3b0c23be6644b99e..4147b3d621a2594c18039b5b16be194483d13ee2 100644 (file)
@@ -1683,6 +1683,24 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> {
     return peertubeTruncate(this.description, { length: maxLength })
   }
 
+  getAllFiles () {
+    let files: MVideoFile[] = []
+
+    if (Array.isArray(this.VideoFiles)) {
+      files = files.concat(this.VideoFiles)
+    }
+
+    if (Array.isArray(this.VideoStreamingPlaylists)) {
+      for (const p of this.VideoStreamingPlaylists) {
+        if (Array.isArray(p.VideoFiles)) {
+          files = files.concat(p.VideoFiles)
+        }
+      }
+    }
+
+    return files
+  }
+
   probeMaxQualityFile () {
     const file = this.getMaxQualityFile()
     const videoOrPlaylist = file.getVideoOrStreamingPlaylist()
index d0293f542fcc7b7406bb1ddf31b1857e2433bcdd..6b07eba6913a8baeb3b58ed13fb8792cb8ced8b8 100644 (file)
@@ -20,6 +20,7 @@ export type JobType =
   | 'video-redundancy'
   | 'video-live-ending'
   | 'actor-keys'
+  | 'manage-video-torrent'
   | 'move-to-object-storage'
   | 'video-edition'
 
@@ -96,6 +97,20 @@ export type VideoRedundancyPayload = {
   videoId: number
 }
 
+export type ManageVideoTorrentPayload =
+  {
+    action: 'create'
+    videoId: number
+    videoFileId: number
+  } | {
+    action: 'update-metadata'
+
+    videoId?: number
+    streamingPlaylistId?: number
+
+    videoFileId: number
+  }
+
 // Video transcoding payloads
 
 interface BaseTranscodingPayload {