diff options
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/video-import.ts | 226 |
1 files changed, 130 insertions, 96 deletions
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 9901b878c..99016f511 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -12,7 +12,8 @@ import { buildMoveToObjectStorageJob, buildOptimizeOrMergeAudioJob } from '@serv | |||
12 | import { VideoPathManager } from '@server/lib/video-path-manager' | 12 | import { VideoPathManager } from '@server/lib/video-path-manager' |
13 | import { buildNextVideoState } from '@server/lib/video-state' | 13 | import { buildNextVideoState } from '@server/lib/video-state' |
14 | import { ThumbnailModel } from '@server/models/video/thumbnail' | 14 | import { ThumbnailModel } from '@server/models/video/thumbnail' |
15 | import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' | 15 | import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' |
16 | import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' | ||
16 | import { getLowercaseExtension } from '@shared/core-utils' | 17 | import { getLowercaseExtension } from '@shared/core-utils' |
17 | import { isAudioFile } from '@shared/extra-utils' | 18 | import { isAudioFile } from '@shared/extra-utils' |
18 | import { | 19 | import { |
@@ -36,7 +37,6 @@ import { sequelizeTypescript } from '../../../initializers/database' | |||
36 | import { VideoModel } from '../../../models/video/video' | 37 | import { VideoModel } from '../../../models/video/video' |
37 | import { VideoFileModel } from '../../../models/video/video-file' | 38 | import { VideoFileModel } from '../../../models/video/video-file' |
38 | import { VideoImportModel } from '../../../models/video/video-import' | 39 | import { VideoImportModel } from '../../../models/video/video-import' |
39 | import { MThumbnail } from '../../../types/models/video/thumbnail' | ||
40 | import { federateVideoIfNeeded } from '../../activitypub/videos' | 40 | import { federateVideoIfNeeded } from '../../activitypub/videos' |
41 | import { Notifier } from '../../notifier' | 41 | import { Notifier } from '../../notifier' |
42 | import { generateVideoMiniature } from '../../thumbnail' | 42 | import { generateVideoMiniature } from '../../thumbnail' |
@@ -178,125 +178,159 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
178 | } | 178 | } |
179 | 179 | ||
180 | // Video is accepted, resuming preparation | 180 | // Video is accepted, resuming preparation |
181 | const videoWithFiles = Object.assign(videoImport.Video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] }) | 181 | const videoFileLockReleaser = await VideoPathManager.Instance.lockFiles(videoImport.Video.uuid) |
182 | // To clean files if the import fails | ||
183 | const videoImportWithFiles: MVideoImportDefaultFiles = Object.assign(videoImport, { Video: videoWithFiles }) | ||
184 | |||
185 | // Move file | ||
186 | const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile) | ||
187 | await move(tempVideoPath, videoDestFile) | ||
188 | tempVideoPath = null // This path is not used anymore | ||
189 | |||
190 | // Generate miniature if the import did not created it | ||
191 | let thumbnailModel: MThumbnail | ||
192 | let thumbnailSave: object | ||
193 | if (!videoImportWithFiles.Video.getMiniature()) { | ||
194 | thumbnailModel = await generateVideoMiniature({ | ||
195 | video: videoImportWithFiles.Video, | ||
196 | videoFile, | ||
197 | type: ThumbnailType.MINIATURE | ||
198 | }) | ||
199 | thumbnailSave = thumbnailModel.toJSON() | ||
200 | } | ||
201 | 182 | ||
202 | // Generate preview if the import did not created it | 183 | try { |
203 | let previewModel: MThumbnail | 184 | const videoImportWithFiles = await refreshVideoImportFromDB(videoImport, videoFile) |
204 | let previewSave: object | ||
205 | if (!videoImportWithFiles.Video.getPreview()) { | ||
206 | previewModel = await generateVideoMiniature({ | ||
207 | video: videoImportWithFiles.Video, | ||
208 | videoFile, | ||
209 | type: ThumbnailType.PREVIEW | ||
210 | }) | ||
211 | previewSave = previewModel.toJSON() | ||
212 | } | ||
213 | 185 | ||
214 | // Create torrent | 186 | // Move file |
215 | await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile) | 187 | const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile) |
188 | await move(tempVideoPath, videoDestFile) | ||
216 | 189 | ||
217 | const videoFileSave = videoFile.toJSON() | 190 | tempVideoPath = null // This path is not used anymore |
218 | 191 | ||
219 | const { videoImportUpdated, video } = await retryTransactionWrapper(() => { | 192 | let { |
220 | return sequelizeTypescript.transaction(async t => { | 193 | miniatureModel: thumbnailModel, |
221 | const videoImportToUpdate = videoImportWithFiles as MVideoImportVideo | 194 | miniatureJSONSave: thumbnailSave |
195 | } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.MINIATURE) | ||
222 | 196 | ||
223 | // Refresh video | 197 | let { |
224 | const video = await VideoModel.load(videoImportToUpdate.videoId, t) | 198 | miniatureModel: previewModel, |
225 | if (!video) throw new Error('Video linked to import ' + videoImportToUpdate.videoId + ' does not exist anymore.') | 199 | miniatureJSONSave: previewSave |
200 | } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.PREVIEW) | ||
226 | 201 | ||
227 | const videoFileCreated = await videoFile.save({ transaction: t }) | 202 | // Create torrent |
203 | await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile) | ||
228 | 204 | ||
229 | // Update video DB object | 205 | const videoFileSave = videoFile.toJSON() |
230 | video.duration = duration | ||
231 | video.state = buildNextVideoState(video.state) | ||
232 | await video.save({ transaction: t }) | ||
233 | 206 | ||
234 | if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) | 207 | const { videoImportUpdated, video } = await retryTransactionWrapper(() => { |
235 | if (previewModel) await video.addAndSaveThumbnail(previewModel, t) | 208 | return sequelizeTypescript.transaction(async t => { |
209 | // Refresh video | ||
210 | const video = await VideoModel.load(videoImportWithFiles.videoId, t) | ||
211 | if (!video) throw new Error('Video linked to import ' + videoImportWithFiles.videoId + ' does not exist anymore.') | ||
236 | 212 | ||
237 | // Now we can federate the video (reload from database, we need more attributes) | 213 | await videoFile.save({ transaction: t }) |
238 | const videoForFederation = await VideoModel.loadFull(video.uuid, t) | ||
239 | await federateVideoIfNeeded(videoForFederation, true, t) | ||
240 | 214 | ||
241 | // Update video import object | 215 | // Update video DB object |
242 | videoImportToUpdate.state = VideoImportState.SUCCESS | 216 | video.duration = duration |
243 | const videoImportUpdated = await videoImportToUpdate.save({ transaction: t }) as MVideoImportVideo | 217 | video.state = buildNextVideoState(video.state) |
244 | videoImportUpdated.Video = video | 218 | await video.save({ transaction: t }) |
245 | 219 | ||
246 | videoImportToUpdate.Video = Object.assign(video, { VideoFiles: [ videoFileCreated ] }) | 220 | if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) |
221 | if (previewModel) await video.addAndSaveThumbnail(previewModel, t) | ||
247 | 222 | ||
248 | logger.info('Video %s imported.', video.uuid) | 223 | // Now we can federate the video (reload from database, we need more attributes) |
224 | const videoForFederation = await VideoModel.loadFull(video.uuid, t) | ||
225 | await federateVideoIfNeeded(videoForFederation, true, t) | ||
249 | 226 | ||
250 | return { videoImportUpdated, video: videoForFederation } | 227 | // Update video import object |
251 | }).catch(err => { | 228 | videoImportWithFiles.state = VideoImportState.SUCCESS |
252 | // Reset fields | 229 | const videoImportUpdated = await videoImportWithFiles.save({ transaction: t }) as MVideoImport |
253 | if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave) | ||
254 | if (previewModel) previewModel = new ThumbnailModel(previewSave) | ||
255 | 230 | ||
256 | videoFile = new VideoFileModel(videoFileSave) | 231 | logger.info('Video %s imported.', video.uuid) |
257 | 232 | ||
258 | throw err | 233 | return { videoImportUpdated, video: videoForFederation } |
259 | }) | 234 | }).catch(err => { |
260 | }) | 235 | // Reset fields |
236 | if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave) | ||
237 | if (previewModel) previewModel = new ThumbnailModel(previewSave) | ||
261 | 238 | ||
262 | Notifier.Instance.notifyOnFinishedVideoImport({ videoImport: videoImportUpdated, success: true }) | 239 | videoFile = new VideoFileModel(videoFileSave) |
263 | 240 | ||
264 | if (video.isBlacklisted()) { | 241 | throw err |
265 | const videoBlacklist = Object.assign(video.VideoBlacklist, { Video: video }) | 242 | }) |
243 | }) | ||
266 | 244 | ||
267 | Notifier.Instance.notifyOnVideoAutoBlacklist(videoBlacklist) | 245 | await afterImportSuccess({ videoImport: videoImportUpdated, video, videoFile, user: videoImport.User }) |
268 | } else { | 246 | } finally { |
269 | Notifier.Instance.notifyOnNewVideoIfNeeded(video) | 247 | videoFileLockReleaser() |
270 | } | 248 | } |
249 | } catch (err) { | ||
250 | await onImportError(err, tempVideoPath, videoImport) | ||
271 | 251 | ||
272 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | 252 | throw err |
273 | await JobQueue.Instance.createJob( | 253 | } |
274 | await buildMoveToObjectStorageJob({ video: videoImportUpdated.Video, previousVideoState: VideoState.TO_IMPORT }) | 254 | } |
275 | ) | ||
276 | } | ||
277 | 255 | ||
278 | // Create transcoding jobs? | 256 | async function refreshVideoImportFromDB (videoImport: MVideoImportDefault, videoFile: MVideoFile): Promise<MVideoImportDefaultFiles> { |
279 | if (video.state === VideoState.TO_TRANSCODE) { | 257 | // Refresh video, privacy may have changed |
280 | await JobQueue.Instance.createJob( | 258 | const video = await videoImport.Video.reload() |
281 | await buildOptimizeOrMergeAudioJob({ video: videoImportUpdated.Video, videoFile, user: videoImport.User }) | 259 | const videoWithFiles = Object.assign(video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] }) |
282 | ) | ||
283 | } | ||
284 | 260 | ||
285 | } catch (err) { | 261 | return Object.assign(videoImport, { Video: videoWithFiles }) |
286 | try { | 262 | } |
287 | if (tempVideoPath) await remove(tempVideoPath) | ||
288 | } catch (errUnlink) { | ||
289 | logger.warn('Cannot cleanup files after a video import error.', { err: errUnlink }) | ||
290 | } | ||
291 | 263 | ||
292 | videoImport.error = err.message | 264 | async function generateMiniature (videoImportWithFiles: MVideoImportDefaultFiles, videoFile: MVideoFile, thumbnailType: ThumbnailType) { |
293 | if (videoImport.state !== VideoImportState.REJECTED) { | 265 | // Generate miniature if the import did not created it |
294 | videoImport.state = VideoImportState.FAILED | 266 | const needsMiniature = thumbnailType === ThumbnailType.MINIATURE |
267 | ? !videoImportWithFiles.Video.getMiniature() | ||
268 | : !videoImportWithFiles.Video.getPreview() | ||
269 | |||
270 | if (!needsMiniature) { | ||
271 | return { | ||
272 | miniatureModel: null, | ||
273 | miniatureJSONSave: null | ||
295 | } | 274 | } |
296 | await videoImport.save() | 275 | } |
297 | 276 | ||
298 | Notifier.Instance.notifyOnFinishedVideoImport({ videoImport, success: false }) | 277 | const miniatureModel = await generateVideoMiniature({ |
278 | video: videoImportWithFiles.Video, | ||
279 | videoFile, | ||
280 | type: ThumbnailType.MINIATURE | ||
281 | }) | ||
282 | const miniatureJSONSave = miniatureModel.toJSON() | ||
299 | 283 | ||
300 | throw err | 284 | return { |
285 | miniatureModel, | ||
286 | miniatureJSONSave | ||
287 | } | ||
288 | } | ||
289 | |||
290 | async function afterImportSuccess (options: { | ||
291 | videoImport: MVideoImport | ||
292 | video: MVideoFullLight | ||
293 | videoFile: MVideoFile | ||
294 | user: MUserId | ||
295 | }) { | ||
296 | const { video, videoFile, videoImport, user } = options | ||
297 | |||
298 | Notifier.Instance.notifyOnFinishedVideoImport({ videoImport: Object.assign(videoImport, { Video: video }), success: true }) | ||
299 | |||
300 | if (video.isBlacklisted()) { | ||
301 | const videoBlacklist = Object.assign(video.VideoBlacklist, { Video: video }) | ||
302 | |||
303 | Notifier.Instance.notifyOnVideoAutoBlacklist(videoBlacklist) | ||
304 | } else { | ||
305 | Notifier.Instance.notifyOnNewVideoIfNeeded(video) | ||
301 | } | 306 | } |
307 | |||
308 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | ||
309 | await JobQueue.Instance.createJob( | ||
310 | await buildMoveToObjectStorageJob({ video, previousVideoState: VideoState.TO_IMPORT }) | ||
311 | ) | ||
312 | return | ||
313 | } | ||
314 | |||
315 | if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs? | ||
316 | await JobQueue.Instance.createJob( | ||
317 | await buildOptimizeOrMergeAudioJob({ video, videoFile, user }) | ||
318 | ) | ||
319 | } | ||
320 | } | ||
321 | |||
322 | async function onImportError (err: Error, tempVideoPath: string, videoImport: MVideoImportVideo) { | ||
323 | try { | ||
324 | if (tempVideoPath) await remove(tempVideoPath) | ||
325 | } catch (errUnlink) { | ||
326 | logger.warn('Cannot cleanup files after a video import error.', { err: errUnlink }) | ||
327 | } | ||
328 | |||
329 | videoImport.error = err.message | ||
330 | if (videoImport.state !== VideoImportState.REJECTED) { | ||
331 | videoImport.state = VideoImportState.FAILED | ||
332 | } | ||
333 | await videoImport.save() | ||
334 | |||
335 | Notifier.Instance.notifyOnFinishedVideoImport({ videoImport, success: false }) | ||
302 | } | 336 | } |