diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-05-15 22:22:03 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-05-20 09:57:40 +0200 |
commit | 65fcc3119c334b75dd13bcfdebf186afdc580a8f (patch) | |
tree | 4f2158c61a9b7c3f47cfa233d01413b946ee53c0 /server/lib/jobs/job-scheduler.js | |
parent | d5f345ed4cfac4e1fa84dcb4fce1cda4d32f9c73 (diff) | |
download | PeerTube-65fcc3119c334b75dd13bcfdebf186afdc580a8f.tar.gz PeerTube-65fcc3119c334b75dd13bcfdebf186afdc580a8f.tar.zst PeerTube-65fcc3119c334b75dd13bcfdebf186afdc580a8f.zip |
First typescript iteration
Diffstat (limited to 'server/lib/jobs/job-scheduler.js')
-rw-r--r-- | server/lib/jobs/job-scheduler.js | 129 |
1 files changed, 0 insertions, 129 deletions
diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js deleted file mode 100644 index 7b239577f..000000000 --- a/server/lib/jobs/job-scheduler.js +++ /dev/null | |||
@@ -1,129 +0,0 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const forever = require('async/forever') | ||
4 | const queue = require('async/queue') | ||
5 | |||
6 | const constants = require('../../initializers/constants') | ||
7 | const db = require('../../initializers/database') | ||
8 | const logger = require('../../helpers/logger') | ||
9 | |||
10 | const jobHandlers = require('./handlers') | ||
11 | |||
12 | const jobScheduler = { | ||
13 | activate, | ||
14 | createJob | ||
15 | } | ||
16 | |||
17 | function activate () { | ||
18 | const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE | ||
19 | |||
20 | logger.info('Jobs scheduler activated.') | ||
21 | |||
22 | const jobsQueue = queue(processJob) | ||
23 | |||
24 | // Finish processing jobs from a previous start | ||
25 | const state = constants.JOB_STATES.PROCESSING | ||
26 | db.Job.listWithLimit(limit, state, function (err, jobs) { | ||
27 | enqueueJobs(err, jobsQueue, jobs) | ||
28 | |||
29 | forever( | ||
30 | function (next) { | ||
31 | if (jobsQueue.length() !== 0) { | ||
32 | // Finish processing the queue first | ||
33 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | ||
34 | } | ||
35 | |||
36 | const state = constants.JOB_STATES.PENDING | ||
37 | db.Job.listWithLimit(limit, state, function (err, jobs) { | ||
38 | if (err) { | ||
39 | logger.error('Cannot list pending jobs.', { error: err }) | ||
40 | } else { | ||
41 | jobs.forEach(function (job) { | ||
42 | jobsQueue.push(job) | ||
43 | }) | ||
44 | } | ||
45 | |||
46 | // Optimization: we could use "drain" from queue object | ||
47 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | ||
48 | }) | ||
49 | } | ||
50 | ) | ||
51 | }) | ||
52 | } | ||
53 | |||
54 | // --------------------------------------------------------------------------- | ||
55 | |||
56 | module.exports = jobScheduler | ||
57 | |||
58 | // --------------------------------------------------------------------------- | ||
59 | |||
60 | function enqueueJobs (err, jobsQueue, jobs) { | ||
61 | if (err) { | ||
62 | logger.error('Cannot list pending jobs.', { error: err }) | ||
63 | } else { | ||
64 | jobs.forEach(function (job) { | ||
65 | jobsQueue.push(job) | ||
66 | }) | ||
67 | } | ||
68 | } | ||
69 | |||
70 | function createJob (transaction, handlerName, handlerInputData, callback) { | ||
71 | const createQuery = { | ||
72 | state: constants.JOB_STATES.PENDING, | ||
73 | handlerName, | ||
74 | handlerInputData | ||
75 | } | ||
76 | const options = { transaction } | ||
77 | |||
78 | db.Job.create(createQuery, options).asCallback(callback) | ||
79 | } | ||
80 | |||
81 | function processJob (job, callback) { | ||
82 | const jobHandler = jobHandlers[job.handlerName] | ||
83 | |||
84 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | ||
85 | |||
86 | job.state = constants.JOB_STATES.PROCESSING | ||
87 | job.save().asCallback(function (err) { | ||
88 | if (err) return cannotSaveJobError(err, callback) | ||
89 | |||
90 | if (jobHandler === undefined) { | ||
91 | logger.error('Unknown job handler for job %s.', jobHandler.handlerName) | ||
92 | return callback() | ||
93 | } | ||
94 | |||
95 | return jobHandler.process(job.handlerInputData, function (err, result) { | ||
96 | if (err) { | ||
97 | logger.error('Error in job handler %s.', job.handlerName, { error: err }) | ||
98 | return onJobError(jobHandler, job, result, callback) | ||
99 | } | ||
100 | |||
101 | return onJobSuccess(jobHandler, job, result, callback) | ||
102 | }) | ||
103 | }) | ||
104 | } | ||
105 | |||
106 | function onJobError (jobHandler, job, jobResult, callback) { | ||
107 | job.state = constants.JOB_STATES.ERROR | ||
108 | |||
109 | job.save().asCallback(function (err) { | ||
110 | if (err) return cannotSaveJobError(err, callback) | ||
111 | |||
112 | return jobHandler.onError(err, job.id, jobResult, callback) | ||
113 | }) | ||
114 | } | ||
115 | |||
116 | function onJobSuccess (jobHandler, job, jobResult, callback) { | ||
117 | job.state = constants.JOB_STATES.SUCCESS | ||
118 | |||
119 | job.save().asCallback(function (err) { | ||
120 | if (err) return cannotSaveJobError(err, callback) | ||
121 | |||
122 | return jobHandler.onSuccess(err, job.id, jobResult, callback) | ||
123 | }) | ||
124 | } | ||
125 | |||
126 | function cannotSaveJobError (err, callback) { | ||
127 | logger.error('Cannot save new job state.', { error: err }) | ||
128 | return callback(err) | ||
129 | } | ||