X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjobs%2Fjob-scheduler.js;h=7b239577f8935d4e16b73d78695aaecbd4f53a10;hb=d5f345ed4cfac4e1fa84dcb4fce1cda4d32f9c73;hp=c59bf9262c399a2bcbaa8fb30178c370f4f64885;hpb=62326afb151a1062253ac8b08bb62ce3f01e1267;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js index c59bf9262..7b239577f 100644 --- a/server/lib/jobs/job-scheduler.js +++ b/server/lib/jobs/job-scheduler.js @@ -15,31 +15,40 @@ const jobScheduler = { } function activate () { + const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE + logger.info('Jobs scheduler activated.') const jobsQueue = queue(processJob) - forever( - function (next) { - if (jobsQueue.length() !== 0) { - // Finish processing the queue first - return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) - } + // Finish processing jobs from a previous start + const state = constants.JOB_STATES.PROCESSING + db.Job.listWithLimit(limit, state, function (err, jobs) { + enqueueJobs(err, jobsQueue, jobs) - db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) { - if (err) { - logger.error('Cannot list pending jobs.', { error: err }) - } else { - jobs.forEach(function (job) { - jobsQueue.push(job) - }) + forever( + function (next) { + if (jobsQueue.length() !== 0) { + // Finish processing the queue first + return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) } - // Optimization: we could use "drain" from queue object - return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) - }) - } - ) + const state = constants.JOB_STATES.PENDING + db.Job.listWithLimit(limit, state, function (err, jobs) { + if (err) { + logger.error('Cannot list pending jobs.', { error: err }) + } else { + jobs.forEach(function (job) { + jobsQueue.push(job) + }) + } + + // Optimization: we could use "drain" from queue object + return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) + }) + } + ) + }) } // --------------------------------------------------------------------------- @@ -48,6 +57,16 @@ module.exports = jobScheduler // --------------------------------------------------------------------------- +function enqueueJobs (err, jobsQueue, jobs) { + if (err) { + logger.error('Cannot list pending jobs.', { error: err }) + } else { + jobs.forEach(function (job) { + jobsQueue.push(job) + }) + } +} + function createJob (transaction, handlerName, handlerInputData, callback) { const createQuery = { state: constants.JOB_STATES.PENDING,