3 const forever
= require('async/forever')
4 const queue
= require('async/queue')
6 const constants
= require('../../initializers/constants')
7 const db
= require('../../initializers/database')
8 const logger
= require('../../helpers/logger')
10 const jobHandlers
= require('./handlers')
12 const jobScheduler
= {
17 function activate () {
18 const limit
= constants
.JOBS_FETCH_LIMIT_PER_CYCLE
20 logger
.info('Jobs scheduler activated.')
22 const jobsQueue
= queue(processJob
)
24 // Finish processing jobs from a previous start
25 const state
= constants
.JOB_STATES
.PROCESSING
26 db
.Job
.listWithLimit(limit
, state
, function (err
, jobs
) {
27 enqueueJobs(err
, jobsQueue
, jobs
)
31 if (jobsQueue
.length() !== 0) {
32 // Finish processing the queue first
33 return setTimeout(next
, constants
.JOBS_FETCHING_INTERVAL
)
36 const state
= constants
.JOB_STATES
.PENDING
37 db
.Job
.listWithLimit(limit
, state
, function (err
, jobs
) {
39 logger
.error('Cannot list pending jobs.', { error: err
})
41 jobs
.forEach(function (job
) {
46 // Optimization: we could use "drain" from queue object
47 return setTimeout(next
, constants
.JOBS_FETCHING_INTERVAL
)
54 // ---------------------------------------------------------------------------
56 module
.exports
= jobScheduler
58 // ---------------------------------------------------------------------------
60 function enqueueJobs (err
, jobsQueue
, jobs
) {
62 logger
.error('Cannot list pending jobs.', { error: err
})
64 jobs
.forEach(function (job
) {
70 function createJob (transaction
, handlerName
, handlerInputData
, callback
) {
72 state: constants
.JOB_STATES
.PENDING
,
76 const options
= { transaction
}
78 db
.Job
.create(createQuery
, options
).asCallback(callback
)
81 function processJob (job
, callback
) {
82 const jobHandler
= jobHandlers
[job
.handlerName
]
84 logger
.info('Processing job %d with handler %s.', job
.id
, job
.handlerName
)
86 job
.state
= constants
.JOB_STATES
.PROCESSING
87 job
.save().asCallback(function (err
) {
88 if (err
) return cannotSaveJobError(err
, callback
)
90 if (jobHandler
=== undefined) {
91 logger
.error('Unknown job handler for job %s.', jobHandler
.handlerName
)
95 return jobHandler
.process(job
.handlerInputData
, function (err
, result
) {
97 logger
.error('Error in job handler %s.', job
.handlerName
, { error: err
})
98 return onJobError(jobHandler
, job
, result
, callback
)
101 return onJobSuccess(jobHandler
, job
, result
, callback
)
106 function onJobError (jobHandler
, job
, jobResult
, callback
) {
107 job
.state
= constants
.JOB_STATES
.ERROR
109 job
.save().asCallback(function (err
) {
110 if (err
) return cannotSaveJobError(err
, callback
)
112 return jobHandler
.onError(err
, job
.id
, jobResult
, callback
)
116 function onJobSuccess (jobHandler
, job
, jobResult
, callback
) {
117 job
.state
= constants
.JOB_STATES
.SUCCESS
119 job
.save().asCallback(function (err
) {
120 if (err
) return cannotSaveJobError(err
, callback
)
122 return jobHandler
.onSuccess(err
, job
.id
, jobResult
, callback
)
126 function cannotSaveJobError (err
, callback
) {
127 logger
.error('Cannot save new job state.', { error: err
})