]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/jobs/job-scheduler.js
Server: finish old jobs at startup
[github/Chocobozzz/PeerTube.git] / server / lib / jobs / job-scheduler.js
index c59bf9262c399a2bcbaa8fb30178c370f4f64885..7b239577f8935d4e16b73d78695aaecbd4f53a10 100644 (file)
@@ -15,31 +15,40 @@ const jobScheduler = {
 }
 
 function activate () {
+  const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE
+
   logger.info('Jobs scheduler activated.')
 
   const jobsQueue = queue(processJob)
 
-  forever(
-    function (next) {
-      if (jobsQueue.length() !== 0) {
-        // Finish processing the queue first
-        return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
-      }
+  // Finish processing jobs from a previous start
+  const state = constants.JOB_STATES.PROCESSING
+  db.Job.listWithLimit(limit, state, function (err, jobs) {
+    enqueueJobs(err, jobsQueue, jobs)
 
-      db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) {
-        if (err) {
-          logger.error('Cannot list pending jobs.', { error: err })
-        } else {
-          jobs.forEach(function (job) {
-            jobsQueue.push(job)
-          })
+    forever(
+      function (next) {
+        if (jobsQueue.length() !== 0) {
+          // Finish processing the queue first
+          return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
         }
 
-        // Optimization: we could use "drain" from queue object
-        return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
-      })
-    }
-  )
+        const state = constants.JOB_STATES.PENDING
+        db.Job.listWithLimit(limit, state, function (err, jobs) {
+          if (err) {
+            logger.error('Cannot list pending jobs.', { error: err })
+          } else {
+            jobs.forEach(function (job) {
+              jobsQueue.push(job)
+            })
+          }
+
+          // Optimization: we could use "drain" from queue object
+          return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
+        })
+      }
+    )
+  })
 }
 
 // ---------------------------------------------------------------------------
@@ -48,6 +57,16 @@ module.exports = jobScheduler
 
 // ---------------------------------------------------------------------------
 
+function enqueueJobs (err, jobsQueue, jobs) {
+  if (err) {
+    logger.error('Cannot list pending jobs.', { error: err })
+  } else {
+    jobs.forEach(function (job) {
+      jobsQueue.push(job)
+    })
+  }
+}
+
 function createJob (transaction, handlerName, handlerInputData, callback) {
   const createQuery = {
     state: constants.JOB_STATES.PENDING,