3 const forever
= require('async/forever')
4 const queue
= require('async/queue')
6 const constants
= require('../../initializers/constants')
7 const db
= require('../../initializers/database')
8 const logger
= require('../../helpers/logger')
10 const jobHandlers
= require('./handlers')
12 const jobScheduler
= {
17 function activate () {
18 logger
.info('Jobs scheduler activated.')
20 const jobsQueue
= queue(processJob
)
24 if (jobsQueue
.length() !== 0) {
25 // Finish processing the queue first
26 return setTimeout(next
, constants
.JOBS_FETCHING_INTERVAL
)
29 db
.Job
.listWithLimit(constants
.JOBS_FETCH_LIMIT_PER_CYCLE
, function (err
, jobs
) {
31 logger
.error('Cannot list pending jobs.', { error: err
})
33 jobs
.forEach(function (job
) {
38 // Optimization: we could use "drain" from queue object
39 return setTimeout(next
, constants
.JOBS_FETCHING_INTERVAL
)
45 // ---------------------------------------------------------------------------
47 module
.exports
= jobScheduler
49 // ---------------------------------------------------------------------------
51 function createJob (transaction
, handlerName
, handlerInputData
, callback
) {
53 state: constants
.JOB_STATES
.PENDING
,
57 const options
= { transaction
}
59 db
.Job
.create(createQuery
, options
).asCallback(callback
)
62 function processJob (job
, callback
) {
63 const jobHandler
= jobHandlers
[job
.handlerName
]
65 logger
.info('Processing job %d with handler %s.', job
.id
, job
.handlerName
)
67 job
.state
= constants
.JOB_STATES
.PROCESSING
68 job
.save().asCallback(function (err
) {
69 if (err
) return cannotSaveJobError(err
, callback
)
71 if (jobHandler
=== undefined) {
72 logger
.error('Unknown job handler for job %s.', jobHandler
.handlerName
)
76 return jobHandler
.process(job
.handlerInputData
, function (err
, result
) {
78 logger
.error('Error in job handler %s.', job
.handlerName
, { error: err
})
79 return onJobError(jobHandler
, job
, result
, callback
)
82 return onJobSuccess(jobHandler
, job
, result
, callback
)
87 function onJobError (jobHandler
, job
, jobResult
, callback
) {
88 job
.state
= constants
.JOB_STATES
.ERROR
90 job
.save().asCallback(function (err
) {
91 if (err
) return cannotSaveJobError(err
, callback
)
93 return jobHandler
.onError(err
, job
.id
, jobResult
, callback
)
97 function onJobSuccess (jobHandler
, job
, jobResult
, callback
) {
98 job
.state
= constants
.JOB_STATES
.SUCCESS
100 job
.save().asCallback(function (err
) {
101 if (err
) return cannotSaveJobError(err
, callback
)
103 return jobHandler
.onSuccess(err
, job
.id
, jobResult
, callback
)
107 function cannotSaveJobError (err
, callback
) {
108 logger
.error('Cannot save new job state.', { error: err
})