diff options
Diffstat (limited to 'server/lib/job-queue')
6 files changed, 29 insertions, 28 deletions
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts index 69b441176..f480b32cd 100644 --- a/server/lib/job-queue/handlers/move-to-object-storage.ts +++ b/server/lib/job-queue/handlers/move-to-object-storage.ts | |||
@@ -11,7 +11,7 @@ import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/l | |||
11 | import { VideoModel } from '@server/models/video/video' | 11 | import { VideoModel } from '@server/models/video/video' |
12 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | 12 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' |
13 | import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models' | 13 | import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models' |
14 | import { MoveObjectStoragePayload, VideoStorage } from '@shared/models' | 14 | import { MoveObjectStoragePayload, VideoState, VideoStorage } from '@shared/models' |
15 | 15 | ||
16 | const lTagsBase = loggerTagsFactory('move-object-storage') | 16 | const lTagsBase = loggerTagsFactory('move-object-storage') |
17 | 17 | ||
@@ -45,7 +45,7 @@ export async function processMoveToObjectStorage (job: Job) { | |||
45 | if (pendingMove === 0) { | 45 | if (pendingMove === 0) { |
46 | logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags) | 46 | logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags) |
47 | 47 | ||
48 | await doAfterLastJob(video, payload.isNewVideo) | 48 | await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo }) |
49 | } | 49 | } |
50 | } catch (err) { | 50 | } catch (err) { |
51 | logger.error('Cannot move video %s to object storage.', video.url, { err, ...lTags }) | 51 | logger.error('Cannot move video %s to object storage.', video.url, { err, ...lTags }) |
@@ -91,7 +91,13 @@ async function moveHLSFiles (video: MVideoWithAllFiles) { | |||
91 | } | 91 | } |
92 | } | 92 | } |
93 | 93 | ||
94 | async function doAfterLastJob (video: MVideoWithAllFiles, isNewVideo: boolean) { | 94 | async function doAfterLastJob (options: { |
95 | video: MVideoWithAllFiles | ||
96 | previousVideoState: VideoState | ||
97 | isNewVideo: boolean | ||
98 | }) { | ||
99 | const { video, previousVideoState, isNewVideo } = options | ||
100 | |||
95 | for (const playlist of video.VideoStreamingPlaylists) { | 101 | for (const playlist of video.VideoStreamingPlaylists) { |
96 | if (playlist.storage === VideoStorage.OBJECT_STORAGE) continue | 102 | if (playlist.storage === VideoStorage.OBJECT_STORAGE) continue |
97 | 103 | ||
@@ -115,7 +121,7 @@ async function doAfterLastJob (video: MVideoWithAllFiles, isNewVideo: boolean) { | |||
115 | await remove(getHLSDirectory(video)) | 121 | await remove(getHLSDirectory(video)) |
116 | } | 122 | } |
117 | 123 | ||
118 | await moveToNextState(video, isNewVideo) | 124 | await moveToNextState({ video, previousVideoState, isNewVideo }) |
119 | } | 125 | } |
120 | 126 | ||
121 | async function onFileMoved (options: { | 127 | async function onFileMoved (options: { |
diff --git a/server/lib/job-queue/handlers/video-edition.ts b/server/lib/job-queue/handlers/video-edition.ts index c5ba0452f..d2d2a4f65 100644 --- a/server/lib/job-queue/handlers/video-edition.ts +++ b/server/lib/job-queue/handlers/video-edition.ts | |||
@@ -8,10 +8,9 @@ import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | |||
8 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' | 8 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' |
9 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' | 9 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' |
10 | import { isAbleToUploadVideo } from '@server/lib/user' | 10 | import { isAbleToUploadVideo } from '@server/lib/user' |
11 | import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video' | 11 | import { addOptimizeOrMergeAudioJob } from '@server/lib/video' |
12 | import { approximateIntroOutroAdditionalSize } from '@server/lib/video-editor' | 12 | import { approximateIntroOutroAdditionalSize } from '@server/lib/video-editor' |
13 | import { VideoPathManager } from '@server/lib/video-path-manager' | 13 | import { VideoPathManager } from '@server/lib/video-path-manager' |
14 | import { buildNextVideoState } from '@server/lib/video-state' | ||
15 | import { UserModel } from '@server/models/user/user' | 14 | import { UserModel } from '@server/models/user/user' |
16 | import { VideoModel } from '@server/models/video/video' | 15 | import { VideoModel } from '@server/models/video/video' |
17 | import { VideoFileModel } from '@server/models/video/video-file' | 16 | import { VideoFileModel } from '@server/models/video/video-file' |
@@ -33,8 +32,7 @@ import { | |||
33 | VideoEditorTaskCutPayload, | 32 | VideoEditorTaskCutPayload, |
34 | VideoEditorTaskIntroPayload, | 33 | VideoEditorTaskIntroPayload, |
35 | VideoEditorTaskOutroPayload, | 34 | VideoEditorTaskOutroPayload, |
36 | VideoEditorTaskWatermarkPayload, | 35 | VideoEditorTaskWatermarkPayload |
37 | VideoState | ||
38 | } from '@shared/models' | 36 | } from '@shared/models' |
39 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 37 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
40 | 38 | ||
@@ -42,14 +40,15 @@ const lTagsBase = loggerTagsFactory('video-edition') | |||
42 | 40 | ||
43 | async function processVideoEdition (job: Job) { | 41 | async function processVideoEdition (job: Job) { |
44 | const payload = job.data as VideoEditionPayload | 42 | const payload = job.data as VideoEditionPayload |
43 | const lTags = lTagsBase(payload.videoUUID) | ||
45 | 44 | ||
46 | logger.info('Process video edition of %s in job %d.', payload.videoUUID, job.id) | 45 | logger.info('Process video edition of %s in job %d.', payload.videoUUID, job.id, lTags) |
47 | 46 | ||
48 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) | 47 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) |
49 | 48 | ||
50 | // No video, maybe deleted? | 49 | // No video, maybe deleted? |
51 | if (!video) { | 50 | if (!video) { |
52 | logger.info('Can\'t process job %d, video does not exist.', job.id, lTagsBase(payload.videoUUID)) | 51 | logger.info('Can\'t process job %d, video does not exist.', job.id, lTags) |
53 | return undefined | 52 | return undefined |
54 | } | 53 | } |
55 | 54 | ||
@@ -69,7 +68,8 @@ async function processVideoEdition (job: Job) { | |||
69 | inputPath: tmpInputFilePath ?? originalFilePath, | 68 | inputPath: tmpInputFilePath ?? originalFilePath, |
70 | video, | 69 | video, |
71 | outputPath, | 70 | outputPath, |
72 | task | 71 | task, |
72 | lTags | ||
73 | }) | 73 | }) |
74 | 74 | ||
75 | if (tmpInputFilePath) await remove(tmpInputFilePath) | 75 | if (tmpInputFilePath) await remove(tmpInputFilePath) |
@@ -81,7 +81,7 @@ async function processVideoEdition (job: Job) { | |||
81 | return outputPath | 81 | return outputPath |
82 | }) | 82 | }) |
83 | 83 | ||
84 | logger.info('Video edition ended for video %s.', video.uuid) | 84 | logger.info('Video edition ended for video %s.', video.uuid, lTags) |
85 | 85 | ||
86 | const newFile = await buildNewFile(video, editionResultPath) | 86 | const newFile = await buildNewFile(video, editionResultPath) |
87 | 87 | ||
@@ -94,19 +94,13 @@ async function processVideoEdition (job: Job) { | |||
94 | 94 | ||
95 | await newFile.save() | 95 | await newFile.save() |
96 | 96 | ||
97 | video.state = buildNextVideoState() | ||
98 | video.duration = await getVideoStreamDuration(outputPath) | 97 | video.duration = await getVideoStreamDuration(outputPath) |
99 | await video.save() | 98 | await video.save() |
100 | 99 | ||
101 | await federateVideoIfNeeded(video, false, undefined) | 100 | await federateVideoIfNeeded(video, false, undefined) |
102 | 101 | ||
103 | if (video.state === VideoState.TO_TRANSCODE) { | 102 | const user = await UserModel.loadByVideoId(video.id) |
104 | const user = await UserModel.loadByVideoId(video.id) | 103 | await addOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) |
105 | |||
106 | await addOptimizeOrMergeAudioJob(video, newFile, user, false) | ||
107 | } else if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | ||
108 | await addMoveToObjectStorageJob(video, false) | ||
109 | } | ||
110 | } | 104 | } |
111 | 105 | ||
112 | // --------------------------------------------------------------------------- | 106 | // --------------------------------------------------------------------------- |
@@ -122,6 +116,7 @@ type TaskProcessorOptions <T extends VideoEditionTaskPayload = VideoEditionTaskP | |||
122 | outputPath: string | 116 | outputPath: string |
123 | video: MVideo | 117 | video: MVideo |
124 | task: T | 118 | task: T |
119 | lTags: { tags: string[] } | ||
125 | } | 120 | } |
126 | 121 | ||
127 | const taskProcessors: { [id in VideoEditorTask['name']]: (options: TaskProcessorOptions) => Promise<any> } = { | 122 | const taskProcessors: { [id in VideoEditorTask['name']]: (options: TaskProcessorOptions) => Promise<any> } = { |
@@ -134,7 +129,7 @@ const taskProcessors: { [id in VideoEditorTask['name']]: (options: TaskProcessor | |||
134 | async function processTask (options: TaskProcessorOptions) { | 129 | async function processTask (options: TaskProcessorOptions) { |
135 | const { video, task } = options | 130 | const { video, task } = options |
136 | 131 | ||
137 | logger.info('Processing %s task for video %s.', task.name, video.uuid, { task }) | 132 | logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...options.lTags }) |
138 | 133 | ||
139 | const processor = taskProcessors[options.task.name] | 134 | const processor = taskProcessors[options.task.name] |
140 | if (!process) throw new Error('Unknown task ' + task.name) | 135 | if (!process) throw new Error('Unknown task ' + task.name) |
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 6b2d60317..110176d81 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -28,7 +28,7 @@ async function processVideoFileImport (job: Job) { | |||
28 | await updateVideoFile(video, payload.filePath) | 28 | await updateVideoFile(video, payload.filePath) |
29 | 29 | ||
30 | if (CONFIG.OBJECT_STORAGE.ENABLED) { | 30 | if (CONFIG.OBJECT_STORAGE.ENABLED) { |
31 | await addMoveToObjectStorageJob(video) | 31 | await addMoveToObjectStorageJob({ video, previousVideoState: video.state }) |
32 | } else { | 32 | } else { |
33 | await federateVideoIfNeeded(video, false) | 33 | await federateVideoIfNeeded(video, false) |
34 | } | 34 | } |
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index b3ca28c2f..d59a1b12f 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -254,12 +254,12 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
254 | } | 254 | } |
255 | 255 | ||
256 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | 256 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { |
257 | return addMoveToObjectStorageJob(videoImportUpdated.Video) | 257 | return addMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) |
258 | } | 258 | } |
259 | 259 | ||
260 | // Create transcoding jobs? | 260 | // Create transcoding jobs? |
261 | if (video.state === VideoState.TO_TRANSCODE) { | 261 | if (video.state === VideoState.TO_TRANSCODE) { |
262 | await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile, videoImport.User) | 262 | await addOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) |
263 | } | 263 | } |
264 | 264 | ||
265 | } catch (err) { | 265 | } catch (err) { |
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 497f6612a..f4de4b47c 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts | |||
@@ -133,7 +133,7 @@ async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MSt | |||
133 | }) | 133 | }) |
134 | } | 134 | } |
135 | 135 | ||
136 | await moveToNextState(videoWithFiles, false) | 136 | await moveToNextState({ video: videoWithFiles, isNewVideo: false }) |
137 | } | 137 | } |
138 | 138 | ||
139 | async function cleanupTMPLiveFiles (hlsDirectory: string) { | 139 | async function cleanupTMPLiveFiles (hlsDirectory: string) { |
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 512979734..95ee6b384 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -168,7 +168,7 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, pay | |||
168 | } | 168 | } |
169 | 169 | ||
170 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | 170 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') |
171 | await retryTransactionWrapper(moveToNextState, video, payload.isNewVideo) | 171 | await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) |
172 | } | 172 | } |
173 | 173 | ||
174 | async function onVideoFirstWebTorrentTranscoding ( | 174 | async function onVideoFirstWebTorrentTranscoding ( |
@@ -210,7 +210,7 @@ async function onVideoFirstWebTorrentTranscoding ( | |||
210 | 210 | ||
211 | // Move to next state if there are no other resolutions to generate | 211 | // Move to next state if there are no other resolutions to generate |
212 | if (!hasHls && !hasNewResolutions) { | 212 | if (!hasHls && !hasNewResolutions) { |
213 | await retryTransactionWrapper(moveToNextState, videoDatabase, payload.isNewVideo) | 213 | await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo }) |
214 | } | 214 | } |
215 | } | 215 | } |
216 | 216 | ||
@@ -225,7 +225,7 @@ async function onNewWebTorrentFileResolution ( | |||
225 | 225 | ||
226 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | 226 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') |
227 | 227 | ||
228 | await retryTransactionWrapper(moveToNextState, video, payload.isNewVideo) | 228 | await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) |
229 | } | 229 | } |
230 | 230 | ||
231 | // --------------------------------------------------------------------------- | 231 | // --------------------------------------------------------------------------- |