From 77d7e851dccf17dcc89e8fcc2db3f655d1e63f95 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 21 Jan 2021 16:57:21 +0100 Subject: Add priority to transcoding jobs (1 = highest priority) 100 for new resolutions 10 for original file optimization Add a malus for transcoding jobs depending on how many uploads the user did in the last 7 days --- server/lib/job-queue/handlers/video-file-import.ts | 5 +- server/lib/job-queue/handlers/video-import.ts | 4 +- server/lib/job-queue/handlers/video-transcoding.ts | 91 ++++++++++++++-------- server/lib/job-queue/job-queue.ts | 2 + server/lib/video.ts | 44 ++++++++++- 5 files changed, 106 insertions(+), 40 deletions(-) (limited to 'server/lib') diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 22e4d0cf1..582efea3a 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts @@ -3,6 +3,7 @@ import { copy, stat } from 'fs-extra' import { extname } from 'path' import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' import { getVideoFilePath } from '@server/lib/video-paths' +import { UserModel } from '@server/models/account/user' import { MVideoFile, MVideoWithFile } from '@server/types/models' import { VideoFileImportPayload } from '@shared/models' import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils' @@ -24,7 +25,9 @@ async function processVideoFileImport (job: Bull.Job) { await updateVideoFile(video, payload.filePath) - await onNewWebTorrentFileResolution(video) + const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId) + await onNewWebTorrentFileResolution(video, user) + return video } diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index c1d1866b0..f61fd773a 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -1,10 +1,10 @@ import * as Bull from 'bull' import { move, remove, stat } from 'fs-extra' import { extname } from 'path' -import { addOptimizeOrMergeAudioJob } from '@server/helpers/video' import { isPostImportVideoAccepted } from '@server/lib/moderation' import { Hooks } from '@server/lib/plugins/hooks' import { isAbleToUploadVideo } from '@server/lib/user' +import { addOptimizeOrMergeAudioJob } from '@server/lib/video' import { getVideoFilePath } from '@server/lib/video-paths' import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' import { @@ -224,7 +224,7 @@ async function processFile (downloader: () => Promise, videoImport: MVid // Create transcoding jobs? if (video.state === VideoState.TO_TRANSCODE) { - await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile) + await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile, videoImport.User) } } catch (err) { diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 0f6b3f753..ee241ad03 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -1,8 +1,9 @@ import * as Bull from 'bull' import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' -import { publishAndFederateIfNeeded } from '@server/lib/video' +import { JOB_PRIORITY } from '@server/initializers/constants' +import { getJobTranscodingPriorityMalus, publishAndFederateIfNeeded } from '@server/lib/video' import { getVideoFilePath } from '@server/lib/video-paths' -import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' +import { MUser, MUserId, MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' import { HLSTranscodingPayload, MergeAudioTranscodingPayload, @@ -25,8 +26,11 @@ import { transcodeNewWebTorrentResolution } from '../../video-transcoding' import { JobQueue } from '../job-queue' +import { UserModel } from '@server/models/account/user' -const handlers: { [ id: string ]: (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight) => Promise } = { +type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise + +const handlers: { [ id: string ]: HandlerFunction } = { // Deprecated, introduced in 3.1 'hls': handleHLSJob, 'new-resolution-to-hls': handleHLSJob, @@ -55,13 +59,15 @@ async function processVideoTranscoding (job: Bull.Job) { return undefined } + const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId) + const handler = handlers[payload.type] if (!handler) { throw new Error('Cannot find transcoding handler for ' + payload.type) } - await handler(job, payload, video) + await handler(job, payload, video, user) return video } @@ -90,22 +96,27 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide await retryTransactionWrapper(onHlsPlaylistGeneration, video) } -async function handleNewWebTorrentResolutionJob (job: Bull.Job, payload: NewResolutionTranscodingPayload, video: MVideoFullLight) { +async function handleNewWebTorrentResolutionJob ( + job: Bull.Job, + payload: NewResolutionTranscodingPayload, + video: MVideoFullLight, + user: MUserId +) { await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job) - await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) + await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) } -async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight) { +async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { await mergeAudioVideofile(video, payload.resolution, job) - await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) + await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) } -async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight) { +async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) - await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType) + await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user) } // --------------------------------------------------------------------------- @@ -129,7 +140,8 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight) { async function onVideoFileOptimizer ( videoArg: MVideoWithFile, payload: OptimizeTranscodingPayload, - transcodeType: TranscodeOptionsType + transcodeType: TranscodeOptionsType, + user: MUserId ) { if (videoArg === undefined) return undefined @@ -142,13 +154,6 @@ async function onVideoFileOptimizer ( // Video does not exist anymore if (!videoDatabase) return undefined - // Create transcoding jobs if there are enabled resolutions - const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') - logger.info( - 'Resolutions computed for video %s and origin file resolution of %d.', videoDatabase.uuid, videoFileResolution, - { resolutions: resolutionsEnabled } - ) - let videoPublished = false // Generate HLS version of the original file @@ -158,9 +163,9 @@ async function onVideoFileOptimizer ( // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues copyCodecs: transcodeType !== 'quick-transcode' }) - createHlsJobIfEnabled(originalFileHLSPayload) + await createHlsJobIfEnabled(user, originalFileHLSPayload) - const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, videoFileResolution, isPortraitMode) + const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode) if (!hasNewResolutions) { // No transcoding to do, it's now published @@ -178,11 +183,12 @@ async function onVideoFileOptimizer ( async function onNewWebTorrentFileResolution ( video: MVideoUUID, + user: MUserId, payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload ) { await publishAndFederateIfNeeded(video) - createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true })) + await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true })) } // --------------------------------------------------------------------------- @@ -194,22 +200,35 @@ export { // --------------------------------------------------------------------------- -function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number, isPortraitMode?: boolean, copyCodecs: boolean }) { - // Generate HLS playlist? - if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { - const hlsTranscodingPayload: HLSTranscodingPayload = { - type: 'new-resolution-to-hls', - videoUUID: payload.videoUUID, - resolution: payload.resolution, - isPortraitMode: payload.isPortraitMode, - copyCodecs: payload.copyCodecs - } +async function createHlsJobIfEnabled (user: MUserId, payload: { + videoUUID: string + resolution: number + isPortraitMode?: boolean + copyCodecs: boolean +}) { + if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return + + const jobOptions = { + priority: JOB_PRIORITY.TRANSCODING.NEW_RESOLUTION + await getJobTranscodingPriorityMalus(user) + } - return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) + const hlsTranscodingPayload: HLSTranscodingPayload = { + type: 'new-resolution-to-hls', + videoUUID: payload.videoUUID, + resolution: payload.resolution, + isPortraitMode: payload.isPortraitMode, + copyCodecs: payload.copyCodecs } + + return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) } -function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution: number, isPortraitMode: boolean) { +async function createLowerResolutionsJobs ( + video: MVideoFullLight, + user: MUserId, + videoFileResolution: number, + isPortraitMode: boolean +) { // Create transcoding jobs if there are enabled resolutions const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') logger.info( @@ -244,7 +263,11 @@ function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution } } - JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) + const jobOptions = { + priority: JOB_PRIORITY.TRANSCODING.NEW_RESOLUTION + await getJobTranscodingPriorityMalus(user) + } + + JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }, jobOptions) } logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 5d0b797b0..38b1d6f1f 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -47,6 +47,7 @@ type CreateJobArgument = type CreateJobOptions = { delay?: number + priority?: number } const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { @@ -148,6 +149,7 @@ class JobQueue { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[obj.type], timeout: JOB_TTL[obj.type], + priority: options.priority, delay: options.delay } diff --git a/server/lib/video.ts b/server/lib/video.ts index d03ab0452..6b75fadb0 100644 --- a/server/lib/video.ts +++ b/server/lib/video.ts @@ -1,11 +1,13 @@ import { Transaction } from 'sequelize/types' +import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY } from '@server/initializers/constants' import { sequelizeTypescript } from '@server/initializers/database' import { TagModel } from '@server/models/video/tag' import { VideoModel } from '@server/models/video/video' import { FilteredModelAttributes } from '@server/types' -import { MTag, MThumbnail, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' -import { ThumbnailType, VideoCreate, VideoPrivacy } from '@shared/models' +import { MTag, MThumbnail, MUserId, MVideo, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' +import { ThumbnailType, VideoCreate, VideoPrivacy, VideoTranscodingPayload } from '@shared/models' import { federateVideoIfNeeded } from './activitypub/videos' +import { JobQueue } from './job-queue/job-queue' import { Notifier } from './notifier' import { createVideoMiniatureFromExisting } from './thumbnail' @@ -104,11 +106,47 @@ async function publishAndFederateIfNeeded (video: MVideoUUID, wasLive = false) { } } +async function addOptimizeOrMergeAudioJob (video: MVideo, videoFile: MVideoFile, user: MUserId) { + let dataInput: VideoTranscodingPayload + + if (videoFile.isAudio()) { + dataInput = { + type: 'merge-audio-to-webtorrent', + resolution: DEFAULT_AUDIO_RESOLUTION, + videoUUID: video.uuid, + isNewVideo: true + } + } else { + dataInput = { + type: 'optimize-to-webtorrent', + videoUUID: video.uuid, + isNewVideo: true + } + } + + const jobOptions = { + priority: JOB_PRIORITY.TRANSCODING.OPTIMIZER + await getJobTranscodingPriorityMalus(user) + } + + return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: dataInput }, jobOptions) +} + +async function getJobTranscodingPriorityMalus (user: MUserId) { + const now = new Date() + const lastWeek = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 7) + + const videoUploadedByUser = await VideoModel.countVideosUploadedByUserSince(user.id, lastWeek) + + return videoUploadedByUser +} + // --------------------------------------------------------------------------- export { buildLocalVideoFromReq, publishAndFederateIfNeeded, buildVideoThumbnailsFromReq, - setVideoTags + setVideoTags, + addOptimizeOrMergeAudioJob, + getJobTranscodingPriorityMalus } -- cgit v1.2.3