aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-05-02 22:02:27 +0200
committerChocobozzz <florian.bigard@gmail.com>2017-05-04 21:12:32 +0200
commit227d02feadbc9b1fc916a12528ccc0623fb3069e (patch)
tree0b6bcc8c2c081591588714bc040ac6e810e333de /server/lib/jobs
parent15d4ee04a94230e1ea83c2959a1981105699ba22 (diff)
downloadPeerTube-227d02feadbc9b1fc916a12528ccc0623fb3069e.tar.gz
PeerTube-227d02feadbc9b1fc916a12528ccc0623fb3069e.tar.zst
PeerTube-227d02feadbc9b1fc916a12528ccc0623fb3069e.zip
Server: add job scheduler to transcode video files
Diffstat (limited to 'server/lib/jobs')
-rw-r--r--server/lib/jobs/handlers/index.js7
-rw-r--r--server/lib/jobs/handlers/video-transcoder.js34
-rw-r--r--server/lib/jobs/job-scheduler.js110
3 files changed, 151 insertions, 0 deletions
diff --git a/server/lib/jobs/handlers/index.js b/server/lib/jobs/handlers/index.js
new file mode 100644
index 000000000..59c1ccce5
--- /dev/null
+++ b/server/lib/jobs/handlers/index.js
@@ -0,0 +1,7 @@
1'use strict'
2
3const videoTranscoder = require('./video-transcoder')
4
5module.exports = {
6 videoTranscoder
7}
diff --git a/server/lib/jobs/handlers/video-transcoder.js b/server/lib/jobs/handlers/video-transcoder.js
new file mode 100644
index 000000000..8524df3aa
--- /dev/null
+++ b/server/lib/jobs/handlers/video-transcoder.js
@@ -0,0 +1,34 @@
1'use strict'
2
3const db = require('../../../initializers/database')
4const logger = require('../../../helpers/logger')
5
6const VideoTranscoderHandler = {
7 process,
8 onError,
9 onSuccess
10}
11
12// ---------------------------------------------------------------------------
13
14function process (data, callback) {
15 db.Video.load(data.id, function (err, video) {
16 if (err) return callback(err)
17
18 video.transcodeVideofile(callback)
19 })
20}
21
22function onError (err, jobId, callback) {
23 logger.error('Error when transcoding video file in job %d.', jobId, { error: err })
24 return callback()
25}
26
27function onSuccess (data, jobId, callback) {
28 logger.info('Job %d is a success.', jobId)
29 return callback()
30}
31
32// ---------------------------------------------------------------------------
33
34module.exports = VideoTranscoderHandler
diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js
new file mode 100644
index 000000000..589a30630
--- /dev/null
+++ b/server/lib/jobs/job-scheduler.js
@@ -0,0 +1,110 @@
1'use strict'
2
3const forever = require('async/forever')
4const queue = require('async/queue')
5
6const constants = require('../../initializers/constants')
7const db = require('../../initializers/database')
8const logger = require('../../helpers/logger')
9
10const jobHandlers = require('./handlers')
11
12const jobScheduler = {
13 activate,
14 createJob
15}
16
17function activate () {
18 logger.info('Jobs scheduler activated.')
19
20 const jobsQueue = queue(processJob)
21
22 forever(
23 function (next) {
24 if (jobsQueue.length() !== 0) {
25 // Finish processing the queue first
26 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
27 }
28
29 db.Job.listWithLimit(constants.JOBS_FETCH_LIMIT_PER_CYCLE, function (err, jobs) {
30 if (err) {
31 logger.error('Cannot list pending jobs.', { error: err })
32 } else {
33 jobs.forEach(function (job) {
34 jobsQueue.push(job)
35 })
36 }
37
38 // Optimization: we could use "drain" from queue object
39 return setTimeout(next, constants.JOBS_FETCHING_INTERVAL)
40 })
41 }
42 )
43}
44
45// ---------------------------------------------------------------------------
46
47module.exports = jobScheduler
48
49// ---------------------------------------------------------------------------
50
51function createJob (transaction, handlerName, handlerInputData, callback) {
52 const createQuery = {
53 state: constants.JOB_STATES.PENDING,
54 handlerName,
55 handlerInputData
56 }
57 const options = { transaction }
58
59 db.Job.create(createQuery, options).asCallback(callback)
60}
61
62function processJob (job, callback) {
63 const jobHandler = jobHandlers[job.handlerName]
64
65 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
66
67 job.state = constants.JOB_STATES.PROCESSING
68 job.save().asCallback(function (err) {
69 if (err) return cannotSaveJobError(err, callback)
70
71 if (jobHandler === undefined) {
72 logger.error('Unknown job handler for job %s.', jobHandler.handlerName)
73 return callback()
74 }
75
76 return jobHandler.process(job.handlerInputData, function (err, result) {
77 if (err) {
78 logger.error('Error in job handler %s.', job.handlerName, { error: err })
79 return onJobError(jobHandler, job, callback)
80 }
81
82 return onJobSuccess(jobHandler, job, callback)
83 })
84 })
85}
86
87function onJobError (jobHandler, job, callback) {
88 job.state = constants.JOB_STATES.ERROR
89
90 job.save().asCallback(function (err) {
91 if (err) return cannotSaveJobError(err, callback)
92
93 return jobHandler.onError(err, job.id, callback)
94 })
95}
96
97function onJobSuccess (jobHandler, job, callback) {
98 job.state = constants.JOB_STATES.SUCCESS
99
100 job.save().asCallback(function (err) {
101 if (err) return cannotSaveJobError(err, callback)
102
103 return jobHandler.onSuccess(err, job.id, callback)
104 })
105}
106
107function cannotSaveJobError (err, callback) {
108 logger.error('Cannot save new job state.', { error: err })
109 return callback(err)
110}