aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs/job-scheduler.js
blob: c59bf9262c399a2bcbaa8fb30178c370f4f64885 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
'use strict'

const forever = require('async/forever')
const queue = require('async/queue')

const constants = require('../../initializers/constants')
const db = require('../../initializers/database')
const logger = require('../../helpers/logger')

const jobHandlers = require('./handlers')

const jobScheduler = {
  activate,
  createJob
}

function activate () {
  logger.info('Jobs scheduler activated.')

  const jobsQueue = queue(processJob)

  forever(
    function (next) {
      if (jobsQueue.length() !== 0) {
        // Finish processing the queue first
        return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
      }

      db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) {
        if (err) {
          logger.error('Cannot list pending jobs.', { error: err })
        } else {
          jobs.forEach(function (job) {
            jobsQueue.push(job)
          })
        }

        // Optimization: we could use "drain" from queue object
        return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
      })
    }
  )
}

// ---------------------------------------------------------------------------

module.exports = jobScheduler

// ---------------------------------------------------------------------------

function createJob (transaction, handlerName, handlerInputData, callback) {
  const createQuery = {
    state: constants.JOB_STATES.PENDING,
    handlerName,
    handlerInputData
  }
  const options = { transaction }

  db.Job.create(createQuery, options).asCallback(callback)
}

function processJob (job, callback) {
  const jobHandler = jobHandlers[job.handlerName]

  logger.info('Processing job %d with handler %s.', job.id, job.handlerName)

  job.state = constants.JOB_STATES.PROCESSING
  job.save().asCallback(function (err) {
    if (err) return cannotSaveJobError(err, callback)

    if (jobHandler === undefined) {
      logger.error('Unknown job handler for job %s.', jobHandler.handlerName)
      return callback()
    }

    return jobHandler.process(job.handlerInputData, function (err, result) {
      if (err) {
        logger.error('Error in job handler %s.', job.handlerName, { error: err })
        return onJobError(jobHandler, job, result, callback)
      }

      return onJobSuccess(jobHandler, job, result, callback)
    })
  })
}

function onJobError (jobHandler, job, jobResult, callback) {
  job.state = constants.JOB_STATES.ERROR

  job.save().asCallback(function (err) {
    if (err) return cannotSaveJobError(err, callback)

    return jobHandler.onError(err, job.id, jobResult, callback)
  })
}

function onJobSuccess (jobHandler, job, jobResult, callback) {
  job.state = constants.JOB_STATES.SUCCESS

  job.save().asCallback(function (err) {
    if (err) return cannotSaveJobError(err, callback)

    return jobHandler.onSuccess(err, job.id, jobResult, callback)
  })
}

function cannotSaveJobError (err, callback) {
  logger.error('Cannot save new job state.', { error: err })
  return callback(err)
}