From 4e284e97b95d1fe93378b90061272e3abc875078 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 5 May 2017 17:24:16 +0200 Subject: Server: finish old jobs at startup --- server/lib/jobs/job-scheduler.js | 55 +++++++++++++++++++++++++++------------- server/models/job.js | 4 +-- 2 files changed, 39 insertions(+), 20 deletions(-) (limited to 'server') diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js index c59bf9262..7b239577f 100644 --- a/server/lib/jobs/job-scheduler.js +++ b/server/lib/jobs/job-scheduler.js @@ -15,31 +15,40 @@ const jobScheduler = { } function activate () { + const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE + logger.info('Jobs scheduler activated.') const jobsQueue = queue(processJob) - forever( - function (next) { - if (jobsQueue.length() !== 0) { - // Finish processing the queue first - return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) - } + // Finish processing jobs from a previous start + const state = constants.JOB_STATES.PROCESSING + db.Job.listWithLimit(limit, state, function (err, jobs) { + enqueueJobs(err, jobsQueue, jobs) - db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) { - if (err) { - logger.error('Cannot list pending jobs.', { error: err }) - } else { - jobs.forEach(function (job) { - jobsQueue.push(job) - }) + forever( + function (next) { + if (jobsQueue.length() !== 0) { + // Finish processing the queue first + return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) } - // Optimization: we could use "drain" from queue object - return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) - }) - } - ) + const state = constants.JOB_STATES.PENDING + db.Job.listWithLimit(limit, state, function (err, jobs) { + if (err) { + logger.error('Cannot list pending jobs.', { error: err }) + } else { + jobs.forEach(function (job) { + jobsQueue.push(job) + }) + } + + // Optimization: we could use "drain" from queue object + return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) + }) + } + ) + }) } // --------------------------------------------------------------------------- @@ -48,6 +57,16 @@ module.exports = jobScheduler // --------------------------------------------------------------------------- +function enqueueJobs (err, jobsQueue, jobs) { + if (err) { + logger.error('Cannot list pending jobs.', { error: err }) + } else { + jobs.forEach(function (job) { + jobsQueue.push(job) + }) + } +} + function createJob (transaction, handlerName, handlerInputData, callback) { const createQuery = { state: constants.JOB_STATES.PENDING, diff --git a/server/models/job.js b/server/models/job.js index eeb50e16d..949f88d44 100644 --- a/server/models/job.js +++ b/server/models/job.js @@ -39,14 +39,14 @@ module.exports = function (sequelize, DataTypes) { // --------------------------------------------------------------------------- -function listWithLimit (limit, callback) { +function listWithLimit (limit, state, callback) { const query = { order: [ [ 'id', 'ASC' ] ], limit: limit, where: { - state: constants.JOB_STATES.PENDING + state } } -- cgit v1.2.3