diff options
Diffstat (limited to 'server/lib/job-queue/handlers/video-transcoding.ts')
-rw-r--r-- | server/lib/job-queue/handlers/video-transcoding.ts | 204 |
1 files changed, 204 insertions, 0 deletions
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts new file mode 100644 index 000000000..ceee83f13 --- /dev/null +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -0,0 +1,204 @@ | |||
1 | import * as Bull from 'bull' | ||
2 | import { VideoResolution, VideoState } from '../../../../shared' | ||
3 | import { logger } from '../../../helpers/logger' | ||
4 | import { VideoModel } from '../../../models/video/video' | ||
5 | import { JobQueue } from '../job-queue' | ||
6 | import { federateVideoIfNeeded } from '../../activitypub' | ||
7 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | ||
8 | import { sequelizeTypescript, CONFIG } from '../../../initializers' | ||
9 | import * as Bluebird from 'bluebird' | ||
10 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' | ||
11 | import { generateHlsPlaylist, importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding' | ||
12 | import { Notifier } from '../../notifier' | ||
13 | |||
14 | export type VideoTranscodingPayload = { | ||
15 | videoUUID: string | ||
16 | resolution?: VideoResolution | ||
17 | isNewVideo?: boolean | ||
18 | isPortraitMode?: boolean | ||
19 | generateHlsPlaylist?: boolean | ||
20 | } | ||
21 | |||
22 | export type VideoFileImportPayload = { | ||
23 | videoUUID: string, | ||
24 | filePath: string | ||
25 | } | ||
26 | |||
27 | async function processVideoFileImport (job: Bull.Job) { | ||
28 | const payload = job.data as VideoFileImportPayload | ||
29 | logger.info('Processing video file import in job %d.', job.id) | ||
30 | |||
31 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) | ||
32 | // No video, maybe deleted? | ||
33 | if (!video) { | ||
34 | logger.info('Do not process job %d, video does not exist.', job.id) | ||
35 | return undefined | ||
36 | } | ||
37 | |||
38 | await importVideoFile(video, payload.filePath) | ||
39 | |||
40 | await onVideoFileTranscoderOrImportSuccess(video) | ||
41 | return video | ||
42 | } | ||
43 | |||
44 | async function processVideoTranscoding (job: Bull.Job) { | ||
45 | const payload = job.data as VideoTranscodingPayload | ||
46 | logger.info('Processing video file in job %d.', job.id) | ||
47 | |||
48 | const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) | ||
49 | // No video, maybe deleted? | ||
50 | if (!video) { | ||
51 | logger.info('Do not process job %d, video does not exist.', job.id) | ||
52 | return undefined | ||
53 | } | ||
54 | |||
55 | if (payload.generateHlsPlaylist) { | ||
56 | await generateHlsPlaylist(video, payload.resolution, payload.isPortraitMode || false) | ||
57 | |||
58 | await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video) | ||
59 | } else if (payload.resolution) { // Transcoding in other resolution | ||
60 | await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false) | ||
61 | |||
62 | await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video, payload) | ||
63 | } else { | ||
64 | await optimizeVideofile(video) | ||
65 | |||
66 | await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload) | ||
67 | } | ||
68 | |||
69 | return video | ||
70 | } | ||
71 | |||
72 | async function onHlsPlaylistGenerationSuccess (video: VideoModel) { | ||
73 | if (video === undefined) return undefined | ||
74 | |||
75 | await sequelizeTypescript.transaction(async t => { | ||
76 | // Maybe the video changed in database, refresh it | ||
77 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) | ||
78 | // Video does not exist anymore | ||
79 | if (!videoDatabase) return undefined | ||
80 | |||
81 | // If the video was not published, we consider it is a new one for other instances | ||
82 | await federateVideoIfNeeded(videoDatabase, false, t) | ||
83 | }) | ||
84 | } | ||
85 | |||
86 | async function onVideoFileTranscoderOrImportSuccess (video: VideoModel, payload?: VideoTranscodingPayload) { | ||
87 | if (video === undefined) return undefined | ||
88 | |||
89 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { | ||
90 | // Maybe the video changed in database, refresh it | ||
91 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) | ||
92 | // Video does not exist anymore | ||
93 | if (!videoDatabase) return undefined | ||
94 | |||
95 | let videoPublished = false | ||
96 | |||
97 | // We transcoded the video file in another format, now we can publish it | ||
98 | if (videoDatabase.state !== VideoState.PUBLISHED) { | ||
99 | videoPublished = true | ||
100 | |||
101 | videoDatabase.state = VideoState.PUBLISHED | ||
102 | videoDatabase.publishedAt = new Date() | ||
103 | videoDatabase = await videoDatabase.save({ transaction: t }) | ||
104 | } | ||
105 | |||
106 | // If the video was not published, we consider it is a new one for other instances | ||
107 | await federateVideoIfNeeded(videoDatabase, videoPublished, t) | ||
108 | |||
109 | return { videoDatabase, videoPublished } | ||
110 | }) | ||
111 | |||
112 | // don't notify prior to scheduled video update | ||
113 | if (videoPublished && !videoDatabase.ScheduleVideoUpdate) { | ||
114 | Notifier.Instance.notifyOnNewVideo(videoDatabase) | ||
115 | Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) | ||
116 | } | ||
117 | |||
118 | await createHlsJobIfEnabled(payload) | ||
119 | } | ||
120 | |||
121 | async function onVideoFileOptimizerSuccess (videoArg: VideoModel, payload: VideoTranscodingPayload) { | ||
122 | if (videoArg === undefined) return undefined | ||
123 | |||
124 | // Outside the transaction (IO on disk) | ||
125 | const { videoFileResolution } = await videoArg.getOriginalFileResolution() | ||
126 | |||
127 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { | ||
128 | // Maybe the video changed in database, refresh it | ||
129 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) | ||
130 | // Video does not exist anymore | ||
131 | if (!videoDatabase) return undefined | ||
132 | |||
133 | // Create transcoding jobs if there are enabled resolutions | ||
134 | const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution) | ||
135 | logger.info( | ||
136 | 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution, | ||
137 | { resolutions: resolutionsEnabled } | ||
138 | ) | ||
139 | |||
140 | let videoPublished = false | ||
141 | |||
142 | if (resolutionsEnabled.length !== 0) { | ||
143 | const tasks: (Bluebird<Bull.Job<any>> | Promise<Bull.Job<any>>)[] = [] | ||
144 | |||
145 | for (const resolution of resolutionsEnabled) { | ||
146 | const dataInput = { | ||
147 | videoUUID: videoDatabase.uuid, | ||
148 | resolution | ||
149 | } | ||
150 | |||
151 | const p = JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) | ||
152 | tasks.push(p) | ||
153 | } | ||
154 | |||
155 | await Promise.all(tasks) | ||
156 | |||
157 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) | ||
158 | } else { | ||
159 | videoPublished = true | ||
160 | |||
161 | // No transcoding to do, it's now published | ||
162 | videoDatabase.state = VideoState.PUBLISHED | ||
163 | videoDatabase = await videoDatabase.save({ transaction: t }) | ||
164 | |||
165 | logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) | ||
166 | } | ||
167 | |||
168 | await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t) | ||
169 | |||
170 | return { videoDatabase, videoPublished } | ||
171 | }) | ||
172 | |||
173 | // don't notify prior to scheduled video update | ||
174 | if (!videoDatabase.ScheduleVideoUpdate) { | ||
175 | if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) | ||
176 | if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) | ||
177 | } | ||
178 | |||
179 | await createHlsJobIfEnabled(Object.assign({}, payload, { resolution: videoDatabase.getOriginalFile().resolution })) | ||
180 | } | ||
181 | |||
182 | // --------------------------------------------------------------------------- | ||
183 | |||
184 | export { | ||
185 | processVideoTranscoding, | ||
186 | processVideoFileImport | ||
187 | } | ||
188 | |||
189 | // --------------------------------------------------------------------------- | ||
190 | |||
191 | function createHlsJobIfEnabled (payload?: VideoTranscodingPayload) { | ||
192 | // Generate HLS playlist? | ||
193 | if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { | ||
194 | const hlsTranscodingPayload = { | ||
195 | videoUUID: payload.videoUUID, | ||
196 | resolution: payload.resolution, | ||
197 | isPortraitMode: payload.isPortraitMode, | ||
198 | |||
199 | generateHlsPlaylist: true | ||
200 | } | ||
201 | |||
202 | return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) | ||
203 | } | ||
204 | } | ||