diff options
Diffstat (limited to 'server/lib/jobs/job-scheduler.js')
-rw-r--r-- | server/lib/jobs/job-scheduler.js | 55 |
1 files changed, 37 insertions, 18 deletions
diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js index c59bf9262..7b239577f 100644 --- a/server/lib/jobs/job-scheduler.js +++ b/server/lib/jobs/job-scheduler.js | |||
@@ -15,31 +15,40 @@ const jobScheduler = { | |||
15 | } | 15 | } |
16 | 16 | ||
17 | function activate () { | 17 | function activate () { |
18 | const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE | ||
19 | |||
18 | logger.info('Jobs scheduler activated.') | 20 | logger.info('Jobs scheduler activated.') |
19 | 21 | ||
20 | const jobsQueue = queue(processJob) | 22 | const jobsQueue = queue(processJob) |
21 | 23 | ||
22 | forever( | 24 | // Finish processing jobs from a previous start |
23 | function (next) { | 25 | const state = constants.JOB_STATES.PROCESSING |
24 | if (jobsQueue.length() !== 0) { | 26 | db.Job.listWithLimit(limit, state, function (err, jobs) { |
25 | // Finish processing the queue first | 27 | enqueueJobs(err, jobsQueue, jobs) |
26 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | ||
27 | } | ||
28 | 28 | ||
29 | db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) { | 29 | forever( |
30 | if (err) { | 30 | function (next) { |
31 | logger.error('Cannot list pending jobs.', { error: err }) | 31 | if (jobsQueue.length() !== 0) { |
32 | } else { | 32 | // Finish processing the queue first |
33 | jobs.forEach(function (job) { | 33 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) |
34 | jobsQueue.push(job) | ||
35 | }) | ||
36 | } | 34 | } |
37 | 35 | ||
38 | // Optimization: we could use "drain" from queue object | 36 | const state = constants.JOB_STATES.PENDING |
39 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | 37 | db.Job.listWithLimit(limit, state, function (err, jobs) { |
40 | }) | 38 | if (err) { |
41 | } | 39 | logger.error('Cannot list pending jobs.', { error: err }) |
42 | ) | 40 | } else { |
41 | jobs.forEach(function (job) { | ||
42 | jobsQueue.push(job) | ||
43 | }) | ||
44 | } | ||
45 | |||
46 | // Optimization: we could use "drain" from queue object | ||
47 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | ||
48 | }) | ||
49 | } | ||
50 | ) | ||
51 | }) | ||
43 | } | 52 | } |
44 | 53 | ||
45 | // --------------------------------------------------------------------------- | 54 | // --------------------------------------------------------------------------- |
@@ -48,6 +57,16 @@ module.exports = jobScheduler | |||
48 | 57 | ||
49 | // --------------------------------------------------------------------------- | 58 | // --------------------------------------------------------------------------- |
50 | 59 | ||
60 | function enqueueJobs (err, jobsQueue, jobs) { | ||
61 | if (err) { | ||
62 | logger.error('Cannot list pending jobs.', { error: err }) | ||
63 | } else { | ||
64 | jobs.forEach(function (job) { | ||
65 | jobsQueue.push(job) | ||
66 | }) | ||
67 | } | ||
68 | } | ||
69 | |||
51 | function createJob (transaction, handlerName, handlerInputData, callback) { | 70 | function createJob (transaction, handlerName, handlerInputData, callback) { |
52 | const createQuery = { | 71 | const createQuery = { |
53 | state: constants.JOB_STATES.PENDING, | 72 | state: constants.JOB_STATES.PENDING, |