diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-06-10 22:15:25 +0200 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-06-10 22:15:25 +0200 |
commit | 69818c9394366b954b6ba3bd697bd9d2b09f2a16 (patch) | |
tree | ad199a18ec3c322460d6f9523fc383ee562554e0 /server/lib/jobs | |
parent | 4d4e5cd4dca78480ec7f40e747f424cd107376a4 (diff) | |
download | PeerTube-69818c9394366b954b6ba3bd697bd9d2b09f2a16.tar.gz PeerTube-69818c9394366b954b6ba3bd697bd9d2b09f2a16.tar.zst PeerTube-69818c9394366b954b6ba3bd697bd9d2b09f2a16.zip |
Type functions
Diffstat (limited to 'server/lib/jobs')
-rw-r--r-- | server/lib/jobs/handlers/index.ts | 10 | ||||
-rw-r--r-- | server/lib/jobs/handlers/video-transcoder.ts | 7 | ||||
-rw-r--r-- | server/lib/jobs/job-scheduler.ts | 24 |
3 files changed, 27 insertions, 14 deletions
diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts index ae5440031..7d0263b15 100644 --- a/server/lib/jobs/handlers/index.ts +++ b/server/lib/jobs/handlers/index.ts | |||
@@ -1,6 +1,14 @@ | |||
1 | import * as videoTranscoder from './video-transcoder' | 1 | import * as videoTranscoder from './video-transcoder' |
2 | 2 | ||
3 | const jobHandlers = { | 3 | import { VideoInstance } from '../../../models' |
4 | |||
5 | export interface JobHandler<T> { | ||
6 | process (data: object, callback: (err: Error, videoInstance?: T) => void) | ||
7 | onError (err: Error, jobId: number, video: T, callback: (err: Error) => void) | ||
8 | onSuccess (data: any, jobId: number, video: T, callback: (err: Error) => void) | ||
9 | } | ||
10 | |||
11 | const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = { | ||
4 | videoTranscoder | 12 | videoTranscoder |
5 | } | 13 | } |
6 | 14 | ||
diff --git a/server/lib/jobs/handlers/video-transcoder.ts b/server/lib/jobs/handlers/video-transcoder.ts index 43599356a..efa18ef2d 100644 --- a/server/lib/jobs/handlers/video-transcoder.ts +++ b/server/lib/jobs/handlers/video-transcoder.ts | |||
@@ -1,8 +1,9 @@ | |||
1 | import { database as db } from '../../../initializers/database' | 1 | import { database as db } from '../../../initializers/database' |
2 | import { logger } from '../../../helpers' | 2 | import { logger } from '../../../helpers' |
3 | import { addVideoToFriends } from '../../../lib' | 3 | import { addVideoToFriends } from '../../../lib' |
4 | import { VideoInstance } from '../../../models' | ||
4 | 5 | ||
5 | function process (data, callback) { | 6 | function process (data: { id: string }, callback: (err: Error, videoInstance?: VideoInstance) => void) { |
6 | db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { | 7 | db.Video.loadAndPopulateAuthorAndPodAndTags(data.id, function (err, video) { |
7 | if (err) return callback(err) | 8 | if (err) return callback(err) |
8 | 9 | ||
@@ -12,12 +13,12 @@ function process (data, callback) { | |||
12 | }) | 13 | }) |
13 | } | 14 | } |
14 | 15 | ||
15 | function onError (err, jobId, video, callback) { | 16 | function onError (err: Error, jobId: number, video: VideoInstance, callback: () => void) { |
16 | logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) | 17 | logger.error('Error when transcoding video file in job %d.', jobId, { error: err }) |
17 | return callback() | 18 | return callback() |
18 | } | 19 | } |
19 | 20 | ||
20 | function onSuccess (data, jobId, video, callback) { | 21 | function onSuccess (data: any, jobId: number, video: VideoInstance, callback: (err: Error) => void) { |
21 | logger.info('Job %d is a success.', jobId) | 22 | logger.info('Job %d is a success.', jobId) |
22 | 23 | ||
23 | video.toAddRemoteJSON(function (err, remoteVideo) { | 24 | video.toAddRemoteJSON(function (err, remoteVideo) { |
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index ad5f7f6d9..2f01387e7 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts | |||
@@ -1,4 +1,5 @@ | |||
1 | import { forever, queue } from 'async' | 1 | import { forever, queue } from 'async' |
2 | import * as Sequelize from 'sequelize' | ||
2 | 3 | ||
3 | import { database as db } from '../../initializers/database' | 4 | import { database as db } from '../../initializers/database' |
4 | import { | 5 | import { |
@@ -7,7 +8,10 @@ import { | |||
7 | JOB_STATES | 8 | JOB_STATES |
8 | } from '../../initializers' | 9 | } from '../../initializers' |
9 | import { logger } from '../../helpers' | 10 | import { logger } from '../../helpers' |
10 | import { jobHandlers } from './handlers' | 11 | import { JobInstance } from '../../models' |
12 | import { JobHandler, jobHandlers } from './handlers' | ||
13 | |||
14 | type JobQueueCallback = (err: Error) => void | ||
11 | 15 | ||
12 | class JobScheduler { | 16 | class JobScheduler { |
13 | 17 | ||
@@ -24,7 +28,7 @@ class JobScheduler { | |||
24 | 28 | ||
25 | logger.info('Jobs scheduler activated.') | 29 | logger.info('Jobs scheduler activated.') |
26 | 30 | ||
27 | const jobsQueue = queue(this.processJob.bind(this)) | 31 | const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) |
28 | 32 | ||
29 | // Finish processing jobs from a previous start | 33 | // Finish processing jobs from a previous start |
30 | const state = JOB_STATES.PROCESSING | 34 | const state = JOB_STATES.PROCESSING |
@@ -58,7 +62,7 @@ class JobScheduler { | |||
58 | }) | 62 | }) |
59 | } | 63 | } |
60 | 64 | ||
61 | createJob (transaction, handlerName: string, handlerInputData: object, callback) { | 65 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object, callback: (err: Error) => void) { |
62 | const createQuery = { | 66 | const createQuery = { |
63 | state: JOB_STATES.PENDING, | 67 | state: JOB_STATES.PENDING, |
64 | handlerName, | 68 | handlerName, |
@@ -69,7 +73,7 @@ class JobScheduler { | |||
69 | db.Job.create(createQuery, options).asCallback(callback) | 73 | db.Job.create(createQuery, options).asCallback(callback) |
70 | } | 74 | } |
71 | 75 | ||
72 | private enqueueJobs (err, jobsQueue, jobs) { | 76 | private enqueueJobs (err: Error, jobsQueue: AsyncQueue<JobInstance>, jobs: JobInstance[]) { |
73 | if (err) { | 77 | if (err) { |
74 | logger.error('Cannot list pending jobs.', { error: err }) | 78 | logger.error('Cannot list pending jobs.', { error: err }) |
75 | } else { | 79 | } else { |
@@ -79,7 +83,7 @@ class JobScheduler { | |||
79 | } | 83 | } |
80 | } | 84 | } |
81 | 85 | ||
82 | private processJob (job, callback) { | 86 | private processJob (job: JobInstance, callback: (err: Error) => void) { |
83 | const jobHandler = jobHandlers[job.handlerName] | 87 | const jobHandler = jobHandlers[job.handlerName] |
84 | 88 | ||
85 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) | 89 | logger.info('Processing job %d with handler %s.', job.id, job.handlerName) |
@@ -89,8 +93,8 @@ class JobScheduler { | |||
89 | if (err) return this.cannotSaveJobError(err, callback) | 93 | if (err) return this.cannotSaveJobError(err, callback) |
90 | 94 | ||
91 | if (jobHandler === undefined) { | 95 | if (jobHandler === undefined) { |
92 | logger.error('Unknown job handler for job %s.', jobHandler.handlerName) | 96 | logger.error('Unknown job handler for job %s.', job.handlerName) |
93 | return callback() | 97 | return callback(null) |
94 | } | 98 | } |
95 | 99 | ||
96 | return jobHandler.process(job.handlerInputData, (err, result) => { | 100 | return jobHandler.process(job.handlerInputData, (err, result) => { |
@@ -104,7 +108,7 @@ class JobScheduler { | |||
104 | }) | 108 | }) |
105 | } | 109 | } |
106 | 110 | ||
107 | private onJobError (jobHandler, job, jobResult, callback) { | 111 | private onJobError (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { |
108 | job.state = JOB_STATES.ERROR | 112 | job.state = JOB_STATES.ERROR |
109 | 113 | ||
110 | job.save().asCallback(err => { | 114 | job.save().asCallback(err => { |
@@ -114,7 +118,7 @@ class JobScheduler { | |||
114 | }) | 118 | }) |
115 | } | 119 | } |
116 | 120 | ||
117 | private onJobSuccess (jobHandler, job, jobResult, callback) { | 121 | private onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any, callback: (err: Error) => void) { |
118 | job.state = JOB_STATES.SUCCESS | 122 | job.state = JOB_STATES.SUCCESS |
119 | 123 | ||
120 | job.save().asCallback(err => { | 124 | job.save().asCallback(err => { |
@@ -124,7 +128,7 @@ class JobScheduler { | |||
124 | }) | 128 | }) |
125 | } | 129 | } |
126 | 130 | ||
127 | private cannotSaveJobError (err, callback) { | 131 | private cannotSaveJobError (err: Error, callback: (err: Error) => void) { |
128 | logger.error('Cannot save new job state.', { error: err }) | 132 | logger.error('Cannot save new job state.', { error: err }) |
129 | return callback(err) | 133 | return callback(err) |
130 | } | 134 | } |