diff options
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 34 |
1 files changed, 20 insertions, 14 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 61f07c487..14acace7d 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -28,7 +28,7 @@ type CreateJobArgument = | |||
28 | { type: 'videos-views', payload: {} } | | 28 | { type: 'videos-views', payload: {} } | |
29 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | 29 | { type: 'video-redundancy', payload: VideoRedundancyPayload } |
30 | 30 | ||
31 | const handlers: { [ id in JobType ]: (job: Bull.Job) => Promise<any>} = { | 31 | const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { |
32 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 32 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
33 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | 33 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
34 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | 34 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, |
@@ -60,13 +60,14 @@ class JobQueue { | |||
60 | 60 | ||
61 | private static instance: JobQueue | 61 | private static instance: JobQueue |
62 | 62 | ||
63 | private queues: { [ id in JobType ]?: Bull.Queue } = {} | 63 | private queues: { [id in JobType]?: Bull.Queue } = {} |
64 | private initialized = false | 64 | private initialized = false |
65 | private jobRedisPrefix: string | 65 | private jobRedisPrefix: string |
66 | 66 | ||
67 | private constructor () {} | 67 | private constructor () { |
68 | } | ||
68 | 69 | ||
69 | async init () { | 70 | init () { |
70 | // Already initialized | 71 | // Already initialized |
71 | if (this.initialized === true) return | 72 | if (this.initialized === true) return |
72 | this.initialized = true | 73 | this.initialized = true |
@@ -108,11 +109,16 @@ class JobQueue { | |||
108 | } | 109 | } |
109 | } | 110 | } |
110 | 111 | ||
111 | createJob (obj: CreateJobArgument) { | 112 | createJob (obj: CreateJobArgument): void { |
113 | this.createJobWithPromise(obj) | ||
114 | .catch(err => logger.error('Cannot create job.', { err, obj })) | ||
115 | } | ||
116 | |||
117 | createJobWithPromise (obj: CreateJobArgument) { | ||
112 | const queue = this.queues[obj.type] | 118 | const queue = this.queues[obj.type] |
113 | if (queue === undefined) { | 119 | if (queue === undefined) { |
114 | logger.error('Unknown queue %s: cannot create job.', obj.type) | 120 | logger.error('Unknown queue %s: cannot create job.', obj.type) |
115 | throw Error('Unknown queue, cannot create job') | 121 | return |
116 | } | 122 | } |
117 | 123 | ||
118 | const jobArgs: Bull.JobOptions = { | 124 | const jobArgs: Bull.JobOptions = { |
@@ -125,10 +131,10 @@ class JobQueue { | |||
125 | } | 131 | } |
126 | 132 | ||
127 | async listForApi (options: { | 133 | async listForApi (options: { |
128 | state: JobState, | 134 | state: JobState |
129 | start: number, | 135 | start: number |
130 | count: number, | 136 | count: number |
131 | asc?: boolean, | 137 | asc?: boolean |
132 | jobType: JobType | 138 | jobType: JobType |
133 | }): Promise<Bull.Job[]> { | 139 | }): Promise<Bull.Job[]> { |
134 | const { state, start, count, asc, jobType } = options | 140 | const { state, start, count, asc, jobType } = options |
@@ -137,7 +143,7 @@ class JobQueue { | |||
137 | const filteredJobTypes = this.filterJobTypes(jobType) | 143 | const filteredJobTypes = this.filterJobTypes(jobType) |
138 | 144 | ||
139 | for (const jobType of filteredJobTypes) { | 145 | for (const jobType of filteredJobTypes) { |
140 | const queue = this.queues[ jobType ] | 146 | const queue = this.queues[jobType] |
141 | if (queue === undefined) { | 147 | if (queue === undefined) { |
142 | logger.error('Unknown queue %s to list jobs.', jobType) | 148 | logger.error('Unknown queue %s to list jobs.', jobType) |
143 | continue | 149 | continue |
@@ -165,7 +171,7 @@ class JobQueue { | |||
165 | const filteredJobTypes = this.filterJobTypes(jobType) | 171 | const filteredJobTypes = this.filterJobTypes(jobType) |
166 | 172 | ||
167 | for (const type of filteredJobTypes) { | 173 | for (const type of filteredJobTypes) { |
168 | const queue = this.queues[ type ] | 174 | const queue = this.queues[type] |
169 | if (queue === undefined) { | 175 | if (queue === undefined) { |
170 | logger.error('Unknown queue %s to count jobs.', type) | 176 | logger.error('Unknown queue %s to count jobs.', type) |
171 | continue | 177 | continue |
@@ -173,7 +179,7 @@ class JobQueue { | |||
173 | 179 | ||
174 | const counts = await queue.getJobCounts() | 180 | const counts = await queue.getJobCounts() |
175 | 181 | ||
176 | total += counts[ state ] | 182 | total += counts[state] |
177 | } | 183 | } |
178 | 184 | ||
179 | return total | 185 | return total |
@@ -189,7 +195,7 @@ class JobQueue { | |||
189 | private addRepeatableJobs () { | 195 | private addRepeatableJobs () { |
190 | this.queues['videos-views'].add({}, { | 196 | this.queues['videos-views'].add({}, { |
191 | repeat: REPEAT_JOBS['videos-views'] | 197 | repeat: REPEAT_JOBS['videos-views'] |
192 | }) | 198 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
193 | } | 199 | } |
194 | 200 | ||
195 | private filterJobTypes (jobType?: JobType) { | 201 | private filterJobTypes (jobType?: JobType) { |