diff options
author | Chocobozzz <me@florianbigard.com> | 2023-07-31 14:34:36 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2023-08-11 15:02:33 +0200 |
commit | 3a4992633ee62d5edfbb484d9c6bcb3cf158489d (patch) | |
tree | e4510b39bdac9c318fdb4b47018d08f15368b8f0 /server/lib/job-queue/handlers/video-import.ts | |
parent | 04d1da5621d25d59bd5fa1543b725c497bf5d9a8 (diff) | |
download | PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.gz PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.tar.zst PeerTube-3a4992633ee62d5edfbb484d9c6bcb3cf158489d.zip |
Migrate server to ESM
Sorry for the very big commit that may lead to git log issues and merge
conflicts, but it's a major step forward:
* Server can be faster at startup because imports() are async and we can
easily lazy import big modules
* Angular doesn't seem to support ES import (with .js extension), so we
had to correctly organize peertube into a monorepo:
* Use yarn workspace feature
* Use typescript reference projects for dependencies
* Shared projects have been moved into "packages", each one is now a
node module (with a dedicated package.json/tsconfig.json)
* server/tools have been moved into apps/ and is now a dedicated app
bundled and published on NPM so users don't have to build peertube
cli tools manually
* server/tests have been moved into packages/ so we don't compile
them every time we want to run the server
* Use isolatedModule option:
* Had to move from const enum to const
(https://www.typescriptlang.org/docs/handbook/enums.html#objects-vs-enums)
* Had to explictely specify "type" imports when used in decorators
* Prefer tsx (that uses esbuild under the hood) instead of ts-node to
load typescript files (tests with mocha or scripts):
* To reduce test complexity as esbuild doesn't support decorator
metadata, we only test server files that do not import server
models
* We still build tests files into js files for a faster CI
* Remove unmaintained peertube CLI import script
* Removed some barrels to speed up execution (less imports)
Diffstat (limited to 'server/lib/job-queue/handlers/video-import.ts')
-rw-r--r-- | server/lib/job-queue/handlers/video-import.ts | 344 |
1 files changed, 0 insertions, 344 deletions
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts deleted file mode 100644 index e5cd258d6..000000000 --- a/server/lib/job-queue/handlers/video-import.ts +++ /dev/null | |||
@@ -1,344 +0,0 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { move, remove, stat } from 'fs-extra' | ||
3 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
4 | import { YoutubeDLWrapper } from '@server/helpers/youtube-dl' | ||
5 | import { CONFIG } from '@server/initializers/config' | ||
6 | import { isPostImportVideoAccepted } from '@server/lib/moderation' | ||
7 | import { generateWebVideoFilename } from '@server/lib/paths' | ||
8 | import { Hooks } from '@server/lib/plugins/hooks' | ||
9 | import { ServerConfigManager } from '@server/lib/server-config-manager' | ||
10 | import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' | ||
11 | import { isAbleToUploadVideo } from '@server/lib/user' | ||
12 | import { buildMoveToObjectStorageJob } from '@server/lib/video' | ||
13 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
14 | import { buildNextVideoState } from '@server/lib/video-state' | ||
15 | import { ThumbnailModel } from '@server/models/video/thumbnail' | ||
16 | import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' | ||
17 | import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' | ||
18 | import { getLowercaseExtension } from '@shared/core-utils' | ||
19 | import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS, isAudioFile } from '@shared/ffmpeg' | ||
20 | import { | ||
21 | ThumbnailType, | ||
22 | VideoImportPayload, | ||
23 | VideoImportPreventExceptionResult, | ||
24 | VideoImportState, | ||
25 | VideoImportTorrentPayload, | ||
26 | VideoImportTorrentPayloadType, | ||
27 | VideoImportYoutubeDLPayload, | ||
28 | VideoImportYoutubeDLPayloadType, | ||
29 | VideoResolution, | ||
30 | VideoState | ||
31 | } from '@shared/models' | ||
32 | import { logger } from '../../../helpers/logger' | ||
33 | import { getSecureTorrentName } from '../../../helpers/utils' | ||
34 | import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' | ||
35 | import { JOB_TTL } from '../../../initializers/constants' | ||
36 | import { sequelizeTypescript } from '../../../initializers/database' | ||
37 | import { VideoModel } from '../../../models/video/video' | ||
38 | import { VideoFileModel } from '../../../models/video/video-file' | ||
39 | import { VideoImportModel } from '../../../models/video/video-import' | ||
40 | import { federateVideoIfNeeded } from '../../activitypub/videos' | ||
41 | import { Notifier } from '../../notifier' | ||
42 | import { generateLocalVideoMiniature } from '../../thumbnail' | ||
43 | import { JobQueue } from '../job-queue' | ||
44 | |||
45 | async function processVideoImport (job: Job): Promise<VideoImportPreventExceptionResult> { | ||
46 | const payload = job.data as VideoImportPayload | ||
47 | |||
48 | const videoImport = await getVideoImportOrDie(payload) | ||
49 | if (videoImport.state === VideoImportState.CANCELLED) { | ||
50 | logger.info('Do not process import since it has been cancelled', { payload }) | ||
51 | return { resultType: 'success' } | ||
52 | } | ||
53 | |||
54 | videoImport.state = VideoImportState.PROCESSING | ||
55 | await videoImport.save() | ||
56 | |||
57 | try { | ||
58 | if (payload.type === 'youtube-dl') await processYoutubeDLImport(job, videoImport, payload) | ||
59 | if (payload.type === 'magnet-uri' || payload.type === 'torrent-file') await processTorrentImport(job, videoImport, payload) | ||
60 | |||
61 | return { resultType: 'success' } | ||
62 | } catch (err) { | ||
63 | if (!payload.preventException) throw err | ||
64 | |||
65 | logger.warn('Catch error in video import to send value to parent job.', { payload, err }) | ||
66 | return { resultType: 'error' } | ||
67 | } | ||
68 | } | ||
69 | |||
70 | // --------------------------------------------------------------------------- | ||
71 | |||
72 | export { | ||
73 | processVideoImport | ||
74 | } | ||
75 | |||
76 | // --------------------------------------------------------------------------- | ||
77 | |||
78 | async function processTorrentImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportTorrentPayload) { | ||
79 | logger.info('Processing torrent video import in job %s.', job.id) | ||
80 | |||
81 | const options = { type: payload.type, videoImportId: payload.videoImportId } | ||
82 | |||
83 | const target = { | ||
84 | torrentName: videoImport.torrentName ? getSecureTorrentName(videoImport.torrentName) : undefined, | ||
85 | uri: videoImport.magnetUri | ||
86 | } | ||
87 | return processFile(() => downloadWebTorrentVideo(target, JOB_TTL['video-import']), videoImport, options) | ||
88 | } | ||
89 | |||
90 | async function processYoutubeDLImport (job: Job, videoImport: MVideoImportDefault, payload: VideoImportYoutubeDLPayload) { | ||
91 | logger.info('Processing youtubeDL video import in job %s.', job.id) | ||
92 | |||
93 | const options = { type: payload.type, videoImportId: videoImport.id } | ||
94 | |||
95 | const youtubeDL = new YoutubeDLWrapper( | ||
96 | videoImport.targetUrl, | ||
97 | ServerConfigManager.Instance.getEnabledResolutions('vod'), | ||
98 | CONFIG.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION | ||
99 | ) | ||
100 | |||
101 | return processFile( | ||
102 | () => youtubeDL.downloadVideo(payload.fileExt, JOB_TTL['video-import']), | ||
103 | videoImport, | ||
104 | options | ||
105 | ) | ||
106 | } | ||
107 | |||
108 | async function getVideoImportOrDie (payload: VideoImportPayload) { | ||
109 | const videoImport = await VideoImportModel.loadAndPopulateVideo(payload.videoImportId) | ||
110 | if (!videoImport?.Video) { | ||
111 | throw new Error(`Cannot import video ${payload.videoImportId}: the video import or video linked to this import does not exist anymore.`) | ||
112 | } | ||
113 | |||
114 | return videoImport | ||
115 | } | ||
116 | |||
117 | type ProcessFileOptions = { | ||
118 | type: VideoImportYoutubeDLPayloadType | VideoImportTorrentPayloadType | ||
119 | videoImportId: number | ||
120 | } | ||
121 | async function processFile (downloader: () => Promise<string>, videoImport: MVideoImportDefault, options: ProcessFileOptions) { | ||
122 | let tempVideoPath: string | ||
123 | let videoFile: VideoFileModel | ||
124 | |||
125 | try { | ||
126 | // Download video from youtubeDL | ||
127 | tempVideoPath = await downloader() | ||
128 | |||
129 | // Get information about this video | ||
130 | const stats = await stat(tempVideoPath) | ||
131 | const isAble = await isAbleToUploadVideo(videoImport.User.id, stats.size) | ||
132 | if (isAble === false) { | ||
133 | throw new Error('The user video quota is exceeded with this video to import.') | ||
134 | } | ||
135 | |||
136 | const probe = await ffprobePromise(tempVideoPath) | ||
137 | |||
138 | const { resolution } = await isAudioFile(tempVideoPath, probe) | ||
139 | ? { resolution: VideoResolution.H_NOVIDEO } | ||
140 | : await getVideoStreamDimensionsInfo(tempVideoPath, probe) | ||
141 | |||
142 | const fps = await getVideoStreamFPS(tempVideoPath, probe) | ||
143 | const duration = await getVideoStreamDuration(tempVideoPath, probe) | ||
144 | |||
145 | // Prepare video file object for creation in database | ||
146 | const fileExt = getLowercaseExtension(tempVideoPath) | ||
147 | const videoFileData = { | ||
148 | extname: fileExt, | ||
149 | resolution, | ||
150 | size: stats.size, | ||
151 | filename: generateWebVideoFilename(resolution, fileExt), | ||
152 | fps, | ||
153 | videoId: videoImport.videoId | ||
154 | } | ||
155 | videoFile = new VideoFileModel(videoFileData) | ||
156 | |||
157 | const hookName = options.type === 'youtube-dl' | ||
158 | ? 'filter:api.video.post-import-url.accept.result' | ||
159 | : 'filter:api.video.post-import-torrent.accept.result' | ||
160 | |||
161 | // Check we accept this video | ||
162 | const acceptParameters = { | ||
163 | videoImport, | ||
164 | video: videoImport.Video, | ||
165 | videoFilePath: tempVideoPath, | ||
166 | videoFile, | ||
167 | user: videoImport.User | ||
168 | } | ||
169 | const acceptedResult = await Hooks.wrapFun(isPostImportVideoAccepted, acceptParameters, hookName) | ||
170 | |||
171 | if (acceptedResult.accepted !== true) { | ||
172 | logger.info('Refused imported video.', { acceptedResult, acceptParameters }) | ||
173 | |||
174 | videoImport.state = VideoImportState.REJECTED | ||
175 | await videoImport.save() | ||
176 | |||
177 | throw new Error(acceptedResult.errorMessage) | ||
178 | } | ||
179 | |||
180 | // Video is accepted, resuming preparation | ||
181 | const videoFileLockReleaser = await VideoPathManager.Instance.lockFiles(videoImport.Video.uuid) | ||
182 | |||
183 | try { | ||
184 | const videoImportWithFiles = await refreshVideoImportFromDB(videoImport, videoFile) | ||
185 | |||
186 | // Move file | ||
187 | const videoDestFile = VideoPathManager.Instance.getFSVideoFileOutputPath(videoImportWithFiles.Video, videoFile) | ||
188 | await move(tempVideoPath, videoDestFile) | ||
189 | |||
190 | tempVideoPath = null // This path is not used anymore | ||
191 | |||
192 | let { | ||
193 | miniatureModel: thumbnailModel, | ||
194 | miniatureJSONSave: thumbnailSave | ||
195 | } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.MINIATURE) | ||
196 | |||
197 | let { | ||
198 | miniatureModel: previewModel, | ||
199 | miniatureJSONSave: previewSave | ||
200 | } = await generateMiniature(videoImportWithFiles, videoFile, ThumbnailType.PREVIEW) | ||
201 | |||
202 | // Create torrent | ||
203 | await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile) | ||
204 | |||
205 | const videoFileSave = videoFile.toJSON() | ||
206 | |||
207 | const { videoImportUpdated, video } = await retryTransactionWrapper(() => { | ||
208 | return sequelizeTypescript.transaction(async t => { | ||
209 | // Refresh video | ||
210 | const video = await VideoModel.load(videoImportWithFiles.videoId, t) | ||
211 | if (!video) throw new Error('Video linked to import ' + videoImportWithFiles.videoId + ' does not exist anymore.') | ||
212 | |||
213 | await videoFile.save({ transaction: t }) | ||
214 | |||
215 | // Update video DB object | ||
216 | video.duration = duration | ||
217 | video.state = buildNextVideoState(video.state) | ||
218 | await video.save({ transaction: t }) | ||
219 | |||
220 | if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) | ||
221 | if (previewModel) await video.addAndSaveThumbnail(previewModel, t) | ||
222 | |||
223 | // Now we can federate the video (reload from database, we need more attributes) | ||
224 | const videoForFederation = await VideoModel.loadFull(video.uuid, t) | ||
225 | await federateVideoIfNeeded(videoForFederation, true, t) | ||
226 | |||
227 | // Update video import object | ||
228 | videoImportWithFiles.state = VideoImportState.SUCCESS | ||
229 | const videoImportUpdated = await videoImportWithFiles.save({ transaction: t }) as MVideoImport | ||
230 | |||
231 | logger.info('Video %s imported.', video.uuid) | ||
232 | |||
233 | return { videoImportUpdated, video: videoForFederation } | ||
234 | }).catch(err => { | ||
235 | // Reset fields | ||
236 | if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave) | ||
237 | if (previewModel) previewModel = new ThumbnailModel(previewSave) | ||
238 | |||
239 | videoFile = new VideoFileModel(videoFileSave) | ||
240 | |||
241 | throw err | ||
242 | }) | ||
243 | }) | ||
244 | |||
245 | await afterImportSuccess({ videoImport: videoImportUpdated, video, videoFile, user: videoImport.User, videoFileAlreadyLocked: true }) | ||
246 | } finally { | ||
247 | videoFileLockReleaser() | ||
248 | } | ||
249 | } catch (err) { | ||
250 | await onImportError(err, tempVideoPath, videoImport) | ||
251 | |||
252 | throw err | ||
253 | } | ||
254 | } | ||
255 | |||
256 | async function refreshVideoImportFromDB (videoImport: MVideoImportDefault, videoFile: MVideoFile): Promise<MVideoImportDefaultFiles> { | ||
257 | // Refresh video, privacy may have changed | ||
258 | const video = await videoImport.Video.reload() | ||
259 | const videoWithFiles = Object.assign(video, { VideoFiles: [ videoFile ], VideoStreamingPlaylists: [] }) | ||
260 | |||
261 | return Object.assign(videoImport, { Video: videoWithFiles }) | ||
262 | } | ||
263 | |||
264 | async function generateMiniature (videoImportWithFiles: MVideoImportDefaultFiles, videoFile: MVideoFile, thumbnailType: ThumbnailType) { | ||
265 | // Generate miniature if the import did not created it | ||
266 | const needsMiniature = thumbnailType === ThumbnailType.MINIATURE | ||
267 | ? !videoImportWithFiles.Video.getMiniature() | ||
268 | : !videoImportWithFiles.Video.getPreview() | ||
269 | |||
270 | if (!needsMiniature) { | ||
271 | return { | ||
272 | miniatureModel: null, | ||
273 | miniatureJSONSave: null | ||
274 | } | ||
275 | } | ||
276 | |||
277 | const miniatureModel = await generateLocalVideoMiniature({ | ||
278 | video: videoImportWithFiles.Video, | ||
279 | videoFile, | ||
280 | type: thumbnailType | ||
281 | }) | ||
282 | const miniatureJSONSave = miniatureModel.toJSON() | ||
283 | |||
284 | return { | ||
285 | miniatureModel, | ||
286 | miniatureJSONSave | ||
287 | } | ||
288 | } | ||
289 | |||
290 | async function afterImportSuccess (options: { | ||
291 | videoImport: MVideoImport | ||
292 | video: MVideoFullLight | ||
293 | videoFile: MVideoFile | ||
294 | user: MUserId | ||
295 | videoFileAlreadyLocked: boolean | ||
296 | }) { | ||
297 | const { video, videoFile, videoImport, user, videoFileAlreadyLocked } = options | ||
298 | |||
299 | Notifier.Instance.notifyOnFinishedVideoImport({ videoImport: Object.assign(videoImport, { Video: video }), success: true }) | ||
300 | |||
301 | if (video.isBlacklisted()) { | ||
302 | const videoBlacklist = Object.assign(video.VideoBlacklist, { Video: video }) | ||
303 | |||
304 | Notifier.Instance.notifyOnVideoAutoBlacklist(videoBlacklist) | ||
305 | } else { | ||
306 | Notifier.Instance.notifyOnNewVideoIfNeeded(video) | ||
307 | } | ||
308 | |||
309 | // Generate the storyboard in the job queue, and don't forget to federate an update after | ||
310 | await JobQueue.Instance.createJob({ | ||
311 | type: 'generate-video-storyboard' as 'generate-video-storyboard', | ||
312 | payload: { | ||
313 | videoUUID: video.uuid, | ||
314 | federate: true | ||
315 | } | ||
316 | }) | ||
317 | |||
318 | if (video.state === VideoState.TO_MOVE_TO_EXTERNAL_STORAGE) { | ||
319 | await JobQueue.Instance.createJob( | ||
320 | await buildMoveToObjectStorageJob({ video, previousVideoState: VideoState.TO_IMPORT }) | ||
321 | ) | ||
322 | return | ||
323 | } | ||
324 | |||
325 | if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs? | ||
326 | await createOptimizeOrMergeAudioJobs({ video, videoFile, isNewVideo: true, user, videoFileAlreadyLocked }) | ||
327 | } | ||
328 | } | ||
329 | |||
330 | async function onImportError (err: Error, tempVideoPath: string, videoImport: MVideoImportVideo) { | ||
331 | try { | ||
332 | if (tempVideoPath) await remove(tempVideoPath) | ||
333 | } catch (errUnlink) { | ||
334 | logger.warn('Cannot cleanup files after a video import error.', { err: errUnlink }) | ||
335 | } | ||
336 | |||
337 | videoImport.error = err.message | ||
338 | if (videoImport.state !== VideoImportState.REJECTED) { | ||
339 | videoImport.state = VideoImportState.FAILED | ||
340 | } | ||
341 | await videoImport.save() | ||
342 | |||
343 | Notifier.Instance.notifyOnFinishedVideoImport({ videoImport, success: false }) | ||
344 | } | ||