From 849f0fd3b2d00056a2c6252230814d6c2e3e3919 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 25 Oct 2022 11:50:44 +0200 Subject: Lock files to generate torrents/move files --- .../lib/job-queue/handlers/manage-video-torrent.ts | 35 +++++++++++++++------- .../job-queue/handlers/move-to-object-storage.ts | 6 ++++ 2 files changed, 31 insertions(+), 10 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/manage-video-torrent.ts b/server/lib/job-queue/handlers/manage-video-torrent.ts index 425915c96..cef93afda 100644 --- a/server/lib/job-queue/handlers/manage-video-torrent.ts +++ b/server/lib/job-queue/handlers/manage-video-torrent.ts @@ -1,5 +1,7 @@ import { Job } from 'bullmq' +import { extractVideo } from '@server/helpers/video' import { createTorrentAndSetInfoHash, updateTorrentMetadata } from '@server/helpers/webtorrent' +import { VideoPathManager } from '@server/lib/video-path-manager' import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' @@ -30,17 +32,23 @@ async function doCreateAction (payload: ManageVideoTorrentPayload & { action: 'c if (!video || !file) return - await createTorrentAndSetInfoHash(video, file) + const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) - // Refresh videoFile because the createTorrentAndSetInfoHash could be long - const refreshedFile = await VideoFileModel.loadWithVideo(file.id) - // File does not exist anymore, remove the generated torrent - if (!refreshedFile) return file.removeTorrent() + try { + await createTorrentAndSetInfoHash(video, file) - refreshedFile.infoHash = file.infoHash - refreshedFile.torrentFilename = file.torrentFilename + // Refresh videoFile because the createTorrentAndSetInfoHash could be long + const refreshedFile = await VideoFileModel.loadWithVideo(file.id) + // File does not exist anymore, remove the generated torrent + if (!refreshedFile) return file.removeTorrent() - return refreshedFile.save() + refreshedFile.infoHash = file.infoHash + refreshedFile.torrentFilename = file.torrentFilename + + await refreshedFile.save() + } finally { + fileMutexReleaser() + } } async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { action: 'update-metadata' }) { @@ -52,9 +60,16 @@ async function doUpdateMetadataAction (payload: ManageVideoTorrentPayload & { ac if ((!video && !streamingPlaylist) || !file) return - await updateTorrentMetadata(video || streamingPlaylist, file) + const extractedVideo = extractVideo(video || streamingPlaylist) + const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(extractedVideo.uuid) - await file.save() + try { + await updateTorrentMetadata(video || streamingPlaylist, file) + + await file.save() + } finally { + fileMutexReleaser() + } } async function loadVideoOrLog (videoId: number) { diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index 0b68555d1..a1530cc57 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts @@ -28,6 +28,8 @@ export async function processMoveToObjectStorage (job: Job) { const lTags = lTagsBase(video.uuid, video.url) + const fileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) + try { if (video.VideoFiles) { logger.debug('Moving %d webtorrent files for video %s.', video.VideoFiles.length, video.uuid, lTags) @@ -49,6 +51,10 @@ export async function processMoveToObjectStorage (job: Job) { } } catch (err) { await onMoveToObjectStorageFailure(job, err) + + throw err + } finally { + fileMutexReleaser() } return payload.videoUUID -- cgit v1.2.3