From 227d02feadbc9b1fc916a12528ccc0623fb3069e Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 2 May 2017 22:02:27 +0200 Subject: Server: add job scheduler to transcode video files --- server/initializers/checker.js | 2 +- server/initializers/constants.js | 21 +++++ server/lib/jobs/handlers/index.js | 7 ++ server/lib/jobs/handlers/video-transcoder.js | 34 ++++++++ server/lib/jobs/job-scheduler.js | 110 +++++++++++++++++++++++++ server/models/job.js | 54 +++++++++++++ server/models/video.js | 116 ++++++++++++++++++++++----- 7 files changed, 321 insertions(+), 23 deletions(-) create mode 100644 server/lib/jobs/handlers/index.js create mode 100644 server/lib/jobs/handlers/video-transcoder.js create mode 100644 server/lib/jobs/job-scheduler.js create mode 100644 server/models/job.js (limited to 'server') diff --git a/server/initializers/checker.js b/server/initializers/checker.js index 461a851fe..a3727563a 100644 --- a/server/initializers/checker.js +++ b/server/initializers/checker.js @@ -29,7 +29,7 @@ function checkMissedConfig () { 'webserver.https', 'webserver.hostname', 'webserver.port', 'database.hostname', 'database.port', 'database.suffix', 'database.username', 'database.password', 'storage.certs', 'storage.videos', 'storage.logs', 'storage.thumbnails', 'storage.previews', - 'admin.email', 'signup.enabled' + 'admin.email', 'signup.enabled', 'transcoding.enabled', 'transcoding.threads' ] const miss = [] diff --git a/server/initializers/constants.js b/server/initializers/constants.js index d6da20982..87e9c8002 100644 --- a/server/initializers/constants.js +++ b/server/initializers/constants.js @@ -64,6 +64,10 @@ const CONFIG = { }, SIGNUP: { ENABLED: config.get('signup.enabled') + }, + TRANSCODING: { + ENABLED: config.get('transcoding.enabled'), + THREADS: config.get('transcoding.threads') } } CONFIG.WEBSERVER.URL = CONFIG.WEBSERVER.SCHEME + '://' + CONFIG.WEBSERVER.HOSTNAME + ':' + CONFIG.WEBSERVER.PORT @@ -223,6 +227,18 @@ const REMOTE_SCHEME = { WS: 'wss' } +const JOB_STATES = { + PENDING: 'pending', + PROCESSING: 'processing', + ERROR: 'error', + SUCCESS: 'success' +} +// How many maximum jobs we fetch from the database per cycle +const JOBS_FETCH_LIMIT_PER_CYCLE = 10 +const JOBS_CONCURRENCY = 1 +// 1 minutes +let JOBS_FETCHING_INTERVAL = 60000 + // --------------------------------------------------------------------------- const PRIVATE_CERT_NAME = 'peertube.key.pem' @@ -264,6 +280,7 @@ if (isTestInstance() === true) { CONSTRAINTS_FIELDS.VIDEOS.DURATION.max = 14 FRIEND_SCORE.BASE = 20 REQUESTS_INTERVAL = 10000 + JOBS_FETCHING_INTERVAL = 10000 REMOTE_SCHEME.HTTP = 'http' REMOTE_SCHEME.WS = 'ws' STATIC_MAX_AGE = 0 @@ -277,6 +294,10 @@ module.exports = { CONFIG, CONSTRAINTS_FIELDS, FRIEND_SCORE, + JOBS_FETCHING_INTERVAL, + JOB_STATES, + JOBS_CONCURRENCY, + JOBS_FETCH_LIMIT_PER_CYCLE, LAST_MIGRATION_VERSION, OAUTH_LIFETIME, PAGINATION_COUNT_DEFAULT, diff --git a/server/lib/jobs/handlers/index.js b/server/lib/jobs/handlers/index.js new file mode 100644 index 000000000..59c1ccce5 --- /dev/null +++ b/server/lib/jobs/handlers/index.js @@ -0,0 +1,7 @@ +'use strict' + +const videoTranscoder = require('./video-transcoder') + +module.exports = { + videoTranscoder +} diff --git a/server/lib/jobs/handlers/video-transcoder.js b/server/lib/jobs/handlers/video-transcoder.js new file mode 100644 index 000000000..8524df3aa --- /dev/null +++ b/server/lib/jobs/handlers/video-transcoder.js @@ -0,0 +1,34 @@ +'use strict' + +const db = require('../../../initializers/database') +const logger = require('../../../helpers/logger') + +const VideoTranscoderHandler = { + process, + onError, + onSuccess +} + +// --------------------------------------------------------------------------- + +function process (data, callback) { + db.Video.load(data.id, function (err, video) { + if (err) return callback(err) + + video.transcodeVideofile(callback) + }) +} + +function onError (err, jobId, callback) { + logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) + return callback() +} + +function onSuccess (data, jobId, callback) { + logger.info('Job %d is a success.', jobId) + return callback() +} + +// --------------------------------------------------------------------------- + +module.exports = VideoTranscoderHandler diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js new file mode 100644 index 000000000..589a30630 --- /dev/null +++ b/server/lib/jobs/job-scheduler.js @@ -0,0 +1,110 @@ +'use strict' + +const forever = require('async/forever') +const queue = require('async/queue') + +const constants = require('../../initializers/constants') +const db = require('../../initializers/database') +const logger = require('../../helpers/logger') + +const jobHandlers = require('./handlers') + +const jobScheduler = { + activate, + createJob +} + +function activate () { + logger.info('Jobs scheduler activated.') + + const jobsQueue = queue(processJob) + + forever( + function (next) { + if (jobsQueue.length() !== 0) { + // Finish processing the queue first + return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) + } + + db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) { + if (err) { + logger.error('Cannot list pending jobs.', { error: err }) + } else { + jobs.forEach(function (job) { + jobsQueue.push(job) + }) + } + + // Optimization: we could use "drain" from queue object + return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) + }) + } + ) +} + +// --------------------------------------------------------------------------- + +module.exports = jobScheduler + +// --------------------------------------------------------------------------- + +function createJob (transaction, handlerName, handlerInputData, callback) { + const createQuery = { + state: constants.JOB_STATES.PENDING, + handlerName, + handlerInputData + } + const options = { transaction } + + db.Job.create(createQuery, options).asCallback(callback) +} + +function processJob (job, callback) { + const jobHandler = jobHandlers[job.handlerName] + + logger.info('Processing job %d with handler %s.', job.id, job.handlerName) + + job.state = constants.JOB_STATES.PROCESSING + job.save().asCallback(function (err) { + if (err) return cannotSaveJobError(err, callback) + + if (jobHandler === undefined) { + logger.error('Unknown job handler for job %s.', jobHandler.handlerName) + return callback() + } + + return jobHandler.process(job.handlerInputData, function (err, result) { + if (err) { + logger.error('Error in job handler %s.', job.handlerName, { error: err }) + return onJobError(jobHandler, job, callback) + } + + return onJobSuccess(jobHandler, job, callback) + }) + }) +} + +function onJobError (jobHandler, job, callback) { + job.state = constants.JOB_STATES.ERROR + + job.save().asCallback(function (err) { + if (err) return cannotSaveJobError(err, callback) + + return jobHandler.onError(err, job.id, callback) + }) +} + +function onJobSuccess (jobHandler, job, callback) { + job.state = constants.JOB_STATES.SUCCESS + + job.save().asCallback(function (err) { + if (err) return cannotSaveJobError(err, callback) + + return jobHandler.onSuccess(err, job.id, callback) + }) +} + +function cannotSaveJobError (err, callback) { + logger.error('Cannot save new job state.', { error: err }) + return callback(err) +} diff --git a/server/models/job.js b/server/models/job.js new file mode 100644 index 000000000..eeb50e16d --- /dev/null +++ b/server/models/job.js @@ -0,0 +1,54 @@ +'use strict' + +const values = require('lodash/values') + +const constants = require('../initializers/constants') + +// --------------------------------------------------------------------------- + +module.exports = function (sequelize, DataTypes) { + const Job = sequelize.define('Job', + { + state: { + type: DataTypes.ENUM(values(constants.JOB_STATES)), + allowNull: false + }, + handlerName: { + type: DataTypes.STRING, + allowNull: false + }, + handlerInputData: { + type: DataTypes.JSON, + allowNull: true + } + }, + { + indexes: [ + { + fields: [ 'state' ] + } + ], + classMethods: { + listWithLimit + } + } + ) + + return Job +} + +// --------------------------------------------------------------------------- + +function listWithLimit (limit, callback) { + const query = { + order: [ + [ 'id', 'ASC' ] + ], + limit: limit, + where: { + state: constants.JOB_STATES.PENDING + } + } + + return this.findAll(query).asCallback(callback) +} diff --git a/server/models/video.js b/server/models/video.js index 029cb6d7c..da4ddb420 100644 --- a/server/models/video.js +++ b/server/models/video.js @@ -7,6 +7,7 @@ const fs = require('fs') const magnetUtil = require('magnet-uri') const map = require('lodash/map') const parallel = require('async/parallel') +const series = require('async/series') const parseTorrent = require('parse-torrent') const pathUtils = require('path') const values = require('lodash/values') @@ -17,6 +18,7 @@ const friends = require('../lib/friends') const modelUtils = require('./utils') const customVideosValidators = require('../helpers/custom-validators').videos const db = require('../initializers/database') +const jobScheduler = require('../lib/jobs/job-scheduler') // --------------------------------------------------------------------------- @@ -203,6 +205,7 @@ module.exports = function (sequelize, DataTypes) { toFormatedJSON, toAddRemoteJSON, toUpdateRemoteJSON, + transcodeVideofile, removeFromBlacklist }, hooks: { @@ -234,38 +237,30 @@ function beforeCreate (video, options, next) { tasks.push( function createVideoTorrent (callback) { - const options = { - announceList: [ - [ constants.CONFIG.WEBSERVER.WS + '://' + constants.CONFIG.WEBSERVER.HOSTNAME + ':' + constants.CONFIG.WEBSERVER.PORT + '/tracker/socket' ] - ], - urlList: [ - constants.CONFIG.WEBSERVER.URL + constants.STATIC_PATHS.WEBSEED + video.getVideoFilename() - ] - } - - createTorrent(videoPath, options, function (err, torrent) { - if (err) return callback(err) - - const filePath = pathUtils.join(constants.CONFIG.STORAGE.TORRENTS_DIR, video.getTorrentName()) - fs.writeFile(filePath, torrent, function (err) { - if (err) return callback(err) - - const parsedTorrent = parseTorrent(torrent) - video.set('infoHash', parsedTorrent.infoHash) - video.validate().asCallback(callback) - }) - }) + createTorrentFromVideo(video, videoPath, callback) }, function createVideoThumbnail (callback) { createThumbnail(video, videoPath, callback) }, - function createVIdeoPreview (callback) { + function createVideoPreview (callback) { createPreview(video, videoPath, callback) } ) + if (constants.CONFIG.TRANSCODING.ENABLED === true) { + tasks.push( + function createVideoTranscoderJob (callback) { + const dataInput = { + id: video.id + } + + jobScheduler.createJob(options.transaction, 'videoTranscoder', dataInput, callback) + } + ) + } + return parallel(tasks, next) } @@ -503,6 +498,59 @@ function toUpdateRemoteJSON (callback) { return json } +function transcodeVideofile (finalCallback) { + const video = this + + const videosDirectory = constants.CONFIG.STORAGE.VIDEOS_DIR + const newExtname = '.mp4' + const videoInputPath = pathUtils.join(videosDirectory, video.getVideoFilename()) + const videoOutputPath = pathUtils.join(videosDirectory, video.id + '-transcoded' + newExtname) + + ffmpeg(videoInputPath) + .output(videoOutputPath) + .videoCodec('libx264') + .outputOption('-threads ' + constants.CONFIG.TRANSCODING.THREADS) + .outputOption('-movflags faststart') + .on('error', finalCallback) + .on('end', function () { + series([ + function removeOldFile (callback) { + fs.unlink(videoInputPath, callback) + }, + + function moveNewFile (callback) { + // Important to do this before getVideoFilename() to take in account the new file extension + video.set('extname', newExtname) + + const newVideoPath = pathUtils.join(videosDirectory, video.getVideoFilename()) + fs.rename(videoOutputPath, newVideoPath, callback) + }, + + function torrent (callback) { + const newVideoPath = pathUtils.join(videosDirectory, video.getVideoFilename()) + createTorrentFromVideo(video, newVideoPath, callback) + }, + + function videoExtension (callback) { + video.save().asCallback(callback) + } + + ], function (err) { + if (err) { + // Autodescruction... + video.destroy().asCallback(function (err) { + if (err) logger.error('Cannot destruct video after transcoding failure.', { error: err }) + }) + + return finalCallback(err) + } + + return finalCallback(null) + }) + }) + .run() +} + // ------------------------------ STATICS ------------------------------ function generateThumbnailFromData (video, thumbnailData, callback) { @@ -737,6 +785,30 @@ function removePreview (video, callback) { fs.unlink(constants.CONFIG.STORAGE.PREVIEWS_DIR + video.getPreviewName(), callback) } +function createTorrentFromVideo (video, videoPath, callback) { + const options = { + announceList: [ + [ constants.CONFIG.WEBSERVER.WS + '://' + constants.CONFIG.WEBSERVER.HOSTNAME + ':' + constants.CONFIG.WEBSERVER.PORT + '/tracker/socket' ] + ], + urlList: [ + constants.CONFIG.WEBSERVER.URL + constants.STATIC_PATHS.WEBSEED + video.getVideoFilename() + ] + } + + createTorrent(videoPath, options, function (err, torrent) { + if (err) return callback(err) + + const filePath = pathUtils.join(constants.CONFIG.STORAGE.TORRENTS_DIR, video.getTorrentName()) + fs.writeFile(filePath, torrent, function (err) { + if (err) return callback(err) + + const parsedTorrent = parseTorrent(torrent) + video.set('infoHash', parsedTorrent.infoHash) + video.validate().asCallback(callback) + }) + }) +} + function createPreview (video, videoPath, callback) { generateImage(video, videoPath, constants.CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName(), callback) } -- cgit v1.2.3