diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-05-05 17:24:16 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-05-05 17:24:16 +0200 |
commit | 4e284e97b95d1fe93378b90061272e3abc875078 (patch) | |
tree | c5ad3b70046725dc7491b0d0a228bd1f76e4cab4 | |
parent | e5b885390557dd61d64ca3e0db5f33dd602518f2 (diff) | |
download | PeerTube-4e284e97b95d1fe93378b90061272e3abc875078.tar.gz PeerTube-4e284e97b95d1fe93378b90061272e3abc875078.tar.zst PeerTube-4e284e97b95d1fe93378b90061272e3abc875078.zip |
Server: finish old jobs at startup
-rw-r--r-- | server.js | 5 | ||||
-rw-r--r-- | server/lib/jobs/job-scheduler.js | 55 | ||||
-rw-r--r-- | server/models/job.js | 4 |
3 files changed, 44 insertions, 20 deletions
@@ -29,6 +29,11 @@ const missed = checker.checkMissedConfig() | |||
29 | if (missed.length !== 0) { | 29 | if (missed.length !== 0) { |
30 | throw new Error('Miss some configurations keys : ' + missed) | 30 | throw new Error('Miss some configurations keys : ' + missed) |
31 | } | 31 | } |
32 | checker.checkFFmpeg(function (err) { | ||
33 | if (err) { | ||
34 | throw err | ||
35 | } | ||
36 | }) | ||
32 | 37 | ||
33 | const errorMessage = checker.checkConfig() | 38 | const errorMessage = checker.checkConfig() |
34 | if (errorMessage !== null) { | 39 | if (errorMessage !== null) { |
diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js index c59bf9262..7b239577f 100644 --- a/server/lib/jobs/job-scheduler.js +++ b/server/lib/jobs/job-scheduler.js | |||
@@ -15,31 +15,40 @@ const jobScheduler = { | |||
15 | } | 15 | } |
16 | 16 | ||
17 | function activate () { | 17 | function activate () { |
18 | const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE | ||
19 | |||
18 | logger.info('Jobs scheduler activated.') | 20 | logger.info('Jobs scheduler activated.') |
19 | 21 | ||
20 | const jobsQueue = queue(processJob) | 22 | const jobsQueue = queue(processJob) |
21 | 23 | ||
22 | forever( | 24 | // Finish processing jobs from a previous start |
23 | function (next) { | 25 | const state = constants.JOB_STATES.PROCESSING |
24 | if (jobsQueue.length() !== 0) { | 26 | db.Job.listWithLimit(limit, state, function (err, jobs) { |
25 | // Finish processing the queue first | 27 | enqueueJobs(err, jobsQueue, jobs) |
26 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | ||
27 | } | ||
28 | 28 | ||
29 | db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) { | 29 | forever( |
30 | if (err) { | 30 | function (next) { |
31 | logger.error('Cannot list pending jobs.', { error: err }) | 31 | if (jobsQueue.length() !== 0) { |
32 | } else { | 32 | // Finish processing the queue first |
33 | jobs.forEach(function (job) { | 33 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) |
34 | jobsQueue.push(job) | ||
35 | }) | ||
36 | } | 34 | } |
37 | 35 | ||
38 | // Optimization: we could use "drain" from queue object | 36 | const state = constants.JOB_STATES.PENDING |
39 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | 37 | db.Job.listWithLimit(limit, state, function (err, jobs) { |
40 | }) | 38 | if (err) { |
41 | } | 39 | logger.error('Cannot list pending jobs.', { error: err }) |
42 | ) | 40 | } else { |
41 | jobs.forEach(function (job) { | ||
42 | jobsQueue.push(job) | ||
43 | }) | ||
44 | } | ||
45 | |||
46 | // Optimization: we could use "drain" from queue object | ||
47 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | ||
48 | }) | ||
49 | } | ||
50 | ) | ||
51 | }) | ||
43 | } | 52 | } |
44 | 53 | ||
45 | // --------------------------------------------------------------------------- | 54 | // --------------------------------------------------------------------------- |
@@ -48,6 +57,16 @@ module.exports = jobScheduler | |||
48 | 57 | ||
49 | // --------------------------------------------------------------------------- | 58 | // --------------------------------------------------------------------------- |
50 | 59 | ||
60 | function enqueueJobs (err, jobsQueue, jobs) { | ||
61 | if (err) { | ||
62 | logger.error('Cannot list pending jobs.', { error: err }) | ||
63 | } else { | ||
64 | jobs.forEach(function (job) { | ||
65 | jobsQueue.push(job) | ||
66 | }) | ||
67 | } | ||
68 | } | ||
69 | |||
51 | function createJob (transaction, handlerName, handlerInputData, callback) { | 70 | function createJob (transaction, handlerName, handlerInputData, callback) { |
52 | const createQuery = { | 71 | const createQuery = { |
53 | state: constants.JOB_STATES.PENDING, | 72 | state: constants.JOB_STATES.PENDING, |
diff --git a/server/models/job.js b/server/models/job.js index eeb50e16d..949f88d44 100644 --- a/server/models/job.js +++ b/server/models/job.js | |||
@@ -39,14 +39,14 @@ module.exports = function (sequelize, DataTypes) { | |||
39 | 39 | ||
40 | // --------------------------------------------------------------------------- | 40 | // --------------------------------------------------------------------------- |
41 | 41 | ||
42 | function listWithLimit (limit, callback) { | 42 | function listWithLimit (limit, state, callback) { |
43 | const query = { | 43 | const query = { |
44 | order: [ | 44 | order: [ |
45 | [ 'id', 'ASC' ] | 45 | [ 'id', 'ASC' ] |
46 | ], | 46 | ], |
47 | limit: limit, | 47 | limit: limit, |
48 | where: { | 48 | where: { |
49 | state: constants.JOB_STATES.PENDING | 49 | state |
50 | } | 50 | } |
51 | } | 51 | } |
52 | 52 | ||