]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Server: finish old jobs at startup
authorChocobozzz <florian.bigard@gmail.com>
Fri, 5 May 2017 15:24:16 +0000 (17:24 +0200)
committerChocobozzz <florian.bigard@gmail.com>
Fri, 5 May 2017 15:24:16 +0000 (17:24 +0200)
server.js
server/lib/jobs/job-scheduler.js
server/models/job.js

index 54fa914bfbf5ad24dda7737d788adcfc9a681f32..b2487b767c36faa614a40233ac0e0340bfd24833 100644 (file)
--- a/server.js
+++ b/server.js
@@ -29,6 +29,11 @@ const missed = checker.checkMissedConfig()
 if (missed.length !== 0) {
   throw new Error('Miss some configurations keys : ' + missed)
 }
+checker.checkFFmpeg(function (err) {
+  if (err) {
+    throw err
+  }
+})
 
 const errorMessage = checker.checkConfig()
 if (errorMessage !== null) {
index c59bf9262c399a2bcbaa8fb30178c370f4f64885..7b239577f8935d4e16b73d78695aaecbd4f53a10 100644 (file)
@@ -15,31 +15,40 @@ const jobScheduler = {
 }
 
 function activate () {
+  const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE
+
   logger.info('Jobs scheduler activated.')
 
   const jobsQueue = queue(processJob)
 
-  forever(
-    function (next) {
-      if (jobsQueue.length() !== 0) {
-        // Finish processing the queue first
-        return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
-      }
+  // Finish processing jobs from a previous start
+  const state = constants.JOB_STATES.PROCESSING
+  db.Job.listWithLimit(limit, state, function (err, jobs) {
+    enqueueJobs(err, jobsQueue, jobs)
 
-      db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) {
-        if (err) {
-          logger.error('Cannot list pending jobs.', { error: err })
-        } else {
-          jobs.forEach(function (job) {
-            jobsQueue.push(job)
-          })
+    forever(
+      function (next) {
+        if (jobsQueue.length() !== 0) {
+          // Finish processing the queue first
+          return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
         }
 
-        // Optimization: we could use "drain" from queue object
-        return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
-      })
-    }
-  )
+        const state = constants.JOB_STATES.PENDING
+        db.Job.listWithLimit(limit, state, function (err, jobs) {
+          if (err) {
+            logger.error('Cannot list pending jobs.', { error: err })
+          } else {
+            jobs.forEach(function (job) {
+              jobsQueue.push(job)
+            })
+          }
+
+          // Optimization: we could use "drain" from queue object
+          return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
+        })
+      }
+    )
+  })
 }
 
 // ---------------------------------------------------------------------------
@@ -48,6 +57,16 @@ module.exports = jobScheduler
 
 // ---------------------------------------------------------------------------
 
+function enqueueJobs (err, jobsQueue, jobs) {
+  if (err) {
+    logger.error('Cannot list pending jobs.', { error: err })
+  } else {
+    jobs.forEach(function (job) {
+      jobsQueue.push(job)
+    })
+  }
+}
+
 function createJob (transaction, handlerName, handlerInputData, callback) {
   const createQuery = {
     state: constants.JOB_STATES.PENDING,
index eeb50e16d6f8c433e787db6b048bce73b677d124..949f88d44eb33ec33a13763c2bc40d816ab54ec5 100644 (file)
@@ -39,14 +39,14 @@ module.exports = function (sequelize, DataTypes) {
 
 // ---------------------------------------------------------------------------
 
-function listWithLimit (limit, callback) {
+function listWithLimit (limit, state, callback) {
   const query = {
     order: [
       [ 'id', 'ASC' ]
     ],
     limit: limit,
     where: {
-      state: constants.JOB_STATES.PENDING
+      state
     }
   }