aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs/job-scheduler.ts
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-07-05 13:26:25 +0200
committerChocobozzz <florian.bigard@gmail.com>2017-07-05 14:14:16 +0200
commit6fcd19ba737f1f5614a56c6925adb882dea43b8d (patch)
tree3365a96d82bc7f00ae504a568725c8e914150cf8 /server/lib/jobs/job-scheduler.ts
parent5fe7e898316e18369c3e1aba307b55077adc7bfb (diff)
downloadPeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.gz
PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.tar.zst
PeerTube-6fcd19ba737f1f5614a56c6925adb882dea43b8d.zip
Move to promises
Closes https://github.com/Chocobozzz/PeerTube/issues/74
Diffstat (limited to 'server/lib/jobs/job-scheduler.ts')
-rw-r--r--server/lib/jobs/job-scheduler.ts117
1 files changed, 55 insertions, 62 deletions
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
index 2f01387e7..248dc7978 100644
--- a/server/lib/jobs/job-scheduler.ts
+++ b/server/lib/jobs/job-scheduler.ts
@@ -32,37 +32,35 @@ class JobScheduler {
32 32
33 // Finish processing jobs from a previous start 33 // Finish processing jobs from a previous start
34 const state = JOB_STATES.PROCESSING 34 const state = JOB_STATES.PROCESSING
35 db.Job.listWithLimit(limit, state, (err, jobs) => { 35 db.Job.listWithLimit(limit, state)
36 this.enqueueJobs(err, jobsQueue, jobs) 36 .then(jobs => {
37 37 this.enqueueJobs(jobsQueue, jobs)
38 forever( 38
39 next => { 39 forever(
40 if (jobsQueue.length() !== 0) { 40 next => {
41 // Finish processing the queue first 41 if (jobsQueue.length() !== 0) {
42 return setTimeout(next, JOBS_FETCHING_INTERVAL) 42 // Finish processing the queue first
43 } 43 return setTimeout(next, JOBS_FETCHING_INTERVAL)
44
45 const state = JOB_STATES.PENDING
46 db.Job.listWithLimit(limit, state, (err, jobs) => {
47 if (err) {
48 logger.error('Cannot list pending jobs.', { error: err })
49 } else {
50 jobs.forEach(job => {
51 jobsQueue.push(job)
52 })
53 } 44 }
54 45
55 // Optimization: we could use "drain" from queue object 46 const state = JOB_STATES.PENDING
56 return setTimeout(next, JOBS_FETCHING_INTERVAL) 47 db.Job.listWithLimit(limit, state)
57 }) 48 .then(jobs => {
58 }, 49 this.enqueueJobs(jobsQueue, jobs)
59 50
60 err => { logger.error('Error in job scheduler queue.', { error: err }) } 51 // Optimization: we could use "drain" from queue object
61 ) 52 return setTimeout(next, JOBS_FETCHING_INTERVAL)
62 }) 53 })
54 .catch(err => logger.error('Cannot list pending jobs.', { error: err }))
55 },
56
57 err => logger.error('Error in job scheduler queue.', { error: err })
58 )
59 })
60 .catch(err => logger.error('Cannot list pending jobs.', { error: err }))
63 } 61 }
64 62
65 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object, callback: (err: Error) => void) { 63 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) {
66 const createQuery = { 64 const createQuery = {
67 state: JOB_STATES.PENDING, 65 state: JOB_STATES.PENDING,
68 handlerName, 66 handlerName,
@@ -70,67 +68,62 @@ class JobScheduler {
70 } 68 }
71 const options = { transaction } 69 const options = { transaction }
72 70
73 db.Job.create(createQuery, options).asCallback(callback) 71 return db.Job.create(createQuery, options)
74 } 72 }
75 73
76 private enqueueJobs (err: Error, jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) { 74 private enqueueJobs (jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) {
77 if (err) { 75 jobs.forEach(job => jobsQueue.push(job))
78 logger.error('Cannot list pending jobs.', { error: err })
79 } else {
80 jobs.forEach(job => {
81 jobsQueue.push(job)
82 })
83 }
84 } 76 }
85 77
86 private processJob (job: JobInstance, callback: (err: Error) => void) { 78 private processJob (job: JobInstance, callback: (err: Error) => void) {
87 const jobHandler = jobHandlers[job.handlerName] 79 const jobHandler = jobHandlers[job.handlerName]
80 if (jobHandler === undefined) {
81 logger.error('Unknown job handler for job %s.', job.handlerName)
82 return callback(null)
83 }
88 84
89 logger.info('Processing job %d with handler %s.', job.id, job.handlerName) 85 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
90 86
91 job.state = JOB_STATES.PROCESSING 87 job.state = JOB_STATES.PROCESSING
92 job.save().asCallback(err => { 88 return job.save()
93 if (err) return this.cannotSaveJobError(err, callback) 89 .then(() => {
94 90 return jobHandler.process(job.handlerInputData)
95 if (jobHandler === undefined) { 91 })
96 logger.error('Unknown job handler for job %s.', job.handlerName) 92 .then(
97 return callback(null) 93 result => {
98 } 94 return this.onJobSuccess(jobHandler, job, result)
95 },
99 96
100 return jobHandler.process(job.handlerInputData, (err, result) => { 97 err => {
101 if (err) {
102 logger.error('Error in job handler %s.', job.handlerName, { error: err }) 98 logger.error('Error in job handler %s.', job.handlerName, { error: err })
103 return this.onJobError(jobHandler, job, result, callback) 99 return this.onJobError(jobHandler, job, err)
104 } 100 }
105 101 )
106 return this.onJobSuccess(jobHandler, job, result, callback) 102 .then(() => callback(null))
103 .catch(err => {
104 this.cannotSaveJobError(err)
105 return callback(err)
107 }) 106 })
108 })
109 } 107 }
110 108
111 private onJobError (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { 109 private onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) {
112 job.state = JOB_STATES.ERROR 110 job.state = JOB_STATES.ERROR
113 111
114 job.save().asCallback(err => { 112 return job.save()
115 if (err) return this.cannotSaveJobError(err, callback) 113 .then(() => jobHandler.onError(err, job.id))
116 114 .catch(err => this.cannotSaveJobError(err))
117 return jobHandler.onError(err, job.id, jobResult, callback)
118 })
119 } 115 }
120 116
121 private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { 117 private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) {
122 job.state = JOB_STATES.SUCCESS 118 job.state = JOB_STATES.SUCCESS
123 119
124 job.save().asCallback(err => { 120 return job.save()
125 if (err) return this.cannotSaveJobError(err, callback) 121 .then(() => jobHandler.onSuccess(job.id, jobResult))
126 122 .catch(err => this.cannotSaveJobError(err))
127 return jobHandler.onSuccess(err, job.id, jobResult, callback)
128 })
129 } 123 }
130 124
131 private cannotSaveJobError (err: Error, callback: (err: Error) => void) { 125 private cannotSaveJobError (err: Error) {
132 logger.error('Cannot save new job state.', { error: err }) 126 logger.error('Cannot save new job state.', { error: err })
133 return callback(err)
134 } 127 }
135} 128}
136 129