X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fhandlers%2Fvideo-file.ts;h=adc0a2a15cbf26d042edb631ff19d9403ba8b6c8;hb=edb4ffc7e0b13659d7c73b120f2c87b27e4c26a1;hp=5294483bda3fe1412fc4edbe35d960199063e3e9;hpb=94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts index 5294483bd..adc0a2a15 100644 --- a/server/lib/job-queue/handlers/video-file.ts +++ b/server/lib/job-queue/handlers/video-file.ts @@ -1,110 +1,146 @@ -import * as kue from 'kue' -import { VideoResolution } from '../../../../shared' -import { VideoPrivacy } from '../../../../shared/models/videos' +import * as Bull from 'bull' +import { VideoResolution, VideoState } from '../../../../shared' import { logger } from '../../../helpers/logger' -import { computeResolutionsToTranscode } from '../../../helpers/utils' -import { sequelizeTypescript } from '../../../initializers' import { VideoModel } from '../../../models/video/video' -import { shareVideoByServerAndChannel } from '../../activitypub' -import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send' import { JobQueue } from '../job-queue' +import { federateVideoIfNeeded } from '../../activitypub' +import { retryTransactionWrapper } from '../../../helpers/database-utils' +import { sequelizeTypescript } from '../../../initializers' +import * as Bluebird from 'bluebird' +import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' +import { importVideoFile, transcodeOriginalVideofile, optimizeVideofile } from '../../video-transcoding' export type VideoFilePayload = { videoUUID: string + isNewVideo?: boolean resolution?: VideoResolution + isPortraitMode?: boolean } -async function processVideoFile (job: kue.Job) { +export type VideoFileImportPayload = { + videoUUID: string, + filePath: string +} + +async function processVideoFileImport (job: Bull.Job) { + const payload = job.data as VideoFileImportPayload + logger.info('Processing video file import in job %d.', job.id) + + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) + // No video, maybe deleted? + if (!video) { + logger.info('Do not process job %d, video does not exist.', job.id) + return undefined + } + + await importVideoFile(video, payload.filePath) + + await onVideoFileTranscoderOrImportSuccess(video) + return video +} + +async function processVideoFile (job: Bull.Job) { const payload = job.data as VideoFilePayload logger.info('Processing video file in job %d.', job.id) - const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(payload.videoUUID) + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) // No video, maybe deleted? if (!video) { - logger.info('Do not process job %d, video does not exist.', job.id, { videoUUID: video.uuid }) + logger.info('Do not process job %d, video does not exist.', job.id) return undefined } // Transcoding in other resolution if (payload.resolution) { - await video.transcodeOriginalVideofile(payload.resolution) - await onVideoFileTranscoderSuccess(video) + await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false) + + await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video) } else { - await video.optimizeOriginalVideofile() - await onVideoFileOptimizerSuccess(video) + await optimizeVideofile(video) + + await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload.isNewVideo) } return video } -async function onVideoFileTranscoderSuccess (video: VideoModel) { +async function onVideoFileTranscoderOrImportSuccess (video: VideoModel) { if (video === undefined) return undefined - // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) - // Video does not exist anymore - if (!videoDatabase) return undefined + return sequelizeTypescript.transaction(async t => { + // Maybe the video changed in database, refresh it + let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) + // Video does not exist anymore + if (!videoDatabase) return undefined - if (video.privacy !== VideoPrivacy.PRIVATE) { - await sendUpdateVideo(video, undefined) - } + let isNewVideo = false - return undefined -} - -async function onVideoFileOptimizerSuccess (video: VideoModel) { - if (video === undefined) return undefined - - // Maybe the video changed in database, refresh it - const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) - // Video does not exist anymore - if (!videoDatabase) return undefined + // We transcoded the video file in another format, now we can publish it + if (videoDatabase.state !== VideoState.PUBLISHED) { + isNewVideo = true - if (video.privacy !== VideoPrivacy.PRIVATE) { - // Now we'll add the video's meta data to our followers - await sendCreateVideo(video, undefined) - await shareVideoByServerAndChannel(video, undefined) - } - - const originalFileHeight = await videoDatabase.getOriginalFileHeight() + videoDatabase.state = VideoState.PUBLISHED + videoDatabase.publishedAt = new Date() + videoDatabase = await videoDatabase.save({ transaction: t }) + } - // Create transcoding jobs if there are enabled resolutions - const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) - logger.info( - 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, originalFileHeight, - { resolutions: resolutionsEnabled } - ) + // If the video was not published, we consider it is a new one for other instances + await federateVideoIfNeeded(videoDatabase, isNewVideo, t) - if (resolutionsEnabled.length !== 0) { - try { - await sequelizeTypescript.transaction(async t => { - const tasks: Promise[] = [] + return undefined + }) +} - for (const resolution of resolutionsEnabled) { - const dataInput = { - videoUUID: videoDatabase.uuid, - resolution - } +async function onVideoFileOptimizerSuccess (video: VideoModel, isNewVideo: boolean) { + if (video === undefined) return undefined - const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) - tasks.push(p) + // Outside the transaction (IO on disk) + const { videoFileResolution } = await video.getOriginalFileResolution() + + return sequelizeTypescript.transaction(async t => { + // Maybe the video changed in database, refresh it + const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) + // Video does not exist anymore + if (!videoDatabase) return undefined + + // Create transcoding jobs if there are enabled resolutions + const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution) + logger.info( + 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution, + { resolutions: resolutionsEnabled } + ) + + if (resolutionsEnabled.length !== 0) { + const tasks: Bluebird[] = [] + + for (const resolution of resolutionsEnabled) { + const dataInput = { + videoUUID: videoDatabase.uuid, + resolution } - await Promise.all(tasks) - }) + const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) + tasks.push(p) + } + + await Promise.all(tasks) logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) - } catch (err) { - logger.warn('Cannot transcode the video.', err) + } else { + // No transcoding to do, it's now published + video.state = VideoState.PUBLISHED + video = await video.save({ transaction: t }) + + logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid) } - } else { - logger.info('No transcoding jobs created for video %s (no resolutions enabled).') - return undefined - } + + return federateVideoIfNeeded(video, isNewVideo, t) + }) } // --------------------------------------------------------------------------- export { - processVideoFile + processVideoFile, + processVideoFileImport }