diff options
author | Chocobozzz <me@florianbigard.com> | 2021-02-09 11:22:42 +0100 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2021-02-09 11:46:30 +0100 |
commit | 44d1f7f2e8119b8da6a11c3a7b9ef1dd5315d031 (patch) | |
tree | 204dfc3e7986b9a6fc3660e47bdb93df3baeeb94 /server/lib/job-queue | |
parent | 80428d16a0acd1c0d1478d8861c3d5778745bb77 (diff) | |
download | PeerTube-44d1f7f2e8119b8da6a11c3a7b9ef1dd5315d031.tar.gz PeerTube-44d1f7f2e8119b8da6a11c3a7b9ef1dd5315d031.tar.zst PeerTube-44d1f7f2e8119b8da6a11c3a7b9ef1dd5315d031.zip |
Painfully debug concurrent import jobs
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/video-import.ts | 63 |
1 files changed, 41 insertions, 22 deletions
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index f61fd773a..9125b50e1 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -1,11 +1,13 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { move, remove, stat } from 'fs-extra' | 2 | import { move, remove, stat } from 'fs-extra' |
3 | import { extname } from 'path' | 3 | import { extname } from 'path' |
4 | import { retryTransactionWrapper } from '@server/helpers/database-utils' | ||
4 | import { isPostImportVideoAccepted } from '@server/lib/moderation' | 5 | import { isPostImportVideoAccepted } from '@server/lib/moderation' |
5 | import { Hooks } from '@server/lib/plugins/hooks' | 6 | import { Hooks } from '@server/lib/plugins/hooks' |
6 | import { isAbleToUploadVideo } from '@server/lib/user' | 7 | import { isAbleToUploadVideo } from '@server/lib/user' |
7 | import { addOptimizeOrMergeAudioJob } from '@server/lib/video' | 8 | import { addOptimizeOrMergeAudioJob } from '@server/lib/video' |
8 | import { getVideoFilePath } from '@server/lib/video-paths' | 9 | import { getVideoFilePath } from '@server/lib/video-paths' |
10 | import { ThumbnailModel } from '@server/models/video/thumbnail' | ||
9 | import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' | 11 | import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' |
10 | import { | 12 | import { |
11 | VideoImportPayload, | 13 | VideoImportPayload, |
@@ -167,49 +169,66 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
167 | 169 | ||
168 | // Process thumbnail | 170 | // Process thumbnail |
169 | let thumbnailModel: MThumbnail | 171 | let thumbnailModel: MThumbnail |
172 | let thumbnailSave: object | ||
170 | if (options.generateThumbnail) { | 173 | if (options.generateThumbnail) { |
171 | thumbnailModel = await generateVideoMiniature(videoImportWithFiles.Video, videoFile, ThumbnailType.MINIATURE) | 174 | thumbnailModel = await generateVideoMiniature(videoImportWithFiles.Video, videoFile, ThumbnailType.MINIATURE) |
175 | thumbnailSave = thumbnailModel.toJSON() | ||
172 | } | 176 | } |
173 | 177 | ||
174 | // Process preview | 178 | // Process preview |
175 | let previewModel: MThumbnail | 179 | let previewModel: MThumbnail |
180 | let previewSave: object | ||
176 | if (options.generatePreview) { | 181 | if (options.generatePreview) { |
177 | previewModel = await generateVideoMiniature(videoImportWithFiles.Video, videoFile, ThumbnailType.PREVIEW) | 182 | previewModel = await generateVideoMiniature(videoImportWithFiles.Video, videoFile, ThumbnailType.PREVIEW) |
183 | previewSave = previewModel.toJSON() | ||
178 | } | 184 | } |
179 | 185 | ||
180 | // Create torrent | 186 | // Create torrent |
181 | await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile) | 187 | await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile) |
182 | 188 | ||
183 | const { videoImportUpdated, video } = await sequelizeTypescript.transaction(async t => { | 189 | const videoFileSave = videoFile.toJSON() |
184 | const videoImportToUpdate = videoImportWithFiles as MVideoImportVideo | ||
185 | 190 | ||
186 | // Refresh video | 191 | const { videoImportUpdated, video } = await retryTransactionWrapper(() => { |
187 | const video = await VideoModel.load(videoImportToUpdate.videoId, t) | 192 | return sequelizeTypescript.transaction(async t => { |
188 | if (!video) throw new Error('Video linked to import ' + videoImportToUpdate.videoId + ' does not exist anymore.') | 193 | const videoImportToUpdate = videoImportWithFiles as MVideoImportVideo |
189 | 194 | ||
190 | const videoFileCreated = await videoFile.save({ transaction: t }) | 195 | // Refresh video |
191 | videoImportToUpdate.Video = Object.assign(video, { VideoFiles: [ videoFileCreated ] }) | 196 | const video = await VideoModel.load(videoImportToUpdate.videoId, t) |
197 | if (!video) throw new Error('Video linked to import ' + videoImportToUpdate.videoId + ' does not exist anymore.') | ||
192 | 198 | ||
193 | // Update video DB object | 199 | const videoFileCreated = await videoFile.save({ transaction: t }) |
194 | video.duration = duration | ||
195 | video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED | ||
196 | await video.save({ transaction: t }) | ||
197 | 200 | ||
198 | if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) | 201 | // Update video DB object |
199 | if (previewModel) await video.addAndSaveThumbnail(previewModel, t) | 202 | video.duration = duration |
203 | video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED | ||
204 | await video.save({ transaction: t }) | ||
200 | 205 | ||
201 | // Now we can federate the video (reload from database, we need more attributes) | 206 | if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) |
202 | const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) | 207 | if (previewModel) await video.addAndSaveThumbnail(previewModel, t) |
203 | await federateVideoIfNeeded(videoForFederation, true, t) | ||
204 | 208 | ||
205 | // Update video import object | 209 | // Now we can federate the video (reload from database, we need more attributes) |
206 | videoImportToUpdate.state = VideoImportState.SUCCESS | 210 | const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) |
207 | const videoImportUpdated = await videoImportToUpdate.save({ transaction: t }) as MVideoImportVideo | 211 | await federateVideoIfNeeded(videoForFederation, true, t) |
208 | videoImportUpdated.Video = video | ||
209 | 212 | ||
210 | logger.info('Video %s imported.', video.uuid) | 213 | // Update video import object |
214 | videoImportToUpdate.state = VideoImportState.SUCCESS | ||
215 | const videoImportUpdated = await videoImportToUpdate.save({ transaction: t }) as MVideoImportVideo | ||
216 | videoImportUpdated.Video = video | ||
211 | 217 | ||
212 | return { videoImportUpdated, video: videoForFederation } | 218 | videoImportToUpdate.Video = Object.assign(video, { VideoFiles: [ videoFileCreated ] }) |
219 | |||
220 | logger.info('Video %s imported.', video.uuid) | ||
221 | |||
222 | return { videoImportUpdated, video: videoForFederation } | ||
223 | }).catch(err => { | ||
224 | // Reset fields | ||
225 | if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave) | ||
226 | if (previewModel) previewModel = new ThumbnailModel(previewSave) | ||
227 | |||
228 | videoFile = new VideoFileModel(videoFileSave) | ||
229 | |||
230 | throw err | ||
231 | }) | ||
213 | }) | 232 | }) |
214 | 233 | ||
215 | Notifier.Instance.notifyOnFinishedVideoImport(videoImportUpdated, true) | 234 | Notifier.Instance.notifyOnFinishedVideoImport(videoImportUpdated, true) |