aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/handlers
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2021-02-09 11:22:42 +0100
committerChocobozzz <me@florianbigard.com>2021-02-09 11:46:30 +0100
commit44d1f7f2e8119b8da6a11c3a7b9ef1dd5315d031 (patch)
tree204dfc3e7986b9a6fc3660e47bdb93df3baeeb94 /server/lib/job-queue/handlers
parent80428d16a0acd1c0d1478d8861c3d5778745bb77 (diff)
downloadPeerTube-44d1f7f2e8119b8da6a11c3a7b9ef1dd5315d031.tar.gz
PeerTube-44d1f7f2e8119b8da6a11c3a7b9ef1dd5315d031.tar.zst
PeerTube-44d1f7f2e8119b8da6a11c3a7b9ef1dd5315d031.zip
Painfully debug concurrent import jobs
Diffstat (limited to 'server/lib/job-queue/handlers')
-rw-r--r--server/lib/job-queue/handlers/video-import.ts63
1 files changed, 41 insertions, 22 deletions
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts
index f61fd773a..9125b50e1 100644
--- a/server/lib/job-queue/handlers/video-import.ts
+++ b/server/lib/job-queue/handlers/video-import.ts
@@ -1,11 +1,13 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { move, remove, stat } from 'fs-extra' 2import { move, remove, stat } from 'fs-extra'
3import { extname } from 'path' 3import { extname } from 'path'
4import { retryTransactionWrapper } from '@server/helpers/database-utils'
4import { isPostImportVideoAccepted } from '@server/lib/moderation' 5import { isPostImportVideoAccepted } from '@server/lib/moderation'
5import { Hooks } from '@server/lib/plugins/hooks' 6import { Hooks } from '@server/lib/plugins/hooks'
6import { isAbleToUploadVideo } from '@server/lib/user' 7import { isAbleToUploadVideo } from '@server/lib/user'
7import { addOptimizeOrMergeAudioJob } from '@server/lib/video' 8import { addOptimizeOrMergeAudioJob } from '@server/lib/video'
8import { getVideoFilePath } from '@server/lib/video-paths' 9import { getVideoFilePath } from '@server/lib/video-paths'
10import { ThumbnailModel } from '@server/models/video/thumbnail'
9import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' 11import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import'
10import { 12import {
11 VideoImportPayload, 13 VideoImportPayload,
@@ -167,49 +169,66 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid
167 169
168 // Process thumbnail 170 // Process thumbnail
169 let thumbnailModel: MThumbnail 171 let thumbnailModel: MThumbnail
172 let thumbnailSave: object
170 if (options.generateThumbnail) { 173 if (options.generateThumbnail) {
171 thumbnailModel = await generateVideoMiniature(videoImportWithFiles.Video, videoFile, ThumbnailType.MINIATURE) 174 thumbnailModel = await generateVideoMiniature(videoImportWithFiles.Video, videoFile, ThumbnailType.MINIATURE)
175 thumbnailSave = thumbnailModel.toJSON()
172 } 176 }
173 177
174 // Process preview 178 // Process preview
175 let previewModel: MThumbnail 179 let previewModel: MThumbnail
180 let previewSave: object
176 if (options.generatePreview) { 181 if (options.generatePreview) {
177 previewModel = await generateVideoMiniature(videoImportWithFiles.Video, videoFile, ThumbnailType.PREVIEW) 182 previewModel = await generateVideoMiniature(videoImportWithFiles.Video, videoFile, ThumbnailType.PREVIEW)
183 previewSave = previewModel.toJSON()
178 } 184 }
179 185
180 // Create torrent 186 // Create torrent
181 await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile) 187 await createTorrentAndSetInfoHash(videoImportWithFiles.Video, videoFile)
182 188
183 const { videoImportUpdated, video } = await sequelizeTypescript.transaction(async t => { 189 const videoFileSave = videoFile.toJSON()
184 const videoImportToUpdate = videoImportWithFiles as MVideoImportVideo
185 190
186 // Refresh video 191 const { videoImportUpdated, video } = await retryTransactionWrapper(() => {
187 const video = await VideoModel.load(videoImportToUpdate.videoId, t) 192 return sequelizeTypescript.transaction(async t => {
188 if (!video) throw new Error('Video linked to import ' + videoImportToUpdate.videoId + ' does not exist anymore.') 193 const videoImportToUpdate = videoImportWithFiles as MVideoImportVideo
189 194
190 const videoFileCreated = await videoFile.save({ transaction: t }) 195 // Refresh video
191 videoImportToUpdate.Video = Object.assign(video, { VideoFiles: [ videoFileCreated ] }) 196 const video = await VideoModel.load(videoImportToUpdate.videoId, t)
197 if (!video) throw new Error('Video linked to import ' + videoImportToUpdate.videoId + ' does not exist anymore.')
192 198
193 // Update video DB object 199 const videoFileCreated = await videoFile.save({ transaction: t })
194 video.duration = duration
195 video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED
196 await video.save({ transaction: t })
197 200
198 if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t) 201 // Update video DB object
199 if (previewModel) await video.addAndSaveThumbnail(previewModel, t) 202 video.duration = duration
203 video.state = CONFIG.TRANSCODING.ENABLED ? VideoState.TO_TRANSCODE : VideoState.PUBLISHED
204 await video.save({ transaction: t })
200 205
201 // Now we can federate the video (reload from database, we need more attributes) 206 if (thumbnailModel) await video.addAndSaveThumbnail(thumbnailModel, t)
202 const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) 207 if (previewModel) await video.addAndSaveThumbnail(previewModel, t)
203 await federateVideoIfNeeded(videoForFederation, true, t)
204 208
205 // Update video import object 209 // Now we can federate the video (reload from database, we need more attributes)
206 videoImportToUpdate.state = VideoImportState.SUCCESS 210 const videoForFederation = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t)
207 const videoImportUpdated = await videoImportToUpdate.save({ transaction: t }) as MVideoImportVideo 211 await federateVideoIfNeeded(videoForFederation, true, t)
208 videoImportUpdated.Video = video
209 212
210 logger.info('Video %s imported.', video.uuid) 213 // Update video import object
214 videoImportToUpdate.state = VideoImportState.SUCCESS
215 const videoImportUpdated = await videoImportToUpdate.save({ transaction: t }) as MVideoImportVideo
216 videoImportUpdated.Video = video
211 217
212 return { videoImportUpdated, video: videoForFederation } 218 videoImportToUpdate.Video = Object.assign(video, { VideoFiles: [ videoFileCreated ] })
219
220 logger.info('Video %s imported.', video.uuid)
221
222 return { videoImportUpdated, video: videoForFederation }
223 }).catch(err => {
224 // Reset fields
225 if (thumbnailModel) thumbnailModel = new ThumbnailModel(thumbnailSave)
226 if (previewModel) previewModel = new ThumbnailModel(previewSave)
227
228 videoFile = new VideoFileModel(videoFileSave)
229
230 throw err
231 })
213 }) 232 })
214 233
215 Notifier.Instance.notifyOnFinishedVideoImport(videoImportUpdated, true) 234 Notifier.Instance.notifyOnFinishedVideoImport(videoImportUpdated, true)