diff options
author | Chocobozzz <me@florianbigard.com> | 2022-08-09 09:09:31 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-08-09 09:32:17 +0200 |
commit | b42c2c7e89a64ed730d8140840fe74a13c31f2a4 (patch) | |
tree | 715e7ad31d03881e3f3530dba1fe3d172251249b /server | |
parent | bd911b54b555b11df7e9849cf92d358bccfecf6e (diff) | |
download | PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.tar.gz PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.tar.zst PeerTube-b42c2c7e89a64ed730d8140840fe74a13c31f2a4.zip |
Avoid concurrency issue on transcoding
Diffstat (limited to 'server')
-rw-r--r-- | server/controllers/api/videos/transcoding.ts | 90 | ||||
-rw-r--r-- | server/lib/activitypub/videos/shared/video-sync-attributes.ts | 8 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-transcoding.ts | 7 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 27 | ||||
-rw-r--r-- | server/lib/video.ts | 8 | ||||
-rw-r--r-- | server/models/video/video.ts | 13 | ||||
-rw-r--r-- | server/tools/peertube-import-videos.ts | 6 |
7 files changed, 112 insertions, 47 deletions
diff --git a/server/controllers/api/videos/transcoding.ts b/server/controllers/api/videos/transcoding.ts index b2b71a870..9aca761c1 100644 --- a/server/controllers/api/videos/transcoding.ts +++ b/server/controllers/api/videos/transcoding.ts | |||
@@ -1,10 +1,12 @@ | |||
1 | import Bluebird from 'bluebird' | ||
1 | import express from 'express' | 2 | import express from 'express' |
2 | import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg' | 3 | import { computeResolutionsToTranscode } from '@server/helpers/ffmpeg' |
3 | import { logger, loggerTagsFactory } from '@server/helpers/logger' | 4 | import { logger, loggerTagsFactory } from '@server/helpers/logger' |
4 | import { addTranscodingJob } from '@server/lib/video' | 5 | import { JobQueue } from '@server/lib/job-queue' |
6 | import { Hooks } from '@server/lib/plugins/hooks' | ||
7 | import { buildTranscodingJob } from '@server/lib/video' | ||
5 | import { HttpStatusCode, UserRight, VideoState, VideoTranscodingCreate } from '@shared/models' | 8 | import { HttpStatusCode, UserRight, VideoState, VideoTranscodingCreate } from '@shared/models' |
6 | import { asyncMiddleware, authenticate, createTranscodingValidator, ensureUserHasRight } from '../../../middlewares' | 9 | import { asyncMiddleware, authenticate, createTranscodingValidator, ensureUserHasRight } from '../../../middlewares' |
7 | import { Hooks } from '@server/lib/plugins/hooks' | ||
8 | 10 | ||
9 | const lTags = loggerTagsFactory('api', 'video') | 11 | const lTags = loggerTagsFactory('api', 'video') |
10 | const transcodingRouter = express.Router() | 12 | const transcodingRouter = express.Router() |
@@ -44,29 +46,81 @@ async function createTranscoding (req: express.Request, res: express.Response) { | |||
44 | video.state = VideoState.TO_TRANSCODE | 46 | video.state = VideoState.TO_TRANSCODE |
45 | await video.save() | 47 | await video.save() |
46 | 48 | ||
47 | for (const resolution of resolutions) { | 49 | const hasAudio = !!audioStream |
50 | const childrenResolutions = resolutions.filter(r => r !== maxResolution) | ||
51 | |||
52 | const children = await Bluebird.mapSeries(childrenResolutions, resolution => { | ||
48 | if (body.transcodingType === 'hls') { | 53 | if (body.transcodingType === 'hls') { |
49 | await addTranscodingJob({ | 54 | return buildHLSJobOption({ |
50 | type: 'new-resolution-to-hls', | ||
51 | videoUUID: video.uuid, | 55 | videoUUID: video.uuid, |
56 | hasAudio, | ||
52 | resolution, | 57 | resolution, |
53 | hasAudio: !!audioStream, | 58 | isMaxQuality: false |
54 | copyCodecs: false, | ||
55 | isNewVideo: false, | ||
56 | autoDeleteWebTorrentIfNeeded: false, | ||
57 | isMaxQuality: maxResolution === resolution | ||
58 | }) | 59 | }) |
59 | } else if (body.transcodingType === 'webtorrent') { | 60 | } |
60 | await addTranscodingJob({ | 61 | |
61 | type: 'new-resolution-to-webtorrent', | 62 | if (body.transcodingType === 'webtorrent') { |
63 | return buildWebTorrentJobOption({ | ||
62 | videoUUID: video.uuid, | 64 | videoUUID: video.uuid, |
63 | isNewVideo: false, | 65 | hasAudio, |
64 | resolution, | 66 | resolution |
65 | hasAudio: !!audioStream, | ||
66 | createHLSIfNeeded: false | ||
67 | }) | 67 | }) |
68 | } | 68 | } |
69 | } | 69 | }) |
70 | |||
71 | const parent = body.transcodingType === 'hls' | ||
72 | ? await buildHLSJobOption({ | ||
73 | videoUUID: video.uuid, | ||
74 | hasAudio, | ||
75 | resolution: maxResolution, | ||
76 | isMaxQuality: false | ||
77 | }) | ||
78 | : await buildWebTorrentJobOption({ | ||
79 | videoUUID: video.uuid, | ||
80 | hasAudio, | ||
81 | resolution: maxResolution | ||
82 | }) | ||
83 | |||
84 | // Porcess the last resolution after the other ones to prevent concurrency issue | ||
85 | // Because low resolutions use the biggest one as ffmpeg input | ||
86 | await JobQueue.Instance.createJobWithChildren(parent, children) | ||
70 | 87 | ||
71 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) | 88 | return res.sendStatus(HttpStatusCode.NO_CONTENT_204) |
72 | } | 89 | } |
90 | |||
91 | function buildHLSJobOption (options: { | ||
92 | videoUUID: string | ||
93 | hasAudio: boolean | ||
94 | resolution: number | ||
95 | isMaxQuality: boolean | ||
96 | }) { | ||
97 | const { videoUUID, hasAudio, resolution, isMaxQuality } = options | ||
98 | |||
99 | return buildTranscodingJob({ | ||
100 | type: 'new-resolution-to-hls', | ||
101 | videoUUID, | ||
102 | resolution, | ||
103 | hasAudio, | ||
104 | copyCodecs: false, | ||
105 | isNewVideo: false, | ||
106 | autoDeleteWebTorrentIfNeeded: false, | ||
107 | isMaxQuality | ||
108 | }) | ||
109 | } | ||
110 | |||
111 | function buildWebTorrentJobOption (options: { | ||
112 | videoUUID: string | ||
113 | hasAudio: boolean | ||
114 | resolution: number | ||
115 | }) { | ||
116 | const { videoUUID, hasAudio, resolution } = options | ||
117 | |||
118 | return buildTranscodingJob({ | ||
119 | type: 'new-resolution-to-webtorrent', | ||
120 | videoUUID, | ||
121 | isNewVideo: false, | ||
122 | resolution, | ||
123 | hasAudio, | ||
124 | createHLSIfNeeded: false | ||
125 | }) | ||
126 | } | ||
diff --git a/server/lib/activitypub/videos/shared/video-sync-attributes.ts b/server/lib/activitypub/videos/shared/video-sync-attributes.ts index 8ed1b6447..e3cb96a62 100644 --- a/server/lib/activitypub/videos/shared/video-sync-attributes.ts +++ b/server/lib/activitypub/videos/shared/video-sync-attributes.ts | |||
@@ -73,10 +73,6 @@ async function getRatesCount (type: 'like' | 'dislike', video: MVideo, fetchedVi | |||
73 | return totalItems | 73 | return totalItems |
74 | } | 74 | } |
75 | 75 | ||
76 | function createJob (payload: ActivitypubHttpFetcherPayload) { | ||
77 | return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) | ||
78 | } | ||
79 | |||
80 | function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { | 76 | function syncShares (video: MVideo, fetchedVideo: VideoObject, isSync: boolean) { |
81 | const uri = fetchedVideo.shares | 77 | const uri = fetchedVideo.shares |
82 | 78 | ||
@@ -104,3 +100,7 @@ function syncComments (video: MVideo, fetchedVideo: VideoObject, isSync: boolean | |||
104 | return crawlCollectionPage<string>(uri, handler, cleaner) | 100 | return crawlCollectionPage<string>(uri, handler, cleaner) |
105 | .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err, rootUrl: uri, ...lTags(video.uuid, video.url) })) | 101 | .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err, rootUrl: uri, ...lTags(video.uuid, video.url) })) |
106 | } | 102 | } |
103 | |||
104 | function createJob (payload: ActivitypubHttpFetcherPayload) { | ||
105 | return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) | ||
106 | } | ||
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 8dbae8c42..cb2978157 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { Job } from 'bullmq' | 1 | import { Job } from 'bullmq' |
2 | import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' | 2 | import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' |
3 | import { Hooks } from '@server/lib/plugins/hooks' | 3 | import { Hooks } from '@server/lib/plugins/hooks' |
4 | import { addTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' | 4 | import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' |
5 | import { VideoPathManager } from '@server/lib/video-path-manager' | 5 | import { VideoPathManager } from '@server/lib/video-path-manager' |
6 | import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' | 6 | import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' |
7 | import { UserModel } from '@server/models/user/user' | 7 | import { UserModel } from '@server/models/user/user' |
@@ -27,6 +27,7 @@ import { | |||
27 | optimizeOriginalVideofile, | 27 | optimizeOriginalVideofile, |
28 | transcodeNewWebTorrentResolution | 28 | transcodeNewWebTorrentResolution |
29 | } from '../../transcoding/transcoding' | 29 | } from '../../transcoding/transcoding' |
30 | import { JobQueue } from '../job-queue' | ||
30 | 31 | ||
31 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> | 32 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> |
32 | 33 | ||
@@ -248,7 +249,7 @@ async function createHlsJobIfEnabled (user: MUserId, payload: { | |||
248 | ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) | 249 | ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) |
249 | } | 250 | } |
250 | 251 | ||
251 | await addTranscodingJob(hlsTranscodingPayload, jobOptions) | 252 | await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions)) |
252 | 253 | ||
253 | return true | 254 | return true |
254 | } | 255 | } |
@@ -312,7 +313,7 @@ async function createLowerResolutionsJobs (options: { | |||
312 | priority: await getTranscodingJobPriority(user) | 313 | priority: await getTranscodingJobPriority(user) |
313 | } | 314 | } |
314 | 315 | ||
315 | await addTranscodingJob(dataInput, jobOptions) | 316 | await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions)) |
316 | } | 317 | } |
317 | 318 | ||
318 | if (resolutionCreated.length === 0) { | 319 | if (resolutionCreated.length === 0) { |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 50d732beb..386d20103 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -325,10 +325,8 @@ class JobQueue { | |||
325 | if (!job) continue | 325 | if (!job) continue |
326 | 326 | ||
327 | lastJob = { | 327 | lastJob = { |
328 | name: 'job', | 328 | ...this.buildJobFlowOption(job), |
329 | data: job.payload, | 329 | |
330 | queueName: job.type, | ||
331 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])), | ||
332 | children: lastJob | 330 | children: lastJob |
333 | ? [ lastJob ] | 331 | ? [ lastJob ] |
334 | : [] | 332 | : [] |
@@ -338,6 +336,23 @@ class JobQueue { | |||
338 | return this.flowProducer.add(lastJob) | 336 | return this.flowProducer.add(lastJob) |
339 | } | 337 | } |
340 | 338 | ||
339 | async createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) { | ||
340 | return this.flowProducer.add({ | ||
341 | ...this.buildJobFlowOption(parent), | ||
342 | |||
343 | children: children.map(c => this.buildJobFlowOption(c)) | ||
344 | }) | ||
345 | } | ||
346 | |||
347 | private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) { | ||
348 | return { | ||
349 | name: 'job', | ||
350 | data: job.payload, | ||
351 | queueName: job.type, | ||
352 | opts: this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay' ])) | ||
353 | } | ||
354 | } | ||
355 | |||
341 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { | 356 | private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions { |
342 | return { | 357 | return { |
343 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 358 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
@@ -425,10 +440,6 @@ class JobQueue { | |||
425 | } | 440 | } |
426 | } | 441 | } |
427 | 442 | ||
428 | waitJob (job: Job) { | ||
429 | return job.waitUntilFinished(this.queueEvents[job.queueName]) | ||
430 | } | ||
431 | |||
432 | private addRepeatableJobs () { | 443 | private addRepeatableJobs () { |
433 | this.queues['videos-views-stats'].add('job', {}, { | 444 | this.queues['videos-views-stats'].add('job', {}, { |
434 | repeat: REPEAT_JOBS['videos-views-stats'] | 445 | repeat: REPEAT_JOBS['videos-views-stats'] |
diff --git a/server/lib/video.ts b/server/lib/video.ts index f7d7aa186..6c4f3ce7b 100644 --- a/server/lib/video.ts +++ b/server/lib/video.ts | |||
@@ -9,7 +9,7 @@ import { VideoJobInfoModel } from '@server/models/video/video-job-info' | |||
9 | import { FilteredModelAttributes } from '@server/types' | 9 | import { FilteredModelAttributes } from '@server/types' |
10 | import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' | 10 | import { MThumbnail, MUserId, MVideoFile, MVideoTag, MVideoThumbnail, MVideoUUID } from '@server/types/models' |
11 | import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' | 11 | import { ThumbnailType, VideoCreate, VideoPrivacy, VideoState, VideoTranscodingPayload } from '@shared/models' |
12 | import { CreateJobOptions, JobQueue } from './job-queue/job-queue' | 12 | import { CreateJobOptions } from './job-queue/job-queue' |
13 | import { updateVideoMiniatureFromExisting } from './thumbnail' | 13 | import { updateVideoMiniatureFromExisting } from './thumbnail' |
14 | 14 | ||
15 | function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> { | 15 | function buildLocalVideoFromReq (videoInfo: VideoCreate, channelId: number): FilteredModelAttributes<VideoModel> { |
@@ -121,10 +121,10 @@ async function buildOptimizeOrMergeAudioJob (options: { | |||
121 | } | 121 | } |
122 | } | 122 | } |
123 | 123 | ||
124 | async function addTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { | 124 | async function buildTranscodingJob (payload: VideoTranscodingPayload, options: CreateJobOptions = {}) { |
125 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') | 125 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') |
126 | 126 | ||
127 | return JobQueue.Instance.createJob({ type: 'video-transcoding', payload, ...options }) | 127 | return { type: 'video-transcoding' as 'video-transcoding', payload, ...options } |
128 | } | 128 | } |
129 | 129 | ||
130 | async function getTranscodingJobPriority (user: MUserId) { | 130 | async function getTranscodingJobPriority (user: MUserId) { |
@@ -182,7 +182,7 @@ export { | |||
182 | buildVideoThumbnailsFromReq, | 182 | buildVideoThumbnailsFromReq, |
183 | setVideoTags, | 183 | setVideoTags, |
184 | buildOptimizeOrMergeAudioJob, | 184 | buildOptimizeOrMergeAudioJob, |
185 | addTranscodingJob, | 185 | buildTranscodingJob, |
186 | buildMoveToObjectStorageJob, | 186 | buildMoveToObjectStorageJob, |
187 | getTranscodingJobPriority, | 187 | getTranscodingJobPriority, |
188 | getCachedVideoDuration | 188 | getCachedVideoDuration |
diff --git a/server/models/video/video.ts b/server/models/video/video.ts index b8e383502..a8ea67c39 100644 --- a/server/models/video/video.ts +++ b/server/models/video/video.ts | |||
@@ -1592,22 +1592,21 @@ export class VideoModel extends Model<Partial<AttributesOnly<VideoModel>>> { | |||
1592 | } | 1592 | } |
1593 | 1593 | ||
1594 | getQualityFileBy<T extends MVideoWithFile> (this: T, fun: (files: MVideoFile[], it: (file: MVideoFile) => number) => MVideoFile) { | 1594 | getQualityFileBy<T extends MVideoWithFile> (this: T, fun: (files: MVideoFile[], it: (file: MVideoFile) => number) => MVideoFile) { |
1595 | // We first transcode to WebTorrent format, so try this array first | 1595 | const files = this.getAllFiles() |
1596 | if (Array.isArray(this.VideoFiles) && this.VideoFiles.length !== 0) { | 1596 | const file = fun(files, file => file.resolution) |
1597 | const file = fun(this.VideoFiles, file => file.resolution) | 1597 | if (!file) return undefined |
1598 | 1598 | ||
1599 | if (file.videoId) { | ||
1599 | return Object.assign(file, { Video: this }) | 1600 | return Object.assign(file, { Video: this }) |
1600 | } | 1601 | } |
1601 | 1602 | ||
1602 | // No webtorrent files, try with streaming playlist files | 1603 | if (file.videoStreamingPlaylistId) { |
1603 | if (Array.isArray(this.VideoStreamingPlaylists) && this.VideoStreamingPlaylists.length !== 0) { | ||
1604 | const streamingPlaylistWithVideo = Object.assign(this.VideoStreamingPlaylists[0], { Video: this }) | 1604 | const streamingPlaylistWithVideo = Object.assign(this.VideoStreamingPlaylists[0], { Video: this }) |
1605 | 1605 | ||
1606 | const file = fun(streamingPlaylistWithVideo.VideoFiles, file => file.resolution) | ||
1607 | return Object.assign(file, { VideoStreamingPlaylist: streamingPlaylistWithVideo }) | 1606 | return Object.assign(file, { VideoStreamingPlaylist: streamingPlaylistWithVideo }) |
1608 | } | 1607 | } |
1609 | 1608 | ||
1610 | return undefined | 1609 | throw new Error('File is not associated to a video of a playlist') |
1611 | } | 1610 | } |
1612 | 1611 | ||
1613 | getMaxQualityFile<T extends MVideoWithFile> (this: T): MVideoFileVideo | MVideoFileStreamingPlaylistVideo { | 1612 | getMaxQualityFile<T extends MVideoWithFile> (this: T): MVideoFileVideo | MVideoFileStreamingPlaylistVideo { |
diff --git a/server/tools/peertube-import-videos.ts b/server/tools/peertube-import-videos.ts index e2f80c703..76338ea3c 100644 --- a/server/tools/peertube-import-videos.ts +++ b/server/tools/peertube-import-videos.ts | |||
@@ -165,7 +165,7 @@ async function processVideo (parameters: { | |||
165 | const youtubeDLBinary = await YoutubeDLCLI.safeGet() | 165 | const youtubeDLBinary = await YoutubeDLCLI.safeGet() |
166 | const output = await youtubeDLBinary.download({ | 166 | const output = await youtubeDLBinary.download({ |
167 | url: videoInfo.url, | 167 | url: videoInfo.url, |
168 | format: YoutubeDLCLI.getYoutubeDLVideoFormat([]), | 168 | format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false), |
169 | output: path, | 169 | output: path, |
170 | additionalYoutubeDLArgs: command.args, | 170 | additionalYoutubeDLArgs: command.args, |
171 | processOptions | 171 | processOptions |
@@ -251,7 +251,7 @@ async function fetchObject (info: any) { | |||
251 | const youtubeDLCLI = await YoutubeDLCLI.safeGet() | 251 | const youtubeDLCLI = await YoutubeDLCLI.safeGet() |
252 | const result = await youtubeDLCLI.getInfo({ | 252 | const result = await youtubeDLCLI.getInfo({ |
253 | url, | 253 | url, |
254 | format: YoutubeDLCLI.getYoutubeDLVideoFormat([]), | 254 | format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false), |
255 | processOptions | 255 | processOptions |
256 | }) | 256 | }) |
257 | 257 | ||
@@ -336,7 +336,7 @@ function exitError (message: string, ...meta: any[]) { | |||
336 | function getYoutubeDLInfo (youtubeDLCLI: YoutubeDLCLI, url: string, args: string[]) { | 336 | function getYoutubeDLInfo (youtubeDLCLI: YoutubeDLCLI, url: string, args: string[]) { |
337 | return youtubeDLCLI.getInfo({ | 337 | return youtubeDLCLI.getInfo({ |
338 | url, | 338 | url, |
339 | format: YoutubeDLCLI.getYoutubeDLVideoFormat([]), | 339 | format: YoutubeDLCLI.getYoutubeDLVideoFormat([], false), |
340 | additionalYoutubeDLArgs: [ '-j', '--flat-playlist', '--playlist-reverse', ...args ], | 340 | additionalYoutubeDLArgs: [ '-j', '--flat-playlist', '--playlist-reverse', ...args ], |
341 | processOptions | 341 | processOptions |
342 | }) | 342 | }) |