From 4e284e97b95d1fe93378b90061272e3abc875078 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 5 May 2017 17:24:16 +0200 Subject: Server: finish old jobs at startup --- server/lib/jobs/job-scheduler.js | 55 +++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 18 deletions(-) (limited to 'server/lib/jobs') diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js index c59bf9262..7b239577f 100644 --- a/server/lib/jobs/job-scheduler.js +++ b/server/lib/jobs/job-scheduler.js @@ -15,31 +15,40 @@ const jobScheduler = { } function activate () { + const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE + logger.info('Jobs scheduler activated.') const jobsQueue = queue(processJob) - forever( - function (next) { - if (jobsQueue.length() !== 0) { - // Finish processing the queue first - return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) - } + // Finish processing jobs from a previous start + const state = constants.JOB_STATES.PROCESSING + db.Job.listWithLimit(limit, state, function (err, jobs) { + enqueueJobs(err, jobsQueue, jobs) - db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) { - if (err) { - logger.error('Cannot list pending jobs.', { error: err }) - } else { - jobs.forEach(function (job) { - jobsQueue.push(job) - }) + forever( + function (next) { + if (jobsQueue.length() !== 0) { + // Finish processing the queue first + return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) } - // Optimization: we could use "drain" from queue object - return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) - }) - } - ) + const state = constants.JOB_STATES.PENDING + db.Job.listWithLimit(limit, state, function (err, jobs) { + if (err) { + logger.error('Cannot list pending jobs.', { error: err }) + } else { + jobs.forEach(function (job) { + jobsQueue.push(job) + }) + } + + // Optimization: we could use "drain" from queue object + return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) + }) + } + ) + }) } // --------------------------------------------------------------------------- @@ -48,6 +57,16 @@ module.exports = jobScheduler // --------------------------------------------------------------------------- +function enqueueJobs (err, jobsQueue, jobs) { + if (err) { + logger.error('Cannot list pending jobs.', { error: err }) + } else { + jobs.forEach(function (job) { + jobsQueue.push(job) + }) + } +} + function createJob (transaction, handlerName, handlerInputData, callback) { const createQuery = { state: constants.JOB_STATES.PENDING, -- cgit v1.2.3