diff options
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 30 |
1 files changed, 22 insertions, 8 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index fb3cc8f66..b0ccbd59c 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -2,6 +2,7 @@ import * as kue from 'kue' | |||
2 | import { JobType, JobState } from '../../../shared/models' | 2 | import { JobType, JobState } from '../../../shared/models' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' | 4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' |
5 | import { Redis } from '../redis' | ||
5 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 6 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
6 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | 7 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
7 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | 8 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
@@ -29,6 +30,7 @@ class JobQueue { | |||
29 | 30 | ||
30 | private jobQueue: kue.Queue | 31 | private jobQueue: kue.Queue |
31 | private initialized = false | 32 | private initialized = false |
33 | private jobRedisPrefix: string | ||
32 | 34 | ||
33 | private constructor () {} | 35 | private constructor () {} |
34 | 36 | ||
@@ -37,8 +39,10 @@ class JobQueue { | |||
37 | if (this.initialized === true) return | 39 | if (this.initialized === true) return |
38 | this.initialized = true | 40 | this.initialized = true |
39 | 41 | ||
42 | this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST | ||
43 | |||
40 | this.jobQueue = kue.createQueue({ | 44 | this.jobQueue = kue.createQueue({ |
41 | prefix: 'q-' + CONFIG.WEBSERVER.HOST, | 45 | prefix: this.jobRedisPrefix, |
42 | redis: { | 46 | redis: { |
43 | host: CONFIG.REDIS.HOSTNAME, | 47 | host: CONFIG.REDIS.HOSTNAME, |
44 | port: CONFIG.REDIS.PORT, | 48 | port: CONFIG.REDIS.PORT, |
@@ -83,14 +87,14 @@ class JobQueue { | |||
83 | }) | 87 | }) |
84 | } | 88 | } |
85 | 89 | ||
86 | listForApi (state: JobState, start: number, count: number, sort: string) { | 90 | async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC') { |
87 | return new Promise<kue.Job[]>((res, rej) => { | 91 | const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) |
88 | kue.Job.rangeByState(state, start, start + count - 1, sort, (err, jobs) => { | ||
89 | if (err) return rej(err) | ||
90 | 92 | ||
91 | return res(jobs) | 93 | const jobPromises = jobStrings |
92 | }) | 94 | .map(s => s.split('|')) |
93 | }) | 95 | .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) |
96 | |||
97 | return Promise.all(jobPromises) | ||
94 | } | 98 | } |
95 | 99 | ||
96 | count (state: JobState) { | 100 | count (state: JobState) { |
@@ -144,6 +148,16 @@ class JobQueue { | |||
144 | return Promise.all(promises) | 148 | return Promise.all(promises) |
145 | } | 149 | } |
146 | 150 | ||
151 | private getJob (id: number) { | ||
152 | return new Promise((res, rej) => { | ||
153 | kue.Job.get(id, (err, job) => { | ||
154 | if (err) return rej(err) | ||
155 | |||
156 | return res(job) | ||
157 | }) | ||
158 | }) | ||
159 | } | ||
160 | |||
147 | static get Instance () { | 161 | static get Instance () { |
148 | return this.instance || (this.instance = new this()) | 162 | return this.instance || (this.instance = new this()) |
149 | } | 163 | } |