aboutsummaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-05-05 17:24:16 +0200
committerChocobozzz <florian.bigard@gmail.com>2017-05-05 17:24:16 +0200
commit4e284e97b95d1fe93378b90061272e3abc875078 (patch)
treec5ad3b70046725dc7491b0d0a228bd1f76e4cab4
parente5b885390557dd61d64ca3e0db5f33dd602518f2 (diff)
downloadPeerTube-4e284e97b95d1fe93378b90061272e3abc875078.tar.gz
PeerTube-4e284e97b95d1fe93378b90061272e3abc875078.tar.zst
PeerTube-4e284e97b95d1fe93378b90061272e3abc875078.zip
Server: finish old jobs at startup
-rw-r--r--server.js5
-rw-r--r--server/lib/jobs/job-scheduler.js55
-rw-r--r--server/models/job.js4
3 files changed, 44 insertions, 20 deletions
diff --git a/server.js b/server.js
index 54fa914bf..b2487b767 100644
--- a/server.js
+++ b/server.js
@@ -29,6 +29,11 @@ const missed = checker.checkMissedConfig()
29if (missed.length !== 0) { 29if (missed.length !== 0) {
30 throw new Error('Miss some configurations keys : ' + missed) 30 throw new Error('Miss some configurations keys : ' + missed)
31} 31}
32checker.checkFFmpeg(function (err) {
33 if (err) {
34 throw err
35 }
36})
32 37
33const errorMessage = checker.checkConfig() 38const errorMessage = checker.checkConfig()
34if (errorMessage !== null) { 39if (errorMessage !== null) {
diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js
index c59bf9262..7b239577f 100644
--- a/server/lib/jobs/job-scheduler.js
+++ b/server/lib/jobs/job-scheduler.js
@@ -15,31 +15,40 @@ const jobScheduler = {
15} 15}
16 16
17function activate () { 17function activate () {
18 const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE
19
18 logger.info('Jobs scheduler activated.') 20 logger.info('Jobs scheduler activated.')
19 21
20 const jobsQueue = queue(processJob) 22 const jobsQueue = queue(processJob)
21 23
22 forever( 24 // Finish processing jobs from a previous start
23 function (next) { 25 const state = constants.JOB_STATES.PROCESSING
24 if (jobsQueue.length() !== 0) { 26 db.Job.listWithLimit(limit, state, function (err, jobs) {
25 // Finish processing the queue first 27 enqueueJobs(err, jobsQueue, jobs)
26 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
27 }
28 28
29 db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) { 29 forever(
30 if (err) { 30 function (next) {
31 logger.error('Cannot list pending jobs.', { error: err }) 31 if (jobsQueue.length() !== 0) {
32 } else { 32 // Finish processing the queue first
33 jobs.forEach(function (job) { 33 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
34 jobsQueue.push(job)
35 })
36 } 34 }
37 35
38 // Optimization: we could use "drain" from queue object 36 const state = constants.JOB_STATES.PENDING
39 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) 37 db.Job.listWithLimit(limit, state, function (err, jobs) {
40 }) 38 if (err) {
41 } 39 logger.error('Cannot list pending jobs.', { error: err })
42 ) 40 } else {
41 jobs.forEach(function (job) {
42 jobsQueue.push(job)
43 })
44 }
45
46 // Optimization: we could use "drain" from queue object
47 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
48 })
49 }
50 )
51 })
43} 52}
44 53
45// --------------------------------------------------------------------------- 54// ---------------------------------------------------------------------------
@@ -48,6 +57,16 @@ module.exports = jobScheduler
48 57
49// --------------------------------------------------------------------------- 58// ---------------------------------------------------------------------------
50 59
60function enqueueJobs (err, jobsQueue, jobs) {
61 if (err) {
62 logger.error('Cannot list pending jobs.', { error: err })
63 } else {
64 jobs.forEach(function (job) {
65 jobsQueue.push(job)
66 })
67 }
68}
69
51function createJob (transaction, handlerName, handlerInputData, callback) { 70function createJob (transaction, handlerName, handlerInputData, callback) {
52 const createQuery = { 71 const createQuery = {
53 state: constants.JOB_STATES.PENDING, 72 state: constants.JOB_STATES.PENDING,
diff --git a/server/models/job.js b/server/models/job.js
index eeb50e16d..949f88d44 100644
--- a/server/models/job.js
+++ b/server/models/job.js
@@ -39,14 +39,14 @@ module.exports = function (sequelize, DataTypes) {
39 39
40// --------------------------------------------------------------------------- 40// ---------------------------------------------------------------------------
41 41
42function listWithLimit (limit, callback) { 42function listWithLimit (limit, state, callback) {
43 const query = { 43 const query = {
44 order: [ 44 order: [
45 [ 'id', 'ASC' ] 45 [ 'id', 'ASC' ]
46 ], 46 ],
47 limit: limit, 47 limit: limit,
48 where: { 48 where: {
49 state: constants.JOB_STATES.PENDING 49 state
50 } 50 }
51 } 51 }
52 52