aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-10-31 11:45:08 +0100
committerChocobozzz <me@florianbigard.com>2022-10-31 13:30:39 +0100
commit6740b6428be1c27e9ad728eede7c428d1e2e9f47 (patch)
treecf5c919cecd3b1a961002fb7ceee72ff3d9aa478 /server/lib/job-queue
parentf746622be455309112d858105992b2fa0b01af90 (diff)
downloadPeerTube-6740b6428be1c27e9ad728eede7c428d1e2e9f47.tar.gz
PeerTube-6740b6428be1c27e9ad728eede7c428d1e2e9f47.tar.zst
PeerTube-6740b6428be1c27e9ad728eede7c428d1e2e9f47.zip
Fix transcoding failure when importing a video
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/video-import.ts226
1 files changed, 130 insertions, 96 deletions
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index 9901b878c..99016f511 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -12,7 +12,8 @@ import { buildMoveToObjectStorageJob, buildOptimizeOrMergeAudioJob } from '@serv
12import { VideoPathManager } from '@server/lib/video-path-manager' 12import { VideoPathManager } from '@server/lib/video-path-manager'
13import { buildNextVideoState } from '@server/lib/video-state' 13import { buildNextVideoState } from '@server/lib/video-state'
14import { ThumbnailModel } from '@server/models/video/thumbnail' 14import { ThumbnailModel } from '@server/models/video/thumbnail'
15import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' 15import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models'
16import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import'
16import { getLowercaseExtension } from '@shared/core-utils' 17import { getLowercaseExtension } from '@shared/core-utils'
17import { isAudioFile } from '@shared/extra-utils' 18import { isAudioFile } from '@shared/extra-utils'
18import { 19import {
@@ -36,7 +37,6 @@ import { sequelizeTypescript } from '../../../initializers/database'
36import { VideoModel } from '../../../models/video/video' 37import { VideoModel } from '../../../models/video/video'
37import { VideoFileModel } from '../../../models/video/video-file' 38import { VideoFileModel } from '../../../models/video/video-file'
38import { VideoImportModel } from '../../../models/video/video-import' 39import { VideoImportModel } from '../../../models/video/video-import'
39import { MThumbnail } from '../../../types/models/video/thumbnail'
40import { federateVideoIfNeeded } from '../../activitypub/videos' 40import { federateVideoIfNeeded } from '../../activitypub/videos'
41import { Notifier } from '../../notifier' 41import { Notifier } from '../../notifier'
42import { generateVideoMiniature } from '../../thumbnail' 42import { generateVideoMiniature } from '../../thumbnail'
@@ -178,125 +178,159 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
178 } 178 }
179 179
180 // Video is accepted, resuming preparation 180 // Video is accepted, resuming preparation
181 const videoWithFiles = Object.assign(videoImport.Video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] }) 181 const videoFileLockReleaser = await VideoPathManager.Instance.lockFiles(videoImport.Video.uuid)
182 // To clean files if the import fails
183 const videoImportWithFiles: MVideoImportDefaultFiles = Object.assign(videoImport, { Video: videoWithFiles })
184
185 // Move file
186 const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile)
187 await move(tempVideoPath, videoDestFile)
188 tempVideoPath = null // This path is not used anymore
189
190 // Generate miniature if the import did not created it
191 let thumbnailModel: MThumbnail
192 let thumbnailSave: object
193 if (!videoImportWithFiles.Video.getMiniature()) {
194 thumbnailModel = await generateVideoMiniature({
195 video: videoImportWithFiles.Video,
196 videoFile,
197 type: ThumbnailType.MINIATURE
198 })
199 thumbnailSave = thumbnailModel.toJSON()
200 }
201 182
202 // Generate preview if the import did not created it 183 try {
203 let previewModel: MThumbnail 184 const videoImportWithFiles = await refreshVideoImportFromDB(videoImport, videoFile)
204 let previewSave: object
205 if (!videoImportWithFiles.Video.getPreview()) {
206 previewModel = await generateVideoMiniature({
207 video: videoImportWithFiles.Video,
208 videoFile,
209 type: ThumbnailType.PREVIEW
210 })
211 previewSave = previewModel.toJSON()
212 }
213 185
214 // Create torrent 186 // Move file
215 await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile) 187 const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile)
188 await move(tempVideoPath, videoDestFile)
216 189
217 const videoFileSave = videoFile.toJSON() 190 tempVideoPath = null // This path is not used anymore
218 191
219 const { videoImportUpdated, video } = await retryTransactionWrapper(() => { 192 let {
220 return sequelizeTypescript.transaction(async t => { 193 miniatureModel: thumbnailModel,
221 const videoImportToUpdate = videoImportWithFiles as MVideoImportVideo 194 miniatureJSONSave: thumbnailSave
195 } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.MINIATURE)
222 196
223 // Refresh video 197 let {
224 const video = await VideoModel.load(videoImportToUpdate.videoId, t) 198 miniatureModel: previewModel,
225 if (!video) throw new Error('Video linked to import ' + videoImportToUpdate.videoId + ' does not exist anymore.') 199 miniatureJSONSave: previewSave
200 } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.PREVIEW)
226 201
227 const videoFileCreated = await videoFile.save({ transaction: t }) 202 // Create torrent
203 await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile)
228 204
229 // Update video DB object 205 const videoFileSave = videoFile.toJSON()
230 video.duration = duration
231 video.state = buildNextVideoState(video.state)
232 await video.save({ transaction: t })
233 206
234 if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) 207 const { videoImportUpdated, video } = await retryTransactionWrapper(() => {
235 if (previewModel) await video.addAndSaveThumbnail(previewModel, t) 208 return sequelizeTypescript.transaction(async t => {
209 // Refresh video
210 const video = await VideoModel.load(videoImportWithFiles.videoId, t)
211 if (!video) throw new Error('Video linked to import ' + videoImportWithFiles.videoId + ' does not exist anymore.')
236 212
237 // Now we can federate the video (reload from database, we need more attributes) 213 await videoFile.save({ transaction: t })
238 const videoForFederation = await VideoModel.loadFull(video.uuid, t)
239 await federateVideoIfNeeded(videoForFederation, true, t)
240 214
241 // Update video import object 215 // Update video DB object
242 videoImportToUpdate.state = VideoImportState.SUCCESS 216 video.duration = duration
243 const videoImportUpdated = await videoImportToUpdate.save({ transaction: t }) as MVideoImportVideo 217 video.state = buildNextVideoState(video.state)
244 videoImportUpdated.Video = video 218 await video.save({ transaction: t })
245 219
246 videoImportToUpdate.Video = Object.assign(video, { VideoFiles: [ videoFileCreated ] }) 220 if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t)
221 if (previewModel) await video.addAndSaveThumbnail(previewModel, t)
247 222
248 logger.info('Video %s imported.', video.uuid) 223 // Now we can federate the video (reload from database, we need more attributes)
224 const videoForFederation = await VideoModel.loadFull(video.uuid, t)
225 await federateVideoIfNeeded(videoForFederation, true, t)
249 226
250 return { videoImportUpdated, video: videoForFederation } 227 // Update video import object
251 }).catch(err => { 228 videoImportWithFiles.state = VideoImportState.SUCCESS
252 // Reset fields 229 const videoImportUpdated = await videoImportWithFiles.save({ transaction: t }) as MVideoImport
253 if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave)
254 if (previewModel) previewModel = new ThumbnailModel(previewSave)
255 230
256 videoFile = new VideoFileModel(videoFileSave) 231 logger.info('Video %s imported.', video.uuid)
257 232
258 throw err 233 return { videoImportUpdated, video: videoForFederation }
259 }) 234 }).catch(err => {
260 }) 235 // Reset fields
236 if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave)
237 if (previewModel) previewModel = new ThumbnailModel(previewSave)
261 238
262 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport: videoImportUpdated, success: true }) 239 videoFile = new VideoFileModel(videoFileSave)
263 240
264 if (video.isBlacklisted()) { 241 throw err
265 const videoBlacklist = Object.assign(video.VideoBlacklist, { Video: video }) 242 })
243 })
266 244
267 Notifier.Instance.notifyOnVideoAutoBlacklist(videoBlacklist) 245 await afterImportSuccess({ videoImport: videoImportUpdated, video, videoFile, user: videoImport.User })
268 } else { 246 } finally {
269 Notifier.Instance.notifyOnNewVideoIfNeeded(video) 247 videoFileLockReleaser()
270 } 248 }
249 } catch (err) {
250 await onImportError(err, tempVideoPath, videoImport)
271 251
272 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { 252 throw err
273 await JobQueue.Instance.createJob( 253 }
274 await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) 254}
275 )
276 }
277 255
278 // Create transcoding jobs? 256async function refreshVideoImportFromDB (videoImport: MVideoImportDefault, videoFile: MVideoFile): Promise<MVideoImportDefaultFiles> {
279 if (video.state === VideoState.TO_TRANSCODE) { 257 // Refresh video, privacy may have changed
280 await JobQueue.Instance.createJob( 258 const video = await videoImport.Video.reload()
281 await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) 259 const videoWithFiles = Object.assign(video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] })
282 )
283 }
284 260
285 } catch (err) { 261 return Object.assign(videoImport, { Video: videoWithFiles })
286 try { 262}
287 if (tempVideoPath) await remove(tempVideoPath)
288 } catch (errUnlink) {
289 logger.warn('Cannot cleanup files after a video import error.', { err: errUnlink })
290 }
291 263
292 videoImport.error = err.message 264async function generateMiniature (videoImportWithFiles: MVideoImportDefaultFiles, videoFile: MVideoFile, thumbnailType: ThumbnailType) {
293 if (videoImport.state !== VideoImportState.REJECTED) { 265 // Generate miniature if the import did not created it
294 videoImport.state = VideoImportState.FAILED 266 const needsMiniature = thumbnailType === ThumbnailType.MINIATURE
267 ? !videoImportWithFiles.Video.getMiniature()
268 : !videoImportWithFiles.Video.getPreview()
269
270 if (!needsMiniature) {
271 return {
272 miniatureModel: null,
273 miniatureJSONSave: null
295 } 274 }
296 await videoImport.save() 275 }
297 276
298 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport, success: false }) 277 const miniatureModel = await generateVideoMiniature({
278 video: videoImportWithFiles.Video,
279 videoFile,
280 type: ThumbnailType.MINIATURE
281 })
282 const miniatureJSONSave = miniatureModel.toJSON()
299 283
300 throw err 284 return {
285 miniatureModel,
286 miniatureJSONSave
287 }
288}
289
290async function afterImportSuccess (options: {
291 videoImport: MVideoImport
292 video: MVideoFullLight
293 videoFile: MVideoFile
294 user: MUserId
295}) {
296 const { video, videoFile, videoImport, user } = options
297
298 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport: Object.assign(videoImport, { Video: video }), success: true })
299
300 if (video.isBlacklisted()) {
301 const videoBlacklist = Object.assign(video.VideoBlacklist, { Video: video })
302
303 Notifier.Instance.notifyOnVideoAutoBlacklist(videoBlacklist)
304 } else {
305 Notifier.Instance.notifyOnNewVideoIfNeeded(video)
301 } 306 }
307
308 if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) {
309 await JobQueue.Instance.createJob(
310 await buildMoveToObjectStorageJob({ video, previousVideoState: VideoState.TO_IMPORT })
311 )
312 return
313 }
314
315 if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs?
316 await JobQueue.Instance.createJob(
317 await buildOptimizeOrMergeAudioJob({ video, videoFile, user })
318 )
319 }
320}
321
322async function onImportError (err: Error, tempVideoPath: string, videoImport: MVideoImportVideo) {
323 try {
324 if (tempVideoPath) await remove(tempVideoPath)
325 } catch (errUnlink) {
326 logger.warn('Cannot cleanup files after a video import error.', { err: errUnlink })
327 }
328
329 videoImport.error = err.message
330 if (videoImport.state !== VideoImportState.REJECTED) {
331 videoImport.state = VideoImportState.FAILED
332 }
333 await videoImport.save()
334
335 Notifier.Instance.notifyOnFinishedVideoImport({ videoImport, success: false })
302} 336}