aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/jobs')
-rw-r--r--server/lib/jobs/handlers/index.ts8
-rw-r--r--server/lib/jobs/handlers/video-transcoder.ts22
-rw-r--r--server/lib/jobs/job-scheduler.ts117
3 files changed, 66 insertions, 81 deletions
diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts
index 7d0263b15..8abddae35 100644
--- a/server/lib/jobs/handlers/index.ts
+++ b/server/lib/jobs/handlers/index.ts
@@ -1,11 +1,9 @@
1import * as videoTranscoder from './video-transcoder' 1import * as videoTranscoder from './video-transcoder'
2 2
3import { VideoInstance } from '../../../models'
4
5export interface JobHandler<T> { 3export interface JobHandler<T> {
6 process (data: object, callback: (err: Error, videoInstance?: T) => void) 4 process (data: object): T
7 onError (err: Error, jobId: number, video: T, callback: (err: Error) => void) 5 onError (err: Error, jobId: number)
8 onSuccess (data: any, jobId: number, video: T, callback: (err: Error) => void) 6 onSuccess (jobId: number, jobResult: T)
9} 7}
10 8
11const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = { 9const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = {
diff --git a/server/lib/jobs/handlers/video-transcoder.ts b/server/lib/jobs/handlers/video-transcoder.ts
index 6f606a7d3..e829ca813 100644
--- a/server/lib/jobs/handlers/video-transcoder.ts
+++ b/server/lib/jobs/handlers/video-transcoder.ts
@@ -3,29 +3,23 @@ import { logger } from '../../../helpers'
3import { addVideoToFriends } from '../../../lib' 3import { addVideoToFriends } from '../../../lib'
4import { VideoInstance } from '../../../models' 4import { VideoInstance } from '../../../models'
5 5
6function process (data: { id: string }, callback: (err: Error, videoInstance?: VideoInstance) => void) { 6function process (data: { id: string }) {
7 db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { 7 return db.Video.loadAndPopulateAuthorAndPodAndTags(data.id).then(video => {
8 if (err) return callback(err) 8 return video.transcodeVideofile().then(() => video)
9
10 video.transcodeVideofile(function (err) {
11 return callback(err, video)
12 })
13 }) 9 })
14} 10}
15 11
16function onError (err: Error, jobId: number, video: VideoInstance, callback: (err: Error) => void) { 12function onError (err: Error, jobId: number) {
17 logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) 13 logger.error('Error when transcoding video file in job %d.', jobId, { error: err })
18 return callback(null) 14 return Promise.resolve()
19} 15}
20 16
21function onSuccess (data: any, jobId: number, video: VideoInstance, callback: (err: Error) => void) { 17function onSuccess (jobId: number, video: VideoInstance) {
22 logger.info('Job %d is a success.', jobId) 18 logger.info('Job %d is a success.', jobId)
23 19
24 video.toAddRemoteJSON(function (err, remoteVideo) { 20 video.toAddRemoteJSON().then(remoteVideo => {
25 if (err) return callback(err)
26
27 // Now we'll add the video's meta data to our friends 21 // Now we'll add the video's meta data to our friends
28 addVideoToFriends(remoteVideo, null, callback) 22 return addVideoToFriends(remoteVideo, null)
29 }) 23 })
30} 24}
31 25
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
index 2f01387e7..248dc7978 100644
--- a/server/lib/jobs/job-scheduler.ts
+++ b/server/lib/jobs/job-scheduler.ts
@@ -32,37 +32,35 @@ class JobScheduler {
32 32
33 // Finish processing jobs from a previous start 33 // Finish processing jobs from a previous start
34 const state = JOB_STATES.PROCESSING 34 const state = JOB_STATES.PROCESSING
35 db.Job.listWithLimit(limit, state, (err, jobs) => { 35 db.Job.listWithLimit(limit, state)
36 this.enqueueJobs(err, jobsQueue, jobs) 36 .then(jobs => {
37 37 this.enqueueJobs(jobsQueue, jobs)
38 forever( 38
39 next => { 39 forever(
40 if (jobsQueue.length() !== 0) { 40 next => {
41 // Finish processing the queue first 41 if (jobsQueue.length() !== 0) {
42 return setTimeout(next, JOBS_FETCHING_INTERVAL) 42 // Finish processing the queue first
43 } 43 return setTimeout(next, JOBS_FETCHING_INTERVAL)
44
45 const state = JOB_STATES.PENDING
46 db.Job.listWithLimit(limit, state, (err, jobs) => {
47 if (err) {
48 logger.error('Cannot list pending jobs.', { error: err })
49 } else {
50 jobs.forEach(job => {
51 jobsQueue.push(job)
52 })
53 } 44 }
54 45
55 // Optimization: we could use "drain" from queue object 46 const state = JOB_STATES.PENDING
56 return setTimeout(next, JOBS_FETCHING_INTERVAL) 47 db.Job.listWithLimit(limit, state)
57 }) 48 .then(jobs => {
58 }, 49 this.enqueueJobs(jobsQueue, jobs)
59 50
60 err => { logger.error('Error in job scheduler queue.', { error: err }) } 51 // Optimization: we could use "drain" from queue object
61 ) 52 return setTimeout(next, JOBS_FETCHING_INTERVAL)
62 }) 53 })
54 .catch(err => logger.error('Cannot list pending jobs.', { error: err }))
55 },
56
57 err => logger.error('Error in job scheduler queue.', { error: err })
58 )
59 })
60 .catch(err => logger.error('Cannot list pending jobs.', { error: err }))
63 } 61 }
64 62
65 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object, callback: (err: Error) => void) { 63 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) {
66 const createQuery = { 64 const createQuery = {
67 state: JOB_STATES.PENDING, 65 state: JOB_STATES.PENDING,
68 handlerName, 66 handlerName,
@@ -70,67 +68,62 @@ class JobScheduler {
70 } 68 }
71 const options = { transaction } 69 const options = { transaction }
72 70
73 db.Job.create(createQuery, options).asCallback(callback) 71 return db.Job.create(createQuery, options)
74 } 72 }
75 73
76 private enqueueJobs (err: Error, jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) { 74 private enqueueJobs (jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) {
77 if (err) { 75 jobs.forEach(job => jobsQueue.push(job))
78 logger.error('Cannot list pending jobs.', { error: err })
79 } else {
80 jobs.forEach(job => {
81 jobsQueue.push(job)
82 })
83 }
84 } 76 }
85 77
86 private processJob (job: JobInstance, callback: (err: Error) => void) { 78 private processJob (job: JobInstance, callback: (err: Error) => void) {
87 const jobHandler = jobHandlers[job.handlerName] 79 const jobHandler = jobHandlers[job.handlerName]
80 if (jobHandler === undefined) {
81 logger.error('Unknown job handler for job %s.', job.handlerName)
82 return callback(null)
83 }
88 84
89 logger.info('Processing job %d with handler %s.', job.id, job.handlerName) 85 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
90 86
91 job.state = JOB_STATES.PROCESSING 87 job.state = JOB_STATES.PROCESSING
92 job.save().asCallback(err => { 88 return job.save()
93 if (err) return this.cannotSaveJobError(err, callback) 89 .then(() => {
94 90 return jobHandler.process(job.handlerInputData)
95 if (jobHandler === undefined) { 91 })
96 logger.error('Unknown job handler for job %s.', job.handlerName) 92 .then(
97 return callback(null) 93 result => {
98 } 94 return this.onJobSuccess(jobHandler, job, result)
95 },
99 96
100 return jobHandler.process(job.handlerInputData, (err, result) => { 97 err => {
101 if (err) {
102 logger.error('Error in job handler %s.', job.handlerName, { error: err }) 98 logger.error('Error in job handler %s.', job.handlerName, { error: err })
103 return this.onJobError(jobHandler, job, result, callback) 99 return this.onJobError(jobHandler, job, err)
104 } 100 }
105 101 )
106 return this.onJobSuccess(jobHandler, job, result, callback) 102 .then(() => callback(null))
103 .catch(err => {
104 this.cannotSaveJobError(err)
105 return callback(err)
107 }) 106 })
108 })
109 } 107 }
110 108
111 private onJobError (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { 109 private onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) {
112 job.state = JOB_STATES.ERROR 110 job.state = JOB_STATES.ERROR
113 111
114 job.save().asCallback(err => { 112 return job.save()
115 if (err) return this.cannotSaveJobError(err, callback) 113 .then(() => jobHandler.onError(err, job.id))
116 114 .catch(err => this.cannotSaveJobError(err))
117 return jobHandler.onError(err, job.id, jobResult, callback)
118 })
119 } 115 }
120 116
121 private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { 117 private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) {
122 job.state = JOB_STATES.SUCCESS 118 job.state = JOB_STATES.SUCCESS
123 119
124 job.save().asCallback(err => { 120 return job.save()
125 if (err) return this.cannotSaveJobError(err, callback) 121 .then(() => jobHandler.onSuccess(job.id, jobResult))
126 122 .catch(err => this.cannotSaveJobError(err))
127 return jobHandler.onSuccess(err, job.id, jobResult, callback)
128 })
129 } 123 }
130 124
131 private cannotSaveJobError (err: Error, callback: (err: Error) => void) { 125 private cannotSaveJobError (err: Error) {
132 logger.error('Cannot save new job state.', { error: err }) 126 logger.error('Cannot save new job state.', { error: err })
133 return callback(err)
134 } 127 }
135} 128}
136 129