diff options
author | Chocobozzz <me@florianbigard.com> | 2023-04-21 14:55:10 +0200 |
---|---|---|
committer | Chocobozzz <chocobozzz@cpy.re> | 2023-05-09 08:57:34 +0200 |
commit | 0c9668f77901e7540e2c7045eb0f2974a4842a69 (patch) | |
tree | 226d3dd1565b0bb56588897af3b8530e6216e96b /server/lib/job-queue | |
parent | 6bcb854cdea8688a32240bc5719c7d139806e00b (diff) | |
download | PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.gz PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.tar.zst PeerTube-0c9668f77901e7540e2c7045eb0f2974a4842a69.zip |
Implement remote runner jobs in server
Move ffmpeg functions to @shared
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/transcoding-job-builder.ts | 47 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-file-import.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-import.ts | 12 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-live-ending.ts | 10 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-studio-edition.ts | 68 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-transcoding.ts | 282 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 63 |
7 files changed, 169 insertions, 315 deletions
diff --git a/server/lib/job-queue/handlers/transcoding-job-builder.ts b/server/lib/job-queue/handlers/transcoding-job-builder.ts new file mode 100644 index 000000000..8b4a877d7 --- /dev/null +++ b/server/lib/job-queue/handlers/transcoding-job-builder.ts | |||
@@ -0,0 +1,47 @@ | |||
1 | import { Job } from 'bullmq' | ||
2 | import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' | ||
3 | import { UserModel } from '@server/models/user/user' | ||
4 | import { VideoModel } from '@server/models/video/video' | ||
5 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | ||
6 | import { pick } from '@shared/core-utils' | ||
7 | import { TranscodingJobBuilderPayload } from '@shared/models' | ||
8 | import { logger } from '../../../helpers/logger' | ||
9 | import { JobQueue } from '../job-queue' | ||
10 | |||
11 | async function processTranscodingJobBuilder (job: Job) { | ||
12 | const payload = job.data as TranscodingJobBuilderPayload | ||
13 | |||
14 | logger.info('Processing transcoding job builder in job %s.', job.id) | ||
15 | |||
16 | if (payload.optimizeJob) { | ||
17 | const video = await VideoModel.loadFull(payload.videoUUID) | ||
18 | const user = await UserModel.loadByVideoId(video.id) | ||
19 | const videoFile = video.getMaxQualityFile() | ||
20 | |||
21 | await createOptimizeOrMergeAudioJobs({ | ||
22 | ...pick(payload.optimizeJob, [ 'isNewVideo' ]), | ||
23 | |||
24 | video, | ||
25 | videoFile, | ||
26 | user | ||
27 | }) | ||
28 | } | ||
29 | |||
30 | for (const job of (payload.jobs || [])) { | ||
31 | await JobQueue.Instance.createJob(job) | ||
32 | |||
33 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode') | ||
34 | } | ||
35 | |||
36 | for (const sequentialJobs of (payload.sequentialJobs || [])) { | ||
37 | await JobQueue.Instance.createSequentialJobFlow(...sequentialJobs) | ||
38 | |||
39 | await VideoJobInfoModel.increaseOrCreate(payload.videoUUID, 'pendingTranscode', sequentialJobs.length) | ||
40 | } | ||
41 | } | ||
42 | |||
43 | // --------------------------------------------------------------------------- | ||
44 | |||
45 | export { | ||
46 | processTranscodingJobBuilder | ||
47 | } | ||
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index d950f6407..9a4550e4d 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -10,8 +10,8 @@ import { VideoModel } from '@server/models/video/video' | |||
10 | import { VideoFileModel } from '@server/models/video/video-file' | 10 | import { VideoFileModel } from '@server/models/video/video-file' |
11 | import { MVideoFullLight } from '@server/types/models' | 11 | import { MVideoFullLight } from '@server/types/models' |
12 | import { getLowercaseExtension } from '@shared/core-utils' | 12 | import { getLowercaseExtension } from '@shared/core-utils' |
13 | import { getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg' | ||
13 | import { VideoFileImportPayload, VideoStorage } from '@shared/models' | 14 | import { VideoFileImportPayload, VideoStorage } from '@shared/models' |
14 | import { getVideoStreamFPS, getVideoStreamDimensionsInfo } from '../../../helpers/ffmpeg' | ||
15 | import { logger } from '../../../helpers/logger' | 15 | import { logger } from '../../../helpers/logger' |
16 | import { JobQueue } from '../job-queue' | 16 | import { JobQueue } from '../job-queue' |
17 | 17 | ||
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 4d361c7b9..2a063282c 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -7,15 +7,16 @@ import { isPostImportVideoAccepted } from '@server/lib/moderation' | |||
7 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' | 7 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' |
8 | import { Hooks } from '@server/lib/plugins/hooks' | 8 | import { Hooks } from '@server/lib/plugins/hooks' |
9 | import { ServerConfigManager } from '@server/lib/server-config-manager' | 9 | import { ServerConfigManager } from '@server/lib/server-config-manager' |
10 | import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' | ||
10 | import { isAbleToUploadVideo } from '@server/lib/user' | 11 | import { isAbleToUploadVideo } from '@server/lib/user' |
11 | import { buildMoveToObjectStorageJob, buildOptimizeOrMergeAudioJob } from '@server/lib/video' | 12 | import { buildMoveToObjectStorageJob } from '@server/lib/video' |
12 | import { VideoPathManager } from '@server/lib/video-path-manager' | 13 | import { VideoPathManager } from '@server/lib/video-path-manager' |
13 | import { buildNextVideoState } from '@server/lib/video-state' | 14 | import { buildNextVideoState } from '@server/lib/video-state' |
14 | import { ThumbnailModel } from '@server/models/video/thumbnail' | 15 | import { ThumbnailModel } from '@server/models/video/thumbnail' |
15 | import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' | 16 | import { MUserId, MVideoFile, MVideoFullLight } from '@server/types/models' |
16 | import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' | 17 | import { MVideoImport, MVideoImportDefault, MVideoImportDefaultFiles, MVideoImportVideo } from '@server/types/models/video/video-import' |
17 | import { getLowercaseExtension } from '@shared/core-utils' | 18 | import { getLowercaseExtension } from '@shared/core-utils' |
18 | import { isAudioFile } from '@shared/extra-utils' | 19 | import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS, isAudioFile } from '@shared/ffmpeg' |
19 | import { | 20 | import { |
20 | ThumbnailType, | 21 | ThumbnailType, |
21 | VideoImportPayload, | 22 | VideoImportPayload, |
@@ -28,7 +29,6 @@ import { | |||
28 | VideoResolution, | 29 | VideoResolution, |
29 | VideoState | 30 | VideoState |
30 | } from '@shared/models' | 31 | } from '@shared/models' |
31 | import { ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS } from '../../../helpers/ffmpeg' | ||
32 | import { logger } from '../../../helpers/logger' | 32 | import { logger } from '../../../helpers/logger' |
33 | import { getSecureTorrentName } from '../../../helpers/utils' | 33 | import { getSecureTorrentName } from '../../../helpers/utils' |
34 | import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' | 34 | import { createTorrentAndSetInfoHash, downloadWebTorrentVideo } from '../../../helpers/webtorrent' |
@@ -137,7 +137,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
137 | 137 | ||
138 | const { resolution } = await isAudioFile(tempVideoPath, probe) | 138 | const { resolution } = await isAudioFile(tempVideoPath, probe) |
139 | ? { resolution: VideoResolution.H_NOVIDEO } | 139 | ? { resolution: VideoResolution.H_NOVIDEO } |
140 | : await getVideoStreamDimensionsInfo(tempVideoPath) | 140 | : await getVideoStreamDimensionsInfo(tempVideoPath, probe) |
141 | 141 | ||
142 | const fps = await getVideoStreamFPS(tempVideoPath, probe) | 142 | const fps = await getVideoStreamFPS(tempVideoPath, probe) |
143 | const duration = await getVideoStreamDuration(tempVideoPath, probe) | 143 | const duration = await getVideoStreamDuration(tempVideoPath, probe) |
@@ -313,9 +313,7 @@ async function afterImportSuccess (options: { | |||
313 | } | 313 | } |
314 | 314 | ||
315 | if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs? | 315 | if (video.state === VideoState.TO_TRANSCODE) { // Create transcoding jobs? |
316 | await JobQueue.Instance.createJob( | 316 | await createOptimizeOrMergeAudioJobs({ video, videoFile, isNewVideo: true, user }) |
317 | await buildOptimizeOrMergeAudioJob({ video, videoFile, user }) | ||
318 | ) | ||
319 | } | 317 | } |
320 | } | 318 | } |
321 | 319 | ||
diff --git a/server/lib/job-queue/handlers/video-live-ending.ts b/server/lib/job-queue/handlers/video-live-ending.ts index 2f3a971bd..1bf43f592 100644 --- a/server/lib/job-queue/handlers/video-live-ending.ts +++ b/server/lib/job-queue/handlers/video-live-ending.ts | |||
@@ -1,25 +1,25 @@ | |||
1 | import { Job } from 'bullmq' | 1 | import { Job } from 'bullmq' |
2 | import { readdir, remove } from 'fs-extra' | 2 | import { readdir, remove } from 'fs-extra' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo } from '@server/helpers/ffmpeg' | ||
5 | import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' | 4 | import { getLocalVideoActivityPubUrl } from '@server/lib/activitypub/url' |
6 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | 5 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' |
7 | import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' | 6 | import { cleanupAndDestroyPermanentLive, cleanupTMPLiveFiles, cleanupUnsavedNormalLive } from '@server/lib/live' |
8 | import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' | 7 | import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '@server/lib/paths' |
9 | import { generateVideoMiniature } from '@server/lib/thumbnail' | 8 | import { generateVideoMiniature } from '@server/lib/thumbnail' |
10 | import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/transcoding' | 9 | import { generateHlsPlaylistResolutionFromTS } from '@server/lib/transcoding/hls-transcoding' |
10 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
11 | import { moveToNextState } from '@server/lib/video-state' | 11 | import { moveToNextState } from '@server/lib/video-state' |
12 | import { VideoModel } from '@server/models/video/video' | 12 | import { VideoModel } from '@server/models/video/video' |
13 | import { VideoBlacklistModel } from '@server/models/video/video-blacklist' | 13 | import { VideoBlacklistModel } from '@server/models/video/video-blacklist' |
14 | import { VideoFileModel } from '@server/models/video/video-file' | 14 | import { VideoFileModel } from '@server/models/video/video-file' |
15 | import { VideoLiveModel } from '@server/models/video/video-live' | 15 | import { VideoLiveModel } from '@server/models/video/video-live' |
16 | import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' | ||
16 | import { VideoLiveSessionModel } from '@server/models/video/video-live-session' | 17 | import { VideoLiveSessionModel } from '@server/models/video/video-live-session' |
17 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' | 18 | import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist' |
18 | import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' | 19 | import { MVideo, MVideoLive, MVideoLiveSession, MVideoWithAllFiles } from '@server/types/models' |
20 | import { ffprobePromise, getAudioStream, getVideoStreamDimensionsInfo, getVideoStreamFPS } from '@shared/ffmpeg' | ||
19 | import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' | 21 | import { ThumbnailType, VideoLiveEndingPayload, VideoState } from '@shared/models' |
20 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 22 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
21 | import { VideoPathManager } from '@server/lib/video-path-manager' | ||
22 | import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting' | ||
23 | 23 | ||
24 | const lTags = loggerTagsFactory('live', 'job') | 24 | const lTags = loggerTagsFactory('live', 'job') |
25 | 25 | ||
@@ -224,6 +224,7 @@ async function assignReplayFilesToVideo (options: { | |||
224 | const probe = await ffprobePromise(concatenatedTsFilePath) | 224 | const probe = await ffprobePromise(concatenatedTsFilePath) |
225 | const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe) | 225 | const { audioStream } = await getAudioStream(concatenatedTsFilePath, probe) |
226 | const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) | 226 | const { resolution } = await getVideoStreamDimensionsInfo(concatenatedTsFilePath, probe) |
227 | const fps = await getVideoStreamFPS(concatenatedTsFilePath, probe) | ||
227 | 228 | ||
228 | try { | 229 | try { |
229 | await generateHlsPlaylistResolutionFromTS({ | 230 | await generateHlsPlaylistResolutionFromTS({ |
@@ -231,6 +232,7 @@ async function assignReplayFilesToVideo (options: { | |||
231 | inputFileMutexReleaser, | 232 | inputFileMutexReleaser, |
232 | concatenatedTsFilePath, | 233 | concatenatedTsFilePath, |
233 | resolution, | 234 | resolution, |
235 | fps, | ||
234 | isAAC: audioStream?.codec_name === 'aac' | 236 | isAAC: audioStream?.codec_name === 'aac' |
235 | }) | 237 | }) |
236 | } catch (err) { | 238 | } catch (err) { |
diff --git a/server/lib/job-queue/handlers/video-studio-edition.ts b/server/lib/job-queue/handlers/video-studio-edition.ts index 3e208d83d..991d11ef1 100644 --- a/server/lib/job-queue/handlers/video-studio-edition.ts +++ b/server/lib/job-queue/handlers/video-studio-edition.ts | |||
@@ -1,15 +1,16 @@ | |||
1 | import { Job } from 'bullmq' | 1 | import { Job } from 'bullmq' |
2 | import { move, remove } from 'fs-extra' | 2 | import { move, remove } from 'fs-extra' |
3 | import { join } from 'path' | 3 | import { join } from 'path' |
4 | import { addIntroOutro, addWatermark, cutVideo } from '@server/helpers/ffmpeg' | 4 | import { getFFmpegCommandWrapperOptions } from '@server/helpers/ffmpeg' |
5 | import { createTorrentAndSetInfoHashFromPath } from '@server/helpers/webtorrent' | 5 | import { createTorrentAndSetInfoHashFromPath } from '@server/helpers/webtorrent' |
6 | import { CONFIG } from '@server/initializers/config' | 6 | import { CONFIG } from '@server/initializers/config' |
7 | import { VIDEO_FILTERS } from '@server/initializers/constants' | ||
7 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' | 8 | import { federateVideoIfNeeded } from '@server/lib/activitypub/videos' |
8 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' | 9 | import { generateWebTorrentVideoFilename } from '@server/lib/paths' |
10 | import { createOptimizeOrMergeAudioJobs } from '@server/lib/transcoding/create-transcoding-job' | ||
9 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' | 11 | import { VideoTranscodingProfilesManager } from '@server/lib/transcoding/default-transcoding-profiles' |
10 | import { isAbleToUploadVideo } from '@server/lib/user' | 12 | import { isAbleToUploadVideo } from '@server/lib/user' |
11 | import { buildOptimizeOrMergeAudioJob } from '@server/lib/video' | 13 | import { buildFileMetadata, removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' |
12 | import { removeHLSPlaylist, removeWebTorrentFile } from '@server/lib/video-file' | ||
13 | import { VideoPathManager } from '@server/lib/video-path-manager' | 14 | import { VideoPathManager } from '@server/lib/video-path-manager' |
14 | import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' | 15 | import { approximateIntroOutroAdditionalSize } from '@server/lib/video-studio' |
15 | import { UserModel } from '@server/models/user/user' | 16 | import { UserModel } from '@server/models/user/user' |
@@ -17,15 +18,8 @@ import { VideoModel } from '@server/models/video/video' | |||
17 | import { VideoFileModel } from '@server/models/video/video-file' | 18 | import { VideoFileModel } from '@server/models/video/video-file' |
18 | import { MVideo, MVideoFile, MVideoFullLight, MVideoId, MVideoWithAllFiles } from '@server/types/models' | 19 | import { MVideo, MVideoFile, MVideoFullLight, MVideoId, MVideoWithAllFiles } from '@server/types/models' |
19 | import { getLowercaseExtension, pick } from '@shared/core-utils' | 20 | import { getLowercaseExtension, pick } from '@shared/core-utils' |
20 | import { | 21 | import { buildUUID, getFileSize } from '@shared/extra-utils' |
21 | buildFileMetadata, | 22 | import { FFmpegEdition, ffprobePromise, getVideoStreamDimensionsInfo, getVideoStreamDuration, getVideoStreamFPS } from '@shared/ffmpeg' |
22 | buildUUID, | ||
23 | ffprobePromise, | ||
24 | getFileSize, | ||
25 | getVideoStreamDimensionsInfo, | ||
26 | getVideoStreamDuration, | ||
27 | getVideoStreamFPS | ||
28 | } from '@shared/extra-utils' | ||
29 | import { | 23 | import { |
30 | VideoStudioEditionPayload, | 24 | VideoStudioEditionPayload, |
31 | VideoStudioTask, | 25 | VideoStudioTask, |
@@ -36,7 +30,6 @@ import { | |||
36 | VideoStudioTaskWatermarkPayload | 30 | VideoStudioTaskWatermarkPayload |
37 | } from '@shared/models' | 31 | } from '@shared/models' |
38 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 32 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
39 | import { JobQueue } from '../job-queue' | ||
40 | 33 | ||
41 | const lTagsBase = loggerTagsFactory('video-edition') | 34 | const lTagsBase = loggerTagsFactory('video-edition') |
42 | 35 | ||
@@ -102,9 +95,7 @@ async function processVideoStudioEdition (job: Job) { | |||
102 | 95 | ||
103 | const user = await UserModel.loadByVideoId(video.id) | 96 | const user = await UserModel.loadByVideoId(video.id) |
104 | 97 | ||
105 | await JobQueue.Instance.createJob( | 98 | await createOptimizeOrMergeAudioJobs({ video, videoFile: newFile, isNewVideo: false, user }) |
106 | await buildOptimizeOrMergeAudioJob({ video, videoFile: newFile, user, isNewVideo: false }) | ||
107 | ) | ||
108 | } | 99 | } |
109 | 100 | ||
110 | // --------------------------------------------------------------------------- | 101 | // --------------------------------------------------------------------------- |
@@ -131,9 +122,9 @@ const taskProcessors: { [id in VideoStudioTask['name']]: (options: TaskProcessor | |||
131 | } | 122 | } |
132 | 123 | ||
133 | async function processTask (options: TaskProcessorOptions) { | 124 | async function processTask (options: TaskProcessorOptions) { |
134 | const { video, task } = options | 125 | const { video, task, lTags } = options |
135 | 126 | ||
136 | logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...options.lTags }) | 127 | logger.info('Processing %s task for video %s.', task.name, video.uuid, { task, ...lTags }) |
137 | 128 | ||
138 | const processor = taskProcessors[options.task.name] | 129 | const processor = taskProcessors[options.task.name] |
139 | if (!process) throw new Error('Unknown task ' + task.name) | 130 | if (!process) throw new Error('Unknown task ' + task.name) |
@@ -142,48 +133,53 @@ async function processTask (options: TaskProcessorOptions) { | |||
142 | } | 133 | } |
143 | 134 | ||
144 | function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) { | 135 | function processAddIntroOutro (options: TaskProcessorOptions<VideoStudioTaskIntroPayload | VideoStudioTaskOutroPayload>) { |
145 | const { task } = options | 136 | const { task, lTags } = options |
137 | |||
138 | logger.debug('Will add intro/outro to the video.', { options, ...lTags }) | ||
146 | 139 | ||
147 | return addIntroOutro({ | 140 | return buildFFmpegEdition().addIntroOutro({ |
148 | ...pick(options, [ 'inputPath', 'outputPath' ]), | 141 | ...pick(options, [ 'inputPath', 'outputPath' ]), |
149 | 142 | ||
150 | introOutroPath: task.options.file, | 143 | introOutroPath: task.options.file, |
151 | type: task.name === 'add-intro' | 144 | type: task.name === 'add-intro' |
152 | ? 'intro' | 145 | ? 'intro' |
153 | : 'outro', | 146 | : 'outro' |
154 | |||
155 | availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), | ||
156 | profile: CONFIG.TRANSCODING.PROFILE | ||
157 | }) | 147 | }) |
158 | } | 148 | } |
159 | 149 | ||
160 | function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) { | 150 | function processCut (options: TaskProcessorOptions<VideoStudioTaskCutPayload>) { |
161 | const { task } = options | 151 | const { task, lTags } = options |
162 | 152 | ||
163 | return cutVideo({ | 153 | logger.debug('Will cut the video.', { options, ...lTags }) |
154 | |||
155 | return buildFFmpegEdition().cutVideo({ | ||
164 | ...pick(options, [ 'inputPath', 'outputPath' ]), | 156 | ...pick(options, [ 'inputPath', 'outputPath' ]), |
165 | 157 | ||
166 | start: task.options.start, | 158 | start: task.options.start, |
167 | end: task.options.end, | 159 | end: task.options.end |
168 | |||
169 | availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), | ||
170 | profile: CONFIG.TRANSCODING.PROFILE | ||
171 | }) | 160 | }) |
172 | } | 161 | } |
173 | 162 | ||
174 | function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) { | 163 | function processAddWatermark (options: TaskProcessorOptions<VideoStudioTaskWatermarkPayload>) { |
175 | const { task } = options | 164 | const { task, lTags } = options |
165 | |||
166 | logger.debug('Will add watermark to the video.', { options, ...lTags }) | ||
176 | 167 | ||
177 | return addWatermark({ | 168 | return buildFFmpegEdition().addWatermark({ |
178 | ...pick(options, [ 'inputPath', 'outputPath' ]), | 169 | ...pick(options, [ 'inputPath', 'outputPath' ]), |
179 | 170 | ||
180 | watermarkPath: task.options.file, | 171 | watermarkPath: task.options.file, |
181 | 172 | ||
182 | availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(), | 173 | videoFilters: { |
183 | profile: CONFIG.TRANSCODING.PROFILE | 174 | watermarkSizeRatio: VIDEO_FILTERS.WATERMARK.SIZE_RATIO, |
175 | horitonzalMarginRatio: VIDEO_FILTERS.WATERMARK.HORIZONTAL_MARGIN_RATIO, | ||
176 | verticalMarginRatio: VIDEO_FILTERS.WATERMARK.VERTICAL_MARGIN_RATIO | ||
177 | } | ||
184 | }) | 178 | }) |
185 | } | 179 | } |
186 | 180 | ||
181 | // --------------------------------------------------------------------------- | ||
182 | |||
187 | async function buildNewFile (video: MVideoId, path: string) { | 183 | async function buildNewFile (video: MVideoId, path: string) { |
188 | const videoFile = new VideoFileModel({ | 184 | const videoFile = new VideoFileModel({ |
189 | extname: getLowercaseExtension(path), | 185 | extname: getLowercaseExtension(path), |
@@ -223,3 +219,7 @@ async function checkUserQuotaOrThrow (video: MVideoFullLight, payload: VideoStud | |||
223 | throw new Error('Quota exceeded for this user to edit the video') | 219 | throw new Error('Quota exceeded for this user to edit the video') |
224 | } | 220 | } |
225 | } | 221 | } |
222 | |||
223 | function buildFFmpegEdition () { | ||
224 | return new FFmpegEdition(getFFmpegCommandWrapperOptions('vod', VideoTranscodingProfilesManager.Instance.getAvailableEncoders())) | ||
225 | } | ||
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 3e6d23363..17b717275 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -1,13 +1,13 @@ | |||
1 | import { Job } from 'bullmq' | 1 | import { Job } from 'bullmq' |
2 | import { TranscodeVODOptionsType } from '@server/helpers/ffmpeg' | 2 | import { onTranscodingEnded } from '@server/lib/transcoding/ended-transcoding' |
3 | import { Hooks } from '@server/lib/plugins/hooks' | 3 | import { generateHlsPlaylistResolution } from '@server/lib/transcoding/hls-transcoding' |
4 | import { buildTranscodingJob, getTranscodingJobPriority } from '@server/lib/video' | 4 | import { mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewWebTorrentResolution } from '@server/lib/transcoding/web-transcoding' |
5 | import { removeAllWebTorrentFiles } from '@server/lib/video-file' | ||
5 | import { VideoPathManager } from '@server/lib/video-path-manager' | 6 | import { VideoPathManager } from '@server/lib/video-path-manager' |
6 | import { moveToFailedTranscodingState, moveToNextState } from '@server/lib/video-state' | 7 | import { moveToFailedTranscodingState } from '@server/lib/video-state' |
7 | import { UserModel } from '@server/models/user/user' | 8 | import { UserModel } from '@server/models/user/user' |
8 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' | 9 | import { VideoJobInfoModel } from '@server/models/video/video-job-info' |
9 | import { MUser, MUserId, MVideo, MVideoFullLight, MVideoWithFile } from '@server/types/models' | 10 | import { MUser, MUserId, MVideoFullLight } from '@server/types/models' |
10 | import { pick } from '@shared/core-utils' | ||
11 | import { | 11 | import { |
12 | HLSTranscodingPayload, | 12 | HLSTranscodingPayload, |
13 | MergeAudioTranscodingPayload, | 13 | MergeAudioTranscodingPayload, |
@@ -15,18 +15,8 @@ import { | |||
15 | OptimizeTranscodingPayload, | 15 | OptimizeTranscodingPayload, |
16 | VideoTranscodingPayload | 16 | VideoTranscodingPayload |
17 | } from '@shared/models' | 17 | } from '@shared/models' |
18 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | ||
19 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg' | ||
20 | import { logger, loggerTagsFactory } from '../../../helpers/logger' | 18 | import { logger, loggerTagsFactory } from '../../../helpers/logger' |
21 | import { CONFIG } from '../../../initializers/config' | ||
22 | import { VideoModel } from '../../../models/video/video' | 19 | import { VideoModel } from '../../../models/video/video' |
23 | import { | ||
24 | generateHlsPlaylistResolution, | ||
25 | mergeAudioVideofile, | ||
26 | optimizeOriginalVideofile, | ||
27 | transcodeNewWebTorrentResolution | ||
28 | } from '../../transcoding/transcoding' | ||
29 | import { JobQueue } from '../job-queue' | ||
30 | 20 | ||
31 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> | 21 | type HandlerFunction = (job: Job, payload: VideoTranscodingPayload, video: MVideoFullLight, user: MUser) => Promise<void> |
32 | 22 | ||
@@ -84,260 +74,72 @@ export { | |||
84 | // Job handlers | 74 | // Job handlers |
85 | // --------------------------------------------------------------------------- | 75 | // --------------------------------------------------------------------------- |
86 | 76 | ||
87 | async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight, user: MUser) { | ||
88 | logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) | ||
89 | |||
90 | const videoFileInput = payload.copyCodecs | ||
91 | ? video.getWebTorrentFile(payload.resolution) | ||
92 | : video.getMaxQualityFile() | ||
93 | |||
94 | const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() | ||
95 | |||
96 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) | ||
97 | |||
98 | try { | ||
99 | await videoFileInput.getVideo().reload() | ||
100 | |||
101 | await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { | ||
102 | return generateHlsPlaylistResolution({ | ||
103 | video, | ||
104 | videoInputPath, | ||
105 | inputFileMutexReleaser, | ||
106 | resolution: payload.resolution, | ||
107 | copyCodecs: payload.copyCodecs, | ||
108 | job | ||
109 | }) | ||
110 | }) | ||
111 | } finally { | ||
112 | inputFileMutexReleaser() | ||
113 | } | ||
114 | |||
115 | logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) | ||
116 | |||
117 | await onHlsPlaylistGeneration(video, user, payload) | ||
118 | } | ||
119 | |||
120 | async function handleNewWebTorrentResolutionJob ( | ||
121 | job: Job, | ||
122 | payload: NewWebTorrentResolutionTranscodingPayload, | ||
123 | video: MVideoFullLight, | ||
124 | user: MUserId | ||
125 | ) { | ||
126 | logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid)) | ||
127 | |||
128 | await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, job }) | ||
129 | |||
130 | logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid)) | ||
131 | |||
132 | await onNewWebTorrentFileResolution(video, user, payload) | ||
133 | } | ||
134 | |||
135 | async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { | 77 | async function handleWebTorrentMergeAudioJob (job: Job, payload: MergeAudioTranscodingPayload, video: MVideoFullLight, user: MUserId) { |
136 | logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) | 78 | logger.info('Handling merge audio transcoding job for %s.', video.uuid, lTags(video.uuid)) |
137 | 79 | ||
138 | await mergeAudioVideofile({ video, resolution: payload.resolution, job }) | 80 | await mergeAudioVideofile({ video, resolution: payload.resolution, fps: payload.fps, job }) |
139 | 81 | ||
140 | logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid)) | 82 | logger.info('Merge audio transcoding job for %s ended.', video.uuid, lTags(video.uuid)) |
141 | 83 | ||
142 | await onVideoFirstWebTorrentTranscoding(video, payload, 'video', user) | 84 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) |
143 | } | 85 | } |
144 | 86 | ||
145 | async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { | 87 | async function handleWebTorrentOptimizeJob (job: Job, payload: OptimizeTranscodingPayload, video: MVideoFullLight, user: MUserId) { |
146 | logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) | 88 | logger.info('Handling optimize transcoding job for %s.', video.uuid, lTags(video.uuid)) |
147 | 89 | ||
148 | const { transcodeType } = await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), job }) | 90 | await optimizeOriginalVideofile({ video, inputVideoFile: video.getMaxQualityFile(), quickTranscode: payload.quickTranscode, job }) |
149 | 91 | ||
150 | logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid)) | 92 | logger.info('Optimize transcoding job for %s ended.', video.uuid, lTags(video.uuid)) |
151 | 93 | ||
152 | await onVideoFirstWebTorrentTranscoding(video, payload, transcodeType, user) | 94 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) |
153 | } | 95 | } |
154 | 96 | ||
155 | // --------------------------------------------------------------------------- | 97 | async function handleNewWebTorrentResolutionJob (job: Job, payload: NewWebTorrentResolutionTranscodingPayload, video: MVideoFullLight) { |
156 | 98 | logger.info('Handling WebTorrent transcoding job for %s.', video.uuid, lTags(video.uuid)) | |
157 | async function onHlsPlaylistGeneration (video: MVideoFullLight, user: MUser, payload: HLSTranscodingPayload) { | ||
158 | if (payload.isMaxQuality && payload.autoDeleteWebTorrentIfNeeded && CONFIG.TRANSCODING.WEBTORRENT.ENABLED === false) { | ||
159 | // Remove webtorrent files if not enabled | ||
160 | for (const file of video.VideoFiles) { | ||
161 | await video.removeWebTorrentFile(file) | ||
162 | await file.destroy() | ||
163 | } | ||
164 | |||
165 | video.VideoFiles = [] | ||
166 | |||
167 | // Create HLS new resolution jobs | ||
168 | await createLowerResolutionsJobs({ | ||
169 | video, | ||
170 | user, | ||
171 | videoFileResolution: payload.resolution, | ||
172 | hasAudio: payload.hasAudio, | ||
173 | isNewVideo: payload.isNewVideo ?? true, | ||
174 | type: 'hls' | ||
175 | }) | ||
176 | } | ||
177 | |||
178 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | ||
179 | await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) | ||
180 | } | ||
181 | 99 | ||
182 | async function onVideoFirstWebTorrentTranscoding ( | 100 | await transcodeNewWebTorrentResolution({ video, resolution: payload.resolution, fps: payload.fps, job }) |
183 | videoArg: MVideoWithFile, | ||
184 | payload: OptimizeTranscodingPayload | MergeAudioTranscodingPayload, | ||
185 | transcodeType: TranscodeVODOptionsType, | ||
186 | user: MUserId | ||
187 | ) { | ||
188 | const mutexReleaser = await VideoPathManager.Instance.lockFiles(videoArg.uuid) | ||
189 | 101 | ||
190 | try { | 102 | logger.info('WebTorrent transcoding job for %s ended.', video.uuid, lTags(video.uuid)) |
191 | // Maybe the video changed in database, refresh it | ||
192 | const videoDatabase = await VideoModel.loadFull(videoArg.uuid) | ||
193 | // Video does not exist anymore | ||
194 | if (!videoDatabase) return undefined | ||
195 | |||
196 | const { resolution, audioStream } = await videoDatabase.probeMaxQualityFile() | ||
197 | |||
198 | // Generate HLS version of the original file | ||
199 | const originalFileHLSPayload = { | ||
200 | ...payload, | ||
201 | |||
202 | hasAudio: !!audioStream, | ||
203 | resolution: videoDatabase.getMaxQualityFile().resolution, | ||
204 | // If we quick transcoded original file, force transcoding for HLS to avoid some weird playback issues | ||
205 | copyCodecs: transcodeType !== 'quick-transcode', | ||
206 | isMaxQuality: true | ||
207 | } | ||
208 | const hasHls = await createHlsJobIfEnabled(user, originalFileHLSPayload) | ||
209 | const hasNewResolutions = await createLowerResolutionsJobs({ | ||
210 | video: videoDatabase, | ||
211 | user, | ||
212 | videoFileResolution: resolution, | ||
213 | hasAudio: !!audioStream, | ||
214 | type: 'webtorrent', | ||
215 | isNewVideo: payload.isNewVideo ?? true | ||
216 | }) | ||
217 | |||
218 | await VideoJobInfoModel.decrease(videoDatabase.uuid, 'pendingTranscode') | ||
219 | 103 | ||
220 | // Move to next state if there are no other resolutions to generate | 104 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) |
221 | if (!hasHls && !hasNewResolutions) { | ||
222 | await retryTransactionWrapper(moveToNextState, { video: videoDatabase, isNewVideo: payload.isNewVideo }) | ||
223 | } | ||
224 | } finally { | ||
225 | mutexReleaser() | ||
226 | } | ||
227 | } | 105 | } |
228 | 106 | ||
229 | async function onNewWebTorrentFileResolution ( | 107 | async function handleHLSJob (job: Job, payload: HLSTranscodingPayload, video: MVideoFullLight) { |
230 | video: MVideo, | 108 | logger.info('Handling HLS transcoding job for %s.', video.uuid, lTags(video.uuid)) |
231 | user: MUserId, | ||
232 | payload: NewWebTorrentResolutionTranscodingPayload | MergeAudioTranscodingPayload | ||
233 | ) { | ||
234 | if (payload.createHLSIfNeeded) { | ||
235 | await createHlsJobIfEnabled(user, { hasAudio: true, copyCodecs: true, isMaxQuality: false, ...payload }) | ||
236 | } | ||
237 | |||
238 | await VideoJobInfoModel.decrease(video.uuid, 'pendingTranscode') | ||
239 | 109 | ||
240 | await retryTransactionWrapper(moveToNextState, { video, isNewVideo: payload.isNewVideo }) | 110 | const videoFileInput = payload.copyCodecs |
241 | } | 111 | ? video.getWebTorrentFile(payload.resolution) |
112 | : video.getMaxQualityFile() | ||
242 | 113 | ||
243 | // --------------------------------------------------------------------------- | 114 | const videoOrStreamingPlaylist = videoFileInput.getVideoOrStreamingPlaylist() |
244 | 115 | ||
245 | async function createHlsJobIfEnabled (user: MUserId, payload: { | 116 | const inputFileMutexReleaser = await VideoPathManager.Instance.lockFiles(video.uuid) |
246 | videoUUID: string | ||
247 | resolution: number | ||
248 | hasAudio: boolean | ||
249 | copyCodecs: boolean | ||
250 | isMaxQuality: boolean | ||
251 | isNewVideo?: boolean | ||
252 | }) { | ||
253 | if (!payload || CONFIG.TRANSCODING.ENABLED !== true || CONFIG.TRANSCODING.HLS.ENABLED !== true) return false | ||
254 | |||
255 | const jobOptions = { | ||
256 | priority: await getTranscodingJobPriority(user) | ||
257 | } | ||
258 | 117 | ||
259 | const hlsTranscodingPayload: HLSTranscodingPayload = { | 118 | try { |
260 | type: 'new-resolution-to-hls', | 119 | await videoFileInput.getVideo().reload() |
261 | autoDeleteWebTorrentIfNeeded: true, | ||
262 | 120 | ||
263 | ...pick(payload, [ 'videoUUID', 'resolution', 'copyCodecs', 'isMaxQuality', 'isNewVideo', 'hasAudio' ]) | 121 | await VideoPathManager.Instance.makeAvailableVideoFile(videoFileInput.withVideoOrPlaylist(videoOrStreamingPlaylist), videoInputPath => { |
122 | return generateHlsPlaylistResolution({ | ||
123 | video, | ||
124 | videoInputPath, | ||
125 | inputFileMutexReleaser, | ||
126 | resolution: payload.resolution, | ||
127 | fps: payload.fps, | ||
128 | copyCodecs: payload.copyCodecs, | ||
129 | job | ||
130 | }) | ||
131 | }) | ||
132 | } finally { | ||
133 | inputFileMutexReleaser() | ||
264 | } | 134 | } |
265 | 135 | ||
266 | await JobQueue.Instance.createJob(await buildTranscodingJob(hlsTranscodingPayload, jobOptions)) | 136 | logger.info('HLS transcoding job for %s ended.', video.uuid, lTags(video.uuid)) |
267 | |||
268 | return true | ||
269 | } | ||
270 | |||
271 | async function createLowerResolutionsJobs (options: { | ||
272 | video: MVideoFullLight | ||
273 | user: MUserId | ||
274 | videoFileResolution: number | ||
275 | hasAudio: boolean | ||
276 | isNewVideo: boolean | ||
277 | type: 'hls' | 'webtorrent' | ||
278 | }) { | ||
279 | const { video, user, videoFileResolution, isNewVideo, hasAudio, type } = options | ||
280 | |||
281 | // Create transcoding jobs if there are enabled resolutions | ||
282 | const resolutionsEnabled = await Hooks.wrapObject( | ||
283 | computeResolutionsToTranscode({ input: videoFileResolution, type: 'vod', includeInput: false, strictLower: true, hasAudio }), | ||
284 | 'filter:transcoding.auto.resolutions-to-transcode.result', | ||
285 | options | ||
286 | ) | ||
287 | |||
288 | const resolutionCreated: string[] = [] | ||
289 | |||
290 | for (const resolution of resolutionsEnabled) { | ||
291 | let dataInput: VideoTranscodingPayload | ||
292 | |||
293 | if (CONFIG.TRANSCODING.WEBTORRENT.ENABLED && type === 'webtorrent') { | ||
294 | // WebTorrent will create subsequent HLS job | ||
295 | dataInput = { | ||
296 | type: 'new-resolution-to-webtorrent', | ||
297 | videoUUID: video.uuid, | ||
298 | resolution, | ||
299 | hasAudio, | ||
300 | createHLSIfNeeded: true, | ||
301 | isNewVideo | ||
302 | } | ||
303 | |||
304 | resolutionCreated.push('webtorrent-' + resolution) | ||
305 | } | ||
306 | |||
307 | if (CONFIG.TRANSCODING.HLS.ENABLED && type === 'hls') { | ||
308 | dataInput = { | ||
309 | type: 'new-resolution-to-hls', | ||
310 | videoUUID: video.uuid, | ||
311 | resolution, | ||
312 | hasAudio, | ||
313 | copyCodecs: false, | ||
314 | isMaxQuality: false, | ||
315 | autoDeleteWebTorrentIfNeeded: true, | ||
316 | isNewVideo | ||
317 | } | ||
318 | |||
319 | resolutionCreated.push('hls-' + resolution) | ||
320 | } | ||
321 | |||
322 | if (!dataInput) continue | ||
323 | |||
324 | const jobOptions = { | ||
325 | priority: await getTranscodingJobPriority(user) | ||
326 | } | ||
327 | |||
328 | await JobQueue.Instance.createJob(await buildTranscodingJob(dataInput, jobOptions)) | ||
329 | } | ||
330 | 137 | ||
331 | if (resolutionCreated.length === 0) { | 138 | if (payload.deleteWebTorrentFiles === true) { |
332 | logger.info('No transcoding jobs created for video %s (no resolutions).', video.uuid, lTags(video.uuid)) | 139 | logger.info('Removing WebTorrent files of %s now we have a HLS version of it.', video.uuid, lTags(video.uuid)) |
333 | 140 | ||
334 | return false | 141 | await removeAllWebTorrentFiles(video) |
335 | } | 142 | } |
336 | 143 | ||
337 | logger.info( | 144 | await onTranscodingEnded({ isNewVideo: payload.isNewVideo, moveVideoToNextState: true, video }) |
338 | 'New resolutions %s transcoding jobs created for video %s and origin file resolution of %d.', type, video.uuid, videoFileResolution, | ||
339 | { resolutionCreated, ...lTags(video.uuid) } | ||
340 | ) | ||
341 | |||
342 | return true | ||
343 | } | 145 | } |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index cc6be0bd8..21bf0f226 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -31,6 +31,7 @@ import { | |||
31 | MoveObjectStoragePayload, | 31 | MoveObjectStoragePayload, |
32 | NotifyPayload, | 32 | NotifyPayload, |
33 | RefreshPayload, | 33 | RefreshPayload, |
34 | TranscodingJobBuilderPayload, | ||
34 | VideoChannelImportPayload, | 35 | VideoChannelImportPayload, |
35 | VideoFileImportPayload, | 36 | VideoFileImportPayload, |
36 | VideoImportPayload, | 37 | VideoImportPayload, |
@@ -56,6 +57,7 @@ import { processFederateVideo } from './handlers/federate-video' | |||
56 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' | 57 | import { processManageVideoTorrent } from './handlers/manage-video-torrent' |
57 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' | 58 | import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage' |
58 | import { processNotify } from './handlers/notify' | 59 | import { processNotify } from './handlers/notify' |
60 | import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder' | ||
59 | import { processVideoChannelImport } from './handlers/video-channel-import' | 61 | import { processVideoChannelImport } from './handlers/video-channel-import' |
60 | import { processVideoFileImport } from './handlers/video-file-import' | 62 | import { processVideoFileImport } from './handlers/video-file-import' |
61 | import { processVideoImport } from './handlers/video-import' | 63 | import { processVideoImport } from './handlers/video-import' |
@@ -69,11 +71,12 @@ export type CreateJobArgument = | |||
69 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | | 71 | { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } | |
70 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | 72 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | |
71 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | | 73 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | |
72 | { type: 'activitypub-http-cleaner', payload: {} } | | 74 | { type: 'activitypub-cleaner', payload: {} } | |
73 | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | | 75 | { type: 'activitypub-follow', payload: ActivitypubFollowPayload } | |
74 | { type: 'video-file-import', payload: VideoFileImportPayload } | | 76 | { type: 'video-file-import', payload: VideoFileImportPayload } | |
75 | { type: 'video-transcoding', payload: VideoTranscodingPayload } | | 77 | { type: 'video-transcoding', payload: VideoTranscodingPayload } | |
76 | { type: 'email', payload: EmailPayload } | | 78 | { type: 'email', payload: EmailPayload } | |
79 | { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } | | ||
77 | { type: 'video-import', payload: VideoImportPayload } | | 80 | { type: 'video-import', payload: VideoImportPayload } | |
78 | { type: 'activitypub-refresher', payload: RefreshPayload } | | 81 | { type: 'activitypub-refresher', payload: RefreshPayload } | |
79 | { type: 'videos-views-stats', payload: {} } | | 82 | { type: 'videos-views-stats', payload: {} } | |
@@ -96,28 +99,29 @@ export type CreateJobOptions = { | |||
96 | } | 99 | } |
97 | 100 | ||
98 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { | 101 | const handlers: { [id in JobType]: (job: Job) => Promise<any> } = { |
99 | 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, | ||
100 | 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, | ||
101 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | ||
102 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | ||
103 | 'activitypub-cleaner': processActivityPubCleaner, | 102 | 'activitypub-cleaner': processActivityPubCleaner, |
104 | 'activitypub-follow': processActivityPubFollow, | 103 | 'activitypub-follow': processActivityPubFollow, |
105 | 'video-file-import': processVideoFileImport, | 104 | 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast, |
106 | 'video-transcoding': processVideoTranscoding, | 105 | 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast, |
106 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | ||
107 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | ||
108 | 'activitypub-refresher': refreshAPObject, | ||
109 | 'actor-keys': processActorKeys, | ||
110 | 'after-video-channel-import': processAfterVideoChannelImport, | ||
107 | 'email': processEmail, | 111 | 'email': processEmail, |
112 | 'federate-video': processFederateVideo, | ||
113 | 'transcoding-job-builder': processTranscodingJobBuilder, | ||
114 | 'manage-video-torrent': processManageVideoTorrent, | ||
115 | 'move-to-object-storage': processMoveToObjectStorage, | ||
116 | 'notify': processNotify, | ||
117 | 'video-channel-import': processVideoChannelImport, | ||
118 | 'video-file-import': processVideoFileImport, | ||
108 | 'video-import': processVideoImport, | 119 | 'video-import': processVideoImport, |
109 | 'videos-views-stats': processVideosViewsStats, | ||
110 | 'activitypub-refresher': refreshAPObject, | ||
111 | 'video-live-ending': processVideoLiveEnding, | 120 | 'video-live-ending': processVideoLiveEnding, |
112 | 'actor-keys': processActorKeys, | ||
113 | 'video-redundancy': processVideoRedundancy, | 121 | 'video-redundancy': processVideoRedundancy, |
114 | 'move-to-object-storage': processMoveToObjectStorage, | ||
115 | 'manage-video-torrent': processManageVideoTorrent, | ||
116 | 'video-studio-edition': processVideoStudioEdition, | 122 | 'video-studio-edition': processVideoStudioEdition, |
117 | 'video-channel-import': processVideoChannelImport, | 123 | 'video-transcoding': processVideoTranscoding, |
118 | 'after-video-channel-import': processAfterVideoChannelImport, | 124 | 'videos-views-stats': processVideosViewsStats |
119 | 'notify': processNotify, | ||
120 | 'federate-video': processFederateVideo | ||
121 | } | 125 | } |
122 | 126 | ||
123 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { | 127 | const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = { |
@@ -125,28 +129,29 @@ const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } | |||
125 | } | 129 | } |
126 | 130 | ||
127 | const jobTypes: JobType[] = [ | 131 | const jobTypes: JobType[] = [ |
132 | 'activitypub-cleaner', | ||
128 | 'activitypub-follow', | 133 | 'activitypub-follow', |
129 | 'activitypub-http-broadcast', | ||
130 | 'activitypub-http-broadcast-parallel', | 134 | 'activitypub-http-broadcast-parallel', |
135 | 'activitypub-http-broadcast', | ||
131 | 'activitypub-http-fetcher', | 136 | 'activitypub-http-fetcher', |
132 | 'activitypub-http-unicast', | 137 | 'activitypub-http-unicast', |
133 | 'activitypub-cleaner', | 138 | 'activitypub-refresher', |
139 | 'actor-keys', | ||
140 | 'after-video-channel-import', | ||
134 | 'email', | 141 | 'email', |
135 | 'video-transcoding', | 142 | 'federate-video', |
143 | 'transcoding-job-builder', | ||
144 | 'manage-video-torrent', | ||
145 | 'move-to-object-storage', | ||
146 | 'notify', | ||
147 | 'video-channel-import', | ||
136 | 'video-file-import', | 148 | 'video-file-import', |
137 | 'video-import', | 149 | 'video-import', |
138 | 'videos-views-stats', | ||
139 | 'activitypub-refresher', | ||
140 | 'video-redundancy', | ||
141 | 'actor-keys', | ||
142 | 'video-live-ending', | 150 | 'video-live-ending', |
143 | 'move-to-object-storage', | 151 | 'video-redundancy', |
144 | 'manage-video-torrent', | ||
145 | 'video-studio-edition', | 152 | 'video-studio-edition', |
146 | 'video-channel-import', | 153 | 'video-transcoding', |
147 | 'after-video-channel-import', | 154 | 'videos-views-stats' |
148 | 'notify', | ||
149 | 'federate-video' | ||
150 | ] | 155 | ] |
151 | 156 | ||
152 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) | 157 | const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ]) |