diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-05-02 22:02:27 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-05-04 21:12:32 +0200 |
commit | 227d02feadbc9b1fc916a12528ccc0623fb3069e (patch) | |
tree | 0b6bcc8c2c081591588714bc040ac6e810e333de /server | |
parent | 15d4ee04a94230e1ea83c2959a1981105699ba22 (diff) | |
download | PeerTube-227d02feadbc9b1fc916a12528ccc0623fb3069e.tar.gz PeerTube-227d02feadbc9b1fc916a12528ccc0623fb3069e.tar.zst PeerTube-227d02feadbc9b1fc916a12528ccc0623fb3069e.zip |
Server: add job scheduler to transcode video files
Diffstat (limited to 'server')
-rw-r--r-- | server/initializers/checker.js | 2 | ||||
-rw-r--r-- | server/initializers/constants.js | 21 | ||||
-rw-r--r-- | server/lib/jobs/handlers/index.js | 7 | ||||
-rw-r--r-- | server/lib/jobs/handlers/video-transcoder.js | 34 | ||||
-rw-r--r-- | server/lib/jobs/job-scheduler.js | 110 | ||||
-rw-r--r-- | server/models/job.js | 54 | ||||
-rw-r--r-- | server/models/video.js | 116 |
7 files changed, 321 insertions, 23 deletions
diff --git a/server/initializers/checker.js b/server/initializers/checker.js index 461a851fe..a3727563a 100644 --- a/server/initializers/checker.js +++ b/server/initializers/checker.js | |||
@@ -29,7 +29,7 @@ function checkMissedConfig () { | |||
29 | 'webserver.https', 'webserver.hostname', 'webserver.port', | 29 | 'webserver.https', 'webserver.hostname', 'webserver.port', |
30 | 'database.hostname', 'database.port', 'database.suffix', 'database.username', 'database.password', | 30 | 'database.hostname', 'database.port', 'database.suffix', 'database.username', 'database.password', |
31 | 'storage.certs', 'storage.videos', 'storage.logs', 'storage.thumbnails', 'storage.previews', | 31 | 'storage.certs', 'storage.videos', 'storage.logs', 'storage.thumbnails', 'storage.previews', |
32 | 'admin.email', 'signup.enabled' | 32 | 'admin.email', 'signup.enabled', 'transcoding.enabled', 'transcoding.threads' |
33 | ] | 33 | ] |
34 | const miss = [] | 34 | const miss = [] |
35 | 35 | ||
diff --git a/server/initializers/constants.js b/server/initializers/constants.js index d6da20982..87e9c8002 100644 --- a/server/initializers/constants.js +++ b/server/initializers/constants.js | |||
@@ -64,6 +64,10 @@ const CONFIG = { | |||
64 | }, | 64 | }, |
65 | SIGNUP: { | 65 | SIGNUP: { |
66 | ENABLED: config.get('signup.enabled') | 66 | ENABLED: config.get('signup.enabled') |
67 | }, | ||
68 | TRANSCODING: { | ||
69 | ENABLED: config.get('transcoding.enabled'), | ||
70 | THREADS: config.get('transcoding.threads') | ||
67 | } | 71 | } |
68 | } | 72 | } |
69 | CONFIG.WEBSERVER.URL = CONFIG.WEBSERVER.SCHEME + '://' + CONFIG.WEBSERVER.HOSTNAME + ':' + CONFIG.WEBSERVER.PORT | 73 | CONFIG.WEBSERVER.URL = CONFIG.WEBSERVER.SCHEME + '://' + CONFIG.WEBSERVER.HOSTNAME + ':' + CONFIG.WEBSERVER.PORT |
@@ -223,6 +227,18 @@ const REMOTE_SCHEME = { | |||
223 | WS: 'wss' | 227 | WS: 'wss' |
224 | } | 228 | } |
225 | 229 | ||
230 | const JOB_STATES = { | ||
231 | PENDING: 'pending', | ||
232 | PROCESSING: 'processing', | ||
233 | ERROR: 'error', | ||
234 | SUCCESS: 'success' | ||
235 | } | ||
236 | // How many maximum jobs we fetch from the database per cycle | ||
237 | const JOBS_FETCH_LIMIT_PER_CYCLE = 10 | ||
238 | const JOBS_CONCURRENCY = 1 | ||
239 | // 1 minutes | ||
240 | let JOBS_FETCHING_INTERVAL = 60000 | ||
241 | |||
226 | // --------------------------------------------------------------------------- | 242 | // --------------------------------------------------------------------------- |
227 | 243 | ||
228 | const PRIVATE_CERT_NAME = 'peertube.key.pem' | 244 | const PRIVATE_CERT_NAME = 'peertube.key.pem' |
@@ -264,6 +280,7 @@ if (isTestInstance() === true) { | |||
264 | CONSTRAINTS_FIELDS.VIDEOS.DURATION.max = 14 | 280 | CONSTRAINTS_FIELDS.VIDEOS.DURATION.max = 14 |
265 | FRIEND_SCORE.BASE = 20 | 281 | FRIEND_SCORE.BASE = 20 |
266 | REQUESTS_INTERVAL = 10000 | 282 | REQUESTS_INTERVAL = 10000 |
283 | JOBS_FETCHING_INTERVAL = 10000 | ||
267 | REMOTE_SCHEME.HTTP = 'http' | 284 | REMOTE_SCHEME.HTTP = 'http' |
268 | REMOTE_SCHEME.WS = 'ws' | 285 | REMOTE_SCHEME.WS = 'ws' |
269 | STATIC_MAX_AGE = 0 | 286 | STATIC_MAX_AGE = 0 |
@@ -277,6 +294,10 @@ module.exports = { | |||
277 | CONFIG, | 294 | CONFIG, |
278 | CONSTRAINTS_FIELDS, | 295 | CONSTRAINTS_FIELDS, |
279 | FRIEND_SCORE, | 296 | FRIEND_SCORE, |
297 | JOBS_FETCHING_INTERVAL, | ||
298 | JOB_STATES, | ||
299 | JOBS_CONCURRENCY, | ||
300 | JOBS_FETCH_LIMIT_PER_CYCLE, | ||
280 | LAST_MIGRATION_VERSION, | 301 | LAST_MIGRATION_VERSION, |
281 | OAUTH_LIFETIME, | 302 | OAUTH_LIFETIME, |
282 | PAGINATION_COUNT_DEFAULT, | 303 | PAGINATION_COUNT_DEFAULT, |
diff --git a/server/lib/jobs/handlers/index.js b/server/lib/jobs/handlers/index.js new file mode 100644 index 000000000..59c1ccce5 --- /dev/null +++ b/server/lib/jobs/handlers/index.js | |||
@@ -0,0 +1,7 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const videoTranscoder = require('./video-transcoder') | ||
4 | |||
5 | module.exports = { | ||
6 | videoTranscoder | ||
7 | } | ||
diff --git a/server/lib/jobs/handlers/video-transcoder.js b/server/lib/jobs/handlers/video-transcoder.js new file mode 100644 index 000000000..8524df3aa --- /dev/null +++ b/server/lib/jobs/handlers/video-transcoder.js | |||
@@ -0,0 +1,34 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const db = require('../../../initializers/database') | ||
4 | const logger = require('../../../helpers/logger') | ||
5 | |||
6 | const VideoTranscoderHandler = { | ||
7 | process, | ||
8 | onError, | ||
9 | onSuccess | ||
10 | } | ||
11 | |||
12 | // --------------------------------------------------------------------------- | ||
13 | |||
14 | function process (data, callback) { | ||
15 | db.Video.load(data.id, function (err, video) { | ||
16 | if (err) return callback(err) | ||
17 | |||
18 | video.transcodeVideofile(callback) | ||
19 | }) | ||
20 | } | ||
21 | |||
22 | function onError (err, jobId, callback) { | ||
23 | logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) | ||
24 | return callback() | ||
25 | } | ||
26 | |||
27 | function onSuccess (data, jobId, callback) { | ||
28 | logger.info('Job %d is a success.', jobId) | ||
29 | return callback() | ||
30 | } | ||
31 | |||
32 | // --------------------------------------------------------------------------- | ||
33 | |||
34 | module.exports = VideoTranscoderHandler | ||
diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js new file mode 100644 index 000000000..589a30630 --- /dev/null +++ b/server/lib/jobs/job-scheduler.js | |||
@@ -0,0 +1,110 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const forever = require('async/forever') | ||
4 | const queue = require('async/queue') | ||
5 | |||
6 | const constants = require('../../initializers/constants') | ||
7 | const db = require('../../initializers/database') | ||
8 | const logger = require('../../helpers/logger') | ||
9 | |||
10 | const jobHandlers = require('./handlers') | ||
11 | |||
12 | const jobScheduler = { | ||
13 | activate, | ||
14 | createJob | ||
15 | } | ||
16 | |||
17 | function activate () { | ||
18 | logger.info('Jobs scheduler activated.') | ||
19 | |||
20 | const jobsQueue = queue(processJob) | ||
21 | |||
22 | forever( | ||
23 | function (next) { | ||
24 | if (jobsQueue.length() !== 0) { | ||
25 | // Finish processing the queue first | ||
26 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | ||
27 | } | ||
28 | |||
29 | db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) { | ||
30 | if (err) { | ||
31 | logger.error('Cannot list pending jobs.', { error: err }) | ||
32 | } else { | ||
33 | jobs.forEach(function (job) { | ||
34 | jobsQueue.push(job) | ||
35 | }) | ||
36 | } | ||
37 | |||
38 | // Optimization: we could use "drain" from queue object | ||
39 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | ||
40 | }) | ||
41 | } | ||
42 | ) | ||
43 | } | ||
44 | |||
45 | // --------------------------------------------------------------------------- | ||
46 | |||
47 | module.exports = jobScheduler | ||
48 | |||
49 | // --------------------------------------------------------------------------- | ||
50 | |||
51 | function createJob (transaction, handlerName, handlerInputData, callback) { | ||
52 | const createQuery = { | ||
53 | state: constants.JOB_STATES.PENDING, | ||
54 | handlerName, | ||
55 | handlerInputData | ||
56 | } | ||
57 | const options = { transaction } | ||
58 | |||
59 | db.Job.create(createQuery, options).asCallback(callback) | ||
60 | } | ||
61 | |||
62 | function processJob (job, callback) { | ||
63 | const jobHandler = jobHandlers[job.handlerName] | ||
64 | |||
65 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | ||
66 | |||
67 | job.state = constants.JOB_STATES.PROCESSING | ||
68 | job.save().asCallback(function (err) { | ||
69 | if (err) return cannotSaveJobError(err, callback) | ||
70 | |||
71 | if (jobHandler === undefined) { | ||
72 | logger.error('Unknown job handler for job %s.', jobHandler.handlerName) | ||
73 | return callback() | ||
74 | } | ||
75 | |||
76 | return jobHandler.process(job.handlerInputData, function (err, result) { | ||
77 | if (err) { | ||
78 | logger.error('Error in job handler %s.', job.handlerName, { error: err }) | ||
79 | return onJobError(jobHandler, job, callback) | ||
80 | } | ||
81 | |||
82 | return onJobSuccess(jobHandler, job, callback) | ||
83 | }) | ||
84 | }) | ||
85 | } | ||
86 | |||
87 | function onJobError (jobHandler, job, callback) { | ||
88 | job.state = constants.JOB_STATES.ERROR | ||
89 | |||
90 | job.save().asCallback(function (err) { | ||
91 | if (err) return cannotSaveJobError(err, callback) | ||
92 | |||
93 | return jobHandler.onError(err, job.id, callback) | ||
94 | }) | ||
95 | } | ||
96 | |||
97 | function onJobSuccess (jobHandler, job, callback) { | ||
98 | job.state = constants.JOB_STATES.SUCCESS | ||
99 | |||
100 | job.save().asCallback(function (err) { | ||
101 | if (err) return cannotSaveJobError(err, callback) | ||
102 | |||
103 | return jobHandler.onSuccess(err, job.id, callback) | ||
104 | }) | ||
105 | } | ||
106 | |||
107 | function cannotSaveJobError (err, callback) { | ||
108 | logger.error('Cannot save new job state.', { error: err }) | ||
109 | return callback(err) | ||
110 | } | ||
diff --git a/server/models/job.js b/server/models/job.js new file mode 100644 index 000000000..eeb50e16d --- /dev/null +++ b/server/models/job.js | |||
@@ -0,0 +1,54 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const values = require('lodash/values') | ||
4 | |||
5 | const constants = require('../initializers/constants') | ||
6 | |||
7 | // --------------------------------------------------------------------------- | ||
8 | |||
9 | module.exports = function (sequelize, DataTypes) { | ||
10 | const Job = sequelize.define('Job', | ||
11 | { | ||
12 | state: { | ||
13 | type: DataTypes.ENUM(values(constants.JOB_STATES)), | ||
14 | allowNull: false | ||
15 | }, | ||
16 | handlerName: { | ||
17 | type: DataTypes.STRING, | ||
18 | allowNull: false | ||
19 | }, | ||
20 | handlerInputData: { | ||
21 | type: DataTypes.JSON, | ||
22 | allowNull: true | ||
23 | } | ||
24 | }, | ||
25 | { | ||
26 | indexes: [ | ||
27 | { | ||
28 | fields: [ 'state' ] | ||
29 | } | ||
30 | ], | ||
31 | classMethods: { | ||
32 | listWithLimit | ||
33 | } | ||
34 | } | ||
35 | ) | ||
36 | |||
37 | return Job | ||
38 | } | ||
39 | |||
40 | // --------------------------------------------------------------------------- | ||
41 | |||
42 | function listWithLimit (limit, callback) { | ||
43 | const query = { | ||
44 | order: [ | ||
45 | [ 'id', 'ASC' ] | ||
46 | ], | ||
47 | limit: limit, | ||
48 | where: { | ||
49 | state: constants.JOB_STATES.PENDING | ||
50 | } | ||
51 | } | ||
52 | |||
53 | return this.findAll(query).asCallback(callback) | ||
54 | } | ||
diff --git a/server/models/video.js b/server/models/video.js index 029cb6d7c..da4ddb420 100644 --- a/server/models/video.js +++ b/server/models/video.js | |||
@@ -7,6 +7,7 @@ const fs = require('fs') | |||
7 | const magnetUtil = require('magnet-uri') | 7 | const magnetUtil = require('magnet-uri') |
8 | const map = require('lodash/map') | 8 | const map = require('lodash/map') |
9 | const parallel = require('async/parallel') | 9 | const parallel = require('async/parallel') |
10 | const series = require('async/series') | ||
10 | const parseTorrent = require('parse-torrent') | 11 | const parseTorrent = require('parse-torrent') |
11 | const pathUtils = require('path') | 12 | const pathUtils = require('path') |
12 | const values = require('lodash/values') | 13 | const values = require('lodash/values') |
@@ -17,6 +18,7 @@ const friends = require('../lib/friends') | |||
17 | const modelUtils = require('./utils') | 18 | const modelUtils = require('./utils') |
18 | const customVideosValidators = require('../helpers/custom-validators').videos | 19 | const customVideosValidators = require('../helpers/custom-validators').videos |
19 | const db = require('../initializers/database') | 20 | const db = require('../initializers/database') |
21 | const jobScheduler = require('../lib/jobs/job-scheduler') | ||
20 | 22 | ||
21 | // --------------------------------------------------------------------------- | 23 | // --------------------------------------------------------------------------- |
22 | 24 | ||
@@ -203,6 +205,7 @@ module.exports = function (sequelize, DataTypes) { | |||
203 | toFormatedJSON, | 205 | toFormatedJSON, |
204 | toAddRemoteJSON, | 206 | toAddRemoteJSON, |
205 | toUpdateRemoteJSON, | 207 | toUpdateRemoteJSON, |
208 | transcodeVideofile, | ||
206 | removeFromBlacklist | 209 | removeFromBlacklist |
207 | }, | 210 | }, |
208 | hooks: { | 211 | hooks: { |
@@ -234,38 +237,30 @@ function beforeCreate (video, options, next) { | |||
234 | 237 | ||
235 | tasks.push( | 238 | tasks.push( |
236 | function createVideoTorrent (callback) { | 239 | function createVideoTorrent (callback) { |
237 | const options = { | 240 | createTorrentFromVideo(video, videoPath, callback) |
238 | announceList: [ | ||
239 | [ constants.CONFIG.WEBSERVER.WS + '://' + constants.CONFIG.WEBSERVER.HOSTNAME + ':' + constants.CONFIG.WEBSERVER.PORT + '/tracker/socket' ] | ||
240 | ], | ||
241 | urlList: [ | ||
242 | constants.CONFIG.WEBSERVER.URL + constants.STATIC_PATHS.WEBSEED + video.getVideoFilename() | ||
243 | ] | ||
244 | } | ||
245 | |||
246 | createTorrent(videoPath, options, function (err, torrent) { | ||
247 | if (err) return callback(err) | ||
248 | |||
249 | const filePath = pathUtils.join(constants.CONFIG.STORAGE.TORRENTS_DIR, video.getTorrentName()) | ||
250 | fs.writeFile(filePath, torrent, function (err) { | ||
251 | if (err) return callback(err) | ||
252 | |||
253 | const parsedTorrent = parseTorrent(torrent) | ||
254 | video.set('infoHash', parsedTorrent.infoHash) | ||
255 | video.validate().asCallback(callback) | ||
256 | }) | ||
257 | }) | ||
258 | }, | 241 | }, |
259 | 242 | ||
260 | function createVideoThumbnail (callback) { | 243 | function createVideoThumbnail (callback) { |
261 | createThumbnail(video, videoPath, callback) | 244 | createThumbnail(video, videoPath, callback) |
262 | }, | 245 | }, |
263 | 246 | ||
264 | function createVIdeoPreview (callback) { | 247 | function createVideoPreview (callback) { |
265 | createPreview(video, videoPath, callback) | 248 | createPreview(video, videoPath, callback) |
266 | } | 249 | } |
267 | ) | 250 | ) |
268 | 251 | ||
252 | if (constants.CONFIG.TRANSCODING.ENABLED === true) { | ||
253 | tasks.push( | ||
254 | function createVideoTranscoderJob (callback) { | ||
255 | const dataInput = { | ||
256 | id: video.id | ||
257 | } | ||
258 | |||
259 | jobScheduler.createJob(options.transaction, 'videoTranscoder', dataInput, callback) | ||
260 | } | ||
261 | ) | ||
262 | } | ||
263 | |||
269 | return parallel(tasks, next) | 264 | return parallel(tasks, next) |
270 | } | 265 | } |
271 | 266 | ||
@@ -503,6 +498,59 @@ function toUpdateRemoteJSON (callback) { | |||
503 | return json | 498 | return json |
504 | } | 499 | } |
505 | 500 | ||
501 | function transcodeVideofile (finalCallback) { | ||
502 | const video = this | ||
503 | |||
504 | const videosDirectory = constants.CONFIG.STORAGE.VIDEOS_DIR | ||
505 | const newExtname = '.mp4' | ||
506 | const videoInputPath = pathUtils.join(videosDirectory, video.getVideoFilename()) | ||
507 | const videoOutputPath = pathUtils.join(videosDirectory, video.id + '-transcoded' + newExtname) | ||
508 | |||
509 | ffmpeg(videoInputPath) | ||
510 | .output(videoOutputPath) | ||
511 | .videoCodec('libx264') | ||
512 | .outputOption('-threads ' + constants.CONFIG.TRANSCODING.THREADS) | ||
513 | .outputOption('-movflags faststart') | ||
514 | .on('error', finalCallback) | ||
515 | .on('end', function () { | ||
516 | series([ | ||
517 | function removeOldFile (callback) { | ||
518 | fs.unlink(videoInputPath, callback) | ||
519 | }, | ||
520 | |||
521 | function moveNewFile (callback) { | ||
522 | // Important to do this before getVideoFilename() to take in account the new file extension | ||
523 | video.set('extname', newExtname) | ||
524 | |||
525 | const newVideoPath = pathUtils.join(videosDirectory, video.getVideoFilename()) | ||
526 | fs.rename(videoOutputPath, newVideoPath, callback) | ||
527 | }, | ||
528 | |||
529 | function torrent (callback) { | ||
530 | const newVideoPath = pathUtils.join(videosDirectory, video.getVideoFilename()) | ||
531 | createTorrentFromVideo(video, newVideoPath, callback) | ||
532 | }, | ||
533 | |||
534 | function videoExtension (callback) { | ||
535 | video.save().asCallback(callback) | ||
536 | } | ||
537 | |||
538 | ], function (err) { | ||
539 | if (err) { | ||
540 | // Autodescruction... | ||
541 | video.destroy().asCallback(function (err) { | ||
542 | if (err) logger.error('Cannot destruct video after transcoding failure.', { error: err }) | ||
543 | }) | ||
544 | |||
545 | return finalCallback(err) | ||
546 | } | ||
547 | |||
548 | return finalCallback(null) | ||
549 | }) | ||
550 | }) | ||
551 | .run() | ||
552 | } | ||
553 | |||
506 | // ------------------------------ STATICS ------------------------------ | 554 | // ------------------------------ STATICS ------------------------------ |
507 | 555 | ||
508 | function generateThumbnailFromData (video, thumbnailData, callback) { | 556 | function generateThumbnailFromData (video, thumbnailData, callback) { |
@@ -737,6 +785,30 @@ function removePreview (video, callback) { | |||
737 | fs.unlink(constants.CONFIG.STORAGE.PREVIEWS_DIR + video.getPreviewName(), callback) | 785 | fs.unlink(constants.CONFIG.STORAGE.PREVIEWS_DIR + video.getPreviewName(), callback) |
738 | } | 786 | } |
739 | 787 | ||
788 | function createTorrentFromVideo (video, videoPath, callback) { | ||
789 | const options = { | ||
790 | announceList: [ | ||
791 | [ constants.CONFIG.WEBSERVER.WS + '://' + constants.CONFIG.WEBSERVER.HOSTNAME + ':' + constants.CONFIG.WEBSERVER.PORT + '/tracker/socket' ] | ||
792 | ], | ||
793 | urlList: [ | ||
794 | constants.CONFIG.WEBSERVER.URL + constants.STATIC_PATHS.WEBSEED + video.getVideoFilename() | ||
795 | ] | ||
796 | } | ||
797 | |||
798 | createTorrent(videoPath, options, function (err, torrent) { | ||
799 | if (err) return callback(err) | ||
800 | |||
801 | const filePath = pathUtils.join(constants.CONFIG.STORAGE.TORRENTS_DIR, video.getTorrentName()) | ||
802 | fs.writeFile(filePath, torrent, function (err) { | ||
803 | if (err) return callback(err) | ||
804 | |||
805 | const parsedTorrent = parseTorrent(torrent) | ||
806 | video.set('infoHash', parsedTorrent.infoHash) | ||
807 | video.validate().asCallback(callback) | ||
808 | }) | ||
809 | }) | ||
810 | } | ||
811 | |||
740 | function createPreview (video, videoPath, callback) { | 812 | function createPreview (video, videoPath, callback) { |
741 | generateImage(video, videoPath, constants.CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName(), callback) | 813 | generateImage(video, videoPath, constants.CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName(), callback) |
742 | } | 814 | } |