diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-11-09 17:51:58 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-11-27 19:40:51 +0100 |
commit | e4f97babf701481b55cc10fb3448feab5f97c867 (patch) | |
tree | af37402a594dc5ff09f71ecb0687e8cfe4cdb471 /server/lib/jobs | |
parent | 343ad675f2a26c15b86150a9a3552e619d5d44f4 (diff) | |
download | PeerTube-e4f97babf701481b55cc10fb3448feab5f97c867.tar.gz PeerTube-e4f97babf701481b55cc10fb3448feab5f97c867.tar.zst PeerTube-e4f97babf701481b55cc10fb3448feab5f97c867.zip |
Begin activitypub
Diffstat (limited to 'server/lib/jobs')
-rw-r--r-- | server/lib/jobs/handlers/index.ts | 17 | ||||
-rw-r--r-- | server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts | 25 | ||||
-rw-r--r-- | server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts | 17 | ||||
-rw-r--r-- | server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts | 25 | ||||
-rw-r--r-- | server/lib/jobs/http-request-job-scheduler/index.ts | 1 | ||||
-rw-r--r-- | server/lib/jobs/index.ts | 3 | ||||
-rw-r--r-- | server/lib/jobs/job-scheduler.ts | 35 | ||||
-rw-r--r-- | server/lib/jobs/transcoding-job-scheduler/index.ts | 1 | ||||
-rw-r--r-- | server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts | 17 | ||||
-rw-r--r-- | server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts (renamed from server/lib/jobs/handlers/video-file-optimizer.ts) | 0 | ||||
-rw-r--r-- | server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts (renamed from server/lib/jobs/handlers/video-file-transcoder.ts) | 0 |
11 files changed, 107 insertions, 34 deletions
diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts deleted file mode 100644 index cef1f89a9..000000000 --- a/server/lib/jobs/handlers/index.ts +++ /dev/null | |||
@@ -1,17 +0,0 @@ | |||
1 | import * as videoFileOptimizer from './video-file-optimizer' | ||
2 | import * as videoFileTranscoder from './video-file-transcoder' | ||
3 | |||
4 | export interface JobHandler<T> { | ||
5 | process (data: object, jobId: number): T | ||
6 | onError (err: Error, jobId: number) | ||
7 | onSuccess (jobId: number, jobResult: T) | ||
8 | } | ||
9 | |||
10 | const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = { | ||
11 | videoFileOptimizer, | ||
12 | videoFileTranscoder | ||
13 | } | ||
14 | |||
15 | export { | ||
16 | jobHandlers | ||
17 | } | ||
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts new file mode 100644 index 000000000..6b6946d02 --- /dev/null +++ b/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts | |||
@@ -0,0 +1,25 @@ | |||
1 | import * as Bluebird from 'bluebird' | ||
2 | |||
3 | import { database as db } from '../../../initializers/database' | ||
4 | import { logger } from '../../../helpers' | ||
5 | |||
6 | async function process (data: { videoUUID: string }, jobId: number) { | ||
7 | |||
8 | } | ||
9 | |||
10 | function onError (err: Error, jobId: number) { | ||
11 | logger.error('Error when optimized video file in job %d.', jobId, err) | ||
12 | return Promise.resolve() | ||
13 | } | ||
14 | |||
15 | async function onSuccess (jobId: number) { | ||
16 | |||
17 | } | ||
18 | |||
19 | // --------------------------------------------------------------------------- | ||
20 | |||
21 | export { | ||
22 | process, | ||
23 | onError, | ||
24 | onSuccess | ||
25 | } | ||
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts new file mode 100644 index 000000000..42cb9139c --- /dev/null +++ b/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts | |||
@@ -0,0 +1,17 @@ | |||
1 | import { JobScheduler, JobHandler } from '../job-scheduler' | ||
2 | |||
3 | import * as httpRequestBroadcastHandler from './http-request-broadcast-handler' | ||
4 | import * as httpRequestUnicastHandler from './http-request-unicast-handler' | ||
5 | import { JobCategory } from '../../../../shared' | ||
6 | |||
7 | const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = { | ||
8 | httpRequestBroadcastHandler, | ||
9 | httpRequestUnicastHandler | ||
10 | } | ||
11 | const jobCategory: JobCategory = 'http-request' | ||
12 | |||
13 | const httpRequestJobScheduler = new JobScheduler(jobCategory, jobHandlers) | ||
14 | |||
15 | export { | ||
16 | httpRequestJobScheduler | ||
17 | } | ||
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts new file mode 100644 index 000000000..6b6946d02 --- /dev/null +++ b/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts | |||
@@ -0,0 +1,25 @@ | |||
1 | import * as Bluebird from 'bluebird' | ||
2 | |||
3 | import { database as db } from '../../../initializers/database' | ||
4 | import { logger } from '../../../helpers' | ||
5 | |||
6 | async function process (data: { videoUUID: string }, jobId: number) { | ||
7 | |||
8 | } | ||
9 | |||
10 | function onError (err: Error, jobId: number) { | ||
11 | logger.error('Error when optimized video file in job %d.', jobId, err) | ||
12 | return Promise.resolve() | ||
13 | } | ||
14 | |||
15 | async function onSuccess (jobId: number) { | ||
16 | |||
17 | } | ||
18 | |||
19 | // --------------------------------------------------------------------------- | ||
20 | |||
21 | export { | ||
22 | process, | ||
23 | onError, | ||
24 | onSuccess | ||
25 | } | ||
diff --git a/server/lib/jobs/http-request-job-scheduler/index.ts b/server/lib/jobs/http-request-job-scheduler/index.ts new file mode 100644 index 000000000..4d2573296 --- /dev/null +++ b/server/lib/jobs/http-request-job-scheduler/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './http-request-job-scheduler' | |||
diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts index b18a3d845..a92743707 100644 --- a/server/lib/jobs/index.ts +++ b/server/lib/jobs/index.ts | |||
@@ -1 +1,2 @@ | |||
1 | export * from './job-scheduler' | 1 | export * from './http-request-job-scheduler' |
2 | export * from './transcoding-job-scheduler' | ||
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts index 61d483268..89a4bca88 100644 --- a/server/lib/jobs/job-scheduler.ts +++ b/server/lib/jobs/job-scheduler.ts | |||
@@ -1,39 +1,41 @@ | |||
1 | import { AsyncQueue, forever, queue } from 'async' | 1 | import { AsyncQueue, forever, queue } from 'async' |
2 | import * as Sequelize from 'sequelize' | 2 | import * as Sequelize from 'sequelize' |
3 | 3 | ||
4 | import { database as db } from '../../initializers/database' | ||
5 | import { | 4 | import { |
5 | database as db, | ||
6 | JOBS_FETCHING_INTERVAL, | 6 | JOBS_FETCHING_INTERVAL, |
7 | JOBS_FETCH_LIMIT_PER_CYCLE, | 7 | JOBS_FETCH_LIMIT_PER_CYCLE, |
8 | JOB_STATES | 8 | JOB_STATES |
9 | } from '../../initializers' | 9 | } from '../../initializers' |
10 | import { logger } from '../../helpers' | 10 | import { logger } from '../../helpers' |
11 | import { JobInstance } from '../../models' | 11 | import { JobInstance } from '../../models' |
12 | import { JobHandler, jobHandlers } from './handlers' | 12 | import { JobCategory } from '../../../shared' |
13 | 13 | ||
14 | export interface JobHandler<T> { | ||
15 | process (data: object, jobId: number): T | ||
16 | onError (err: Error, jobId: number) | ||
17 | onSuccess (jobId: number, jobResult: T) | ||
18 | } | ||
14 | type JobQueueCallback = (err: Error) => void | 19 | type JobQueueCallback = (err: Error) => void |
15 | 20 | ||
16 | class JobScheduler { | 21 | class JobScheduler<T> { |
17 | |||
18 | private static instance: JobScheduler | ||
19 | 22 | ||
20 | private constructor () { } | 23 | constructor ( |
21 | 24 | private jobCategory: JobCategory, | |
22 | static get Instance () { | 25 | private jobHandlers: { [ id: string ]: JobHandler<T> } |
23 | return this.instance || (this.instance = new this()) | 26 | ) {} |
24 | } | ||
25 | 27 | ||
26 | async activate () { | 28 | async activate () { |
27 | const limit = JOBS_FETCH_LIMIT_PER_CYCLE | 29 | const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory] |
28 | 30 | ||
29 | logger.info('Jobs scheduler activated.') | 31 | logger.info('Jobs scheduler %s activated.', this.jobCategory) |
30 | 32 | ||
31 | const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) | 33 | const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) |
32 | 34 | ||
33 | // Finish processing jobs from a previous start | 35 | // Finish processing jobs from a previous start |
34 | const state = JOB_STATES.PROCESSING | 36 | const state = JOB_STATES.PROCESSING |
35 | try { | 37 | try { |
36 | const jobs = await db.Job.listWithLimit(limit, state) | 38 | const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) |
37 | 39 | ||
38 | this.enqueueJobs(jobsQueue, jobs) | 40 | this.enqueueJobs(jobsQueue, jobs) |
39 | } catch (err) { | 41 | } catch (err) { |
@@ -49,7 +51,7 @@ class JobScheduler { | |||
49 | 51 | ||
50 | const state = JOB_STATES.PENDING | 52 | const state = JOB_STATES.PENDING |
51 | try { | 53 | try { |
52 | const jobs = await db.Job.listWithLimit(limit, state) | 54 | const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory) |
53 | 55 | ||
54 | this.enqueueJobs(jobsQueue, jobs) | 56 | this.enqueueJobs(jobsQueue, jobs) |
55 | } catch (err) { | 57 | } catch (err) { |
@@ -64,9 +66,10 @@ class JobScheduler { | |||
64 | ) | 66 | ) |
65 | } | 67 | } |
66 | 68 | ||
67 | createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { | 69 | createJob (transaction: Sequelize.Transaction, category: JobCategory, handlerName: string, handlerInputData: object) { |
68 | const createQuery = { | 70 | const createQuery = { |
69 | state: JOB_STATES.PENDING, | 71 | state: JOB_STATES.PENDING, |
72 | category, | ||
70 | handlerName, | 73 | handlerName, |
71 | handlerInputData | 74 | handlerInputData |
72 | } | 75 | } |
@@ -80,7 +83,7 @@ class JobScheduler { | |||
80 | } | 83 | } |
81 | 84 | ||
82 | private async processJob (job: JobInstance, callback: (err: Error) => void) { | 85 | private async processJob (job: JobInstance, callback: (err: Error) => void) { |
83 | const jobHandler = jobHandlers[job.handlerName] | 86 | const jobHandler = this.jobHandlers[job.handlerName] |
84 | if (jobHandler === undefined) { | 87 | if (jobHandler === undefined) { |
85 | logger.error('Unknown job handler for job %s.', job.handlerName) | 88 | logger.error('Unknown job handler for job %s.', job.handlerName) |
86 | return callback(null) | 89 | return callback(null) |
diff --git a/server/lib/jobs/transcoding-job-scheduler/index.ts b/server/lib/jobs/transcoding-job-scheduler/index.ts new file mode 100644 index 000000000..73152a1be --- /dev/null +++ b/server/lib/jobs/transcoding-job-scheduler/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './transcoding-job-scheduler' | |||
diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts new file mode 100644 index 000000000..d7c614fb8 --- /dev/null +++ b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts | |||
@@ -0,0 +1,17 @@ | |||
1 | import { JobScheduler, JobHandler } from '../job-scheduler' | ||
2 | |||
3 | import * as videoFileOptimizer from './video-file-optimizer-handler' | ||
4 | import * as videoFileTranscoder from './video-file-transcoder-handler' | ||
5 | import { JobCategory } from '../../../../shared' | ||
6 | |||
7 | const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = { | ||
8 | videoFileOptimizer, | ||
9 | videoFileTranscoder | ||
10 | } | ||
11 | const jobCategory: JobCategory = 'transcoding' | ||
12 | |||
13 | const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers) | ||
14 | |||
15 | export { | ||
16 | transcodingJobScheduler | ||
17 | } | ||
diff --git a/server/lib/jobs/handlers/video-file-optimizer.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts index ccded4721..ccded4721 100644 --- a/server/lib/jobs/handlers/video-file-optimizer.ts +++ b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts | |||
diff --git a/server/lib/jobs/handlers/video-file-transcoder.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts index 853645510..853645510 100644 --- a/server/lib/jobs/handlers/video-file-transcoder.ts +++ b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts | |||