diff options
Diffstat (limited to 'server/lib/jobs')
-rw-r--r-- | server/lib/jobs/handlers/index.js | 7 | ||||
-rw-r--r-- | server/lib/jobs/handlers/index.ts | 9 | ||||
-rw-r--r-- | server/lib/jobs/handlers/video-transcoder.ts (renamed from server/lib/jobs/handlers/video-transcoder.js) | 22 | ||||
-rw-r--r-- | server/lib/jobs/index.ts | 1 | ||||
-rw-r--r-- | server/lib/jobs/job-scheduler.js | 129 | ||||
-rw-r--r-- | server/lib/jobs/job-scheduler.ts | 137 |
6 files changed, 155 insertions, 150 deletions
diff --git a/server/lib/jobs/handlers/index.js b/server/lib/jobs/handlers/index.js deleted file mode 100644 index 59c1ccce5..000000000 --- a/server/lib/jobs/handlers/index.js +++ /dev/null | |||
@@ -1,7 +0,0 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const videoTranscoder = require('./video-transcoder') | ||
4 | |||
5 | module.exports = { | ||
6 | videoTranscoder | ||
7 | } | ||
diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts new file mode 100644 index 000000000..ae5440031 --- /dev/null +++ b/server/lib/jobs/handlers/index.ts | |||
@@ -0,0 +1,9 @@ | |||
1 | import * as videoTranscoder from './video-transcoder' | ||
2 | |||
3 | const jobHandlers = { | ||
4 | videoTranscoder | ||
5 | } | ||
6 | |||
7 | export { | ||
8 | jobHandlers | ||
9 | } | ||
diff --git a/server/lib/jobs/handlers/video-transcoder.js b/server/lib/jobs/handlers/video-transcoder.ts index d2ad4f9c7..35db5fb96 100644 --- a/server/lib/jobs/handlers/video-transcoder.js +++ b/server/lib/jobs/handlers/video-transcoder.ts | |||
@@ -1,16 +1,6 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const db = require('../../../initializers/database') | 1 | const db = require('../../../initializers/database') |
4 | const logger = require('../../../helpers/logger') | 2 | import { logger } from '../../../helpers' |
5 | const friends = require('../../../lib/friends') | 3 | import { addVideoToFriends } from '../../../lib' |
6 | |||
7 | const VideoTranscoderHandler = { | ||
8 | process, | ||
9 | onError, | ||
10 | onSuccess | ||
11 | } | ||
12 | |||
13 | // --------------------------------------------------------------------------- | ||
14 | 4 | ||
15 | function process (data, callback) { | 5 | function process (data, callback) { |
16 | db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { | 6 | db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { |
@@ -34,10 +24,14 @@ function onSuccess (data, jobId, video, callback) { | |||
34 | if (err) return callback(err) | 24 | if (err) return callback(err) |
35 | 25 | ||
36 | // Now we'll add the video's meta data to our friends | 26 | // Now we'll add the video's meta data to our friends |
37 | friends.addVideoToFriends(remoteVideo, null, callback) | 27 | addVideoToFriends(remoteVideo, null, callback) |
38 | }) | 28 | }) |
39 | } | 29 | } |
40 | 30 | ||
41 | // --------------------------------------------------------------------------- | 31 | // --------------------------------------------------------------------------- |
42 | 32 | ||
43 | module.exports = VideoTranscoderHandler | 33 | export { |
34 | process, | ||
35 | onError, | ||
36 | onSuccess | ||
37 | } | ||
diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts new file mode 100644 index 000000000..b18a3d845 --- /dev/null +++ b/server/lib/jobs/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './job-scheduler' | |||
diff --git a/server/lib/jobs/job-scheduler.js b/server/lib/jobs/job-scheduler.js deleted file mode 100644 index 7b239577f..000000000 --- a/server/lib/jobs/job-scheduler.js +++ /dev/null | |||
@@ -1,129 +0,0 @@ | |||
1 | 'use strict' | ||
2 | |||
3 | const forever = require('async/forever') | ||
4 | const queue = require('async/queue') | ||
5 | |||
6 | const constants = require('../../initializers/constants') | ||
7 | const db = require('../../initializers/database') | ||
8 | const logger = require('../../helpers/logger') | ||
9 | |||
10 | const jobHandlers = require('./handlers') | ||
11 | |||
12 | const jobScheduler = { | ||
13 | activate, | ||
14 | createJob | ||
15 | } | ||
16 | |||
17 | function activate () { | ||
18 | const limit = constants.JOBS_FETCH_LIMIT_PER_CYCLE | ||
19 | |||
20 | logger.info('Jobs scheduler activated.') | ||
21 | |||
22 | const jobsQueue = queue(processJob) | ||
23 | |||
24 | // Finish processing jobs from a previous start | ||
25 | const state = constants.JOB_STATES.PROCESSING | ||
26 | db.Job.listWithLimit(limit, state, function (err, jobs) { | ||
27 | enqueueJobs(err, jobsQueue, jobs) | ||
28 | |||
29 | forever( | ||
30 | function (next) { | ||
31 | if (jobsQueue.length() !== 0) { | ||
32 | // Finish processing the queue first | ||
33 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | ||
34 | } | ||
35 | |||
36 | const state = constants.JOB_STATES.PENDING | ||
37 | db.Job.listWithLimit(limit, state, function (err, jobs) { | ||
38 | if (err) { | ||
39 | logger.error('Cannot list pending jobs.', { error: err }) | ||
40 | } else { | ||
41 | jobs.forEach(function (job) { | ||
42 | jobsQueue.push(job) | ||
43 | }) | ||
44 | } | ||
45 | |||
46 | // Optimization: we could use "drain" from queue object | ||
47 | return setTimeout(next, constants.JOBS_FETCHING_INTERVAL) | ||
48 | }) | ||
49 | } | ||
50 | ) | ||
51 | }) | ||
52 | } | ||
53 | |||
54 | // --------------------------------------------------------------------------- | ||
55 | |||
56 | module.exports = jobScheduler | ||
57 | |||
58 | // --------------------------------------------------------------------------- | ||
59 | |||
60 | function enqueueJobs (err, jobsQueue, jobs) { | ||
61 | if (err) { | ||
62 | logger.error('Cannot list pending jobs.', { error: err }) | ||
63 | } else { | ||
64 | jobs.forEach(function (job) { | ||
65 | jobsQueue.push(job) | ||
66 | }) | ||
67 | } | ||
68 | } | ||
69 | |||
70 | function createJob (transaction, handlerName, handlerInputData, callback) { | ||
71 | const createQuery = { | ||
72 | state: constants.JOB_STATES.PENDING, | ||
73 | handlerName, | ||
74 | handlerInputData | ||
75 | } | ||
76 | const options = { transaction } | ||
77 | |||
78 | db.Job.create(createQuery, options).asCallback(callback) | ||
79 | } | ||
80 | |||
81 | function processJob (job, callback) { | ||
82 | const jobHandler = jobHandlers[job.handlerName] | ||
83 | |||
84 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | ||
85 | |||
86 | job.state = constants.JOB_STATES.PROCESSING | ||
87 | job.save().asCallback(function (err) { | ||
88 | if (err) return cannotSaveJobError(err, callback) | ||
89 | |||
90 | if (jobHandler === undefined) { | ||
91 | logger.error('Unknown job handler for job %s.', jobHandler.handlerName) | ||
92 | return callback() | ||
93 | } | ||
94 | |||
95 | return jobHandler.process(job.handlerInputData, function (err, result) { | ||
96 | if (err) { | ||
97 | logger.error('Error in job handler %s.', job.handlerName, { error: err }) | ||
98 | return onJobError(jobHandler, job, result, callback) | ||
99 | } | ||
100 | |||
101 | return onJobSuccess(jobHandler, job, result, callback) | ||
102 | }) | ||
103 | }) | ||
104 | } | ||
105 | |||
106 | function onJobError (jobHandler, job, jobResult, callback) { | ||
107 | job.state = constants.JOB_STATES.ERROR | ||
108 | |||
109 | job.save().asCallback(function (err) { | ||
110 | if (err) return cannotSaveJobError(err, callback) | ||
111 | |||
112 | return jobHandler.onError(err, job.id, jobResult, callback) | ||
113 | }) | ||
114 | } | ||
115 | |||
116 | function onJobSuccess (jobHandler, job, jobResult, callback) { | ||
117 | job.state = constants.JOB_STATES.SUCCESS | ||
118 | |||
119 | job.save().asCallback(function (err) { | ||
120 | if (err) return cannotSaveJobError(err, callback) | ||
121 | |||
122 | return jobHandler.onSuccess(err, job.id, jobResult, callback) | ||
123 | }) | ||
124 | } | ||
125 | |||
126 | function cannotSaveJobError (err, callback) { | ||
127 | logger.error('Cannot save new job state.', { error: err }) | ||
128 | return callback(err) | ||
129 | } | ||
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts new file mode 100644 index 000000000..7b8c6faf9 --- /dev/null +++ b/server/lib/jobs/job-scheduler.ts | |||
@@ -0,0 +1,137 @@ | |||
1 | import { forever, queue } from 'async' | ||
2 | |||
3 | const db = require('../../initializers/database') | ||
4 | import { | ||
5 | JOBS_FETCHING_INTERVAL, | ||
6 | JOBS_FETCH_LIMIT_PER_CYCLE, | ||
7 | JOB_STATES | ||
8 | } from '../../initializers' | ||
9 | import { logger } from '../../helpers' | ||
10 | import { jobHandlers } from './handlers' | ||
11 | |||
12 | class JobScheduler { | ||
13 | |||
14 | private static instance: JobScheduler | ||
15 | |||
16 | private constructor () { } | ||
17 | |||
18 | static get Instance () { | ||
19 | return this.instance || (this.instance = new this()) | ||
20 | } | ||
21 | |||
22 | activate () { | ||
23 | const limit = JOBS_FETCH_LIMIT_PER_CYCLE | ||
24 | |||
25 | logger.info('Jobs scheduler activated.') | ||
26 | |||
27 | const jobsQueue = queue(this.processJob) | ||
28 | |||
29 | // Finish processing jobs from a previous start | ||
30 | const state = JOB_STATES.PROCESSING | ||
31 | db.Job.listWithLimit(limit, state, (err, jobs) => { | ||
32 | this.enqueueJobs(err, jobsQueue, jobs) | ||
33 | |||
34 | forever( | ||
35 | next => { | ||
36 | if (jobsQueue.length() !== 0) { | ||
37 | // Finish processing the queue first | ||
38 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | ||
39 | } | ||
40 | |||
41 | const state = JOB_STATES.PENDING | ||
42 | db.Job.listWithLimit(limit, state, (err, jobs) => { | ||
43 | if (err) { | ||
44 | logger.error('Cannot list pending jobs.', { error: err }) | ||
45 | } else { | ||
46 | jobs.forEach(job => { | ||
47 | jobsQueue.push(job) | ||
48 | }) | ||
49 | } | ||
50 | |||
51 | // Optimization: we could use "drain" from queue object | ||
52 | return setTimeout(next, JOBS_FETCHING_INTERVAL) | ||
53 | }) | ||
54 | }, | ||
55 | |||
56 | err => { logger.error('Error in job scheduler queue.', { error: err }) } | ||
57 | ) | ||
58 | }) | ||
59 | } | ||
60 | |||
61 | createJob (transaction, handlerName, handlerInputData, callback) { | ||
62 | const createQuery = { | ||
63 | state: JOB_STATES.PENDING, | ||
64 | handlerName, | ||
65 | handlerInputData | ||
66 | } | ||
67 | const options = { transaction } | ||
68 | |||
69 | db.Job.create(createQuery, options).asCallback(callback) | ||
70 | } | ||
71 | |||
72 | private enqueueJobs (err, jobsQueue, jobs) { | ||
73 | if (err) { | ||
74 | logger.error('Cannot list pending jobs.', { error: err }) | ||
75 | } else { | ||
76 | jobs.forEach(job => { | ||
77 | jobsQueue.push(job) | ||
78 | }) | ||
79 | } | ||
80 | } | ||
81 | |||
82 | private processJob (job, callback) { | ||
83 | const jobHandler = jobHandlers[job.handlerName] | ||
84 | |||
85 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | ||
86 | |||
87 | job.state = JOB_STATES.PROCESSING | ||
88 | job.save().asCallback(err => { | ||
89 | if (err) return this.cannotSaveJobError(err, callback) | ||
90 | |||
91 | if (jobHandler === undefined) { | ||
92 | logger.error('Unknown job handler for job %s.', jobHandler.handlerName) | ||
93 | return callback() | ||
94 | } | ||
95 | |||
96 | return jobHandler.process(job.handlerInputData, (err, result) => { | ||
97 | if (err) { | ||
98 | logger.error('Error in job handler %s.', job.handlerName, { error: err }) | ||
99 | return this.onJobError(jobHandler, job, result, callback) | ||
100 | } | ||
101 | |||
102 | return this.onJobSuccess(jobHandler, job, result, callback) | ||
103 | }) | ||
104 | }) | ||
105 | } | ||
106 | |||
107 | private onJobError (jobHandler, job, jobResult, callback) { | ||
108 | job.state = JOB_STATES.ERROR | ||
109 | |||
110 | job.save().asCallback(err => { | ||
111 | if (err) return this.cannotSaveJobError(err, callback) | ||
112 | |||
113 | return jobHandler.onError(err, job.id, jobResult, callback) | ||
114 | }) | ||
115 | } | ||
116 | |||
117 | private onJobSuccess (jobHandler, job, jobResult, callback) { | ||
118 | job.state = JOB_STATES.SUCCESS | ||
119 | |||
120 | job.save().asCallback(err => { | ||
121 | if (err) return this.cannotSaveJobError(err, callback) | ||
122 | |||
123 | return jobHandler.onSuccess(err, job.id, jobResult, callback) | ||
124 | }) | ||
125 | } | ||
126 | |||
127 | private cannotSaveJobError (err, callback) { | ||
128 | logger.error('Cannot save new job state.', { error: err }) | ||
129 | return callback(err) | ||
130 | } | ||
131 | } | ||
132 | |||
133 | // --------------------------------------------------------------------------- | ||
134 | |||
135 | export { | ||
136 | JobScheduler | ||
137 | } | ||