aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/jobs')
-rw-r--r--server/lib/jobs/handlers/index.ts10
-rw-r--r--server/lib/jobs/handlers/video-transcoder.ts7
-rw-r--r--server/lib/jobs/job-scheduler.ts24
3 files changed, 27 insertions, 14 deletions
diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts
index ae5440031..7d0263b15 100644
--- a/server/lib/jobs/handlers/index.ts
+++ b/server/lib/jobs/handlers/index.ts
@@ -1,6 +1,14 @@
1import * as videoTranscoder from './video-transcoder' 1import * as videoTranscoder from './video-transcoder'
2 2
3const jobHandlers = { 3import { VideoInstance } from '../../../models'
4
5export interface JobHandler<T> {
6 process (data: object, callback: (err: Error, videoInstance?: T) => void)
7 onError (err: Error, jobId: number, video: T, callback: (err: Error) => void)
8 onSuccess (data: any, jobId: number, video: T, callback: (err: Error) => void)
9}
10
11const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = {
4 videoTranscoder 12 videoTranscoder
5} 13}
6 14
diff --git a/server/lib/jobs/handlers/video-transcoder.ts b/server/lib/jobs/handlers/video-transcoder.ts
index 43599356a..efa18ef2d 100644
--- a/server/lib/jobs/handlers/video-transcoder.ts
+++ b/server/lib/jobs/handlers/video-transcoder.ts
@@ -1,8 +1,9 @@
1import { database as db } from '../../../initializers/database' 1import { database as db } from '../../../initializers/database'
2import { logger } from '../../../helpers' 2import { logger } from '../../../helpers'
3import { addVideoToFriends } from '../../../lib' 3import { addVideoToFriends } from '../../../lib'
4import { VideoInstance } from '../../../models'
4 5
5function process (data, callback) { 6function process (data: { id: string }, callback: (err: Error, videoInstance?: VideoInstance) => void) {
6 db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { 7 db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) {
7 if (err) return callback(err) 8 if (err) return callback(err)
8 9
@@ -12,12 +13,12 @@ function process (data, callback) {
12 }) 13 })
13} 14}
14 15
15function onError (err, jobId, video, callback) { 16function onError (err: Error, jobId: number, video: VideoInstance, callback: () => void) {
16 logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) 17 logger.error('Error when transcoding video file in job %d.', jobId, { error: err })
17 return callback() 18 return callback()
18} 19}
19 20
20function onSuccess (data, jobId, video, callback) { 21function onSuccess (data: any, jobId: number, video: VideoInstance, callback: (err: Error) => void) {
21 logger.info('Job %d is a success.', jobId) 22 logger.info('Job %d is a success.', jobId)
22 23
23 video.toAddRemoteJSON(function (err, remoteVideo) { 24 video.toAddRemoteJSON(function (err, remoteVideo) {
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
index ad5f7f6d9..2f01387e7 100644
--- a/server/lib/jobs/job-scheduler.ts
+++ b/server/lib/jobs/job-scheduler.ts
@@ -1,4 +1,5 @@
1import { forever, queue } from 'async' 1import { forever, queue } from 'async'
2import * as Sequelize from 'sequelize'
2 3
3import { database as db } from '../../initializers/database' 4import { database as db } from '../../initializers/database'
4import { 5import {
@@ -7,7 +8,10 @@ import {
7 JOB_STATES 8 JOB_STATES
8} from '../../initializers' 9} from '../../initializers'
9import { logger } from '../../helpers' 10import { logger } from '../../helpers'
10import { jobHandlers } from './handlers' 11import { JobInstance } from '../../models'
12import { JobHandler, jobHandlers } from './handlers'
13
14type JobQueueCallback = (err: Error) => void
11 15
12class JobScheduler { 16class JobScheduler {
13 17
@@ -24,7 +28,7 @@ class JobScheduler {
24 28
25 logger.info('Jobs scheduler activated.') 29 logger.info('Jobs scheduler activated.')
26 30
27 const jobsQueue = queue(this.processJob.bind(this)) 31 const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this))
28 32
29 // Finish processing jobs from a previous start 33 // Finish processing jobs from a previous start
30 const state = JOB_STATES.PROCESSING 34 const state = JOB_STATES.PROCESSING
@@ -58,7 +62,7 @@ class JobScheduler {
58 }) 62 })
59 } 63 }
60 64
61 createJob (transaction, handlerName: string, handlerInputData: object, callback) { 65 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object, callback: (err: Error) => void) {
62 const createQuery = { 66 const createQuery = {
63 state: JOB_STATES.PENDING, 67 state: JOB_STATES.PENDING,
64 handlerName, 68 handlerName,
@@ -69,7 +73,7 @@ class JobScheduler {
69 db.Job.create(createQuery, options).asCallback(callback) 73 db.Job.create(createQuery, options).asCallback(callback)
70 } 74 }
71 75
72 private enqueueJobs (err, jobsQueue, jobs) { 76 private enqueueJobs (err: Error, jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) {
73 if (err) { 77 if (err) {
74 logger.error('Cannot list pending jobs.', { error: err }) 78 logger.error('Cannot list pending jobs.', { error: err })
75 } else { 79 } else {
@@ -79,7 +83,7 @@ class JobScheduler {
79 } 83 }
80 } 84 }
81 85
82 private processJob (job, callback) { 86 private processJob (job: JobInstance, callback: (err: Error) => void) {
83 const jobHandler = jobHandlers[job.handlerName] 87 const jobHandler = jobHandlers[job.handlerName]
84 88
85 logger.info('Processing job %d with handler %s.', job.id, job.handlerName) 89 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
@@ -89,8 +93,8 @@ class JobScheduler {
89 if (err) return this.cannotSaveJobError(err, callback) 93 if (err) return this.cannotSaveJobError(err, callback)
90 94
91 if (jobHandler === undefined) { 95 if (jobHandler === undefined) {
92 logger.error('Unknown job handler for job %s.', jobHandler.handlerName) 96 logger.error('Unknown job handler for job %s.', job.handlerName)
93 return callback() 97 return callback(null)
94 } 98 }
95 99
96 return jobHandler.process(job.handlerInputData, (err, result) => { 100 return jobHandler.process(job.handlerInputData, (err, result) => {
@@ -104,7 +108,7 @@ class JobScheduler {
104 }) 108 })
105 } 109 }
106 110
107 private onJobError (jobHandler, job, jobResult, callback) { 111 private onJobError (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) {
108 job.state = JOB_STATES.ERROR 112 job.state = JOB_STATES.ERROR
109 113
110 job.save().asCallback(err => { 114 job.save().asCallback(err => {
@@ -114,7 +118,7 @@ class JobScheduler {
114 }) 118 })
115 } 119 }
116 120
117 private onJobSuccess (jobHandler, job, jobResult, callback) { 121 private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) {
118 job.state = JOB_STATES.SUCCESS 122 job.state = JOB_STATES.SUCCESS
119 123
120 job.save().asCallback(err => { 124 job.save().asCallback(err => {
@@ -124,7 +128,7 @@ class JobScheduler {
124 }) 128 })
125 } 129 }
126 130
127 private cannotSaveJobError (err, callback) { 131 private cannotSaveJobError (err: Error, callback: (err: Error) => void) {
128 logger.error('Cannot save new job state.', { error: err }) 132 logger.error('Cannot save new job state.', { error: err })
129 return callback(err) 133 return callback(err)
130 } 134 }