]>
Commit | Line | Data |
---|---|---|
227d02fe C |
1 | 'use strict' |
2 | ||
3 | const forever = require('async/forever') | |
4 | const queue = require('async/queue') | |
5 | ||
6 | const constants = require('../../initializers/constants') | |
7 | const db = require('../../initializers/database') | |
8 | const logger = require('../../helpers/logger') | |
9 | ||
10 | const jobHandlers = require('./handlers') | |
11 | ||
12 | const jobScheduler = { | |
13 | activate, | |
14 | createJob | |
15 | } | |
16 | ||
17 | function activate () { | |
4e284e97 C |
18 | const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE |
19 | ||
227d02fe C |
20 | logger.info('Jobs scheduler activated.') |
21 | ||
22 | const jobsQueue = queue(processJob) | |
23 | ||
4e284e97 C |
24 | // Finish processing jobs from a previous start |
25 | const state = constants.JOB_STATES.PROCESSING | |
26 | db.Job.listWithLimit(limit, state, function (err, jobs) { | |
27 | enqueueJobs(err, jobsQueue, jobs) | |
227d02fe | 28 | |
4e284e97 C |
29 | forever( |
30 | function (next) { | |
31 | if (jobsQueue.length() !== 0) { | |
32 | // Finish processing the queue first | |
33 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | |
227d02fe C |
34 | } |
35 | ||
4e284e97 C |
36 | const state = constants.JOB_STATES.PENDING |
37 | db.Job.listWithLimit(limit, state, function (err, jobs) { | |
38 | if (err) { | |
39 | logger.error('Cannot list pending jobs.', { error: err }) | |
40 | } else { | |
41 | jobs.forEach(function (job) { | |
42 | jobsQueue.push(job) | |
43 | }) | |
44 | } | |
45 | ||
46 | // Optimization: we could use "drain" from queue object | |
47 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | |
48 | }) | |
49 | } | |
50 | ) | |
51 | }) | |
227d02fe C |
52 | } |
53 | ||
54 | // --------------------------------------------------------------------------- | |
55 | ||
56 | module.exports = jobScheduler | |
57 | ||
58 | // --------------------------------------------------------------------------- | |
59 | ||
4e284e97 C |
60 | function enqueueJobs (err, jobsQueue, jobs) { |
61 | if (err) { | |
62 | logger.error('Cannot list pending jobs.', { error: err }) | |
63 | } else { | |
64 | jobs.forEach(function (job) { | |
65 | jobsQueue.push(job) | |
66 | }) | |
67 | } | |
68 | } | |
69 | ||
227d02fe C |
70 | function createJob (transaction, handlerName, handlerInputData, callback) { |
71 | const createQuery = { | |
72 | state: constants.JOB_STATES.PENDING, | |
73 | handlerName, | |
74 | handlerInputData | |
75 | } | |
76 | const options = { transaction } | |
77 | ||
78 | db.Job.create(createQuery, options).asCallback(callback) | |
79 | } | |
80 | ||
81 | function processJob (job, callback) { | |
82 | const jobHandler = jobHandlers[job.handlerName] | |
83 | ||
84 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | |
85 | ||
86 | job.state = constants.JOB_STATES.PROCESSING | |
87 | job.save().asCallback(function (err) { | |
88 | if (err) return cannotSaveJobError(err, callback) | |
89 | ||
90 | if (jobHandler === undefined) { | |
91 | logger.error('Unknown job handler for job %s.', jobHandler.handlerName) | |
92 | return callback() | |
93 | } | |
94 | ||
95 | return jobHandler.process(job.handlerInputData, function (err, result) { | |
96 | if (err) { | |
97 | logger.error('Error in job handler %s.', job.handlerName, { error: err }) | |
62326afb | 98 | return onJobError(jobHandler, job, result, callback) |
227d02fe C |
99 | } |
100 | ||
62326afb | 101 | return onJobSuccess(jobHandler, job, result, callback) |
227d02fe C |
102 | }) |
103 | }) | |
104 | } | |
105 | ||
62326afb | 106 | function onJobError (jobHandler, job, jobResult, callback) { |
227d02fe C |
107 | job.state = constants.JOB_STATES.ERROR |
108 | ||
109 | job.save().asCallback(function (err) { | |
110 | if (err) return cannotSaveJobError(err, callback) | |
111 | ||
62326afb | 112 | return jobHandler.onError(err, job.id, jobResult, callback) |
227d02fe C |
113 | }) |
114 | } | |
115 | ||
62326afb | 116 | function onJobSuccess (jobHandler, job, jobResult, callback) { |
227d02fe C |
117 | job.state = constants.JOB_STATES.SUCCESS |
118 | ||
119 | job.save().asCallback(function (err) { | |
120 | if (err) return cannotSaveJobError(err, callback) | |
121 | ||
62326afb | 122 | return jobHandler.onSuccess(err, job.id, jobResult, callback) |
227d02fe C |
123 | }) |
124 | } | |
125 | ||
126 | function cannotSaveJobError (err, callback) { | |
127 | logger.error('Cannot save new job state.', { error: err }) | |
128 | return callback(err) | |
129 | } |