diff options
author | Chocobozzz <me@florianbigard.com> | 2018-01-25 15:05:18 +0100 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2018-01-25 18:41:17 +0100 |
commit | 94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4 (patch) | |
tree | 32a9148e0e4567f0c4ffae0412cbed20b84e8873 /server/lib/job-queue/job-queue.ts | |
parent | d765fafc3faf0db9818eb1a07161df1cb1bc0efa (diff) | |
download | PeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.tar.gz PeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.tar.zst PeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.zip |
Move job queue to redis
We'll use it as cache in the future.
/!\ You'll loose your old jobs (pending jobs too) so upgrade only when
you don't have pending job anymore.
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 124 |
1 files changed, 124 insertions, 0 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts new file mode 100644 index 000000000..7a2b6c78d --- /dev/null +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -0,0 +1,124 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { JobType, JobState } from '../../../shared/models' | ||
3 | import { logger } from '../../helpers/logger' | ||
4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' | ||
5 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | ||
6 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | ||
7 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | ||
8 | import { processVideoFile, VideoFilePayload } from './handlers/video-file' | ||
9 | |||
10 | type CreateJobArgument = | ||
11 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | ||
12 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | ||
13 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | | ||
14 | { type: 'video-file', payload: VideoFilePayload } | ||
15 | |||
16 | const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { | ||
17 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | ||
18 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | ||
19 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | ||
20 | 'video-file': processVideoFile | ||
21 | } | ||
22 | |||
23 | class JobQueue { | ||
24 | |||
25 | private static instance: JobQueue | ||
26 | |||
27 | private jobQueue: kue.Queue | ||
28 | private initialized = false | ||
29 | |||
30 | private constructor () {} | ||
31 | |||
32 | init () { | ||
33 | // Already initialized | ||
34 | if (this.initialized === true) return | ||
35 | this.initialized = true | ||
36 | |||
37 | this.jobQueue = kue.createQueue({ | ||
38 | prefix: 'q-' + CONFIG.WEBSERVER.HOST, | ||
39 | redis: { | ||
40 | host: CONFIG.REDIS.HOSTNAME, | ||
41 | port: CONFIG.REDIS.PORT, | ||
42 | auth: CONFIG.REDIS.AUTH | ||
43 | } | ||
44 | }) | ||
45 | |||
46 | this.jobQueue.on('error', err => { | ||
47 | logger.error('Error in job queue.', err) | ||
48 | process.exit(-1) | ||
49 | }) | ||
50 | this.jobQueue.watchStuckJobs(5000) | ||
51 | |||
52 | for (const handlerName of Object.keys(handlers)) { | ||
53 | this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { | ||
54 | try { | ||
55 | const res = await handlers[ handlerName ](job) | ||
56 | return done(null, res) | ||
57 | } catch (err) { | ||
58 | return done(err) | ||
59 | } | ||
60 | }) | ||
61 | } | ||
62 | } | ||
63 | |||
64 | createJob (obj: CreateJobArgument, priority = 'normal') { | ||
65 | return new Promise((res, rej) => { | ||
66 | this.jobQueue | ||
67 | .create(obj.type, obj.payload) | ||
68 | .priority(priority) | ||
69 | .attempts(JOB_ATTEMPTS[obj.type]) | ||
70 | .backoff({ type: 'exponential' }) | ||
71 | .save(err => { | ||
72 | if (err) return rej(err) | ||
73 | |||
74 | return res() | ||
75 | }) | ||
76 | }) | ||
77 | } | ||
78 | |||
79 | listForApi (state: JobState, start: number, count: number, sort: string) { | ||
80 | return new Promise<kue.Job[]>((res, rej) => { | ||
81 | kue.Job.rangeByState(state, start, count, sort, (err, jobs) => { | ||
82 | if (err) return rej(err) | ||
83 | |||
84 | return res(jobs) | ||
85 | }) | ||
86 | }) | ||
87 | } | ||
88 | |||
89 | count (state: JobState) { | ||
90 | return new Promise<number>((res, rej) => { | ||
91 | this.jobQueue[state + 'Count']((err, total) => { | ||
92 | if (err) return rej(err) | ||
93 | |||
94 | return res(total) | ||
95 | }) | ||
96 | }) | ||
97 | } | ||
98 | |||
99 | removeOldJobs () { | ||
100 | const now = new Date().getTime() | ||
101 | kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { | ||
102 | if (err) { | ||
103 | logger.error('Cannot get jobs when removing old jobs.', err) | ||
104 | return | ||
105 | } | ||
106 | |||
107 | for (const job of jobs) { | ||
108 | if (now - job.created_at > JOB_COMPLETED_LIFETIME) { | ||
109 | job.remove() | ||
110 | } | ||
111 | } | ||
112 | }) | ||
113 | } | ||
114 | |||
115 | static get Instance () { | ||
116 | return this.instance || (this.instance = new this()) | ||
117 | } | ||
118 | } | ||
119 | |||
120 | // --------------------------------------------------------------------------- | ||
121 | |||
122 | export { | ||
123 | JobQueue | ||
124 | } | ||