aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs/job-scheduler.js
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/jobs/job-scheduler.js')
-rw-r--r--server/lib/jobs/job-scheduler.js110
1 files changed, 110 insertions, 0 deletions
diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js
new file mode 100644
index 000000000..589a30630
--- /dev/null
+++ b/server/lib/jobs/job-scheduler.js
@@ -0,0 +1,110 @@
1'use strict'
2
3const forever = require('async/forever')
4const queue = require('async/queue')
5
6const constants = require('../../initializers/constants')
7const db = require('../../initializers/database')
8const logger = require('../../helpers/logger')
9
10const jobHandlers = require('./handlers')
11
12const jobScheduler = {
13 activate,
14 createJob
15}
16
17function activate () {
18 logger.info('Jobs scheduler activated.')
19
20 const jobsQueue = queue(processJob)
21
22 forever(
23 function (next) {
24 if (jobsQueue.length() !== 0) {
25 // Finish processing the queue first
26 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
27 }
28
29 db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) {
30 if (err) {
31 logger.error('Cannot list pending jobs.', { error: err })
32 } else {
33 jobs.forEach(function (job) {
34 jobsQueue.push(job)
35 })
36 }
37
38 // Optimization: we could use "drain" from queue object
39 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
40 })
41 }
42 )
43}
44
45// ---------------------------------------------------------------------------
46
47module.exports = jobScheduler
48
49// ---------------------------------------------------------------------------
50
51function createJob (transaction, handlerName, handlerInputData, callback) {
52 const createQuery = {
53 state: constants.JOB_STATES.PENDING,
54 handlerName,
55 handlerInputData
56 }
57 const options = { transaction }
58
59 db.Job.create(createQuery, options).asCallback(callback)
60}
61
62function processJob (job, callback) {
63 const jobHandler = jobHandlers[job.handlerName]
64
65 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
66
67 job.state = constants.JOB_STATES.PROCESSING
68 job.save().asCallback(function (err) {
69 if (err) return cannotSaveJobError(err, callback)
70
71 if (jobHandler === undefined) {
72 logger.error('Unknown job handler for job %s.', jobHandler.handlerName)
73 return callback()
74 }
75
76 return jobHandler.process(job.handlerInputData, function (err, result) {
77 if (err) {
78 logger.error('Error in job handler %s.', job.handlerName, { error: err })
79 return onJobError(jobHandler, job, callback)
80 }
81
82 return onJobSuccess(jobHandler, job, callback)
83 })
84 })
85}
86
87function onJobError (jobHandler, job, callback) {
88 job.state = constants.JOB_STATES.ERROR
89
90 job.save().asCallback(function (err) {
91 if (err) return cannotSaveJobError(err, callback)
92
93 return jobHandler.onError(err, job.id, callback)
94 })
95}
96
97function onJobSuccess (jobHandler, job, callback) {
98 job.state = constants.JOB_STATES.SUCCESS
99
100 job.save().asCallback(function (err) {
101 if (err) return cannotSaveJobError(err, callback)
102
103 return jobHandler.onSuccess(err, job.id, callback)
104 })
105}
106
107function cannotSaveJobError (err, callback) {
108 logger.error('Cannot save new job state.', { error: err })
109 return callback(err)
110}