aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-11-09 17:51:58 +0100
committerChocobozzz <florian.bigard@gmail.com>2017-11-27 19:40:51 +0100
commite4f97babf701481b55cc10fb3448feab5f97c867 (patch)
treeaf37402a594dc5ff09f71ecb0687e8cfe4cdb471 /server/lib/jobs
parent343ad675f2a26c15b86150a9a3552e619d5d44f4 (diff)
downloadPeerTube-e4f97babf701481b55cc10fb3448feab5f97c867.tar.gz
PeerTube-e4f97babf701481b55cc10fb3448feab5f97c867.tar.zst
PeerTube-e4f97babf701481b55cc10fb3448feab5f97c867.zip
Begin activitypub
Diffstat (limited to 'server/lib/jobs')
-rw-r--r--server/lib/jobs/handlers/index.ts17
-rw-r--r--server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts25
-rw-r--r--server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts17
-rw-r--r--server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts25
-rw-r--r--server/lib/jobs/http-request-job-scheduler/index.ts1
-rw-r--r--server/lib/jobs/index.ts3
-rw-r--r--server/lib/jobs/job-scheduler.ts35
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/index.ts1
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts17
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts (renamed from server/lib/jobs/handlers/video-file-optimizer.ts)0
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts (renamed from server/lib/jobs/handlers/video-file-transcoder.ts)0
11 files changed, 107 insertions, 34 deletions
diff --git a/server/lib/jobs/handlers/index.ts b/server/lib/jobs/handlers/index.ts
deleted file mode 100644
index cef1f89a9..000000000
--- a/server/lib/jobs/handlers/index.ts
+++ /dev/null
@@ -1,17 +0,0 @@
1import * as videoFileOptimizer from './video-file-optimizer'
2import * as videoFileTranscoder from './video-file-transcoder'
3
4export interface JobHandler<T> {
5 process (data: object, jobId: number): T
6 onError (err: Error, jobId: number)
7 onSuccess (jobId: number, jobResult: T)
8}
9
10const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = {
11 videoFileOptimizer,
12 videoFileTranscoder
13}
14
15export {
16 jobHandlers
17}
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts
new file mode 100644
index 000000000..6b6946d02
--- /dev/null
+++ b/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts
@@ -0,0 +1,25 @@
1import * as Bluebird from 'bluebird'
2
3import { database as db } from '../../../initializers/database'
4import { logger } from '../../../helpers'
5
6async function process (data: { videoUUID: string }, jobId: number) {
7
8}
9
10function onError (err: Error, jobId: number) {
11 logger.error('Error when optimized video file in job %d.', jobId, err)
12 return Promise.resolve()
13}
14
15async function onSuccess (jobId: number) {
16
17}
18
19// ---------------------------------------------------------------------------
20
21export {
22 process,
23 onError,
24 onSuccess
25}
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts
new file mode 100644
index 000000000..42cb9139c
--- /dev/null
+++ b/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts
@@ -0,0 +1,17 @@
1import { JobScheduler, JobHandler } from '../job-scheduler'
2
3import * as httpRequestBroadcastHandler from './http-request-broadcast-handler'
4import * as httpRequestUnicastHandler from './http-request-unicast-handler'
5import { JobCategory } from '../../../../shared'
6
7const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = {
8 httpRequestBroadcastHandler,
9 httpRequestUnicastHandler
10}
11const jobCategory: JobCategory = 'http-request'
12
13const httpRequestJobScheduler = new JobScheduler(jobCategory, jobHandlers)
14
15export {
16 httpRequestJobScheduler
17}
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts
new file mode 100644
index 000000000..6b6946d02
--- /dev/null
+++ b/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts
@@ -0,0 +1,25 @@
1import * as Bluebird from 'bluebird'
2
3import { database as db } from '../../../initializers/database'
4import { logger } from '../../../helpers'
5
6async function process (data: { videoUUID: string }, jobId: number) {
7
8}
9
10function onError (err: Error, jobId: number) {
11 logger.error('Error when optimized video file in job %d.', jobId, err)
12 return Promise.resolve()
13}
14
15async function onSuccess (jobId: number) {
16
17}
18
19// ---------------------------------------------------------------------------
20
21export {
22 process,
23 onError,
24 onSuccess
25}
diff --git a/server/lib/jobs/http-request-job-scheduler/index.ts b/server/lib/jobs/http-request-job-scheduler/index.ts
new file mode 100644
index 000000000..4d2573296
--- /dev/null
+++ b/server/lib/jobs/http-request-job-scheduler/index.ts
@@ -0,0 +1 @@
export * from './http-request-job-scheduler'
diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts
index b18a3d845..a92743707 100644
--- a/server/lib/jobs/index.ts
+++ b/server/lib/jobs/index.ts
@@ -1 +1,2 @@
1export * from './job-scheduler' 1export * from './http-request-job-scheduler'
2export * from './transcoding-job-scheduler'
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
index 61d483268..89a4bca88 100644
--- a/server/lib/jobs/job-scheduler.ts
+++ b/server/lib/jobs/job-scheduler.ts
@@ -1,39 +1,41 @@
1import { AsyncQueue, forever, queue } from 'async' 1import { AsyncQueue, forever, queue } from 'async'
2import * as Sequelize from 'sequelize' 2import * as Sequelize from 'sequelize'
3 3
4import { database as db } from '../../initializers/database'
5import { 4import {
5 database as db,
6 JOBS_FETCHING_INTERVAL, 6 JOBS_FETCHING_INTERVAL,
7 JOBS_FETCH_LIMIT_PER_CYCLE, 7 JOBS_FETCH_LIMIT_PER_CYCLE,
8 JOB_STATES 8 JOB_STATES
9} from '../../initializers' 9} from '../../initializers'
10import { logger } from '../../helpers' 10import { logger } from '../../helpers'
11import { JobInstance } from '../../models' 11import { JobInstance } from '../../models'
12import { JobHandler, jobHandlers } from './handlers' 12import { JobCategory } from '../../../shared'
13 13
14export interface JobHandler<T> {
15 process (data: object, jobId: number): T
16 onError (err: Error, jobId: number)
17 onSuccess (jobId: number, jobResult: T)
18}
14type JobQueueCallback = (err: Error) => void 19type JobQueueCallback = (err: Error) => void
15 20
16class JobScheduler { 21class JobScheduler<T> {
17
18 private static instance: JobScheduler
19 22
20 private constructor () { } 23 constructor (
21 24 private jobCategory: JobCategory,
22 static get Instance () { 25 private jobHandlers: { [ id: string ]: JobHandler<T> }
23 return this.instance || (this.instance = new this()) 26 ) {}
24 }
25 27
26 async activate () { 28 async activate () {
27 const limit = JOBS_FETCH_LIMIT_PER_CYCLE 29 const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory]
28 30
29 logger.info('Jobs scheduler activated.') 31 logger.info('Jobs scheduler %s activated.', this.jobCategory)
30 32
31 const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this)) 33 const jobsQueue = queue<JobInstance, JobQueueCallback>(this.processJob.bind(this))
32 34
33 // Finish processing jobs from a previous start 35 // Finish processing jobs from a previous start
34 const state = JOB_STATES.PROCESSING 36 const state = JOB_STATES.PROCESSING
35 try { 37 try {
36 const jobs = await db.Job.listWithLimit(limit, state) 38 const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory)
37 39
38 this.enqueueJobs(jobsQueue, jobs) 40 this.enqueueJobs(jobsQueue, jobs)
39 } catch (err) { 41 } catch (err) {
@@ -49,7 +51,7 @@ class JobScheduler {
49 51
50 const state = JOB_STATES.PENDING 52 const state = JOB_STATES.PENDING
51 try { 53 try {
52 const jobs = await db.Job.listWithLimit(limit, state) 54 const jobs = await db.Job.listWithLimitByCategory(limit, state, this.jobCategory)
53 55
54 this.enqueueJobs(jobsQueue, jobs) 56 this.enqueueJobs(jobsQueue, jobs)
55 } catch (err) { 57 } catch (err) {
@@ -64,9 +66,10 @@ class JobScheduler {
64 ) 66 )
65 } 67 }
66 68
67 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: object) { 69 createJob (transaction: Sequelize.Transaction, category: JobCategory, handlerName: string, handlerInputData: object) {
68 const createQuery = { 70 const createQuery = {
69 state: JOB_STATES.PENDING, 71 state: JOB_STATES.PENDING,
72 category,
70 handlerName, 73 handlerName,
71 handlerInputData 74 handlerInputData
72 } 75 }
@@ -80,7 +83,7 @@ class JobScheduler {
80 } 83 }
81 84
82 private async processJob (job: JobInstance, callback: (err: Error) => void) { 85 private async processJob (job: JobInstance, callback: (err: Error) => void) {
83 const jobHandler = jobHandlers[job.handlerName] 86 const jobHandler = this.jobHandlers[job.handlerName]
84 if (jobHandler === undefined) { 87 if (jobHandler === undefined) {
85 logger.error('Unknown job handler for job %s.', job.handlerName) 88 logger.error('Unknown job handler for job %s.', job.handlerName)
86 return callback(null) 89 return callback(null)
diff --git a/server/lib/jobs/transcoding-job-scheduler/index.ts b/server/lib/jobs/transcoding-job-scheduler/index.ts
new file mode 100644
index 000000000..73152a1be
--- /dev/null
+++ b/server/lib/jobs/transcoding-job-scheduler/index.ts
@@ -0,0 +1 @@
export * from './transcoding-job-scheduler'
diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
new file mode 100644
index 000000000..d7c614fb8
--- /dev/null
+++ b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
@@ -0,0 +1,17 @@
1import { JobScheduler, JobHandler } from '../job-scheduler'
2
3import * as videoFileOptimizer from './video-file-optimizer-handler'
4import * as videoFileTranscoder from './video-file-transcoder-handler'
5import { JobCategory } from '../../../../shared'
6
7const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = {
8 videoFileOptimizer,
9 videoFileTranscoder
10}
11const jobCategory: JobCategory = 'transcoding'
12
13const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers)
14
15export {
16 transcodingJobScheduler
17}
diff --git a/server/lib/jobs/handlers/video-file-optimizer.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts
index ccded4721..ccded4721 100644
--- a/server/lib/jobs/handlers/video-file-optimizer.ts
+++ b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts
diff --git a/server/lib/jobs/handlers/video-file-transcoder.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
index 853645510..853645510 100644
--- a/server/lib/jobs/handlers/video-file-transcoder.ts
+++ b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts