diff options
Diffstat (limited to 'server/lib/jobs/job-scheduler.ts')
-rw-r--r-- | server/lib/jobs/job-scheduler.ts | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts new file mode 100644 index 000000000..7b8c6faf9 --- /dev/null +++ b/server/lib/jobs/job-scheduler.ts | |||
@@ -0,0 +1,137 @@ | |||
1 | import { forever, queue } from 'async' | ||
2 | |||
3 | const db = require('../../initializers/database') | ||
4 | import { | ||
5 | JOBS_FETCHING_INTERVAL, | ||
6 | JOBS_FETCH_LIMIT_PER_CYCLE, | ||
7 | JOB_STATES | ||
8 | } from '../../initializers' | ||
9 | import { logger } from '../../helpers' | ||
10 | import { jobHandlers } from './handlers' | ||
11 | |||
12 | class JobScheduler { | ||
13 | |||
14 | private static instance: JobScheduler | ||
15 | |||
16 | private constructor () { } | ||
17 | |||
18 | static get Instance () { | ||
19 | return this.instance || (this.instance = new this()) | ||
20 | } | ||
21 | |||
22 | activate () { | ||
23 | const limit = JOBS_FETCH_LIMIT_PER_CYCLE | ||
24 | |||
25 | logger.info('Jobs scheduler activated.') | ||
26 | |||
27 | const jobsQueue = queue(this.processJob) | ||
28 | |||
29 | // Finish processing jobs from a previous start | ||
30 | const state = JOB_STATES.PROCESSING | ||
31 | db.Job.listWithLimit(limit, state, (err, jobs) => { | ||
32 | this.enqueueJobs(err, jobsQueue, jobs) | ||
33 | |||
34 | forever( | ||
35 | next => { | ||
36 | if (jobsQueue.length() !== 0) { | ||
37 | // Finish processing the queue first | ||
38 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | ||
39 | } | ||
40 | |||
41 | const state = JOB_STATES.PENDING | ||
42 | db.Job.listWithLimit(limit, state, (err, jobs) => { | ||
43 | if (err) { | ||
44 | logger.error('Cannot list pending jobs.', { error: err }) | ||
45 | } else { | ||
46 | jobs.forEach(job => { | ||
47 | jobsQueue.push(job) | ||
48 | }) | ||
49 | } | ||
50 | |||
51 | // Optimization: we could use "drain" from queue object | ||
52 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | ||
53 | }) | ||
54 | }, | ||
55 | |||
56 | err => { logger.error('Error in job scheduler queue.', { error: err }) } | ||
57 | ) | ||
58 | }) | ||
59 | } | ||
60 | |||
61 | createJob (transaction, handlerName, handlerInputData, callback) { | ||
62 | const createQuery = { | ||
63 | state: JOB_STATES.PENDING, | ||
64 | handlerName, | ||
65 | handlerInputData | ||
66 | } | ||
67 | const options = { transaction } | ||
68 | |||
69 | db.Job.create(createQuery, options).asCallback(callback) | ||
70 | } | ||
71 | |||
72 | private enqueueJobs (err, jobsQueue, jobs) { | ||
73 | if (err) { | ||
74 | logger.error('Cannot list pending jobs.', { error: err }) | ||
75 | } else { | ||
76 | jobs.forEach(job => { | ||
77 | jobsQueue.push(job) | ||
78 | }) | ||
79 | } | ||
80 | } | ||
81 | |||
82 | private processJob (job, callback) { | ||
83 | const jobHandler = jobHandlers[job.handlerName] | ||
84 | |||
85 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | ||
86 | |||
87 | job.state = JOB_STATES.PROCESSING | ||
88 | job.save().asCallback(err => { | ||
89 | if (err) return this.cannotSaveJobError(err, callback) | ||
90 | |||
91 | if (jobHandler === undefined) { | ||
92 | logger.error('Unknown job handler for job %s.', jobHandler.handlerName) | ||
93 | return callback() | ||
94 | } | ||
95 | |||
96 | return jobHandler.process(job.handlerInputData, (err, result) => { | ||
97 | if (err) { | ||
98 | logger.error('Error in job handler %s.', job.handlerName, { error: err }) | ||
99 | return this.onJobError(jobHandler, job, result, callback) | ||
100 | } | ||
101 | |||
102 | return this.onJobSuccess(jobHandler, job, result, callback) | ||
103 | }) | ||
104 | }) | ||
105 | } | ||
106 | |||
107 | private onJobError (jobHandler, job, jobResult, callback) { | ||
108 | job.state = JOB_STATES.ERROR | ||
109 | |||
110 | job.save().asCallback(err => { | ||
111 | if (err) return this.cannotSaveJobError(err, callback) | ||
112 | |||
113 | return jobHandler.onError(err, job.id, jobResult, callback) | ||
114 | }) | ||
115 | } | ||
116 | |||
117 | private onJobSuccess (jobHandler, job, jobResult, callback) { | ||
118 | job.state = JOB_STATES.SUCCESS | ||
119 | |||
120 | job.save().asCallback(err => { | ||
121 | if (err) return this.cannotSaveJobError(err, callback) | ||
122 | |||
123 | return jobHandler.onSuccess(err, job.id, jobResult, callback) | ||
124 | }) | ||
125 | } | ||
126 | |||
127 | private cannotSaveJobError (err, callback) { | ||
128 | logger.error('Cannot save new job state.', { error: err }) | ||
129 | return callback(err) | ||
130 | } | ||
131 | } | ||
132 | |||
133 | // --------------------------------------------------------------------------- | ||
134 | |||
135 | export { | ||
136 | JobScheduler | ||
137 | } | ||