From 77d7e851dccf17dcc89e8fcc2db3f655d1e63f95 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 21 Jan 2021 16:57:21 +0100 Subject: Add priority to transcoding jobs (1 = highest priority) 100 for new resolutions 10 for original file optimization Add a malus for transcoding jobs depending on how many uploads the user did in the last 7 days --- server/controllers/api/jobs.ts | 1 + server/controllers/api/videos/index.ts | 7 +- server/helpers/database-utils.ts | 8 ++ server/helpers/video.ts | 28 +------ server/initializers/constants.ts | 7 ++ server/lib/job-queue/handlers/video-file-import.ts | 5 +- server/lib/job-queue/handlers/video-import.ts | 4 +- server/lib/job-queue/handlers/video-transcoding.ts | 91 ++++++++++++++-------- server/lib/job-queue/job-queue.ts | 2 + server/lib/video.ts | 44 ++++++++++- server/models/video/video.ts | 34 ++++++++ 11 files changed, 160 insertions(+), 71 deletions(-) (limited to 'server') diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts index 929140140..861cc22b9 100644 --- a/server/controllers/api/jobs.ts +++ b/server/controllers/api/jobs.ts @@ -69,6 +69,7 @@ async function formatJob (job: any, state?: JobState): Promise { type: job.queue.name as JobType, data: job.data, progress: await job.progress(), + priority: job.opts.priority, error, createdAt: new Date(job.timestamp), finishedOn: new Date(job.finishedOn), diff --git a/server/controllers/api/videos/index.ts b/server/controllers/api/videos/index.ts index e1c775180..c2c5eb640 100644 --- a/server/controllers/api/videos/index.ts +++ b/server/controllers/api/videos/index.ts @@ -2,16 +2,16 @@ import * as express from 'express' import { move } from 'fs-extra' import { extname } from 'path' import toInt from 'validator/lib/toInt' -import { addOptimizeOrMergeAudioJob } from '@server/helpers/video' import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' import { changeVideoChannelShare } from '@server/lib/activitypub/share' import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' import { LiveManager } from '@server/lib/live-manager' -import { buildLocalVideoFromReq, buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' +import { addOptimizeOrMergeAudioJob, buildLocalVideoFromReq, buildVideoThumbnailsFromReq, setVideoTags } from '@server/lib/video' import { getVideoFilePath } from '@server/lib/video-paths' import { getServerActor } from '@server/models/application/application' import { MVideoFullLight } from '@server/types/models' import { VideoCreate, VideoState, VideoUpdate } from '../../../../shared' +import { HttpStatusCode } from '../../../../shared/core-utils/miscs/http-error-codes' import { VideoFilter } from '../../../../shared/models/videos/video-query.type' import { auditLoggerFactory, getAuditIdFromRes, VideoAuditView } from '../../../helpers/audit-logger' import { resetSequelizeInstance } from '../../../helpers/database-utils' @@ -66,7 +66,6 @@ import { liveRouter } from './live' import { ownershipVideoRouter } from './ownership' import { rateVideoRouter } from './rate' import { watchingRouter } from './watching' -import { HttpStatusCode } from '../../../../shared/core-utils/miscs/http-error-codes' const auditLogger = auditLoggerFactory('videos') const videosRouter = express.Router() @@ -267,7 +266,7 @@ async function addVideo (req: express.Request, res: express.Response) { Notifier.Instance.notifyOnNewVideoIfNeeded(videoCreated) if (video.state === VideoState.TO_TRANSCODE) { - await addOptimizeOrMergeAudioJob(videoCreated, videoFile) + await addOptimizeOrMergeAudioJob(videoCreated, videoFile, res.locals.oauth.token.User) } Hooks.runAction('action:api.video.uploaded', { video: videoCreated }) diff --git a/server/helpers/database-utils.ts b/server/helpers/database-utils.ts index 87f10f913..2b916efc2 100644 --- a/server/helpers/database-utils.ts +++ b/server/helpers/database-utils.ts @@ -4,6 +4,14 @@ import { Model } from 'sequelize-typescript' import { logger } from './logger' import { Transaction } from 'sequelize' +function retryTransactionWrapper ( + functionToRetry: (arg1: A, arg2: B, arg3: C, arg4: D) => Promise | Bluebird, + arg1: A, + arg2: B, + arg3: C, + arg4: D, +): Promise + function retryTransactionWrapper ( functionToRetry: (arg1: A, arg2: B, arg3: C) => Promise | Bluebird, arg1: A, diff --git a/server/helpers/video.ts b/server/helpers/video.ts index bfd5a9627..7c510f474 100644 --- a/server/helpers/video.ts +++ b/server/helpers/video.ts @@ -1,20 +1,17 @@ import { Response } from 'express' import { CONFIG } from '@server/initializers/config' -import { DEFAULT_AUDIO_RESOLUTION } from '@server/initializers/constants' -import { JobQueue } from '@server/lib/job-queue' import { isStreamingPlaylist, MStreamingPlaylistVideo, MVideo, MVideoAccountLightBlacklistAllFiles, - MVideoFile, MVideoFullLight, MVideoIdThumbnail, MVideoImmutable, MVideoThumbnail, MVideoWithRights } from '@server/types/models' -import { VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' +import { VideoPrivacy, VideoState } from '@shared/models' import { VideoModel } from '../models/video/video' type VideoFetchType = 'all' | 'only-video' | 'only-video-with-rights' | 'id' | 'none' | 'only-immutable-attributes' @@ -69,27 +66,6 @@ function getVideoWithAttributes (res: Response) { return res.locals.videoAll || res.locals.onlyVideo || res.locals.onlyVideoWithRights } -function addOptimizeOrMergeAudioJob (video: MVideo, videoFile: MVideoFile) { - let dataInput: VideoTranscodingPayload - - if (videoFile.isAudio()) { - dataInput = { - type: 'merge-audio-to-webtorrent', - resolution: DEFAULT_AUDIO_RESOLUTION, - videoUUID: video.uuid, - isNewVideo: true - } - } else { - dataInput = { - type: 'optimize-to-webtorrent', - videoUUID: video.uuid, - isNewVideo: true - } - } - - return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: dataInput }) -} - function extractVideo (videoOrPlaylist: MVideo | MStreamingPlaylistVideo) { return isStreamingPlaylist(videoOrPlaylist) ? videoOrPlaylist.Video @@ -107,7 +83,6 @@ function isStateForFederation (state: VideoState) { const castedState = parseInt(state + '', 10) return castedState === VideoState.PUBLISHED || castedState === VideoState.WAITING_FOR_LIVE || castedState === VideoState.LIVE_ENDED - } function getPrivaciesForFederation () { @@ -130,7 +105,6 @@ export { fetchVideo, getVideoWithAttributes, fetchVideoByUrl, - addOptimizeOrMergeAudioJob, extractVideo, getExtFromMimetype, isStateForFederation, diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 182bdf9cc..453ca02ed 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -179,6 +179,12 @@ const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour } } +const JOB_PRIORITY = { + TRANSCODING: { + OPTIMIZER: 10, + NEW_RESOLUTION: 100 + } +} const BROADCAST_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-http-broadcast job const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...) @@ -851,6 +857,7 @@ export { VIDEO_STATES, QUEUE_CONCURRENCY, VIDEO_RATE_TYPES, + JOB_PRIORITY, VIDEO_TRANSCODING_FPS, FFMPEG_NICE, ABUSE_STATES, diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 22e4d0cf1..582efea3a 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts @@ -3,6 +3,7 @@ import { copy, stat } from 'fs-extra' import { extname } from 'path' import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' import { getVideoFilePath } from '@server/lib/video-paths' +import { UserModel } from '@server/models/account/user' import { MVideoFile, MVideoWithFile } from '@server/types/models' import { VideoFileImportPayload } from '@shared/models' import { getVideoFileFPS, getVideoFileResolution } from '../../../helpers/ffprobe-utils' @@ -24,7 +25,9 @@ async function processVideoFileImport (job: Bull.Job) { await updateVideoFile(video, payload.filePath) - await onNewWebTorrentFileResolution(video) + const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId) + await onNewWebTorrentFileResolution(video, user) + return video } diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index c1d1866b0..f61fd773a 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts @@ -1,10 +1,10 @@ import * as Bull from 'bull' import { move, remove, stat } from 'fs-extra' import { extname } from 'path' -import { addOptimizeOrMergeAudioJob } from '@server/helpers/video' import { isPostImportVideoAccepted } from '@server/lib/moderation' import { Hooks } from '@server/lib/plugins/hooks' import { isAbleToUploadVideo } from '@server/lib/user' +import { addOptimizeOrMergeAudioJob } from '@server/lib/video' import { getVideoFilePath } from '@server/lib/video-paths' import { MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' import { @@ -224,7 +224,7 @@ async function processFile (downloader: () => Promise, videoImport: MVid // Create transcoding jobs? if (video.state === VideoState.TO_TRANSCODE) { - await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile) + await addOptimizeOrMergeAudioJob(videoImportUpdated.Video, videoFile, videoImport.User) } } catch (err) { diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 0f6b3f753..ee241ad03 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts @@ -1,8 +1,9 @@ import * as Bull from 'bull' import { TranscodeOptionsType } from '@server/helpers/ffmpeg-utils' -import { publishAndFederateIfNeeded } from '@server/lib/video' +import { JOB_PRIORITY } from '@server/initializers/constants' +import { getJobTranscodingPriorityMalus, publishAndFederateIfNeeded } from '@server/lib/video' import { getVideoFilePath } from '@server/lib/video-paths' -import { MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' +import { MUser, MUserId, MVideoFullLight, MVideoUUID, MVideoWithFile } from '@server/types/models' import { HLSTranscodingPayload, MergeAudioTranscodingPayload, @@ -25,8 +26,11 @@ import { transcodeNewWebTorrentResolution } from '../../video-transcoding' import { JobQueue } from '../job-queue' +import { UserModel } from '@server/models/account/user' -const handlers: { [ id: string ]: (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight) => Promise } = { +type HandlerFunction = (job: Bull.Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise + +const handlers: { [ id: string ]: HandlerFunction } = { // Deprecated, introduced in 3.1 'hls': handleHLSJob, 'new-resolution-to-hls': handleHLSJob, @@ -55,13 +59,15 @@ async function processVideoTranscoding (job: Bull.Job) { return undefined } + const user = await UserModel.loadByChannelActorId(video.VideoChannel.actorId) + const handler = handlers[payload.type] if (!handler) { throw new Error('Cannot find transcoding handler for ' + payload.type) } - await handler(job, payload, video) + await handler(job, payload, video, user) return video } @@ -90,22 +96,27 @@ async function handleHLSJob (job: Bull.Job, payload: HLSTranscodingPayload, vide await retryTransactionWrapper(onHlsPlaylistGeneration, video) } -async function handleNewWebTorrentResolutionJob (job: Bull.Job, payload: NewResolutionTranscodingPayload, video: MVideoFullLight) { +async function handleNewWebTorrentResolutionJob ( + job: Bull.Job, + payload: NewResolutionTranscodingPayload, + video: MVideoFullLight, + user: MUserId +) { await transcodeNewWebTorrentResolution(video, payload.resolution, payload.isPortraitMode || false, job) - await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) + await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) } -async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight) { +async function handleWebTorrentMergeAudioJob (job: Bull.Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { await mergeAudioVideofile(video, payload.resolution, job) - await retryTransactionWrapper(onNewWebTorrentFileResolution, video, payload) + await retryTransactionWrapper(onNewWebTorrentFileResolution, video, user, payload) } -async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight) { +async function handleWebTorrentOptimizeJob (job: Bull.Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job) - await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType) + await retryTransactionWrapper(onVideoFileOptimizer, video, payload, transcodeType, user) } // --------------------------------------------------------------------------- @@ -129,7 +140,8 @@ async function onHlsPlaylistGeneration (video: MVideoFullLight) { async function onVideoFileOptimizer ( videoArg: MVideoWithFile, payload: OptimizeTranscodingPayload, - transcodeType: TranscodeOptionsType + transcodeType: TranscodeOptionsType, + user: MUserId ) { if (videoArg === undefined) return undefined @@ -142,13 +154,6 @@ async function onVideoFileOptimizer ( // Video does not exist anymore if (!videoDatabase) return undefined - // Create transcoding jobs if there are enabled resolutions - const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') - logger.info( - 'Resolutions computed for video %s and origin file resolution of %d.', videoDatabase.uuid, videoFileResolution, - { resolutions: resolutionsEnabled } - ) - let videoPublished = false // Generate HLS version of the original file @@ -158,9 +163,9 @@ async function onVideoFileOptimizer ( // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues copyCodecs: transcodeType !== 'quick-transcode' }) - createHlsJobIfEnabled(originalFileHLSPayload) + await createHlsJobIfEnabled(user, originalFileHLSPayload) - const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, videoFileResolution, isPortraitMode) + const hasNewResolutions = createLowerResolutionsJobs(videoDatabase, user, videoFileResolution, isPortraitMode) if (!hasNewResolutions) { // No transcoding to do, it's now published @@ -178,11 +183,12 @@ async function onVideoFileOptimizer ( async function onNewWebTorrentFileResolution ( video: MVideoUUID, + user: MUserId, payload?: NewResolutionTranscodingPayload | MergeAudioTranscodingPayload ) { await publishAndFederateIfNeeded(video) - createHlsJobIfEnabled(Object.assign({}, payload, { copyCodecs: true })) + await createHlsJobIfEnabled(user, Object.assign({}, payload, { copyCodecs: true })) } // --------------------------------------------------------------------------- @@ -194,22 +200,35 @@ export { // --------------------------------------------------------------------------- -function createHlsJobIfEnabled (payload: { videoUUID: string, resolution: number, isPortraitMode?: boolean, copyCodecs: boolean }) { - // Generate HLS playlist? - if (payload && CONFIG.TRANSCODING.HLS.ENABLED) { - const hlsTranscodingPayload: HLSTranscodingPayload = { - type: 'new-resolution-to-hls', - videoUUID: payload.videoUUID, - resolution: payload.resolution, - isPortraitMode: payload.isPortraitMode, - copyCodecs: payload.copyCodecs - } +async function createHlsJobIfEnabled (user: MUserId, payload: { + videoUUID: string + resolution: number + isPortraitMode?: boolean + copyCodecs: boolean +}) { + if (!payload || CONFIG.TRANSCODING.HLS.ENABLED !== true) return + + const jobOptions = { + priority: JOB_PRIORITY.TRANSCODING.NEW_RESOLUTION + await getJobTranscodingPriorityMalus(user) + } - return JobQueue.Instance.createJob({ type: 'video-transcoding', payload: hlsTranscodingPayload }) + const hlsTranscodingPayload: HLSTranscodingPayload = { + type: 'new-resolution-to-hls', + videoUUID: payload.videoUUID, + resolution: payload.resolution, + isPortraitMode: payload.isPortraitMode, + copyCodecs: payload.copyCodecs } + + return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: hlsTranscodingPayload }, jobOptions) } -function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution: number, isPortraitMode: boolean) { +async function createLowerResolutionsJobs ( + video: MVideoFullLight, + user: MUserId, + videoFileResolution: number, + isPortraitMode: boolean +) { // Create transcoding jobs if there are enabled resolutions const resolutionsEnabled = computeResolutionsToTranscode(videoFileResolution, 'vod') logger.info( @@ -244,7 +263,11 @@ function createLowerResolutionsJobs (video: MVideoFullLight, videoFileResolution } } - JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) + const jobOptions = { + priority: JOB_PRIORITY.TRANSCODING.NEW_RESOLUTION + await getJobTranscodingPriorityMalus(user) + } + + JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }, jobOptions) } logger.info('Transcoding jobs created for uuid %s.', video.uuid, { resolutionsEnabled }) diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 5d0b797b0..38b1d6f1f 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -47,6 +47,7 @@ type CreateJobArgument = type CreateJobOptions = { delay?: number + priority?: number } const handlers: { [id in JobType]: (job: Bull.Job) => Promise } = { @@ -148,6 +149,7 @@ class JobQueue { backoff: { delay: 60 * 1000, type: 'exponential' }, attempts: JOB_ATTEMPTS[obj.type], timeout: JOB_TTL[obj.type], + priority: options.priority, delay: options.delay } diff --git a/server/lib/video.ts b/server/lib/video.ts index d03ab0452..6b75fadb0 100644 --- a/server/lib/video.ts +++ b/server/lib/video.ts @@ -1,11 +1,13 @@ import { Transaction } from 'sequelize/types' +import { DEFAULT_AUDIO_RESOLUTION, JOB_PRIORITY } from '@server/initializers/constants' import { sequelizeTypescript } from '@server/initializers/database' import { TagModel } from '@server/models/video/tag' import { VideoModel } from '@server/models/video/video' import { FilteredModelAttributes } from '@server/types' -import { MTag, MThumbnail, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' -import { ThumbnailType, VideoCreate, VideoPrivacy } from '@shared/models' +import { MTag, MThumbnail, MUserId, MVideo, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' +import { ThumbnailType, VideoCreate, VideoPrivacy, VideoTranscodingPayload } from '@shared/models' import { federateVideoIfNeeded } from './activitypub/videos' +import { JobQueue } from './job-queue/job-queue' import { Notifier } from './notifier' import { createVideoMiniatureFromExisting } from './thumbnail' @@ -104,11 +106,47 @@ async function publishAndFederateIfNeeded (video: MVideoUUID, wasLive = false) { } } +async function addOptimizeOrMergeAudioJob (video: MVideo, videoFile: MVideoFile, user: MUserId) { + let dataInput: VideoTranscodingPayload + + if (videoFile.isAudio()) { + dataInput = { + type: 'merge-audio-to-webtorrent', + resolution: DEFAULT_AUDIO_RESOLUTION, + videoUUID: video.uuid, + isNewVideo: true + } + } else { + dataInput = { + type: 'optimize-to-webtorrent', + videoUUID: video.uuid, + isNewVideo: true + } + } + + const jobOptions = { + priority: JOB_PRIORITY.TRANSCODING.OPTIMIZER + await getJobTranscodingPriorityMalus(user) + } + + return JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: dataInput }, jobOptions) +} + +async function getJobTranscodingPriorityMalus (user: MUserId) { + const now = new Date() + const lastWeek = new Date(now.getFullYear(), now.getMonth(), now.getDate() - 7) + + const videoUploadedByUser = await VideoModel.countVideosUploadedByUserSince(user.id, lastWeek) + + return videoUploadedByUser +} + // --------------------------------------------------------------------------- export { buildLocalVideoFromReq, publishAndFederateIfNeeded, buildVideoThumbnailsFromReq, - setVideoTags + setVideoTags, + addOptimizeOrMergeAudioJob, + getJobTranscodingPriorityMalus } diff --git a/server/models/video/video.ts b/server/models/video/video.ts index 2bfa704ec..720bfd829 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts @@ -129,6 +129,7 @@ import { VideoShareModel } from './video-share' import { VideoStreamingPlaylistModel } from './video-streaming-playlist' import { VideoTagModel } from './video-tag' import { VideoViewModel } from './video-view' +import { UserModel } from '../account/user' export enum ScopeNames { AVAILABLE_FOR_LIST_IDS = 'AVAILABLE_FOR_LIST_IDS', @@ -1198,6 +1199,39 @@ export class VideoModel extends Model { return VideoModel.count(options) } + static countVideosUploadedByUserSince (userId: number, since: Date) { + const options = { + include: [ + { + model: VideoChannelModel.unscoped(), + required: true, + include: [ + { + model: AccountModel.unscoped(), + required: true, + include: [ + { + model: UserModel.unscoped(), + required: true, + where: { + id: userId + } + } + ] + } + ] + } + ], + where: { + createdAt: { + [Op.gte]: since + } + } + } + + return VideoModel.unscoped().count(options) + } + static countLivesOfAccount (accountId: number) { const options = { where: { -- cgit v1.2.3