aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue/job-queue.ts
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r--server/lib/job-queue/job-queue.ts30
1 files changed, 22 insertions, 8 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index fb3cc8f66..b0ccbd59c 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -2,6 +2,7 @@ import * as kue from 'kue'
2import { JobType, JobState } from '../../../shared/models' 2import { JobType, JobState } from '../../../shared/models'
3import { logger } from '../../helpers/logger' 3import { logger } from '../../helpers/logger'
4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' 4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
5import { Redis } from '../redis'
5import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
6import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
7import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 8import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
@@ -29,6 +30,7 @@ class JobQueue {
29 30
30 private jobQueue: kue.Queue 31 private jobQueue: kue.Queue
31 private initialized = false 32 private initialized = false
33 private jobRedisPrefix: string
32 34
33 private constructor () {} 35 private constructor () {}
34 36
@@ -37,8 +39,10 @@ class JobQueue {
37 if (this.initialized === true) return 39 if (this.initialized === true) return
38 this.initialized = true 40 this.initialized = true
39 41
42 this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST
43
40 this.jobQueue = kue.createQueue({ 44 this.jobQueue = kue.createQueue({
41 prefix: 'q-' + CONFIG.WEBSERVER.HOST, 45 prefix: this.jobRedisPrefix,
42 redis: { 46 redis: {
43 host: CONFIG.REDIS.HOSTNAME, 47 host: CONFIG.REDIS.HOSTNAME,
44 port: CONFIG.REDIS.PORT, 48 port: CONFIG.REDIS.PORT,
@@ -83,14 +87,14 @@ class JobQueue {
83 }) 87 })
84 } 88 }
85 89
86 listForApi (state: JobState, start: number, count: number, sort: string) { 90 async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC') {
87 return new Promise<kue.Job[]>((res, rej) => { 91 const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count)
88 kue.Job.rangeByState(state, start, start + count - 1, sort, (err, jobs) => {
89 if (err) return rej(err)
90 92
91 return res(jobs) 93 const jobPromises = jobStrings
92 }) 94 .map(s => s.split('|'))
93 }) 95 .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10)))
96
97 return Promise.all(jobPromises)
94 } 98 }
95 99
96 count (state: JobState) { 100 count (state: JobState) {
@@ -144,6 +148,16 @@ class JobQueue {
144 return Promise.all(promises) 148 return Promise.all(promises)
145 } 149 }
146 150
151 private getJob (id: number) {
152 return new Promise((res, rej) => {
153 kue.Job.get(id, (err, job) => {
154 if (err) return rej(err)
155
156 return res(job)
157 })
158 })
159 }
160
147 static get Instance () { 161 static get Instance () {
148 return this.instance || (this.instance = new this()) 162 return this.instance || (this.instance = new this())
149 } 163 }