aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/handlers
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-03-22 14:35:04 +0100
committerChocobozzz <me@florianbigard.com>2022-03-22 16:25:14 +0100
commit1808a1f8e4b7b102823492a2007a46929aebf189 (patch)
treea345140ec9a7a20c222ace3cda18ac999277c8c3 /server/lib/job-queue/handlers
parent348c2ce3ff3fe2f25a31f08bfb36c88723a0ce46 (diff)
downloadPeerTube-1808a1f8e4b7b102823492a2007a46929aebf189.tar.gz
PeerTube-1808a1f8e4b7b102823492a2007a46929aebf189.tar.zst
PeerTube-1808a1f8e4b7b102823492a2007a46929aebf189.zip
Add video edition finished notification
Diffstat (limited to 'server/lib/job-queue/handlers')
-rw-r--r--server/lib/job-queue/handlers/move-to-object-storage.ts14
-rw-r--r--server/lib/job-queue/handlers/video-edition.ts29
-rw-r--r--server/lib/job-queue/handlers/video-file-import.ts2
-rw-r--r--server/lib/job-queue/handlers/video-import.ts4
-rw-r--r--server/lib/job-queue/handlers/video-live-ending.ts2
-rw-r--r--server/lib/job-queue/handlers/video-transcoding.ts6
6 files changed, 29 insertions, 28 deletions
diff --git a/server/lib/job-queue/handlers/move-to-object-storage.ts b/server/lib/job-queue/handlers/move-to-object-storage.ts
index 69b441176..f480b32cd 100644
--- a/server/lib/job-queue/handlers/move-to-object-storage.ts
+++ b/server/lib/job-queue/handlers/move-to-object-storage.ts
@@ -11,7 +11,7 @@ import { moveToFailedMoveToObjectStorageState, moveToNextState } from '@server/l
11import { VideoModel } from '@server/models/video/video' 11import { VideoModel } from '@server/models/video/video'
12import { VideoJobInfoModel } from '@server/models/video/video-job-info' 12import { VideoJobInfoModel } from '@server/models/video/video-job-info'
13import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models' 13import { MStreamingPlaylistVideo, MVideo, MVideoFile, MVideoWithAllFiles } from '@server/types/models'
14import { MoveObjectStoragePayload, VideoStorage } from '@shared/models' 14import { MoveObjectStoragePayload, VideoState, VideoStorage } from '@shared/models'
15 15
16const lTagsBase = loggerTagsFactory('move-object-storage') 16const lTagsBase = loggerTagsFactory('move-object-storage')
17 17
@@ -45,7 +45,7 @@ export async function processMoveToObjectStorage (job: Job) {
45 if (pendingMove === 0) { 45 if (pendingMove === 0) {
46 logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags) 46 logger.info('Running cleanup after moving files to object storage (video %s in job %d)', video.uuid, job.id, lTags)
47 47
48 await doAfterLastJob(video, payload.isNewVideo) 48 await doAfterLastJob({ video, previousVideoState: payload.previousVideoState, isNewVideo: payload.isNewVideo })
49 } 49 }
50 } catch (err) { 50 } catch (err) {
51 logger.error('Cannot move video %s to object storage.', video.url, { err, ...lTags }) 51 logger.error('Cannot move video %s to object storage.', video.url, { err, ...lTags })
@@ -91,7 +91,13 @@ async function moveHLSFiles (video: MVideoWithAllFiles) {
91 } 91 }
92} 92}
93 93
94async function doAfterLastJob (video: MVideoWithAllFiles, isNewVideo: boolean) { 94async function doAfterLastJob (options: {
95 video: MVideoWithAllFiles
96 previousVideoState: VideoState
97 isNewVideo: boolean
98}) {
99 const { video, previousVideoState, isNewVideo } = options
100
95 for (const playlist of video.VideoStreamingPlaylists) { 101 for (const playlist of video.VideoStreamingPlaylists) {
96 if (playlist.storage === VideoStorage.OBJECT_STORAGE) continue 102 if (playlist.storage === VideoStorage.OBJECT_STORAGE) continue
97 103
@@ -115,7 +121,7 @@ async function doAfterLastJob (video: MVideoWithAllFiles, isNewVideo: boolean) {
115 await remove(getHLSDirectory(video)) 121 await remove(getHLSDirectory(video))
116 } 122 }
117 123
118 await moveToNextState(video, isNewVideo) 124 await moveToNextState({ video, previousVideoState, isNewVideo })
119} 125}
120 126
121async function onFileMoved (options: { 127async function onFileMoved (options: {
diff --git a/server/lib/job-queue/handlers/video-edition.ts b/server/lib/job-queue/handlers/video-edition.ts
index c5ba0452f..d2d2a4f65 100644
--- a/server/lib/job-queue/handlers/video-edition.ts
+++ b/server/lib/job-queue/handlers/video-edition.ts
@@ -8,10 +8,9 @@ import { federateVideoIfNeeded } from '@server/lib/activitypub/videos'
8import { generateWebTorrentVideoFilename } from '@server/lib/paths' 8import { generateWebTorrentVideoFilename } from '@server/lib/paths'
9import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' 9import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles'
10import { isAbleToUploadVideo } from '@server/lib/user' 10import { isAbleToUploadVideo } from '@server/lib/user'
11import { addMoveToObjectStorageJob, addOptimizeOrMergeAudioJob } from '@server/lib/video' 11import { addOptimizeOrMergeAudioJob } from '@server/lib/video'
12import { approximateIntroOutroAdditionalSize } from '@server/lib/video-editor' 12import { approximateIntroOutroAdditionalSize } from '@server/lib/video-editor'
13import { VideoPathManager } from '@server/lib/video-path-manager' 13import { VideoPathManager } from '@server/lib/video-path-manager'
14import { buildNextVideoState } from '@server/lib/video-state'
15import { UserModel } from '@server/models/user/user' 14import { UserModel } from '@server/models/user/user'
16import { VideoModel } from '@server/models/video/video' 15import { VideoModel } from '@server/models/video/video'
17import { VideoFileModel } from '@server/models/video/video-file' 16import { VideoFileModel } from '@server/models/video/video-file'
@@ -33,8 +32,7 @@ import {
33 VideoEditorTaskCutPayload, 32 VideoEditorTaskCutPayload,
34 VideoEditorTaskIntroPayload, 33 VideoEditorTaskIntroPayload,
35 VideoEditorTaskOutroPayload, 34 VideoEditorTaskOutroPayload,
36 VideoEditorTaskWatermarkPayload, 35 VideoEditorTaskWatermarkPayload
37 VideoState
38} from '@shared/models' 36} from '@shared/models'
39import { logger, loggerTagsFactory } from '../../../helpers/logger' 37import { logger, loggerTagsFactory } from '../../../helpers/logger'
40 38
@@ -42,14 +40,15 @@ const lTagsBase = loggerTagsFactory('video-edition')
42 40
43async function processVideoEdition (job: Job) { 41async function processVideoEdition (job: Job) {
44 const payload = job.data as VideoEditionPayload 42 const payload = job.data as VideoEditionPayload
43 const lTags = lTagsBase(payload.videoUUID)
45 44
46 logger.info('Process video edition of %s in job %d.', payload.videoUUID, job.id) 45 logger.info('Process video edition of %s in job %d.', payload.videoUUID, job.id, lTags)
47 46
48 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) 47 const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID)
49 48
50 // No video, maybe deleted? 49 // No video, maybe deleted?
51 if (!video) { 50 if (!video) {
52 logger.info('Can\'t process job %d, video does not exist.', job.id, lTagsBase(payload.videoUUID)) 51 logger.info('Can\'t process job %d, video does not exist.', job.id, lTags)
53 return undefined 52 return undefined
54 } 53 }
55 54
@@ -69,7 +68,8 @@ async function processVideoEdition (job: Job) {
69 inputPath: tmpInputFilePath ?? originalFilePath, 68 inputPath: tmpInputFilePath ?? originalFilePath,
70 video, 69 video,
71 outputPath, 70 outputPath,
72 task 71 task,
72 lTags
73 }) 73 })
74 74
75 if (tmpInputFilePath) await remove(tmpInputFilePath) 75 if (tmpInputFilePath) await remove(tmpInputFilePath)
@@ -81,7 +81,7 @@ async function processVideoEdition (job: Job) {
81 return outputPath 81 return outputPath
82 }) 82 })
83 83
84 logger.info('Video edition ended for video %s.', video.uuid) 84 logger.info('Video edition ended for video %s.', video.uuid, lTags)
85 85
86 const newFile = await buildNewFile(video, editionResultPath) 86 const newFile = await buildNewFile(video, editionResultPath)
87 87
@@ -94,19 +94,13 @@ async function processVideoEdition (job: Job) {
94 94
95 await newFile.save() 95 await newFile.save()
96 96
97 video.state = buildNextVideoState()
98 video.duration = await getVideoStreamDuration(outputPath) 97 video.duration = await getVideoStreamDuration(outputPath)
99 await video.save() 98 await video.save()
100 99
101 await federateVideoIfNeeded(video, false, undefined) 100 await federateVideoIfNeeded(video, false, undefined)
102 101
103 if (video.state === VideoState.TO_TRANSCODE) { 102 const user = await UserModel.loadByVideoId(video.id)
104 const user = await UserModel.loadByVideoId(video.id) 103 await addOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false })
105
106 await addOptimizeOrMergeAudioJob(video, newFile, user, false)
107 } else if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
108 await addMoveToObjectStorageJob(video, false)
109 }
110} 104}
111 105
112// --------------------------------------------------------------------------- 106// ---------------------------------------------------------------------------
@@ -122,6 +116,7 @@ type TaskProcessorOptions <T extends VideoEditionTaskPayload = VideoEditionTaskP
122 outputPath: string 116 outputPath: string
123 video: MVideo 117 video: MVideo
124 task: T 118 task: T
119 lTags: { tags: string[] }
125} 120}
126 121
127const taskProcessors: { [id in VideoEditorTask['name']]: (options: TaskProcessorOptions) => Promise<any> } = { 122const taskProcessors: { [id in VideoEditorTask['name']]: (options: TaskProcessorOptions) => Promise<any> } = {
@@ -134,7 +129,7 @@ const taskProcessors: { [id in VideoEditorTask['name']]: (options: TaskProcessor
134async function processTask (options: TaskProcessorOptions) { 129async function processTask (options: TaskProcessorOptions) {
135 const { video, task } = options 130 const { video, task } = options
136 131
137 logger.info('Processing %s task for video %s.', task.name, video.uuid, { task }) 132 logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...options.lTags })
138 133
139 const processor = taskProcessors[options.task.name] 134 const processor = taskProcessors[options.task.name]
140 if (!process) throw new Error('Unknown task ' + task.name) 135 if (!process) throw new Error('Unknown task ' + task.name)
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts
index 6b2d60317..110176d81 100644
--- a/server/lib/job-queue/handlers/video-file-import.ts
+++ b/server/lib/job-queue/handlers/video-file-import.ts
@@ -28,7 +28,7 @@ async function processVideoFileImport (job: Job) {
28 await updateVideoFile(video, payload.filePath) 28 await updateVideoFile(video, payload.filePath)
29 29
30 if (CONFIG.OBJECT_STORAGE.ENABLED) { 30 if (CONFIG.OBJECT_STORAGE.ENABLED) {
31 await addMoveToObjectStorageJob(video) 31 await addMoveToObjectStorageJob({ video, previousVideoState: video.state })
32 } else { 32 } else {
33 await federateVideoIfNeeded(video, false) 33 await federateVideoIfNeeded(video, false)
34 } 34 }
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index b3ca28c2f..d59a1b12f 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -254,12 +254,12 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
254 } 254 }
255 255
256 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { 256 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
257 return addMoveToObjectStorageJob(videoImportUpdated.Video) 257 return addMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT })
258 } 258 }
259 259
260 // Create transcoding jobs? 260 // Create transcoding jobs?
261 if (video.state === VideoState.TO_TRANSCODE) { 261 if (video.state === VideoState.TO_TRANSCODE) {
262 await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile, videoImport.User) 262 await addOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User })
263 } 263 }
264 264
265 } catch (err) { 265 } catch (err) {
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts
index 497f6612a..f4de4b47c 100644
--- a/server/lib/job-queue/handlers/video-live-ending.ts
+++ b/server/lib/job-queue/handlers/video-live-ending.ts
@@ -133,7 +133,7 @@ async function saveLive (video: MVideo, live: MVideoLive, streamingPlaylist: MSt
133 }) 133 })
134 } 134 }
135 135
136 await moveToNextState(videoWithFiles, false) 136 await moveToNextState({ video: videoWithFiles, isNewVideo: false })
137} 137}
138 138
139async function cleanupTMPLiveFiles (hlsDirectory: string) { 139async function cleanupTMPLiveFiles (hlsDirectory: string) {
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts
index 512979734..95ee6b384 100644
--- a/server/lib/job-queue/handlers/video-transcoding.ts
+++ b/server/lib/job-queue/handlers/video-transcoding.ts
@@ -168,7 +168,7 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, pay
168 } 168 }
169 169
170 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') 170 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
171 await retryTransactionWrapper(moveToNextState, video, payload.isNewVideo) 171 await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo })
172} 172}
173 173
174async function onVideoFirstWebTorrentTranscoding ( 174async function onVideoFirstWebTorrentTranscoding (
@@ -210,7 +210,7 @@ async function onVideoFirstWebTorrentTranscoding (
210 210
211 // Move to next state if there are no other resolutions to generate 211 // Move to next state if there are no other resolutions to generate
212 if (!hasHls && !hasNewResolutions) { 212 if (!hasHls && !hasNewResolutions) {
213 await retryTransactionWrapper(moveToNextState, videoDatabase, payload.isNewVideo) 213 await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo })
214 } 214 }
215} 215}
216 216
@@ -225,7 +225,7 @@ async function onNewWebTorrentFileResolution (
225 225
226 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') 226 await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode')
227 227
228 await retryTransactionWrapper(moveToNextState, video, payload.isNewVideo) 228 await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo })
229} 229}
230 230
231// --------------------------------------------------------------------------- 231// ---------------------------------------------------------------------------