]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/handlers/move-to-object-storage.ts
Merge branch 'release/5.0.0' into develop
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / handlers / move-to-object-storage.ts
index a0c58d21143755553b154ff8d537011d9450c760..a1530cc573daaf3415da380c4dec199b2ec17fbd 100644 (file)
-import * as Bull from 'bull'
+import { Job } from 'bullmq'
 import { remove } from 'fs-extra'
 import { join } from 'path'
-import { logger } from '@server/helpers/logger'
-import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
-import { CONFIG } from '@server/initializers/config'
-import { storeHLSFile, storeWebTorrentFile } from '@server/lib/object-storage'
+import { logger, loggerTagsFactory } from '@server/helpers/logger'
+import { updateTorrentMetadata } from '@server/helpers/webtorrent'
+import { P2P_MEDIA_LOADER_PEER_VERSION } from '@server/initializers/constants'
+import { storeHLSFileFromFilename, storeWebTorrentFile } from '@server/lib/object-storage'
 import { getHLSDirectory, getHlsResolutionPlaylistFilename } from '@server/lib/paths'
-import { moveToNextState } from '@server/lib/video-state'
+import { VideoPathManager } from '@server/lib/video-path-manager'
+import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/lib/video-state'
 import { VideoModel } from '@server/models/video/video'
 import { VideoJobInfoModel } from '@server/models/video/video-job-info'
 import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models'
-import { MoveObjectStoragePayload, VideoStorage } from '../../../../shared'
+import { MoveObjectStoragePayload, VideoState, VideoStorage } from '@shared/models'
 
-export async function processMoveToObjectStorage (job: Bull.Job) {
+const lTagsBase = loggerTagsFactory('move-object-storage')
+
+export async function processMoveToObjectStorage (job: Job) {
   const payload = job.data as MoveObjectStoragePayload
-  logger.info('Moving video %s in job %d.', payload.videoUUID, job.id)
+  logger.info('Moving video %s in job %s.', payload.videoUUID, job.id)
 
   const video = await VideoModel.loadWithFiles(payload.videoUUID)
   // No video, maybe deleted?
   if (!video) {
-    logger.info('Can\'t process job %d, video does not exist.', job.id)
+    logger.info('Can\'t process job %d, video does not exist.', job.id, lTagsBase(payload.videoUUID))
     return undefined
   }
 
-  if (video.VideoFiles) {
-    await moveWebTorrentFiles(video)
-  }
+  const lTags = lTagsBase(video.uuid, video.url)
 
-  if (video.VideoStreamingPlaylists) {
-    await moveHLSFiles(video)
-  }
+  const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid)
+
+  try {
+    if (video.VideoFiles) {
+      logger.debug('Moving %d webtorrent files for video %s.', video.VideoFiles.length, video.uuid, lTags)
+
+      await moveWebTorrentFiles(video)
+    }
+
+    if (video.VideoStreamingPlaylists) {
+      logger.debug('Moving HLS playlist of %s.', video.uuid)
+
+      await moveHLSFiles(video)
+    }
+
+    const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove')
+    if (pendingMove === 0) {
+      logger.info('Running cleanup after moving files to object storage (video %s in job %s)', video.uuid, job.id, lTags)
+
+      await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo })
+    }
+  } catch (err) {
+    await onMoveToObjectStorageFailure(job, err)
 
-  const pendingMove = await VideoJobInfoModel.decrease(video.uuid, 'pendingMove')
-  if (pendingMove === 0) {
-    logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id)
-    await doAfterLastJob(video, payload.isNewVideo)
+    throw err
+  } finally {
+    fileMutexReleaser()
   }
 
   return payload.videoUUID
 }
 
+export async function onMoveToObjectStorageFailure (job: Job, err: any) {
+  const payload = job.data as MoveObjectStoragePayload
+
+  const video = await VideoModel.loadWithFiles(payload.videoUUID)
+  if (!video) return
+
+  logger.error('Cannot move video %s to object storage.', video.url, { err, ...lTagsBase(video.uuid, video.url) })
+
+  await moveToFailedMoveToObjectStorageState(video)
+  await VideoJobInfoModel.abortAllTasks(video.uuid, 'pendingMove')
+}
+
 // ---------------------------------------------------------------------------
 
 async function moveWebTorrentFiles (video: MVideoWithAllFiles) {
   for (const file of video.VideoFiles) {
     if (file.storage !== VideoStorage.FILE_SYSTEM) continue
 
-    const fileUrl = await storeWebTorrentFile(file.filename)
+    const fileUrl = await storeWebTorrentFile(video, file)
 
-    const oldPath = join(CONFIG.STORAGE.VIDEOS_DIR, file.filename)
+    const oldPath = VideoPathManager.Instance.getFSVideoFileOutputPath(video, file)
     await onFileMoved({ videoOrPlaylist: video, file, fileUrl, oldPath })
   }
 }
 
 async function moveHLSFiles (video: MVideoWithAllFiles) {
   for (const playlist of video.VideoStreamingPlaylists) {
+    const playlistWithVideo = playlist.withVideo(video)
 
     for (const file of playlist.VideoFiles) {
       if (file.storage !== VideoStorage.FILE_SYSTEM) continue
 
       // Resolution playlist
       const playlistFilename = getHlsResolutionPlaylistFilename(file.filename)
-      await storeHLSFile(playlist, video, playlistFilename)
+      await storeHLSFileFromFilename(playlistWithVideo, playlistFilename)
 
       // Resolution fragmented file
-      const fileUrl = await storeHLSFile(playlist, video, file.filename)
+      const fileUrl = await storeHLSFileFromFilename(playlistWithVideo, file.filename)
 
       const oldPath = join(getHLSDirectory(video), file.filename)
 
@@ -73,17 +106,28 @@ async function moveHLSFiles (video: MVideoWithAllFiles) {
   }
 }
 
-async function doAfterLastJob (video: MVideoWithAllFiles, isNewVideo: boolean) {
+async function doAfterLastJob (options: {
+  video: MVideoWithAllFiles
+  previousVideoState: VideoState
+  isNewVideo: boolean
+}) {
+  const { video, previousVideoState, isNewVideo } = options
+
   for (const playlist of video.VideoStreamingPlaylists) {
     if (playlist.storage === VideoStorage.OBJECT_STORAGE) continue
 
+    const playlistWithVideo = playlist.withVideo(video)
+
     // Master playlist
-    playlist.playlistUrl = await storeHLSFile(playlist, video, playlist.playlistFilename)
+    playlist.playlistUrl = await storeHLSFileFromFilename(playlistWithVideo, playlist.playlistFilename)
     // Sha256 segments file
-    playlist.segmentsSha256Url = await storeHLSFile(playlist, video, playlist.segmentsSha256Filename)
+    playlist.segmentsSha256Url = await storeHLSFileFromFilename(playlistWithVideo, playlist.segmentsSha256Filename)
 
     playlist.storage = VideoStorage.OBJECT_STORAGE
 
+    playlist.assignP2PMediaLoaderInfoHashes(video, playlist.VideoFiles)
+    playlist.p2pMediaLoaderPeerVersion = P2P_MEDIA_LOADER_PEER_VERSION
+
     await playlist.save()
   }
 
@@ -92,7 +136,7 @@ async function doAfterLastJob (video: MVideoWithAllFiles, isNewVideo: boolean) {
     await remove(getHLSDirectory(video))
   }
 
-  await moveToNextState(video, isNewVideo)
+  await moveToNextState({ video, previousVideoState, isNewVideo })
 }
 
 async function onFileMoved (options: {
@@ -106,7 +150,7 @@ async function onFileMoved (options: {
   file.fileUrl = fileUrl
   file.storage = VideoStorage.OBJECT_STORAGE
 
-  await createTorrentAndSetInfoHash(videoOrPlaylist, file)
+  await updateTorrentMetadata(videoOrPlaylist, file)
   await file.save()
 
   logger.debug('Removing %s because it\'s now on object storage', oldPath)