From 2186386cca113506791583cb07d6ccacba7af4e0 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 12 Jun 2018 20:04:58 +0200 Subject: Add concept of video state, and add ability to wait transcoding before publishing a video --- server/lib/job-queue/handlers/video-file.ts | 127 +++++++++++++++------------- server/lib/job-queue/job-queue.ts | 1 + 2 files changed, 70 insertions(+), 58 deletions(-) (limited to 'server/lib/job-queue') diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index 85f7dbfc2..f5ad076a6 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts @@ -1,17 +1,16 @@ import * as kue from 'kue' -import { VideoResolution } from '../../../../shared' -import { VideoPrivacy } from '../../../../shared/models/videos' +import { VideoResolution, VideoState } from '../../../../shared' import { logger } from '../../../helpers/logger' import { computeResolutionsToTranscode } from '../../../helpers/utils' -import { sequelizeTypescript } from '../../../initializers' import { VideoModel } from '../../../models/video/video' -import { shareVideoByServerAndChannel } from '../../activitypub' -import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send' import { JobQueue } from '../job-queue' +import { federateVideoIfNeeded } from '../../activitypub' +import { retryTransactionWrapper } from '../../../helpers/database-utils' +import { sequelizeTypescript } from '../../../initializers' export type VideoFilePayload = { videoUUID: string - isNewVideo: boolean + isNewVideo?: boolean resolution?: VideoResolution isPortraitMode?: boolean } @@ -52,10 +51,20 @@ async function processVideoFile (job: kue.Job) { // Transcoding in other resolution if (payload.resolution) { await video.transcodeOriginalVideofile(payload.resolution, payload.isPortraitMode) - await onVideoFileTranscoderOrImportSuccess(video) + + const options = { + arguments: [ video ], + errorMessage: 'Cannot execute onVideoFileTranscoderOrImportSuccess with many retries.' + } + await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, options) } else { await video.optimizeOriginalVideofile() - await onVideoFileOptimizerSuccess(video, payload.isNewVideo) + + const options = { + arguments: [ video, payload.isNewVideo ], + errorMessage: 'Cannot execute onVideoFileOptimizerSuccess with many retries.' + } + await retryTransactionWrapper(onVideoFileOptimizerSuccess, options) } return video @@ -64,68 +73,70 @@ async function processVideoFile (job: kue.Job) { async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { if (video === undefined) return undefined - // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) - // Video does not exist anymore - if (!videoDatabase) return undefined + return sequelizeTypescript.transaction(async t => { + // Maybe the video changed in database, refresh it + let videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid, t) + // Video does not exist anymore + if (!videoDatabase) return undefined - if (video.privacy !== VideoPrivacy.PRIVATE) { - await sendUpdateVideo(video, undefined) - } + // We transcoded the video file in another format, now we can publish it + const oldState = videoDatabase.state + videoDatabase.state = VideoState.PUBLISHED + videoDatabase = await videoDatabase.save({ transaction: t }) + + // If the video was not published, we consider it is a new one for other instances + const isNewVideo = oldState !== VideoState.PUBLISHED + await federateVideoIfNeeded(videoDatabase, isNewVideo, t) - return undefined + return undefined + }) } async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boolean) { if (video === undefined) return undefined - // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) - // Video does not exist anymore - if (!videoDatabase) return undefined - - if (video.privacy !== VideoPrivacy.PRIVATE) { - if (isNewVideo !== false) { - // Now we'll add the video's meta data to our followers - await sequelizeTypescript.transaction(async t => { - await sendCreateVideo(video, t) - await shareVideoByServerAndChannel(video, t) - }) - } else { - await sendUpdateVideo(video, undefined) - } - } - - const { videoFileResolution } = await videoDatabase.getOriginalFileResolution() - - // Create transcoding jobs if there are enabled resolutions - const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution) - logger.info( - 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution, - { resolutions: resolutionsEnabled } - ) + // Outside the transaction (IO on disk) + const { videoFileResolution } = await video.getOriginalFileResolution() + + return sequelizeTypescript.transaction(async t => { + // Maybe the video changed in database, refresh it + const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid, t) + // Video does not exist anymore + if (!videoDatabase) return undefined + + // Create transcoding jobs if there are enabled resolutions + const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution) + logger.info( + 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution, + { resolutions: resolutionsEnabled } + ) + + if (resolutionsEnabled.length !== 0) { + const tasks: Promise[] = [] + + for (const resolution of resolutionsEnabled) { + const dataInput = { + videoUUID: videoDatabase.uuid, + resolution + } + + const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) + tasks.push(p) + } - if (resolutionsEnabled.length !== 0) { - const tasks: Promise[] = [] + await Promise.all(tasks) - for (const resolution of resolutionsEnabled) { - const dataInput = { - videoUUID: videoDatabase.uuid, - resolution, - isNewVideo - } + logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) + } else { + // No transcoding to do, it's now published + video.state = VideoState.PUBLISHED + video = await video.save({ transaction: t }) - const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) - tasks.push(p) + logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid) } - await Promise.all(tasks) - - logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) - } else { - logger.info('No transcoding jobs created for video %s (no resolutions enabled).') - return undefined - } + return federateVideoIfNeeded(video, isNewVideo, t) + }) } // --------------------------------------------------------------------------- diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index bdfa19b61..695fe0eea 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -79,6 +79,7 @@ class JobQueue { const res = await handlers[ handlerName ](job) return done(null, res) } catch (err) { + logger.error('Cannot execute job %d.', job.id, { err }) return done(err) } }) -- cgit v1.2.3