diff options
-rw-r--r-- | server/controllers/api/jobs.ts | 2 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 30 | ||||
-rw-r--r-- | server/lib/redis.ts | 12 |
3 files changed, 35 insertions, 9 deletions
diff --git a/server/controllers/api/jobs.ts b/server/controllers/api/jobs.ts index 132d110ad..aa58a9144 100644 --- a/server/controllers/api/jobs.ts +++ b/server/controllers/api/jobs.ts | |||
@@ -36,7 +36,7 @@ export { | |||
36 | // --------------------------------------------------------------------------- | 36 | // --------------------------------------------------------------------------- |
37 | 37 | ||
38 | async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) { | 38 | async function listJobs (req: express.Request, res: express.Response, next: express.NextFunction) { |
39 | const sort = req.query.sort === 'createdAt' ? 'asc' : 'desc' | 39 | const sort = req.query.sort === 'createdAt' ? 'ASC' : 'DESC' |
40 | 40 | ||
41 | const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort) | 41 | const jobs = await JobQueue.Instance.listForApi(req.params.state, req.query.start, req.query.count, sort) |
42 | const total = await JobQueue.Instance.count(req.params.state) | 42 | const total = await JobQueue.Instance.count(req.params.state) |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index fb3cc8f66..b0ccbd59c 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -2,6 +2,7 @@ import * as kue from 'kue' | |||
2 | import { JobType, JobState } from '../../../shared/models' | 2 | import { JobType, JobState } from '../../../shared/models' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' | 4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' |
5 | import { Redis } from '../redis' | ||
5 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | 6 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' |
6 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | 7 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' |
7 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | 8 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' |
@@ -29,6 +30,7 @@ class JobQueue { | |||
29 | 30 | ||
30 | private jobQueue: kue.Queue | 31 | private jobQueue: kue.Queue |
31 | private initialized = false | 32 | private initialized = false |
33 | private jobRedisPrefix: string | ||
32 | 34 | ||
33 | private constructor () {} | 35 | private constructor () {} |
34 | 36 | ||
@@ -37,8 +39,10 @@ class JobQueue { | |||
37 | if (this.initialized === true) return | 39 | if (this.initialized === true) return |
38 | this.initialized = true | 40 | this.initialized = true |
39 | 41 | ||
42 | this.jobRedisPrefix = 'q-' + CONFIG.WEBSERVER.HOST | ||
43 | |||
40 | this.jobQueue = kue.createQueue({ | 44 | this.jobQueue = kue.createQueue({ |
41 | prefix: 'q-' + CONFIG.WEBSERVER.HOST, | 45 | prefix: this.jobRedisPrefix, |
42 | redis: { | 46 | redis: { |
43 | host: CONFIG.REDIS.HOSTNAME, | 47 | host: CONFIG.REDIS.HOSTNAME, |
44 | port: CONFIG.REDIS.PORT, | 48 | port: CONFIG.REDIS.PORT, |
@@ -83,14 +87,14 @@ class JobQueue { | |||
83 | }) | 87 | }) |
84 | } | 88 | } |
85 | 89 | ||
86 | listForApi (state: JobState, start: number, count: number, sort: string) { | 90 | async listForApi (state: JobState, start: number, count: number, sort: 'ASC' | 'DESC') { |
87 | return new Promise<kue.Job[]>((res, rej) => { | 91 | const jobStrings = await Redis.Instance.listJobs(this.jobRedisPrefix, state, 'alpha', sort, start, count) |
88 | kue.Job.rangeByState(state, start, start + count - 1, sort, (err, jobs) => { | ||
89 | if (err) return rej(err) | ||
90 | 92 | ||
91 | return res(jobs) | 93 | const jobPromises = jobStrings |
92 | }) | 94 | .map(s => s.split('|')) |
93 | }) | 95 | .map(([ , jobId ]) => this.getJob(parseInt(jobId, 10))) |
96 | |||
97 | return Promise.all(jobPromises) | ||
94 | } | 98 | } |
95 | 99 | ||
96 | count (state: JobState) { | 100 | count (state: JobState) { |
@@ -144,6 +148,16 @@ class JobQueue { | |||
144 | return Promise.all(promises) | 148 | return Promise.all(promises) |
145 | } | 149 | } |
146 | 150 | ||
151 | private getJob (id: number) { | ||
152 | return new Promise((res, rej) => { | ||
153 | kue.Job.get(id, (err, job) => { | ||
154 | if (err) return rej(err) | ||
155 | |||
156 | return res(job) | ||
157 | }) | ||
158 | }) | ||
159 | } | ||
160 | |||
147 | static get Instance () { | 161 | static get Instance () { |
148 | return this.instance || (this.instance = new this()) | 162 | return this.instance || (this.instance = new this()) |
149 | } | 163 | } |
diff --git a/server/lib/redis.ts b/server/lib/redis.ts index b284cab8f..2ecff939e 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts | |||
@@ -54,6 +54,18 @@ class Redis { | |||
54 | return this.exists(this.buildViewKey(ip, videoUUID)) | 54 | return this.exists(this.buildViewKey(ip, videoUUID)) |
55 | } | 55 | } |
56 | 56 | ||
57 | listJobs (jobsPrefix: string, state: string, mode: 'alpha', order: 'ASC' | 'DESC', offset: number, count: number) { | ||
58 | return new Promise<string[]>((res, rej) => { | ||
59 | this.client.sort(jobsPrefix + ':jobs:' + state, 'by', mode, order, 'LIMIT', offset.toString(), count.toString(), (err, values) => { | ||
60 | if (err) return rej(err) | ||
61 | |||
62 | |||
63 | |||
64 | return res(values) | ||
65 | }) | ||
66 | }) | ||
67 | } | ||
68 | |||
57 | private getValue (key: string) { | 69 | private getValue (key: string) { |
58 | return new Promise<string>((res, rej) => { | 70 | return new Promise<string>((res, rej) => { |
59 | this.client.get(this.prefix + key, (err, value) => { | 71 | this.client.get(this.prefix + key, (err, value) => { |