]>
Commit | Line | Data |
---|---|---|
bcd1c9e1 | 1 | import { AsyncQueue, forever, queue } from 'async' |
69818c93 | 2 | import * as Sequelize from 'sequelize' |
65fcc311 | 3 | |
e02643f3 | 4 | import { database as db } from '../../initializers/database' |
65fcc311 C |
5 | import { |
6 | JOBS_FETCHING_INTERVAL, | |
7 | JOBS_FETCH_LIMIT_PER_CYCLE, | |
8 | JOB_STATES | |
9 | } from '../../initializers' | |
10 | import { logger } from '../../helpers' | |
69818c93 C |
11 | import { JobInstance } from '../../models' |
12 | import { JobHandler, jobHandlers } from './handlers' | |
13 | ||
14 | type JobQueueCallback = (err: Error) => void | |
65fcc311 C |
15 | |
16 | class JobScheduler { | |
17 | ||
18 | private static instance: JobScheduler | |
19 | ||
20 | private constructor () { } | |
21 | ||
22 | static get Instance () { | |
23 | return this.instance || (this.instance = new this()) | |
24 | } | |
25 | ||
f5028693 | 26 | async activate () { |
65fcc311 C |
27 | const limit = JOBS_FETCH_LIMIT_PER_CYCLE |
28 | ||
29 | logger.info('Jobs scheduler activated.') | |
30 | ||
69818c93 | 31 | const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) |
65fcc311 C |
32 | |
33 | // Finish processing jobs from a previous start | |
34 | const state = JOB_STATES.PROCESSING | |
f5028693 C |
35 | try { |
36 | const jobs = await db.Job.listWithLimit(limit, state) | |
37 | ||
38 | this.enqueueJobs(jobsQueue, jobs) | |
39 | } catch (err) { | |
40 | logger.error('Cannot list pending jobs.', err) | |
41 | } | |
42 | ||
43 | forever( | |
44 | async next => { | |
45 | if (jobsQueue.length() !== 0) { | |
46 | // Finish processing the queue first | |
47 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | |
48 | } | |
49 | ||
50 | const state = JOB_STATES.PENDING | |
51 | try { | |
52 | const jobs = await db.Job.listWithLimit(limit, state) | |
53 | ||
54 | this.enqueueJobs(jobsQueue, jobs) | |
55 | } catch (err) { | |
56 | logger.error('Cannot list pending jobs.', err) | |
57 | } | |
58 | ||
59 | // Optimization: we could use "drain" from queue object | |
60 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | |
61 | }, | |
62 | ||
63 | err => logger.error('Error in job scheduler queue.', err) | |
64 | ) | |
65fcc311 C |
65 | } |
66 | ||
6fcd19ba | 67 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { |
65fcc311 C |
68 | const createQuery = { |
69 | state: JOB_STATES.PENDING, | |
70 | handlerName, | |
71 | handlerInputData | |
72 | } | |
73 | const options = { transaction } | |
74 | ||
6fcd19ba | 75 | return db.Job.create(createQuery, options) |
65fcc311 C |
76 | } |
77 | ||
6fcd19ba C |
78 | private enqueueJobs (jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) { |
79 | jobs.forEach(job => jobsQueue.push(job)) | |
65fcc311 C |
80 | } |
81 | ||
f5028693 | 82 | private async processJob (job: JobInstance, callback: (err: Error) => void) { |
65fcc311 | 83 | const jobHandler = jobHandlers[job.handlerName] |
6fcd19ba C |
84 | if (jobHandler === undefined) { |
85 | logger.error('Unknown job handler for job %s.', job.handlerName) | |
86 | return callback(null) | |
87 | } | |
65fcc311 C |
88 | |
89 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | |
90 | ||
91 | job.state = JOB_STATES.PROCESSING | |
f5028693 C |
92 | await job.save() |
93 | ||
94 | try { | |
95 | const result = await jobHandler.process(job.handlerInputData, job.id) | |
96 | await this.onJobSuccess(jobHandler, job, result) | |
97 | } catch (err) { | |
98 | logger.error('Error in job handler %s.', job.handlerName, err) | |
99 | ||
100 | try { | |
101 | await this.onJobError(jobHandler, job, err) | |
102 | } catch (innerErr) { | |
103 | this.cannotSaveJobError(innerErr) | |
104 | return callback(innerErr) | |
105 | } | |
106 | } | |
107 | ||
108 | callback(null) | |
65fcc311 C |
109 | } |
110 | ||
f5028693 | 111 | private async onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) { |
65fcc311 C |
112 | job.state = JOB_STATES.ERROR |
113 | ||
f5028693 C |
114 | try { |
115 | await job.save() | |
116 | await jobHandler.onError(err, job.id) | |
117 | } catch (err) { | |
118 | this.cannotSaveJobError(err) | |
119 | } | |
65fcc311 C |
120 | } |
121 | ||
f5028693 | 122 | private async onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) { |
65fcc311 C |
123 | job.state = JOB_STATES.SUCCESS |
124 | ||
f5028693 C |
125 | try { |
126 | await job.save() | |
127 | jobHandler.onSuccess(job.id, jobResult) | |
128 | } catch (err) { | |
129 | this.cannotSaveJobError(err) | |
130 | } | |
65fcc311 C |
131 | } |
132 | ||
6fcd19ba | 133 | private cannotSaveJobError (err: Error) { |
ad0997ad | 134 | logger.error('Cannot save new job state.', err) |
65fcc311 C |
135 | } |
136 | } | |
137 | ||
138 | // --------------------------------------------------------------------------- | |
139 | ||
140 | export { | |
141 | JobScheduler | |
142 | } |