aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-05-02 22:02:27 +0200
committerChocobozzz <florian.bigard@gmail.com>2017-05-04 21:12:32 +0200
commit227d02feadbc9b1fc916a12528ccc0623fb3069e (patch)
tree0b6bcc8c2c081591588714bc040ac6e810e333de /server
parent15d4ee04a94230e1ea83c2959a1981105699ba22 (diff)
downloadPeerTube-227d02feadbc9b1fc916a12528ccc0623fb3069e.tar.gz
PeerTube-227d02feadbc9b1fc916a12528ccc0623fb3069e.tar.zst
PeerTube-227d02feadbc9b1fc916a12528ccc0623fb3069e.zip
Server: add job scheduler to transcode video files
Diffstat (limited to 'server')
-rw-r--r--server/initializers/checker.js2
-rw-r--r--server/initializers/constants.js21
-rw-r--r--server/lib/jobs/handlers/index.js7
-rw-r--r--server/lib/jobs/handlers/video-transcoder.js34
-rw-r--r--server/lib/jobs/job-scheduler.js110
-rw-r--r--server/models/job.js54
-rw-r--r--server/models/video.js116
7 files changed, 321 insertions, 23 deletions
diff --git a/server/initializers/checker.js b/server/initializers/checker.js
index 461a851fe..a3727563a 100644
--- a/server/initializers/checker.js
+++ b/server/initializers/checker.js
@@ -29,7 +29,7 @@ function checkMissedConfig () {
29 'webserver.https', 'webserver.hostname', 'webserver.port', 29 'webserver.https', 'webserver.hostname', 'webserver.port',
30 'database.hostname', 'database.port', 'database.suffix', 'database.username', 'database.password', 30 'database.hostname', 'database.port', 'database.suffix', 'database.username', 'database.password',
31 'storage.certs', 'storage.videos', 'storage.logs', 'storage.thumbnails', 'storage.previews', 31 'storage.certs', 'storage.videos', 'storage.logs', 'storage.thumbnails', 'storage.previews',
32 'admin.email', 'signup.enabled' 32 'admin.email', 'signup.enabled', 'transcoding.enabled', 'transcoding.threads'
33 ] 33 ]
34 const miss = [] 34 const miss = []
35 35
diff --git a/server/initializers/constants.js b/server/initializers/constants.js
index d6da20982..87e9c8002 100644
--- a/server/initializers/constants.js
+++ b/server/initializers/constants.js
@@ -64,6 +64,10 @@ const CONFIG = {
64 }, 64 },
65 SIGNUP: { 65 SIGNUP: {
66 ENABLED: config.get('signup.enabled') 66 ENABLED: config.get('signup.enabled')
67 },
68 TRANSCODING: {
69 ENABLED: config.get('transcoding.enabled'),
70 THREADS: config.get('transcoding.threads')
67 } 71 }
68} 72}
69CONFIG.WEBSERVER.URL = CONFIG.WEBSERVER.SCHEME + '://' + CONFIG.WEBSERVER.HOSTNAME + ':' + CONFIG.WEBSERVER.PORT 73CONFIG.WEBSERVER.URL = CONFIG.WEBSERVER.SCHEME + '://' + CONFIG.WEBSERVER.HOSTNAME + ':' + CONFIG.WEBSERVER.PORT
@@ -223,6 +227,18 @@ const REMOTE_SCHEME = {
223 WS: 'wss' 227 WS: 'wss'
224} 228}
225 229
230const JOB_STATES = {
231 PENDING: 'pending',
232 PROCESSING: 'processing',
233 ERROR: 'error',
234 SUCCESS: 'success'
235}
236// How many maximum jobs we fetch from the database per cycle
237const JOBS_FETCH_LIMIT_PER_CYCLE = 10
238const JOBS_CONCURRENCY = 1
239// 1 minutes
240let JOBS_FETCHING_INTERVAL = 60000
241
226// --------------------------------------------------------------------------- 242// ---------------------------------------------------------------------------
227 243
228const PRIVATE_CERT_NAME = 'peertube.key.pem' 244const PRIVATE_CERT_NAME = 'peertube.key.pem'
@@ -264,6 +280,7 @@ if (isTestInstance() === true) {
264 CONSTRAINTS_FIELDS.VIDEOS.DURATION.max = 14 280 CONSTRAINTS_FIELDS.VIDEOS.DURATION.max = 14
265 FRIEND_SCORE.BASE = 20 281 FRIEND_SCORE.BASE = 20
266 REQUESTS_INTERVAL = 10000 282 REQUESTS_INTERVAL = 10000
283 JOBS_FETCHING_INTERVAL = 10000
267 REMOTE_SCHEME.HTTP = 'http' 284 REMOTE_SCHEME.HTTP = 'http'
268 REMOTE_SCHEME.WS = 'ws' 285 REMOTE_SCHEME.WS = 'ws'
269 STATIC_MAX_AGE = 0 286 STATIC_MAX_AGE = 0
@@ -277,6 +294,10 @@ module.exports = {
277 CONFIG, 294 CONFIG,
278 CONSTRAINTS_FIELDS, 295 CONSTRAINTS_FIELDS,
279 FRIEND_SCORE, 296 FRIEND_SCORE,
297 JOBS_FETCHING_INTERVAL,
298 JOB_STATES,
299 JOBS_CONCURRENCY,
300 JOBS_FETCH_LIMIT_PER_CYCLE,
280 LAST_MIGRATION_VERSION, 301 LAST_MIGRATION_VERSION,
281 OAUTH_LIFETIME, 302 OAUTH_LIFETIME,
282 PAGINATION_COUNT_DEFAULT, 303 PAGINATION_COUNT_DEFAULT,
diff --git a/server/lib/jobs/handlers/index.js b/server/lib/jobs/handlers/index.js
new file mode 100644
index 000000000..59c1ccce5
--- /dev/null
+++ b/server/lib/jobs/handlers/index.js
@@ -0,0 +1,7 @@
1'use strict'
2
3const videoTranscoder = require('./video-transcoder')
4
5module.exports = {
6 videoTranscoder
7}
diff --git a/server/lib/jobs/handlers/video-transcoder.js b/server/lib/jobs/handlers/video-transcoder.js
new file mode 100644
index 000000000..8524df3aa
--- /dev/null
+++ b/server/lib/jobs/handlers/video-transcoder.js
@@ -0,0 +1,34 @@
1'use strict'
2
3const db = require('../../../initializers/database')
4const logger = require('../../../helpers/logger')
5
6const VideoTranscoderHandler = {
7 process,
8 onError,
9 onSuccess
10}
11
12// ---------------------------------------------------------------------------
13
14function process (data, callback) {
15 db.Video.load(data.id, function (err, video) {
16 if (err) return callback(err)
17
18 video.transcodeVideofile(callback)
19 })
20}
21
22function onError (err, jobId, callback) {
23 logger.error('Error when transcoding video file in job %d.', jobId, { error: err })
24 return callback()
25}
26
27function onSuccess (data, jobId, callback) {
28 logger.info('Job %d is a success.', jobId)
29 return callback()
30}
31
32// ---------------------------------------------------------------------------
33
34module.exports = VideoTranscoderHandler
diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js
new file mode 100644
index 000000000..589a30630
--- /dev/null
+++ b/server/lib/jobs/job-scheduler.js
@@ -0,0 +1,110 @@
1'use strict'
2
3const forever = require('async/forever')
4const queue = require('async/queue')
5
6const constants = require('../../initializers/constants')
7const db = require('../../initializers/database')
8const logger = require('../../helpers/logger')
9
10const jobHandlers = require('./handlers')
11
12const jobScheduler = {
13 activate,
14 createJob
15}
16
17function activate () {
18 logger.info('Jobs scheduler activated.')
19
20 const jobsQueue = queue(processJob)
21
22 forever(
23 function (next) {
24 if (jobsQueue.length() !== 0) {
25 // Finish processing the queue first
26 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
27 }
28
29 db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) {
30 if (err) {
31 logger.error('Cannot list pending jobs.', { error: err })
32 } else {
33 jobs.forEach(function (job) {
34 jobsQueue.push(job)
35 })
36 }
37
38 // Optimization: we could use "drain" from queue object
39 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
40 })
41 }
42 )
43}
44
45// ---------------------------------------------------------------------------
46
47module.exports = jobScheduler
48
49// ---------------------------------------------------------------------------
50
51function createJob (transaction, handlerName, handlerInputData, callback) {
52 const createQuery = {
53 state: constants.JOB_STATES.PENDING,
54 handlerName,
55 handlerInputData
56 }
57 const options = { transaction }
58
59 db.Job.create(createQuery, options).asCallback(callback)
60}
61
62function processJob (job, callback) {
63 const jobHandler = jobHandlers[job.handlerName]
64
65 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
66
67 job.state = constants.JOB_STATES.PROCESSING
68 job.save().asCallback(function (err) {
69 if (err) return cannotSaveJobError(err, callback)
70
71 if (jobHandler === undefined) {
72 logger.error('Unknown job handler for job %s.', jobHandler.handlerName)
73 return callback()
74 }
75
76 return jobHandler.process(job.handlerInputData, function (err, result) {
77 if (err) {
78 logger.error('Error in job handler %s.', job.handlerName, { error: err })
79 return onJobError(jobHandler, job, callback)
80 }
81
82 return onJobSuccess(jobHandler, job, callback)
83 })
84 })
85}
86
87function onJobError (jobHandler, job, callback) {
88 job.state = constants.JOB_STATES.ERROR
89
90 job.save().asCallback(function (err) {
91 if (err) return cannotSaveJobError(err, callback)
92
93 return jobHandler.onError(err, job.id, callback)
94 })
95}
96
97function onJobSuccess (jobHandler, job, callback) {
98 job.state = constants.JOB_STATES.SUCCESS
99
100 job.save().asCallback(function (err) {
101 if (err) return cannotSaveJobError(err, callback)
102
103 return jobHandler.onSuccess(err, job.id, callback)
104 })
105}
106
107function cannotSaveJobError (err, callback) {
108 logger.error('Cannot save new job state.', { error: err })
109 return callback(err)
110}
diff --git a/server/models/job.js b/server/models/job.js
new file mode 100644
index 000000000..eeb50e16d
--- /dev/null
+++ b/server/models/job.js
@@ -0,0 +1,54 @@
1'use strict'
2
3const values = require('lodash/values')
4
5const constants = require('../initializers/constants')
6
7// ---------------------------------------------------------------------------
8
9module.exports = function (sequelize, DataTypes) {
10 const Job = sequelize.define('Job',
11 {
12 state: {
13 type: DataTypes.ENUM(values(constants.JOB_STATES)),
14 allowNull: false
15 },
16 handlerName: {
17 type: DataTypes.STRING,
18 allowNull: false
19 },
20 handlerInputData: {
21 type: DataTypes.JSON,
22 allowNull: true
23 }
24 },
25 {
26 indexes: [
27 {
28 fields: [ 'state' ]
29 }
30 ],
31 classMethods: {
32 listWithLimit
33 }
34 }
35 )
36
37 return Job
38}
39
40// ---------------------------------------------------------------------------
41
42function listWithLimit (limit, callback) {
43 const query = {
44 order: [
45 [ 'id', 'ASC' ]
46 ],
47 limit: limit,
48 where: {
49 state: constants.JOB_STATES.PENDING
50 }
51 }
52
53 return this.findAll(query).asCallback(callback)
54}
diff --git a/server/models/video.js b/server/models/video.js
index 029cb6d7c..da4ddb420 100644
--- a/server/models/video.js
+++ b/server/models/video.js
@@ -7,6 +7,7 @@ const fs = require('fs')
7const magnetUtil = require('magnet-uri') 7const magnetUtil = require('magnet-uri')
8const map = require('lodash/map') 8const map = require('lodash/map')
9const parallel = require('async/parallel') 9const parallel = require('async/parallel')
10const series = require('async/series')
10const parseTorrent = require('parse-torrent') 11const parseTorrent = require('parse-torrent')
11const pathUtils = require('path') 12const pathUtils = require('path')
12const values = require('lodash/values') 13const values = require('lodash/values')
@@ -17,6 +18,7 @@ const friends = require('../lib/friends')
17const modelUtils = require('./utils') 18const modelUtils = require('./utils')
18const customVideosValidators = require('../helpers/custom-validators').videos 19const customVideosValidators = require('../helpers/custom-validators').videos
19const db = require('../initializers/database') 20const db = require('../initializers/database')
21const jobScheduler = require('../lib/jobs/job-scheduler')
20 22
21// --------------------------------------------------------------------------- 23// ---------------------------------------------------------------------------
22 24
@@ -203,6 +205,7 @@ module.exports = function (sequelize, DataTypes) {
203 toFormatedJSON, 205 toFormatedJSON,
204 toAddRemoteJSON, 206 toAddRemoteJSON,
205 toUpdateRemoteJSON, 207 toUpdateRemoteJSON,
208 transcodeVideofile,
206 removeFromBlacklist 209 removeFromBlacklist
207 }, 210 },
208 hooks: { 211 hooks: {
@@ -234,38 +237,30 @@ function beforeCreate (video, options, next) {
234 237
235 tasks.push( 238 tasks.push(
236 function createVideoTorrent (callback) { 239 function createVideoTorrent (callback) {
237 const options = { 240 createTorrentFromVideo(video, videoPath, callback)
238 announceList: [
239 [ constants.CONFIG.WEBSERVER.WS + '://' + constants.CONFIG.WEBSERVER.HOSTNAME + ':' + constants.CONFIG.WEBSERVER.PORT + '/tracker/socket' ]
240 ],
241 urlList: [
242 constants.CONFIG.WEBSERVER.URL + constants.STATIC_PATHS.WEBSEED + video.getVideoFilename()
243 ]
244 }
245
246 createTorrent(videoPath, options, function (err, torrent) {
247 if (err) return callback(err)
248
249 const filePath = pathUtils.join(constants.CONFIG.STORAGE.TORRENTS_DIR, video.getTorrentName())
250 fs.writeFile(filePath, torrent, function (err) {
251 if (err) return callback(err)
252
253 const parsedTorrent = parseTorrent(torrent)
254 video.set('infoHash', parsedTorrent.infoHash)
255 video.validate().asCallback(callback)
256 })
257 })
258 }, 241 },
259 242
260 function createVideoThumbnail (callback) { 243 function createVideoThumbnail (callback) {
261 createThumbnail(video, videoPath, callback) 244 createThumbnail(video, videoPath, callback)
262 }, 245 },
263 246
264 function createVIdeoPreview (callback) { 247 function createVideoPreview (callback) {
265 createPreview(video, videoPath, callback) 248 createPreview(video, videoPath, callback)
266 } 249 }
267 ) 250 )
268 251
252 if (constants.CONFIG.TRANSCODING.ENABLED === true) {
253 tasks.push(
254 function createVideoTranscoderJob (callback) {
255 const dataInput = {
256 id: video.id
257 }
258
259 jobScheduler.createJob(options.transaction, 'videoTranscoder', dataInput, callback)
260 }
261 )
262 }
263
269 return parallel(tasks, next) 264 return parallel(tasks, next)
270 } 265 }
271 266
@@ -503,6 +498,59 @@ function toUpdateRemoteJSON (callback) {
503 return json 498 return json
504} 499}
505 500
501function transcodeVideofile (finalCallback) {
502 const video = this
503
504 const videosDirectory = constants.CONFIG.STORAGE.VIDEOS_DIR
505 const newExtname = '.mp4'
506 const videoInputPath = pathUtils.join(videosDirectory, video.getVideoFilename())
507 const videoOutputPath = pathUtils.join(videosDirectory, video.id + '-transcoded' + newExtname)
508
509 ffmpeg(videoInputPath)
510 .output(videoOutputPath)
511 .videoCodec('libx264')
512 .outputOption('-threads ' + constants.CONFIG.TRANSCODING.THREADS)
513 .outputOption('-movflags faststart')
514 .on('error', finalCallback)
515 .on('end', function () {
516 series([
517 function removeOldFile (callback) {
518 fs.unlink(videoInputPath, callback)
519 },
520
521 function moveNewFile (callback) {
522 // Important to do this before getVideoFilename() to take in account the new file extension
523 video.set('extname', newExtname)
524
525 const newVideoPath = pathUtils.join(videosDirectory, video.getVideoFilename())
526 fs.rename(videoOutputPath, newVideoPath, callback)
527 },
528
529 function torrent (callback) {
530 const newVideoPath = pathUtils.join(videosDirectory, video.getVideoFilename())
531 createTorrentFromVideo(video, newVideoPath, callback)
532 },
533
534 function videoExtension (callback) {
535 video.save().asCallback(callback)
536 }
537
538 ], function (err) {
539 if (err) {
540 // Autodescruction...
541 video.destroy().asCallback(function (err) {
542 if (err) logger.error('Cannot destruct video after transcoding failure.', { error: err })
543 })
544
545 return finalCallback(err)
546 }
547
548 return finalCallback(null)
549 })
550 })
551 .run()
552}
553
506// ------------------------------ STATICS ------------------------------ 554// ------------------------------ STATICS ------------------------------
507 555
508function generateThumbnailFromData (video, thumbnailData, callback) { 556function generateThumbnailFromData (video, thumbnailData, callback) {
@@ -737,6 +785,30 @@ function removePreview (video, callback) {
737 fs.unlink(constants.CONFIG.STORAGE.PREVIEWS_DIR + video.getPreviewName(), callback) 785 fs.unlink(constants.CONFIG.STORAGE.PREVIEWS_DIR + video.getPreviewName(), callback)
738} 786}
739 787
788function createTorrentFromVideo (video, videoPath, callback) {
789 const options = {
790 announceList: [
791 [ constants.CONFIG.WEBSERVER.WS + '://' + constants.CONFIG.WEBSERVER.HOSTNAME + ':' + constants.CONFIG.WEBSERVER.PORT + '/tracker/socket' ]
792 ],
793 urlList: [
794 constants.CONFIG.WEBSERVER.URL + constants.STATIC_PATHS.WEBSEED + video.getVideoFilename()
795 ]
796 }
797
798 createTorrent(videoPath, options, function (err, torrent) {
799 if (err) return callback(err)
800
801 const filePath = pathUtils.join(constants.CONFIG.STORAGE.TORRENTS_DIR, video.getTorrentName())
802 fs.writeFile(filePath, torrent, function (err) {
803 if (err) return callback(err)
804
805 const parsedTorrent = parseTorrent(torrent)
806 video.set('infoHash', parsedTorrent.infoHash)
807 video.validate().asCallback(callback)
808 })
809 })
810}
811
740function createPreview (video, videoPath, callback) { 812function createPreview (video, videoPath, callback) {
741 generateImage(video, videoPath, constants.CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName(), callback) 813 generateImage(video, videoPath, constants.CONFIG.STORAGE.PREVIEWS_DIR, video.getPreviewName(), callback)
742} 814}