From 227d02feadbc9b1fc916a12528ccc0623fb3069e Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 2 May 2017 22:02:27 +0200 Subject: Server: add job scheduler to transcode video files --- server/lib/jobs/handlers/index.js | 7 ++ server/lib/jobs/handlers/video-transcoder.js | 34 +++++++++ server/lib/jobs/job-scheduler.js | 110 +++++++++++++++++++++++++++ 3 files changed, 151 insertions(+) create mode 100644 server/lib/jobs/handlers/index.js create mode 100644 server/lib/jobs/handlers/video-transcoder.js create mode 100644 server/lib/jobs/job-scheduler.js (limited to 'server/lib/jobs') diff --git a/server/lib/jobs/handlers/index.js b/server/lib/jobs/handlers/index.js new file mode 100644 index 000000000..59c1ccce5 --- /dev/null +++ b/server/lib/jobs/handlers/index.js @@ -0,0 +1,7 @@ +'use strict' + +const videoTranscoder = require('./video-transcoder') + +module.exports = { + videoTranscoder +} diff --git a/server/lib/jobs/handlers/video-transcoder.js b/server/lib/jobs/handlers/video-transcoder.js new file mode 100644 index 000000000..8524df3aa --- /dev/null +++ b/server/lib/jobs/handlers/video-transcoder.js @@ -0,0 +1,34 @@ +'use strict' + +const db = require('../../../initializers/database') +const logger = require('../../../helpers/logger') + +const VideoTranscoderHandler = { + process, + onError, + onSuccess +} + +// --------------------------------------------------------------------------- + +function process (data, callback) { + db.Video.load(data.id, function (err, video) { + if (err) return callback(err) + + video.transcodeVideofile(callback) + }) +} + +function onError (err, jobId, callback) { + logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) + return callback() +} + +function onSuccess (data, jobId, callback) { + logger.info('Job %d is a success.', jobId) + return callback() +} + +// --------------------------------------------------------------------------- + +module.exports = VideoTranscoderHandler diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js new file mode 100644 index 000000000..589a30630 --- /dev/null +++ b/server/lib/jobs/job-scheduler.js @@ -0,0 +1,110 @@ +'use strict' + +const forever = require('async/forever') +const queue = require('async/queue') + +const constants = require('../../initializers/constants') +const db = require('../../initializers/database') +const logger = require('../../helpers/logger') + +const jobHandlers = require('./handlers') + +const jobScheduler = { + activate, + createJob +} + +function activate () { + logger.info('Jobs scheduler activated.') + + const jobsQueue = queue(processJob) + + forever( + function (next) { + if (jobsQueue.length() !== 0) { + // Finish processing the queue first + return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) + } + + db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) { + if (err) { + logger.error('Cannot list pending jobs.', { error: err }) + } else { + jobs.forEach(function (job) { + jobsQueue.push(job) + }) + } + + // Optimization: we could use "drain" from queue object + return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) + }) + } + ) +} + +// --------------------------------------------------------------------------- + +module.exports = jobScheduler + +// --------------------------------------------------------------------------- + +function createJob (transaction, handlerName, handlerInputData, callback) { + const createQuery = { + state: constants.JOB_STATES.PENDING, + handlerName, + handlerInputData + } + const options = { transaction } + + db.Job.create(createQuery, options).asCallback(callback) +} + +function processJob (job, callback) { + const jobHandler = jobHandlers[job.handlerName] + + logger.info('Processing job %d with handler %s.', job.id, job.handlerName) + + job.state = constants.JOB_STATES.PROCESSING + job.save().asCallback(function (err) { + if (err) return cannotSaveJobError(err, callback) + + if (jobHandler === undefined) { + logger.error('Unknown job handler for job %s.', jobHandler.handlerName) + return callback() + } + + return jobHandler.process(job.handlerInputData, function (err, result) { + if (err) { + logger.error('Error in job handler %s.', job.handlerName, { error: err }) + return onJobError(jobHandler, job, callback) + } + + return onJobSuccess(jobHandler, job, callback) + }) + }) +} + +function onJobError (jobHandler, job, callback) { + job.state = constants.JOB_STATES.ERROR + + job.save().asCallback(function (err) { + if (err) return cannotSaveJobError(err, callback) + + return jobHandler.onError(err, job.id, callback) + }) +} + +function onJobSuccess (jobHandler, job, callback) { + job.state = constants.JOB_STATES.SUCCESS + + job.save().asCallback(function (err) { + if (err) return cannotSaveJobError(err, callback) + + return jobHandler.onSuccess(err, job.id, callback) + }) +} + +function cannotSaveJobError (err, callback) { + logger.error('Cannot save new job state.', { error: err }) + return callback(err) +} -- cgit v1.2.3