From 1808a1f8e4b7b102823492a2007a46929aebf189 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 22 Mar 2022 14:35:04 +0100 Subject: Add video edition finished notification --- .../job-queue/handlers/move-to-object-storage.ts | 14 ++++++++--- server/lib/job-queue/handlers/video-edition.ts | 29 +++++++++------------- server/lib/job-queue/handlers/video-file-import.ts | 2 +- server/lib/job-queue/handlers/video-import.ts | 4 +-- server/lib/job-queue/handlers/video-live-ending.ts | 2 +- server/lib/job-queue/handlers/video-transcoding.ts | 6 ++--- 6 files changed, 29 insertions(+), 28 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index 69b441176..f480b32cd 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts @@ -11,7 +11,7 @@ import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/l import { VideoModel } from '@server/models/video/video' import { VideoJobInfoModel } from '@server/models/video/video-job-info' import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models' -import { MoveObjectStoragePayload, VideoStorage } from '@shared/models' +import { MoveObjectStoragePayload, VideoState, VideoStorage } from '@shared/models' const lTagsBase = loggerTagsFactory('move-object-storage') @@ -45,7 +45,7 @@ export async function processMoveToObjectStorage (job: Job) { if (pendingMove === 0) { logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags) - await doAfterLastJob(video, payload.isNewVideo) + await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo }) } } catch (err) { logger.error('Cannot move video %s to object storage.', video.url, { err, ...lTags }) @@ -91,7 +91,13 @@ async function moveHLSFiles (video: MVideoWithAllFiles) { } } -async function doAfterLastJob (video: MVideoWithAllFiles, isNewVideo: boolean) { +async function doAfterLastJob (options: { + video: MVideoWithAllFiles + previousVideoState: VideoState + isNewVideo: boolean +}) { + const { video, previousVideoState, isNewVideo } = options + for (const playlist of video.VideoStreamingPlaylists) { if (playlist.storage === VideoStorage.OBJECT_STORAGE) continue @@ -115,7 +121,7 @@ async function doAfterLastJob (video: MVideoWithAllFiles, isNewVideo: boolean) { await remove(getHLSDirectory(video)) } - await moveToNextState(video, isNewVideo) + await moveToNextState({ video, previousVideoState, isNewVideo }) } async function onFileMoved (options: { diff --git a/server/lib/job-queue/handlers/video-edition.ts b/server/lib/job-queue/handlers/video-edition.ts index c5ba0452f..d2d2a4f65 100644 --- a/server/lib/job-queue/handlers/video-edition.ts +++ b/server/lib/job-queue/handlers/video-edition.ts @@ -8,10 +8,9 @@ import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' import { generateWebTorrentVideoFilename } from '@server/lib/paths' import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' import { isAbleToUploadVideo } from '@server/lib/user' -import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video' +import { addOptimizeOrMergeAudioJob } from '@server/lib/video' import { approximateIntroOutroAdditionalSize } from '@server/lib/video-editor' import { VideoPathManager } from '@server/lib/video-path-manager' -import { buildNextVideoState } from '@server/lib/video-state' import { UserModel } from '@server/models/user/user' import { VideoModel } from '@server/models/video/video' import { VideoFileModel } from '@server/models/video/video-file' @@ -33,8 +32,7 @@ import { VideoEditorTaskCutPayload, VideoEditorTaskIntroPayload, VideoEditorTaskOutroPayload, - VideoEditorTaskWatermarkPayload, - VideoState + VideoEditorTaskWatermarkPayload } from '@shared/models' import { logger, loggerTagsFactory } from '../../../helpers/logger' @@ -42,14 +40,15 @@ const lTagsBase = loggerTagsFactory('video-edition') async function processVideoEdition (job: Job) { const payload = job.data as VideoEditionPayload + const lTags = lTagsBase(payload.videoUUID) - logger.info('Process video edition of %s in job %d.', payload.videoUUID, job.id) + logger.info('Process video edition of %s in job %d.', payload.videoUUID, job.id, lTags) const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) // No video, maybe deleted? if (!video) { - logger.info('Can\'t process job %d, video does not exist.', job.id, lTagsBase(payload.videoUUID)) + logger.info('Can\'t process job %d, video does not exist.', job.id, lTags) return undefined } @@ -69,7 +68,8 @@ async function processVideoEdition (job: Job) { inputPath: tmpInputFilePath ?? originalFilePath, video, outputPath, - task + task, + lTags }) if (tmpInputFilePath) await remove(tmpInputFilePath) @@ -81,7 +81,7 @@ async function processVideoEdition (job: Job) { return outputPath }) - logger.info('Video edition ended for video %s.', video.uuid) + logger.info('Video edition ended for video %s.', video.uuid, lTags) const newFile = await buildNewFile(video, editionResultPath) @@ -94,19 +94,13 @@ async function processVideoEdition (job: Job) { await newFile.save() - video.state = buildNextVideoState() video.duration = await getVideoStreamDuration(outputPath) await video.save() await federateVideoIfNeeded(video, false, undefined) - if (video.state === VideoState.TO_TRANSCODE) { - const user = await UserModel.loadByVideoId(video.id) - - await addOptimizeOrMergeAudioJob(video, newFile, user, false) - } else if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { - await addMoveToObjectStorageJob(video, false) - } + const user = await UserModel.loadByVideoId(video.id) + await addOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) } // --------------------------------------------------------------------------- @@ -122,6 +116,7 @@ type TaskProcessorOptions Promise } = { @@ -134,7 +129,7 @@ const taskProcessors: { [id in VideoEditorTask['name']]: (options: TaskProcessor async function processTask (options: TaskProcessorOptions) { const { video, task } = options - logger.info('Processing %s task for video %s.', task.name, video.uuid, { task }) + logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...options.lTags }) const processor = taskProcessors[options.task.name] if (!process) throw new Error('Unknown task ' + task.name) diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 6b2d60317..110176d81 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts @@ -28,7 +28,7 @@ async function processVideoFileImport (job: Job) { await updateVideoFile(video, payload.filePath) if (CONFIG.OBJECT_STORAGE.ENABLED) { - await addMoveToObjectStorageJob(video) + await addMoveToObjectStorageJob({ video, previousVideoState: video.state }) } else { await federateVideoIfNeeded(video, false) } diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index b3ca28c2f..d59a1b12f 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -254,12 +254,12 @@ async function processFile (downloader: () => Promise, videoImport: MVid } if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { - return addMoveToObjectStorageJob(videoImportUpdated.Video) + return addMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) } // Create transcoding jobs? if (video.state === VideoState.TO_TRANSCODE) { - await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile, videoImport.User) + await addOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) } } catch (err) { diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 497f6612a..f4de4b47c 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts @@ -133,7 +133,7 @@ async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MSt }) } - await moveToNextState(videoWithFiles, false) + await moveToNextState({ video: videoWithFiles, isNewVideo: false }) } async function cleanupTMPLiveFiles (hlsDirectory: string) { diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 512979734..95ee6b384 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -168,7 +168,7 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, pay } await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') - await retryTransactionWrapper(moveToNextState, video, payload.isNewVideo) + await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) } async function onVideoFirstWebTorrentTranscoding ( @@ -210,7 +210,7 @@ async function onVideoFirstWebTorrentTranscoding ( // Move to next state if there are no other resolutions to generate if (!hasHls && !hasNewResolutions) { - await retryTransactionWrapper(moveToNextState, videoDatabase, payload.isNewVideo) + await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo }) } } @@ -225,7 +225,7 @@ async function onNewWebTorrentFileResolution ( await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') - await retryTransactionWrapper(moveToNextState, video, payload.isNewVideo) + await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) } // --------------------------------------------------------------------------- -- cgit v1.2.3