]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/handlers/move-to-object-storage.ts
Merge branch 'release/5.0.0' into develop
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / handlers / move-to-object-storage.ts
index 9e39322a85a6d1c14690110729b09d9f2bd0f83d..a1530cc573daaf3415da380c4dec199b2ec17fbd 100644 (file)
@@ -1,62 +1,86 @@
-import { Job } from 'bull'
+import { Job } from 'bullmq'
 import { remove } from 'fs-extra'
 import { join } from 'path'
-import { logger } from '@server/helpers/logger'
+import { logger, loggerTagsFactory } from '@server/helpers/logger'
 import { updateTorrentMetadata } from '@server/helpers/webtorrent'
-import { CONFIG } from '@server/initializers/config'
 import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants'
-import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage'
+import { storeHLSFileFromFilename, storeWebTorrentFile } from '@server/lib/object-storage'
 import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths'
+import { VideoPathManager } from '@server/lib/video-path-manager'
 import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state'
 import { VideoModel } from '@server/models/video/video'
 import { VideoJobInfoModel } from '@server/models/video/video-job-info'
 import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models'
-import { MoveObjectStoragePayload, VideoStorage } from '@shared/models'
+import { MoveObjectStoragePayload, VideoState, VideoStorage } from '@shared/models'
+
+const lTagsBase = loggerTagsFactory('move-object-storage')
 
 export async function processMoveToObjectStorage (job: Job) {
   const payload = job.data as MoveObjectStoragePayload
-  logger.info('Moving video %s in job %d.', payload.videoUUID, job.id)
+  logger.info('Moving video %s in job %s.', payload.videoUUID, job.id)
 
   const video = await VideoModel.loadWithFiles(payload.videoUUID)
   // No video, maybe deleted?
   if (!video) {
-    logger.info('Can\'t process job %d, video does not exist.', job.id)
+    logger.info('Can\'t process job %d, video does not exist.', job.id, lTagsBase(payload.videoUUID))
     return undefined
   }
 
+  const lTags = lTagsBase(video.uuid, video.url)
+
+  const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
+
   try {
     if (video.VideoFiles) {
+      logger.debug('Moving %d webtorrent files for video %s.', video.VideoFiles.length, video.uuid, lTags)
+
       await moveWebTorrentFiles(video)
     }
 
     if (video.VideoStreamingPlaylists) {
+      logger.debug('Moving HLS playlist of %s.', video.uuid)
+
       await moveHLSFiles(video)
     }
 
     const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove')
     if (pendingMove === 0) {
-      logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id)
-      await doAfterLastJob(video, payload.isNewVideo)
+      logger.info('Running cleanup after moving files to object storage (video %s in job %s)', video.uuid, job.id, lTags)
+
+      await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo })
     }
   } catch (err) {
-    logger.error('Cannot move video %s to object storage.', video.url, { err })
+    await onMoveToObjectStorageFailure(job, err)
 
-    await moveToFailedMoveToObjectStorageState(video)
-    await VideoJobInfoModel.abortAllTasks(video.uuid, 'pendingMove')
+    throw err
+  } finally {
+    fileMutexReleaser()
   }
 
   return payload.videoUUID
 }
 
+export async function onMoveToObjectStorageFailure (job: Job, err: any) {
+  const payload = job.data as MoveObjectStoragePayload
+
+  const video = await VideoModel.loadWithFiles(payload.videoUUID)
+  if (!video) return
+
+  logger.error('Cannot move video %s to object storage.', video.url, { err, ...lTagsBase(video.uuid, video.url) })
+
+  await moveToFailedMoveToObjectStorageState(video)
+  await VideoJobInfoModel.abortAllTasks(video.uuid, 'pendingMove')
+}
+
 // ---------------------------------------------------------------------------
 
 async function moveWebTorrentFiles (video: MVideoWithAllFiles) {
   for (const file of video.VideoFiles) {
     if (file.storage !== VideoStorage.FILE_SYSTEM) continue
 
-    const fileUrl = await storeWebTorrentFile(file.filename)
+    const fileUrl = await storeWebTorrentFile(video, file)
 
-    const oldPath = join(CONFIG.STORAGE.VIDEOS_DIR, file.filename)
+    const oldPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, file)
     await onFileMoved({ videoOrPlaylist: video, file, fileUrl, oldPath })
   }
 }
@@ -70,10 +94,10 @@ async function moveHLSFiles (video: MVideoWithAllFiles) {
 
       // Resolution playlist
       const playlistFilename = getHlsResolutionPlaylistFilename(file.filename)
-      await storeHLSFile(playlistWithVideo, playlistFilename)
+      await storeHLSFileFromFilename(playlistWithVideo, playlistFilename)
 
       // Resolution fragmented file
-      const fileUrl = await storeHLSFile(playlistWithVideo, file.filename)
+      const fileUrl = await storeHLSFileFromFilename(playlistWithVideo, file.filename)
 
       const oldPath = join(getHLSDirectory(video), file.filename)
 
@@ -82,16 +106,22 @@ async function moveHLSFiles (video: MVideoWithAllFiles) {
   }
 }
 
-async function doAfterLastJob (video: MVideoWithAllFiles, isNewVideo: boolean) {
+async function doAfterLastJob (options: {
+  video: MVideoWithAllFiles
+  previousVideoState: VideoState
+  isNewVideo: boolean
+}) {
+  const { video, previousVideoState, isNewVideo } = options
+
   for (const playlist of video.VideoStreamingPlaylists) {
     if (playlist.storage === VideoStorage.OBJECT_STORAGE) continue
 
     const playlistWithVideo = playlist.withVideo(video)
 
     // Master playlist
-    playlist.playlistUrl = await storeHLSFile(playlistWithVideo, playlist.playlistFilename)
+    playlist.playlistUrl = await storeHLSFileFromFilename(playlistWithVideo, playlist.playlistFilename)
     // Sha256 segments file
-    playlist.segmentsSha256Url = await storeHLSFile(playlistWithVideo, playlist.segmentsSha256Filename)
+    playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlistWithVideo, playlist.segmentsSha256Filename)
 
     playlist.storage = VideoStorage.OBJECT_STORAGE
 
@@ -106,7 +136,7 @@ async function doAfterLastJob (video: MVideoWithAllFiles, isNewVideo: boolean) {
     await remove(getHLSDirectory(video))
   }
 
-  await moveToNextState(video, isNewVideo)
+  await moveToNextState({ video, previousVideoState, isNewVideo })
 }
 
 async function onFileMoved (options: {