aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--server/controllers/api/jobs.ts2
-rw-r--r--server/lib/job-queue/job-queue.ts30
-rw-r--r--server/lib/redis.ts12
3 files changed, 35 insertions, 9 deletions
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts
index 132d110ad..aa58a9144 100644
--- a/server/controllers/api/jobs.ts
+++ b/server/controllers/api/jobs.ts
@@ -36,7 +36,7 @@ export {
36// --------------------------------------------------------------------------- 36// ---------------------------------------------------------------------------
37 37
38async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) { 38async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) {
39 const sort = req.query.sort === 'createdAt' ? 'asc' : 'desc' 39 const sort = req.query.sort === 'createdAt' ? 'ASC' : 'DESC'
40 40
41 const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort) 41 const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort)
42 const total = await JobQueue.Instance.count(req.params.state) 42 const total = await JobQueue.Instance.count(req.params.state)
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index fb3cc8f66..b0ccbd59c 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -2,6 +2,7 @@ import * as kue from 'kue'
2import { JobType, JobState } from '../../../shared/models' 2import { JobType, JobState } from '../../../shared/models'
3import { logger } from '../../helpers/logger' 3import { logger } from '../../helpers/logger'
4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' 4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
5import { Redis } from '../redis'
5import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' 6import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
6import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' 7import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
7import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' 8import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
@@ -29,6 +30,7 @@ class JobQueue {
29 30
30 private jobQueue: kue.Queue 31 private jobQueue: kue.Queue
31 private initialized = false 32 private initialized = false
33 private jobRedisPrefix: string
32 34
33 private constructor () {} 35 private constructor () {}
34 36
@@ -37,8 +39,10 @@ class JobQueue {
37 if (this.initialized === true) return 39 if (this.initialized === true) return
38 this.initialized = true 40 this.initialized = true
39 41
42 this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST
43
40 this.jobQueue = kue.createQueue({ 44 this.jobQueue = kue.createQueue({
41 prefix: 'q-' + CONFIG.WEBSERVER.HOST, 45 prefix: this.jobRedisPrefix,
42 redis: { 46 redis: {
43 host: CONFIG.REDIS.HOSTNAME, 47 host: CONFIG.REDIS.HOSTNAME,
44 port: CONFIG.REDIS.PORT, 48 port: CONFIG.REDIS.PORT,
@@ -83,14 +87,14 @@ class JobQueue {
83 }) 87 })
84 } 88 }
85 89
86 listForApi (state: JobState, start: number, count: number, sort: string) { 90 async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC') {
87 return new Promise<kue.Job[]>((res, rej) => { 91 const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count)
88 kue.Job.rangeByState(state, start, start + count - 1, sort, (err, jobs) => {
89 if (err) return rej(err)
90 92
91 return res(jobs) 93 const jobPromises = jobStrings
92 }) 94 .map(s => s.split('|'))
93 }) 95 .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10)))
96
97 return Promise.all(jobPromises)
94 } 98 }
95 99
96 count (state: JobState) { 100 count (state: JobState) {
@@ -144,6 +148,16 @@ class JobQueue {
144 return Promise.all(promises) 148 return Promise.all(promises)
145 } 149 }
146 150
151 private getJob (id: number) {
152 return new Promise((res, rej) => {
153 kue.Job.get(id, (err, job) => {
154 if (err) return rej(err)
155
156 return res(job)
157 })
158 })
159 }
160
147 static get Instance () { 161 static get Instance () {
148 return this.instance || (this.instance = new this()) 162 return this.instance || (this.instance = new this())
149 } 163 }
diff --git a/server/lib/redis.ts b/server/lib/redis.ts
index b284cab8f..2ecff939e 100644
--- a/server/lib/redis.ts
+++ b/server/lib/redis.ts
@@ -54,6 +54,18 @@ class Redis {
54 return this.exists(this.buildViewKey(ip, videoUUID)) 54 return this.exists(this.buildViewKey(ip, videoUUID))
55 } 55 }
56 56
57 listJobs (jobsPrefix: string, state: string, mode: 'alpha', order: 'ASC' | 'DESC', offset: number, count: number) {
58 return new Promise<string[]>((res, rej) => {
59 this.client.sort(jobsPrefix + ':jobs:' + state, 'by', mode, order, 'LIMIT', offset.toString(), count.toString(), (err, values) => {
60 if (err) return rej(err)
61
62
63
64 return res(values)
65 })
66 })
67 }
68
57 private getValue (key: string) { 69 private getValue (key: string) {
58 return new Promise<string>((res, rej) => { 70 return new Promise<string>((res, rej) => {
59 this.client.get(this.prefix + key, (err, value) => { 71 this.client.get(this.prefix + key, (err, value) => {