From a0327eedb0136c4ba7358df80b75cc56bd25ffb8 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 19 Mar 2019 17:00:08 +0100 Subject: Rename video-file job to video-transcoding --- scripts/create-transcoding-job.ts | 2 +- server/controllers/api/videos/index.ts | 2 +- server/initializers/constants.ts | 10 +- server/lib/job-queue/handlers/video-file.ts | 204 --------------------- server/lib/job-queue/handlers/video-import.ts | 2 +- server/lib/job-queue/handlers/video-transcoding.ts | 204 +++++++++++++++++++++ server/lib/job-queue/job-queue.ts | 16 +- shared/models/server/job.model.ts | 2 +- 8 files changed, 226 insertions(+), 216 deletions(-) delete mode 100644 server/lib/job-queue/handlers/video-file.ts create mode 100644 server/lib/job-queue/handlers/video-transcoding.ts diff --git a/scripts/create-transcoding-job.ts b/scripts/create-transcoding-job.ts index 7e5b687bb..4a677eacb 100755 --- a/scripts/create-transcoding-job.ts +++ b/scripts/create-transcoding-job.ts @@ -42,6 +42,6 @@ async function run () { } await JobQueue.Instance.init() - await JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) + await JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) console.log('Transcoding job for video %s created.', video.uuid) } diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index db4f4c96f..08bee97d3 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts @@ -283,7 +283,7 @@ async function addVideo (req: express.Request, res: express.Response) { isNewVideo: true } - await JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) + await JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) } return res.json({ diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 54cd57619..ff0ade17a 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -100,36 +100,40 @@ const REMOTE_SCHEME = { WS: 'wss' } -const JOB_ATTEMPTS: { [ id in JobType ]: number } = { +// TODO: remove 'video-file' +const JOB_ATTEMPTS: { [ id in (JobType | 'video-file') ]: number } = { 'activitypub-http-broadcast': 5, 'activitypub-http-unicast': 5, 'activitypub-http-fetcher': 5, 'activitypub-follow': 5, 'video-file-import': 1, + 'video-transcoding': 1, 'video-file': 1, 'video-import': 1, 'email': 5, 'videos-views': 1, 'activitypub-refresher': 1 } -const JOB_CONCURRENCY: { [ id in JobType ]: number } = { +const JOB_CONCURRENCY: { [ id in (JobType | 'video-file') ]: number } = { 'activitypub-http-broadcast': 1, 'activitypub-http-unicast': 5, 'activitypub-http-fetcher': 1, 'activitypub-follow': 3, 'video-file-import': 1, + 'video-transcoding': 1, 'video-file': 1, 'video-import': 1, 'email': 5, 'videos-views': 1, 'activitypub-refresher': 1 } -const JOB_TTL: { [ id in JobType ]: number } = { +const JOB_TTL: { [ id in (JobType | 'video-file') ]: number } = { 'activitypub-http-broadcast': 60000 * 10, // 10 minutes 'activitypub-http-unicast': 60000 * 10, // 10 minutes 'activitypub-http-fetcher': 60000 * 10, // 10 minutes 'activitypub-follow': 60000 * 10, // 10 minutes 'video-file-import': 1000 * 3600, // 1 hour + 'video-transcoding': 1000 * 3600 * 48, // 2 days, transcoding could be long 'video-file': 1000 * 3600 * 48, // 2 days, transcoding could be long 'video-import': 1000 * 3600 * 2, // hours 'email': 60000 * 10, // 10 minutes diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts deleted file mode 100644 index 3a867b77f..000000000 --- a/server/lib/job-queue/handlers/video-file.ts +++ /dev/null @@ -1,204 +0,0 @@ -import * as Bull from 'bull' -import { VideoResolution, VideoState } from '../../../../shared' -import { logger } from '../../../helpers/logger' -import { VideoModel } from '../../../models/video/video' -import { JobQueue } from '../job-queue' -import { federateVideoIfNeeded } from '../../activitypub' -import { retryTransactionWrapper } from '../../../helpers/database-utils' -import { sequelizeTypescript, CONFIG } from '../../../initializers' -import * as Bluebird from 'bluebird' -import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' -import { generateHlsPlaylist, importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding' -import { Notifier } from '../../notifier' - -export type VideoFilePayload = { - videoUUID: string - resolution?: VideoResolution - isNewVideo?: boolean - isPortraitMode?: boolean - generateHlsPlaylist?: boolean -} - -export type VideoFileImportPayload = { - videoUUID: string, - filePath: string -} - -async function processVideoFileImport (job: Bull.Job) { - const payload = job.data as VideoFileImportPayload - logger.info('Processing video file import in job %d.', job.id) - - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) - // No video, maybe deleted? - if (!video) { - logger.info('Do not process job %d, video does not exist.', job.id) - return undefined - } - - await importVideoFile(video, payload.filePath) - - await onVideoFileTranscoderOrImportSuccess(video) - return video -} - -async function processVideoFile (job: Bull.Job) { - const payload = job.data as VideoFilePayload - logger.info('Processing video file in job %d.', job.id) - - const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) - // No video, maybe deleted? - if (!video) { - logger.info('Do not process job %d, video does not exist.', job.id) - return undefined - } - - if (payload.generateHlsPlaylist) { - await generateHlsPlaylist(video, payload.resolution, payload.isPortraitMode || false) - - await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video) - } else if (payload.resolution) { // Transcoding in other resolution - await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false) - - await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video, payload) - } else { - await optimizeVideofile(video) - - await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload) - } - - return video -} - -async function onHlsPlaylistGenerationSuccess (video: VideoModel) { - if (video === undefined) return undefined - - await sequelizeTypescript.transaction(async t => { - // Maybe the video changed in database, refresh it - let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) - // Video does not exist anymore - if (!videoDatabase) return undefined - - // If the video was not published, we consider it is a new one for other instances - await federateVideoIfNeeded(videoDatabase, false, t) - }) -} - -async function onVideoFileTranscoderOrImportSuccess (video: VideoModel, payload?: VideoFilePayload) { - if (video === undefined) return undefined - - const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { - // Maybe the video changed in database, refresh it - let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) - // Video does not exist anymore - if (!videoDatabase) return undefined - - let videoPublished = false - - // We transcoded the video file in another format, now we can publish it - if (videoDatabase.state !== VideoState.PUBLISHED) { - videoPublished = true - - videoDatabase.state = VideoState.PUBLISHED - videoDatabase.publishedAt = new Date() - videoDatabase = await videoDatabase.save({ transaction: t }) - } - - // If the video was not published, we consider it is a new one for other instances - await federateVideoIfNeeded(videoDatabase, videoPublished, t) - - return { videoDatabase, videoPublished } - }) - - // don't notify prior to scheduled video update - if (videoPublished && !videoDatabase.ScheduleVideoUpdate) { - Notifier.Instance.notifyOnNewVideo(videoDatabase) - Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) - } - - await createHlsJobIfEnabled(payload) -} - -async function onVideoFileOptimizerSuccess (videoArg: VideoModel, payload: VideoFilePayload) { - if (videoArg === undefined) return undefined - - // Outside the transaction (IO on disk) - const { videoFileResolution } = await videoArg.getOriginalFileResolution() - - const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { - // Maybe the video changed in database, refresh it - let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) - // Video does not exist anymore - if (!videoDatabase) return undefined - - // Create transcoding jobs if there are enabled resolutions - const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution) - logger.info( - 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution, - { resolutions: resolutionsEnabled } - ) - - let videoPublished = false - - if (resolutionsEnabled.length !== 0) { - const tasks: (Bluebird> | Promise>)[] = [] - - for (const resolution of resolutionsEnabled) { - const dataInput = { - videoUUID: videoDatabase.uuid, - resolution - } - - const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) - tasks.push(p) - } - - await Promise.all(tasks) - - logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) - } else { - videoPublished = true - - // No transcoding to do, it's now published - videoDatabase.state = VideoState.PUBLISHED - videoDatabase = await videoDatabase.save({ transaction: t }) - - logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) - } - - await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t) - - return { videoDatabase, videoPublished } - }) - - // don't notify prior to scheduled video update - if (!videoDatabase.ScheduleVideoUpdate) { - if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) - if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) - } - - await createHlsJobIfEnabled(Object.assign({}, payload, { resolution: videoDatabase.getOriginalFile().resolution })) -} - -// --------------------------------------------------------------------------- - -export { - processVideoFile, - processVideoFileImport -} - -// --------------------------------------------------------------------------- - -function createHlsJobIfEnabled (payload?: VideoFilePayload) { - // Generate HLS playlist? - if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { - const hlsTranscodingPayload = { - videoUUID: payload.videoUUID, - resolution: payload.resolution, - isPortraitMode: payload.isPortraitMode, - - generateHlsPlaylist: true - } - - return JobQueue.Instance.createJob({ type: 'video-file', payload: hlsTranscodingPayload }) - } -} diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 12004dcd7..d96bfdf43 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -207,7 +207,7 @@ async function processFile (downloader: () => Promise, videoImport: Vide isNewVideo: true } - await JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) + await JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) } } catch (err) { diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts new file mode 100644 index 000000000..ceee83f13 --- /dev/null +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -0,0 +1,204 @@ +import * as Bull from 'bull' +import { VideoResolution, VideoState } from '../../../../shared' +import { logger } from '../../../helpers/logger' +import { VideoModel } from '../../../models/video/video' +import { JobQueue } from '../job-queue' +import { federateVideoIfNeeded } from '../../activitypub' +import { retryTransactionWrapper } from '../../../helpers/database-utils' +import { sequelizeTypescript, CONFIG } from '../../../initializers' +import * as Bluebird from 'bluebird' +import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' +import { generateHlsPlaylist, importVideoFile, optimizeVideofile, transcodeOriginalVideofile } from '../../video-transcoding' +import { Notifier } from '../../notifier' + +export type VideoTranscodingPayload = { + videoUUID: string + resolution?: VideoResolution + isNewVideo?: boolean + isPortraitMode?: boolean + generateHlsPlaylist?: boolean +} + +export type VideoFileImportPayload = { + videoUUID: string, + filePath: string +} + +async function processVideoFileImport (job: Bull.Job) { + const payload = job.data as VideoFileImportPayload + logger.info('Processing video file import in job %d.', job.id) + + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) + // No video, maybe deleted? + if (!video) { + logger.info('Do not process job %d, video does not exist.', job.id) + return undefined + } + + await importVideoFile(video, payload.filePath) + + await onVideoFileTranscoderOrImportSuccess(video) + return video +} + +async function processVideoTranscoding (job: Bull.Job) { + const payload = job.data as VideoTranscodingPayload + logger.info('Processing video file in job %d.', job.id) + + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoUUID) + // No video, maybe deleted? + if (!video) { + logger.info('Do not process job %d, video does not exist.', job.id) + return undefined + } + + if (payload.generateHlsPlaylist) { + await generateHlsPlaylist(video, payload.resolution, payload.isPortraitMode || false) + + await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video) + } else if (payload.resolution) { // Transcoding in other resolution + await transcodeOriginalVideofile(video, payload.resolution, payload.isPortraitMode || false) + + await retryTransactionWrapper(onVideoFileTranscoderOrImportSuccess, video, payload) + } else { + await optimizeVideofile(video) + + await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload) + } + + return video +} + +async function onHlsPlaylistGenerationSuccess (video: VideoModel) { + if (video === undefined) return undefined + + await sequelizeTypescript.transaction(async t => { + // Maybe the video changed in database, refresh it + let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) + // Video does not exist anymore + if (!videoDatabase) return undefined + + // If the video was not published, we consider it is a new one for other instances + await federateVideoIfNeeded(videoDatabase, false, t) + }) +} + +async function onVideoFileTranscoderOrImportSuccess (video: VideoModel, payload?: VideoTranscodingPayload) { + if (video === undefined) return undefined + + const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { + // Maybe the video changed in database, refresh it + let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(video.uuid, t) + // Video does not exist anymore + if (!videoDatabase) return undefined + + let videoPublished = false + + // We transcoded the video file in another format, now we can publish it + if (videoDatabase.state !== VideoState.PUBLISHED) { + videoPublished = true + + videoDatabase.state = VideoState.PUBLISHED + videoDatabase.publishedAt = new Date() + videoDatabase = await videoDatabase.save({ transaction: t }) + } + + // If the video was not published, we consider it is a new one for other instances + await federateVideoIfNeeded(videoDatabase, videoPublished, t) + + return { videoDatabase, videoPublished } + }) + + // don't notify prior to scheduled video update + if (videoPublished && !videoDatabase.ScheduleVideoUpdate) { + Notifier.Instance.notifyOnNewVideo(videoDatabase) + Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) + } + + await createHlsJobIfEnabled(payload) +} + +async function onVideoFileOptimizerSuccess (videoArg: VideoModel, payload: VideoTranscodingPayload) { + if (videoArg === undefined) return undefined + + // Outside the transaction (IO on disk) + const { videoFileResolution } = await videoArg.getOriginalFileResolution() + + const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { + // Maybe the video changed in database, refresh it + let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) + // Video does not exist anymore + if (!videoDatabase) return undefined + + // Create transcoding jobs if there are enabled resolutions + const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution) + logger.info( + 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, videoFileResolution, + { resolutions: resolutionsEnabled } + ) + + let videoPublished = false + + if (resolutionsEnabled.length !== 0) { + const tasks: (Bluebird> | Promise>)[] = [] + + for (const resolution of resolutionsEnabled) { + const dataInput = { + videoUUID: videoDatabase.uuid, + resolution + } + + const p = JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) + tasks.push(p) + } + + await Promise.all(tasks) + + logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) + } else { + videoPublished = true + + // No transcoding to do, it's now published + videoDatabase.state = VideoState.PUBLISHED + videoDatabase = await videoDatabase.save({ transaction: t }) + + logger.info('No transcoding jobs created for video %s (no resolutions).', videoDatabase.uuid, { privacy: videoDatabase.privacy }) + } + + await federateVideoIfNeeded(videoDatabase, payload.isNewVideo, t) + + return { videoDatabase, videoPublished } + }) + + // don't notify prior to scheduled video update + if (!videoDatabase.ScheduleVideoUpdate) { + if (payload.isNewVideo) Notifier.Instance.notifyOnNewVideo(videoDatabase) + if (videoPublished) Notifier.Instance.notifyOnPendingVideoPublished(videoDatabase) + } + + await createHlsJobIfEnabled(Object.assign({}, payload, { resolution: videoDatabase.getOriginalFile().resolution })) +} + +// --------------------------------------------------------------------------- + +export { + processVideoTranscoding, + processVideoFileImport +} + +// --------------------------------------------------------------------------- + +function createHlsJobIfEnabled (payload?: VideoTranscodingPayload) { + // Generate HLS playlist? + if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { + const hlsTranscodingPayload = { + videoUUID: payload.videoUUID, + resolution: payload.resolution, + isPortraitMode: payload.isPortraitMode, + + generateHlsPlaylist: true + } + + return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) + } +} diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index ba9cbe0d9..e73042163 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -7,7 +7,12 @@ import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' import { EmailPayload, processEmail } from './handlers/email' -import { processVideoFile, processVideoFileImport, VideoFileImportPayload, VideoFilePayload } from './handlers/video-file' +import { + processVideoFileImport, + processVideoTranscoding, + VideoFileImportPayload, + VideoTranscodingPayload +} from './handlers/video-transcoding' import { ActivitypubFollowPayload, processActivityPubFollow } from './handlers/activitypub-follow' import { processVideoImport, VideoImportPayload } from './handlers/video-import' import { processVideosViews } from './handlers/video-views' @@ -19,19 +24,20 @@ type CreateJobArgument = { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | { type: 'video-file-import', payload: VideoFileImportPayload } | - { type: 'video-file', payload: VideoFilePayload } | + { type: 'video-transcoding', payload: VideoTranscodingPayload } | { type: 'email', payload: EmailPayload } | { type: 'video-import', payload: VideoImportPayload } | { type: 'activitypub-refresher', payload: RefreshPayload } | { type: 'videos-views', payload: {} } -const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise} = { +const handlers: { [ id in (JobType | 'video-file') ]: (job: Bull.Job) => Promise} = { 'activitypub-http-broadcast': processActivityPubHttpBroadcast, 'activitypub-http-unicast': processActivityPubHttpUnicast, 'activitypub-http-fetcher': processActivityPubHttpFetcher, 'activitypub-follow': processActivityPubFollow, 'video-file-import': processVideoFileImport, - 'video-file': processVideoFile, + 'video-transcoding': processVideoTranscoding, + 'video-file': processVideoTranscoding, // TODO: remove it (changed in 1.3) 'email': processEmail, 'video-import': processVideoImport, 'videos-views': processVideosViews, @@ -44,7 +50,7 @@ const jobTypes: JobType[] = [ 'activitypub-http-fetcher', 'activitypub-http-unicast', 'email', - 'video-file', + 'video-transcoding', 'video-file-import', 'video-import', 'videos-views', diff --git a/shared/models/server/job.model.ts b/shared/models/server/job.model.ts index 85bc9541b..1b9aa8a07 100644 --- a/shared/models/server/job.model.ts +++ b/shared/models/server/job.model.ts @@ -5,7 +5,7 @@ export type JobType = 'activitypub-http-unicast' | 'activitypub-http-fetcher' | 'activitypub-follow' | 'video-file-import' | - 'video-file' | + 'video-transcoding' | 'email' | 'video-import' | 'videos-views' | -- cgit v1.2.3