diff options
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 49 |
1 files changed, 28 insertions, 21 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index ec601e9ea..14acace7d 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -13,6 +13,7 @@ import { processVideoImport, VideoImportPayload } from './handlers/video-import' | |||
13 | import { processVideosViews } from './handlers/video-views' | 13 | import { processVideosViews } from './handlers/video-views' |
14 | import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher' | 14 | import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher' |
15 | import { processVideoFileImport, VideoFileImportPayload } from './handlers/video-file-import' | 15 | import { processVideoFileImport, VideoFileImportPayload } from './handlers/video-file-import' |
16 | import { processVideoRedundancy, VideoRedundancyPayload } from '@server/lib/job-queue/handlers/video-redundancy' | ||
16 | 17 | ||
17 | type CreateJobArgument = | 18 | type CreateJobArgument = |
18 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 19 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -24,20 +25,21 @@ type CreateJobArgument = | |||
24 | { type: 'email', payload: EmailPayload } | | 25 | { type: 'email', payload: EmailPayload } | |
25 | { type: 'video-import', payload: VideoImportPayload } | | 26 | { type: 'video-import', payload: VideoImportPayload } | |
26 | { type: 'activitypub-refresher', payload: RefreshPayload } | | 27 | { type: 'activitypub-refresher', payload: RefreshPayload } | |
27 | { type: 'videos-views', payload: {} } | 28 | { type: 'videos-views', payload: {} } | |
29 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | ||
28 | 30 | ||
29 | const handlers: { [ id in (JobType | 'video-file') ]: (job: Bull.Job) => Promise<any>} = { | 31 | const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { |
30 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 32 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
31 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | 33 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
32 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | 34 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, |
33 | 'activitypub-follow': processActivityPubFollow, | 35 | 'activitypub-follow': processActivityPubFollow, |
34 | 'video-file-import': processVideoFileImport, | 36 | 'video-file-import': processVideoFileImport, |
35 | 'video-transcoding': processVideoTranscoding, | 37 | 'video-transcoding': processVideoTranscoding, |
36 | 'video-file': processVideoTranscoding, // TODO: remove it (changed in 1.3) | ||
37 | 'email': processEmail, | 38 | 'email': processEmail, |
38 | 'video-import': processVideoImport, | 39 | 'video-import': processVideoImport, |
39 | 'videos-views': processVideosViews, | 40 | 'videos-views': processVideosViews, |
40 | 'activitypub-refresher': refreshAPObject | 41 | 'activitypub-refresher': refreshAPObject, |
42 | 'video-redundancy': processVideoRedundancy | ||
41 | } | 43 | } |
42 | 44 | ||
43 | const jobTypes: JobType[] = [ | 45 | const jobTypes: JobType[] = [ |
@@ -50,20 +52,22 @@ const jobTypes: JobType[] = [ | |||
50 | 'video-file-import', | 52 | 'video-file-import', |
51 | 'video-import', | 53 | 'video-import', |
52 | 'videos-views', | 54 | 'videos-views', |
53 | 'activitypub-refresher' | 55 | 'activitypub-refresher', |
56 | 'video-redundancy' | ||
54 | ] | 57 | ] |
55 | 58 | ||
56 | class JobQueue { | 59 | class JobQueue { |
57 | 60 | ||
58 | private static instance: JobQueue | 61 | private static instance: JobQueue |
59 | 62 | ||
60 | private queues: { [ id in JobType ]?: Bull.Queue } = {} | 63 | private queues: { [id in JobType]?: Bull.Queue } = {} |
61 | private initialized = false | 64 | private initialized = false |
62 | private jobRedisPrefix: string | 65 | private jobRedisPrefix: string |
63 | 66 | ||
64 | private constructor () {} | 67 | private constructor () { |
68 | } | ||
65 | 69 | ||
66 | async init () { | 70 | init () { |
67 | // Already initialized | 71 | // Already initialized |
68 | if (this.initialized === true) return | 72 | if (this.initialized === true) return |
69 | this.initialized = true | 73 | this.initialized = true |
@@ -105,11 +109,16 @@ class JobQueue { | |||
105 | } | 109 | } |
106 | } | 110 | } |
107 | 111 | ||
108 | createJob (obj: CreateJobArgument) { | 112 | createJob (obj: CreateJobArgument): void { |
113 | this.createJobWithPromise(obj) | ||
114 | .catch(err => logger.error('Cannot create job.', { err, obj })) | ||
115 | } | ||
116 | |||
117 | createJobWithPromise (obj: CreateJobArgument) { | ||
109 | const queue = this.queues[obj.type] | 118 | const queue = this.queues[obj.type] |
110 | if (queue === undefined) { | 119 | if (queue === undefined) { |
111 | logger.error('Unknown queue %s: cannot create job.', obj.type) | 120 | logger.error('Unknown queue %s: cannot create job.', obj.type) |
112 | throw Error('Unknown queue, cannot create job') | 121 | return |
113 | } | 122 | } |
114 | 123 | ||
115 | const jobArgs: Bull.JobOptions = { | 124 | const jobArgs: Bull.JobOptions = { |
@@ -122,10 +131,10 @@ class JobQueue { | |||
122 | } | 131 | } |
123 | 132 | ||
124 | async listForApi (options: { | 133 | async listForApi (options: { |
125 | state: JobState, | 134 | state: JobState |
126 | start: number, | 135 | start: number |
127 | count: number, | 136 | count: number |
128 | asc?: boolean, | 137 | asc?: boolean |
129 | jobType: JobType | 138 | jobType: JobType |
130 | }): Promise<Bull.Job[]> { | 139 | }): Promise<Bull.Job[]> { |
131 | const { state, start, count, asc, jobType } = options | 140 | const { state, start, count, asc, jobType } = options |
@@ -133,16 +142,14 @@ class JobQueue { | |||
133 | 142 | ||
134 | const filteredJobTypes = this.filterJobTypes(jobType) | 143 | const filteredJobTypes = this.filterJobTypes(jobType) |
135 | 144 | ||
136 | // TODO: optimize | ||
137 | for (const jobType of filteredJobTypes) { | 145 | for (const jobType of filteredJobTypes) { |
138 | const queue = this.queues[ jobType ] | 146 | const queue = this.queues[jobType] |
139 | if (queue === undefined) { | 147 | if (queue === undefined) { |
140 | logger.error('Unknown queue %s to list jobs.', jobType) | 148 | logger.error('Unknown queue %s to list jobs.', jobType) |
141 | continue | 149 | continue |
142 | } | 150 | } |
143 | 151 | ||
144 | // FIXME: Bull queue typings does not have getJobs method | 152 | const jobs = await queue.getJobs([ state ], 0, start + count, asc) |
145 | const jobs = await (queue as any).getJobs(state, 0, start + count, asc) | ||
146 | results = results.concat(jobs) | 153 | results = results.concat(jobs) |
147 | } | 154 | } |
148 | 155 | ||
@@ -164,7 +171,7 @@ class JobQueue { | |||
164 | const filteredJobTypes = this.filterJobTypes(jobType) | 171 | const filteredJobTypes = this.filterJobTypes(jobType) |
165 | 172 | ||
166 | for (const type of filteredJobTypes) { | 173 | for (const type of filteredJobTypes) { |
167 | const queue = this.queues[ type ] | 174 | const queue = this.queues[type] |
168 | if (queue === undefined) { | 175 | if (queue === undefined) { |
169 | logger.error('Unknown queue %s to count jobs.', type) | 176 | logger.error('Unknown queue %s to count jobs.', type) |
170 | continue | 177 | continue |
@@ -172,7 +179,7 @@ class JobQueue { | |||
172 | 179 | ||
173 | const counts = await queue.getJobCounts() | 180 | const counts = await queue.getJobCounts() |
174 | 181 | ||
175 | total += counts[ state ] | 182 | total += counts[state] |
176 | } | 183 | } |
177 | 184 | ||
178 | return total | 185 | return total |
@@ -188,7 +195,7 @@ class JobQueue { | |||
188 | private addRepeatableJobs () { | 195 | private addRepeatableJobs () { |
189 | this.queues['videos-views'].add({}, { | 196 | this.queues['videos-views'].add({}, { |
190 | repeat: REPEAT_JOBS['videos-views'] | 197 | repeat: REPEAT_JOBS['videos-views'] |
191 | }) | 198 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
192 | } | 199 | } |
193 | 200 | ||
194 | private filterJobTypes (jobType?: JobType) { | 201 | private filterJobTypes (jobType?: JobType) { |