diff options
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/utils/activitypub-http-utils.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-file-import.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-import.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-transcoding.ts | 17 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 34 |
5 files changed, 30 insertions, 27 deletions
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts index d3bde6e6a..a28f3596f 100644 --- a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts +++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts | |||
@@ -42,7 +42,7 @@ async function buildSignedRequestOptions (payload: Payload) { | |||
42 | 42 | ||
43 | function buildGlobalHeaders (body: any) { | 43 | function buildGlobalHeaders (body: any) { |
44 | return { | 44 | return { |
45 | 'Digest': buildDigest(body) | 45 | Digest: buildDigest(body) |
46 | } | 46 | } |
47 | } | 47 | } |
48 | 48 | ||
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 99c991e72..be9e7d181 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -11,7 +11,7 @@ import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | |||
11 | import { getVideoFilePath } from '@server/lib/video-paths' | 11 | import { getVideoFilePath } from '@server/lib/video-paths' |
12 | 12 | ||
13 | export type VideoFileImportPayload = { | 13 | export type VideoFileImportPayload = { |
14 | videoUUID: string, | 14 | videoUUID: string |
15 | filePath: string | 15 | filePath: string |
16 | } | 16 | } |
17 | 17 | ||
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 1fca17584..09f225cec 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -221,7 +221,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
221 | isNewVideo: true | 221 | isNewVideo: true |
222 | } | 222 | } |
223 | 223 | ||
224 | await JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) | 224 | await JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: dataInput }) |
225 | } | 225 | } |
226 | 226 | ||
227 | } catch (err) { | 227 | } catch (err) { |
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 39b9fac98..c020057c9 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -6,7 +6,6 @@ import { JobQueue } from '../job-queue' | |||
6 | import { federateVideoIfNeeded } from '../../activitypub' | 6 | import { federateVideoIfNeeded } from '../../activitypub' |
7 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | 7 | import { retryTransactionWrapper } from '../../../helpers/database-utils' |
8 | import { sequelizeTypescript } from '../../../initializers' | 8 | import { sequelizeTypescript } from '../../../initializers' |
9 | import * as Bluebird from 'bluebird' | ||
10 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' | 9 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' |
11 | import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding' | 10 | import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding' |
12 | import { Notifier } from '../../notifier' | 11 | import { Notifier } from '../../notifier' |
@@ -40,8 +39,11 @@ interface OptimizeTranscodingPayload extends BaseTranscodingPayload { | |||
40 | type: 'optimize' | 39 | type: 'optimize' |
41 | } | 40 | } |
42 | 41 | ||
43 | export type VideoTranscodingPayload = HLSTranscodingPayload | NewResolutionTranscodingPayload | 42 | export type VideoTranscodingPayload = |
44 | | OptimizeTranscodingPayload | MergeAudioTranscodingPayload | 43 | HLSTranscodingPayload |
44 | | NewResolutionTranscodingPayload | ||
45 | | OptimizeTranscodingPayload | ||
46 | | MergeAudioTranscodingPayload | ||
45 | 47 | ||
46 | async function processVideoTranscoding (job: Bull.Job) { | 48 | async function processVideoTranscoding (job: Bull.Job) { |
47 | const payload = job.data as VideoTranscodingPayload | 49 | const payload = job.data as VideoTranscodingPayload |
@@ -105,7 +107,7 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O | |||
105 | 107 | ||
106 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { | 108 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { |
107 | // Maybe the video changed in database, refresh it | 109 | // Maybe the video changed in database, refresh it |
108 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) | 110 | const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) |
109 | // Video does not exist anymore | 111 | // Video does not exist anymore |
110 | if (!videoDatabase) return undefined | 112 | if (!videoDatabase) return undefined |
111 | 113 | ||
@@ -122,8 +124,6 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O | |||
122 | await createHlsJobIfEnabled(hlsPayload) | 124 | await createHlsJobIfEnabled(hlsPayload) |
123 | 125 | ||
124 | if (resolutionsEnabled.length !== 0) { | 126 | if (resolutionsEnabled.length !== 0) { |
125 | const tasks: (Bluebird<Bull.Job<any>> | Promise<Bull.Job<any>>)[] = [] | ||
126 | |||
127 | for (const resolution of resolutionsEnabled) { | 127 | for (const resolution of resolutionsEnabled) { |
128 | let dataInput: VideoTranscodingPayload | 128 | let dataInput: VideoTranscodingPayload |
129 | 129 | ||
@@ -143,12 +143,9 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O | |||
143 | } | 143 | } |
144 | } | 144 | } |
145 | 145 | ||
146 | const p = JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) | 146 | JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) |
147 | tasks.push(p) | ||
148 | } | 147 | } |
149 | 148 | ||
150 | await Promise.all(tasks) | ||
151 | |||
152 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) | 149 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) |
153 | } else { | 150 | } else { |
154 | // No transcoding to do, it's now published | 151 | // No transcoding to do, it's now published |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 61f07c487..14acace7d 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -28,7 +28,7 @@ type CreateJobArgument = | |||
28 | { type: 'videos-views', payload: {} } | | 28 | { type: 'videos-views', payload: {} } | |
29 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | 29 | { type: 'video-redundancy', payload: VideoRedundancyPayload } |
30 | 30 | ||
31 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | 31 | const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { |
32 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 32 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
33 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | 33 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
34 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | 34 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, |
@@ -60,13 +60,14 @@ class JobQueue { | |||
60 | 60 | ||
61 | private static instance: JobQueue | 61 | private static instance: JobQueue |
62 | 62 | ||
63 | private queues: { [ id in JobType ]?: Bull.Queue } = {} | 63 | private queues: { [id in JobType]?: Bull.Queue } = {} |
64 | private initialized = false | 64 | private initialized = false |
65 | private jobRedisPrefix: string | 65 | private jobRedisPrefix: string |
66 | 66 | ||
67 | private constructor () {} | 67 | private constructor () { |
68 | } | ||
68 | 69 | ||
69 | async init () { | 70 | init () { |
70 | // Already initialized | 71 | // Already initialized |
71 | if (this.initialized === true) return | 72 | if (this.initialized === true) return |
72 | this.initialized = true | 73 | this.initialized = true |
@@ -108,11 +109,16 @@ class JobQueue { | |||
108 | } | 109 | } |
109 | } | 110 | } |
110 | 111 | ||
111 | createJob (obj: CreateJobArgument) { | 112 | createJob (obj: CreateJobArgument): void { |
113 | this.createJobWithPromise(obj) | ||
114 | .catch(err => logger.error('Cannot create job.', { err, obj })) | ||
115 | } | ||
116 | |||
117 | createJobWithPromise (obj: CreateJobArgument) { | ||
112 | const queue = this.queues[obj.type] | 118 | const queue = this.queues[obj.type] |
113 | if (queue === undefined) { | 119 | if (queue === undefined) { |
114 | logger.error('Unknown queue %s: cannot create job.', obj.type) | 120 | logger.error('Unknown queue %s: cannot create job.', obj.type) |
115 | throw Error('Unknown queue, cannot create job') | 121 | return |
116 | } | 122 | } |
117 | 123 | ||
118 | const jobArgs: Bull.JobOptions = { | 124 | const jobArgs: Bull.JobOptions = { |
@@ -125,10 +131,10 @@ class JobQueue { | |||
125 | } | 131 | } |
126 | 132 | ||
127 | async listForApi (options: { | 133 | async listForApi (options: { |
128 | state: JobState, | 134 | state: JobState |
129 | start: number, | 135 | start: number |
130 | count: number, | 136 | count: number |
131 | asc?: boolean, | 137 | asc?: boolean |
132 | jobType: JobType | 138 | jobType: JobType |
133 | }): Promise<Bull.Job[]> { | 139 | }): Promise<Bull.Job[]> { |
134 | const { state, start, count, asc, jobType } = options | 140 | const { state, start, count, asc, jobType } = options |
@@ -137,7 +143,7 @@ class JobQueue { | |||
137 | const filteredJobTypes = this.filterJobTypes(jobType) | 143 | const filteredJobTypes = this.filterJobTypes(jobType) |
138 | 144 | ||
139 | for (const jobType of filteredJobTypes) { | 145 | for (const jobType of filteredJobTypes) { |
140 | const queue = this.queues[ jobType ] | 146 | const queue = this.queues[jobType] |
141 | if (queue === undefined) { | 147 | if (queue === undefined) { |
142 | logger.error('Unknown queue %s to list jobs.', jobType) | 148 | logger.error('Unknown queue %s to list jobs.', jobType) |
143 | continue | 149 | continue |
@@ -165,7 +171,7 @@ class JobQueue { | |||
165 | const filteredJobTypes = this.filterJobTypes(jobType) | 171 | const filteredJobTypes = this.filterJobTypes(jobType) |
166 | 172 | ||
167 | for (const type of filteredJobTypes) { | 173 | for (const type of filteredJobTypes) { |
168 | const queue = this.queues[ type ] | 174 | const queue = this.queues[type] |
169 | if (queue === undefined) { | 175 | if (queue === undefined) { |
170 | logger.error('Unknown queue %s to count jobs.', type) | 176 | logger.error('Unknown queue %s to count jobs.', type) |
171 | continue | 177 | continue |
@@ -173,7 +179,7 @@ class JobQueue { | |||
173 | 179 | ||
174 | const counts = await queue.getJobCounts() | 180 | const counts = await queue.getJobCounts() |
175 | 181 | ||
176 | total += counts[ state ] | 182 | total += counts[state] |
177 | } | 183 | } |
178 | 184 | ||
179 | return total | 185 | return total |
@@ -189,7 +195,7 @@ class JobQueue { | |||
189 | private addRepeatableJobs () { | 195 | private addRepeatableJobs () { |
190 | this.queues['videos-views'].add({}, { | 196 | this.queues['videos-views'].add({}, { |
191 | repeat: REPEAT_JOBS['videos-views'] | 197 | repeat: REPEAT_JOBS['videos-views'] |
192 | }) | 198 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
193 | } | 199 | } |
194 | 200 | ||
195 | private filterJobTypes (jobType?: JobType) { | 201 | private filterJobTypes (jobType?: JobType) { |