aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs/job-scheduler.js
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-05-05 17:24:16 +0200
committerChocobozzz <florian.bigard@gmail.com>2017-05-05 17:24:16 +0200
commit4e284e97b95d1fe93378b90061272e3abc875078 (patch)
treec5ad3b70046725dc7491b0d0a228bd1f76e4cab4 /server/lib/jobs/job-scheduler.js
parente5b885390557dd61d64ca3e0db5f33dd602518f2 (diff)
downloadPeerTube-4e284e97b95d1fe93378b90061272e3abc875078.tar.gz
PeerTube-4e284e97b95d1fe93378b90061272e3abc875078.tar.zst
PeerTube-4e284e97b95d1fe93378b90061272e3abc875078.zip
Server: finish old jobs at startup
Diffstat (limited to 'server/lib/jobs/job-scheduler.js')
-rw-r--r--server/lib/jobs/job-scheduler.js55
1 files changed, 37 insertions, 18 deletions
diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js
index c59bf9262..7b239577f 100644
--- a/server/lib/jobs/job-scheduler.js
+++ b/server/lib/jobs/job-scheduler.js
@@ -15,31 +15,40 @@ const jobScheduler = {
15} 15}
16 16
17function activate () { 17function activate () {
18 const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE
19
18 logger.info('Jobs scheduler activated.') 20 logger.info('Jobs scheduler activated.')
19 21
20 const jobsQueue = queue(processJob) 22 const jobsQueue = queue(processJob)
21 23
22 forever( 24 // Finish processing jobs from a previous start
23 function (next) { 25 const state = constants.JOB_STATES.PROCESSING
24 if (jobsQueue.length() !== 0) { 26 db.Job.listWithLimit(limit, state, function (err, jobs) {
25 // Finish processing the queue first 27 enqueueJobs(err, jobsQueue, jobs)
26 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
27 }
28 28
29 db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) { 29 forever(
30 if (err) { 30 function (next) {
31 logger.error('Cannot list pending jobs.', { error: err }) 31 if (jobsQueue.length() !== 0) {
32 } else { 32 // Finish processing the queue first
33 jobs.forEach(function (job) { 33 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
34 jobsQueue.push(job)
35 })
36 } 34 }
37 35
38 // Optimization: we could use "drain" from queue object 36 const state = constants.JOB_STATES.PENDING
39 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) 37 db.Job.listWithLimit(limit, state, function (err, jobs) {
40 }) 38 if (err) {
41 } 39 logger.error('Cannot list pending jobs.', { error: err })
42 ) 40 } else {
41 jobs.forEach(function (job) {
42 jobsQueue.push(job)
43 })
44 }
45
46 // Optimization: we could use "drain" from queue object
47 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
48 })
49 }
50 )
51 })
43} 52}
44 53
45// --------------------------------------------------------------------------- 54// ---------------------------------------------------------------------------
@@ -48,6 +57,16 @@ module.exports = jobScheduler
48 57
49// --------------------------------------------------------------------------- 58// ---------------------------------------------------------------------------
50 59
60function enqueueJobs (err, jobsQueue, jobs) {
61 if (err) {
62 logger.error('Cannot list pending jobs.', { error: err })
63 } else {
64 jobs.forEach(function (job) {
65 jobsQueue.push(job)
66 })
67 }
68}
69
51function createJob (transaction, handlerName, handlerInputData, callback) { 70function createJob (transaction, handlerName, handlerInputData, callback) {
52 const createQuery = { 71 const createQuery = {
53 state: constants.JOB_STATES.PENDING, 72 state: constants.JOB_STATES.PENDING,