import { isUUIDValid, toCompleteUUID } from '@server/helpers/custom-validators/misc'
import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg'
import { CONFIG } from '@server/initializers/config'
-import { addTranscodingJob } from '@server/lib/video'
+import { buildTranscodingJob } from '@server/lib/video'
import { VideoState, VideoTranscodingPayload } from '@shared/models'
import { initDatabaseModels } from '../server/initializers/database'
import { JobQueue } from '../server/lib/job-queue'
for (const resolution of resolutionsEnabled) {
dataInput.push({
- type: 'new-resolution-to-hls',
+ type: 'new-resolution-to-hls' as 'new-resolution-to-hls',
videoUUID: video.uuid,
resolution,
} else {
if (options.resolution !== undefined) {
dataInput.push({
- type: 'new-resolution-to-webtorrent',
+ type: 'new-resolution-to-webtorrent' as 'new-resolution-to-webtorrent',
videoUUID: video.uuid,
createHLSIfNeeded: true,
}
dataInput.push({
- type: 'optimize-to-webtorrent',
+ type: 'optimize-to-webtorrent' as 'optimize-to-webtorrent',
videoUUID: video.uuid,
isNewVideo: false
})
await video.save()
for (const d of dataInput) {
- await addTranscodingJob(d, {})
+ await JobQueue.Instance.createJob(await buildTranscodingJob(d))
+
console.log('Transcoding job for video %s created.', video.uuid)
}
}
+import Bluebird from 'bluebird'
import express from 'express'
import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg'
import { logger, loggerTagsFactory } from '@server/helpers/logger'
-import { addTranscodingJob } from '@server/lib/video'
+import { JobQueue } from '@server/lib/job-queue'
+import { Hooks } from '@server/lib/plugins/hooks'
+import { buildTranscodingJob } from '@server/lib/video'
import { HttpStatusCode, UserRight, VideoState, VideoTranscodingCreate } from '@shared/models'
import { asyncMiddleware, authenticate, createTranscodingValidator, ensureUserHasRight } from '../../../middlewares'
-import { Hooks } from '@server/lib/plugins/hooks'
const lTags = loggerTagsFactory('api', 'video')
const transcodingRouter = express.Router()
video.state = VideoState.TO_TRANSCODE
await video.save()
- for (const resolution of resolutions) {
+ const hasAudio = !!audioStream
+ const childrenResolutions = resolutions.filter(r => r !== maxResolution)
+
+ const children = await Bluebird.mapSeries(childrenResolutions, resolution => {
if (body.transcodingType === 'hls') {
- await addTranscodingJob({
- type: 'new-resolution-to-hls',
+ return buildHLSJobOption({
videoUUID: video.uuid,
+ hasAudio,
resolution,
- hasAudio: !!audioStream,
- copyCodecs: false,
- isNewVideo: false,
- autoDeleteWebTorrentIfNeeded: false,
- isMaxQuality: maxResolution === resolution
+ isMaxQuality: false
})
- } else if (body.transcodingType === 'webtorrent') {
- await addTranscodingJob({
- type: 'new-resolution-to-webtorrent',
+ }
+
+ if (body.transcodingType === 'webtorrent') {
+ return buildWebTorrentJobOption({
videoUUID: video.uuid,
- isNewVideo: false,
- resolution,
- hasAudio: !!audioStream,
- createHLSIfNeeded: false
+ hasAudio,
+ resolution
})
}
- }
+ })
+
+ const parent = body.transcodingType === 'hls'
+ ? await buildHLSJobOption({
+ videoUUID: video.uuid,
+ hasAudio,
+ resolution: maxResolution,
+ isMaxQuality: false
+ })
+ : await buildWebTorrentJobOption({
+ videoUUID: video.uuid,
+ hasAudio,
+ resolution: maxResolution
+ })
+
+ // Porcess the last resolution after the other ones to prevent concurrency issue
+ // Because low resolutions use the biggest one as ffmpeg input
+ await JobQueue.Instance.createJobWithChildren(parent, children)
return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
}
+
+function buildHLSJobOption (options: {
+ videoUUID: string
+ hasAudio: boolean
+ resolution: number
+ isMaxQuality: boolean
+}) {
+ const { videoUUID, hasAudio, resolution, isMaxQuality } = options
+
+ return buildTranscodingJob({
+ type: 'new-resolution-to-hls',
+ videoUUID,
+ resolution,
+ hasAudio,
+ copyCodecs: false,
+ isNewVideo: false,
+ autoDeleteWebTorrentIfNeeded: false,
+ isMaxQuality
+ })
+}
+
+function buildWebTorrentJobOption (options: {
+ videoUUID: string
+ hasAudio: boolean
+ resolution: number
+}) {
+ const { videoUUID, hasAudio, resolution } = options
+
+ return buildTranscodingJob({
+ type: 'new-resolution-to-webtorrent',
+ videoUUID,
+ isNewVideo: false,
+ resolution,
+ hasAudio,
+ createHLSIfNeeded: false
+ })
+}
return totalItems
}
-function createJob (payload: ActivitypubHttpFetcherPayload) {
- return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
-}
-
function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) {
const uri = fetchedVideo.shares
return crawlCollectionPage<string>(uri, handler, cleaner)
.catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err, rootUrl: uri, ...lTags(video.uuid, video.url) }))
}
+
+function createJob (payload: ActivitypubHttpFetcherPayload) {
+ return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
+}
import { Job } from 'bullmq'
import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg'
import { Hooks } from '@server/lib/plugins/hooks'
-import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
+import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video'
import { VideoPathManager } from '@server/lib/video-path-manager'
import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state'
import { UserModel } from '@server/models/user/user'
optimizeOriginalVideofile,
transcodeNewWebTorrentResolution
} from '../../transcoding/transcoding'
+import { JobQueue } from '../job-queue'
type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void>
...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ])
}
- await addTranscodingJob(hlsTranscodingPayload, jobOptions)
+ await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions))
return true
}
priority: await getTranscodingJobPriority(user)
}
- await addTranscodingJob(dataInput, jobOptions)
+ await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions))
}
if (resolutionCreated.length === 0) {
if (!job) continue
lastJob = {
- name: 'job',
- data: job.payload,
- queueName: job.type,
- opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])),
+ ...this.buildJobFlowOption(job),
+
children: lastJob
? [ lastJob ]
: []
return this.flowProducer.add(lastJob)
}
+ async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
+ return this.flowProducer.add({
+ ...this.buildJobFlowOption(parent),
+
+ children: children.map(c => this.buildJobFlowOption(c))
+ })
+ }
+
+ private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) {
+ return {
+ name: 'job',
+ data: job.payload,
+ queueName: job.type,
+ opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ]))
+ }
+ }
+
private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
return {
backoff: { delay: 60 * 1000, type: 'exponential' },
}
}
- waitJob (job: Job) {
- return job.waitUntilFinished(this.queueEvents[job.queueName])
- }
-
private addRepeatableJobs () {
this.queues['videos-views-stats'].add('job', {}, {
repeat: REPEAT_JOBS['videos-views-stats']
import { FilteredModelAttributes } from '@server/types'
import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models'
import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models'
-import { CreateJobOptions, JobQueue } from './job-queue/job-queue'
+import { CreateJobOptions } from './job-queue/job-queue'
import { updateVideoMiniatureFromExisting } from './thumbnail'
function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> {
}
}
-async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) {
+async function buildTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) {
await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode')
- return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options })
+ return { type: 'video-transcoding' as 'video-transcoding', payload, ...options }
}
async function getTranscodingJobPriority (user: MUserId) {
buildVideoThumbnailsFromReq,
setVideoTags,
buildOptimizeOrMergeAudioJob,
- addTranscodingJob,
+ buildTranscodingJob,
buildMoveToObjectStorageJob,
getTranscodingJobPriority,
getCachedVideoDuration
}
getQualityFileBy<T extends MVideoWithFile> (this: T, fun: (files: MVideoFile[], it: (file: MVideoFile) => number) => MVideoFile) {
- // We first transcode to WebTorrent format, so try this array first
- if (Array.isArray(this.VideoFiles) && this.VideoFiles.length !== 0) {
- const file = fun(this.VideoFiles, file => file.resolution)
+ const files = this.getAllFiles()
+ const file = fun(files, file => file.resolution)
+ if (!file) return undefined
+ if (file.videoId) {
return Object.assign(file, { Video: this })
}
- // No webtorrent files, try with streaming playlist files
- if (Array.isArray(this.VideoStreamingPlaylists) && this.VideoStreamingPlaylists.length !== 0) {
+ if (file.videoStreamingPlaylistId) {
const streamingPlaylistWithVideo = Object.assign(this.VideoStreamingPlaylists[0], { Video: this })
- const file = fun(streamingPlaylistWithVideo.VideoFiles, file => file.resolution)
return Object.assign(file, { VideoStreamingPlaylist: streamingPlaylistWithVideo })
}
- return undefined
+ throw new Error('File is not associated to a video of a playlist')
}
getMaxQualityFile<T extends MVideoWithFile> (this: T): MVideoFileVideo | MVideoFileStreamingPlaylistVideo {
const youtubeDLBinary = await YoutubeDLCLI.safeGet()
const output = await youtubeDLBinary.download({
url: videoInfo.url,
- format: YoutubeDLCLI.getYoutubeDLVideoFormat([]),
+ format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false),
output: path,
additionalYoutubeDLArgs: command.args,
processOptions
const youtubeDLCLI = await YoutubeDLCLI.safeGet()
const result = await youtubeDLCLI.getInfo({
url,
- format: YoutubeDLCLI.getYoutubeDLVideoFormat([]),
+ format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false),
processOptions
})
function getYoutubeDLInfo (youtubeDLCLI: YoutubeDLCLI, url: string, args: string[]) {
return youtubeDLCLI.getInfo({
url,
- format: YoutubeDLCLI.getYoutubeDLVideoFormat([]),
+ format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false),
additionalYoutubeDLArgs: [ '-j', '--flat-playlist', '--playlist-reverse', ...args ],
processOptions
})