diff options
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/video-file.ts | 127 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 1 |
2 files changed, 70 insertions, 58 deletions
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index 85f7dbfc2..f5ad076a6 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts | |||
@@ -1,17 +1,16 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as kue from 'kue' |
2 | import { VideoResolution } from '../../../../shared' | 2 | import { VideoResolution, VideoState } from '../../../../shared' |
3 | import { VideoPrivacy } from '../../../../shared/models/videos' | ||
4 | import { logger } from '../../../helpers/logger' | 3 | import { logger } from '../../../helpers/logger' |
5 | import { computeResolutionsToTranscode } from '../../../helpers/utils' | 4 | import { computeResolutionsToTranscode } from '../../../helpers/utils' |
6 | import { sequelizeTypescript } from '../../../initializers' | ||
7 | import { VideoModel } from '../../../models/video/video' | 5 | import { VideoModel } from '../../../models/video/video' |
8 | import { shareVideoByServerAndChannel } from '../../activitypub' | ||
9 | import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send' | ||
10 | import { JobQueue } from '../job-queue' | 6 | import { JobQueue } from '../job-queue' |
7 | import { federateVideoIfNeeded } from '../../activitypub' | ||
8 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | ||
9 | import { sequelizeTypescript } from '../../../initializers' | ||
11 | 10 | ||
12 | export type VideoFilePayload = { | 11 | export type VideoFilePayload = { |
13 | videoUUID: string | 12 | videoUUID: string |
14 | isNewVideo: boolean | 13 | isNewVideo?: boolean |
15 | resolution?: VideoResolution | 14 | resolution?: VideoResolution |
16 | isPortraitMode?: boolean | 15 | isPortraitMode?: boolean |
17 | } | 16 | } |
@@ -52,10 +51,20 @@ async function processVideoFile (job: kue.Job) { | |||
52 | // Transcoding in other resolution | 51 | // Transcoding in other resolution |
53 | if (payload.resolution) { | 52 | if (payload.resolution) { |
54 | await video.transcodeOriginalVideofile(payload.resolution, payload.isPortraitMode) | 53 | await video.transcodeOriginalVideofile(payload.resolution, payload.isPortraitMode) |
55 | await onVideoFileTranscoderOrImportSuccess(video) | 54 | |
55 | const options = { | ||
56 | arguments: [ video ], | ||
57 | errorMessage: 'Cannot execute onVideoFileTranscoderOrImportSuccess with many retries.' | ||
58 | } | ||
59 | await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, options) | ||
56 | } else { | 60 | } else { |
57 | await video.optimizeOriginalVideofile() | 61 | await video.optimizeOriginalVideofile() |
58 | await onVideoFileOptimizerSuccess(video, payload.isNewVideo) | 62 | |
63 | const options = { | ||
64 | arguments: [ video, payload.isNewVideo ], | ||
65 | errorMessage: 'Cannot execute onVideoFileOptimizerSuccess with many retries.' | ||
66 | } | ||
67 | await retryTransactionWrapper(onVideoFileOptimizerSuccess, options) | ||
59 | } | 68 | } |
60 | 69 | ||
61 | return video | 70 | return video |
@@ -64,68 +73,70 @@ async function processVideoFile (job: kue.Job) { | |||
64 | async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { | 73 | async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { |
65 | if (video === undefined) return undefined | 74 | if (video === undefined) return undefined |
66 | 75 | ||
67 | // Maybe the video changed in database, refresh it | 76 | return sequelizeTypescript.transaction(async t => { |
68 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | 77 | // Maybe the video changed in database, refresh it |
69 | // Video does not exist anymore | 78 | let videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid, t) |
70 | if (!videoDatabase) return undefined | 79 | // Video does not exist anymore |
80 | if (!videoDatabase) return undefined | ||
71 | 81 | ||
72 | if (video.privacy !== VideoPrivacy.PRIVATE) { | 82 | // We transcoded the video file in another format, now we can publish it |
73 | await sendUpdateVideo(video, undefined) | 83 | const oldState = videoDatabase.state |
74 | } | 84 | videoDatabase.state = VideoState.PUBLISHED |
85 | videoDatabase = await videoDatabase.save({ transaction: t }) | ||
86 | |||
87 | // If the video was not published, we consider it is a new one for other instances | ||
88 | const isNewVideo = oldState !== VideoState.PUBLISHED | ||
89 | await federateVideoIfNeeded(videoDatabase, isNewVideo, t) | ||
75 | 90 | ||
76 | return undefined | 91 | return undefined |
92 | }) | ||
77 | } | 93 | } |
78 | 94 | ||
79 | async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boolean) { | 95 | async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boolean) { |
80 | if (video === undefined) return undefined | 96 | if (video === undefined) return undefined |
81 | 97 | ||
82 | // Maybe the video changed in database, refresh it | 98 | // Outside the transaction (IO on disk) |
83 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | 99 | const { videoFileResolution } = await video.getOriginalFileResolution() |
84 | // Video does not exist anymore | 100 | |
85 | if (!videoDatabase) return undefined | 101 | return sequelizeTypescript.transaction(async t => { |
86 | 102 | // Maybe the video changed in database, refresh it | |
87 | if (video.privacy !== VideoPrivacy.PRIVATE) { | 103 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid, t) |
88 | if (isNewVideo !== false) { | 104 | // Video does not exist anymore |
89 | // Now we'll add the video's meta data to our followers | 105 | if (!videoDatabase) return undefined |
90 | await sequelizeTypescript.transaction(async t => { | 106 | |
91 | await sendCreateVideo(video, t) | 107 | // Create transcoding jobs if there are enabled resolutions |
92 | await shareVideoByServerAndChannel(video, t) | 108 | const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution) |
93 | }) | 109 | logger.info( |
94 | } else { | 110 | 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution, |
95 | await sendUpdateVideo(video, undefined) | 111 | { resolutions: resolutionsEnabled } |
96 | } | 112 | ) |
97 | } | 113 | |
98 | 114 | if (resolutionsEnabled.length !== 0) { | |
99 | const { videoFileResolution } = await videoDatabase.getOriginalFileResolution() | 115 | const tasks: Promise<any>[] = [] |
100 | 116 | ||
101 | // Create transcoding jobs if there are enabled resolutions | 117 | for (const resolution of resolutionsEnabled) { |
102 | const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution) | 118 | const dataInput = { |
103 | logger.info( | 119 | videoUUID: videoDatabase.uuid, |
104 | 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution, | 120 | resolution |
105 | { resolutions: resolutionsEnabled } | 121 | } |
106 | ) | 122 | |
123 | const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) | ||
124 | tasks.push(p) | ||
125 | } | ||
107 | 126 | ||
108 | if (resolutionsEnabled.length !== 0) { | 127 | await Promise.all(tasks) |
109 | const tasks: Promise<any>[] = [] | ||
110 | 128 | ||
111 | for (const resolution of resolutionsEnabled) { | 129 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) |
112 | const dataInput = { | 130 | } else { |
113 | videoUUID: videoDatabase.uuid, | 131 | // No transcoding to do, it's now published |
114 | resolution, | 132 | video.state = VideoState.PUBLISHED |
115 | isNewVideo | 133 | video = await video.save({ transaction: t }) |
116 | } | ||
117 | 134 | ||
118 | const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) | 135 | logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid) |
119 | tasks.push(p) | ||
120 | } | 136 | } |
121 | 137 | ||
122 | await Promise.all(tasks) | 138 | return federateVideoIfNeeded(video, isNewVideo, t) |
123 | 139 | }) | |
124 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) | ||
125 | } else { | ||
126 | logger.info('No transcoding jobs created for video %s (no resolutions enabled).') | ||
127 | return undefined | ||
128 | } | ||
129 | } | 140 | } |
130 | 141 | ||
131 | // --------------------------------------------------------------------------- | 142 | // --------------------------------------------------------------------------- |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index bdfa19b61..695fe0eea 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -79,6 +79,7 @@ class JobQueue { | |||
79 | const res = await handlers[ handlerName ](job) | 79 | const res = await handlers[ handlerName ](job) |
80 | return done(null, res) | 80 | return done(null, res) |
81 | } catch (err) { | 81 | } catch (err) { |
82 | logger.error('Cannot execute job %d.', job.id, { err }) | ||
82 | return done(err) | 83 | return done(err) |
83 | } | 84 | } |
84 | }) | 85 | }) |