aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/jobs')
-rw-r--r--server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts21
-rw-r--r--server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts7
-rw-r--r--server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts19
-rw-r--r--server/lib/jobs/job-scheduler.ts33
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts13
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts17
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts11
7 files changed, 69 insertions, 52 deletions
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts
index 6b6946d02..799b86e1c 100644
--- a/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts
+++ b/server/lib/jobs/http-request-job-scheduler/http-request-broadcast-handler.ts
@@ -1,19 +1,28 @@
1import * as Bluebird from 'bluebird'
2
3import { database as db } from '../../../initializers/database'
4import { logger } from '../../../helpers' 1import { logger } from '../../../helpers'
2import { doRequest } from '../../../helpers/requests'
3import { HTTPRequestPayload } from './http-request-job-scheduler'
4
5async function process (payload: HTTPRequestPayload, jobId: number) {
6 logger.info('Processing broadcast in job %d.', jobId)
5 7
6async function process (data: { videoUUID: string }, jobId: number) { 8 const options = {
9 uri: '',
10 json: payload.body
11 }
7 12
13 for (const uri of payload.uris) {
14 options.uri = uri
15 await doRequest(options)
16 }
8} 17}
9 18
10function onError (err: Error, jobId: number) { 19function onError (err: Error, jobId: number) {
11 logger.error('Error when optimized video file in job %d.', jobId, err) 20 logger.error('Error when broadcasting request in job %d.', jobId, err)
12 return Promise.resolve() 21 return Promise.resolve()
13} 22}
14 23
15async function onSuccess (jobId: number) { 24async function onSuccess (jobId: number) {
16 25 logger.info('Job %d is a success.', jobId)
17} 26}
18 27
19// --------------------------------------------------------------------------- 28// ---------------------------------------------------------------------------
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts
index 42cb9139c..ad3349866 100644
--- a/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts
+++ b/server/lib/jobs/http-request-job-scheduler/http-request-job-scheduler.ts
@@ -4,7 +4,11 @@ import * as httpRequestBroadcastHandler from './http-request-broadcast-handler'
4import * as httpRequestUnicastHandler from './http-request-unicast-handler' 4import * as httpRequestUnicastHandler from './http-request-unicast-handler'
5import { JobCategory } from '../../../../shared' 5import { JobCategory } from '../../../../shared'
6 6
7const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = { 7type HTTPRequestPayload = {
8 uris: string[]
9 body: any
10}
11const jobHandlers: { [ handlerName: string ]: JobHandler<HTTPRequestPayload, void> } = {
8 httpRequestBroadcastHandler, 12 httpRequestBroadcastHandler,
9 httpRequestUnicastHandler 13 httpRequestUnicastHandler
10} 14}
@@ -13,5 +17,6 @@ const jobCategory: JobCategory = 'http-request'
13const httpRequestJobScheduler = new JobScheduler(jobCategory, jobHandlers) 17const httpRequestJobScheduler = new JobScheduler(jobCategory, jobHandlers)
14 18
15export { 19export {
20 HTTPRequestPayload,
16 httpRequestJobScheduler 21 httpRequestJobScheduler
17} 22}
diff --git a/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts b/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts
index 6b6946d02..13451f042 100644
--- a/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts
+++ b/server/lib/jobs/http-request-job-scheduler/http-request-unicast-handler.ts
@@ -1,19 +1,26 @@
1import * as Bluebird from 'bluebird'
2
3import { database as db } from '../../../initializers/database'
4import { logger } from '../../../helpers' 1import { logger } from '../../../helpers'
2import { doRequest } from '../../../helpers/requests'
3import { HTTPRequestPayload } from './http-request-job-scheduler'
4
5async function process (payload: HTTPRequestPayload, jobId: number) {
6 logger.info('Processing unicast in job %d.', jobId)
5 7
6async function process (data: { videoUUID: string }, jobId: number) { 8 const uri = payload.uris[0]
9 const options = {
10 uri,
11 json: payload.body
12 }
7 13
14 await doRequest(options)
8} 15}
9 16
10function onError (err: Error, jobId: number) { 17function onError (err: Error, jobId: number) {
11 logger.error('Error when optimized video file in job %d.', jobId, err) 18 logger.error('Error when sending request in job %d.', jobId, err)
12 return Promise.resolve() 19 return Promise.resolve()
13} 20}
14 21
15async function onSuccess (jobId: number) { 22async function onSuccess (jobId: number) {
16 23 logger.info('Job %d is a success.', jobId)
17} 24}
18 25
19// --------------------------------------------------------------------------- 26// ---------------------------------------------------------------------------
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
index 89a4bca88..f10f745b3 100644
--- a/server/lib/jobs/job-scheduler.ts
+++ b/server/lib/jobs/job-scheduler.ts
@@ -1,28 +1,22 @@
1import { AsyncQueue, forever, queue } from 'async' 1import { AsyncQueue, forever, queue } from 'async'
2import * as Sequelize from 'sequelize' 2import * as Sequelize from 'sequelize'
3 3import { JobCategory } from '../../../shared'
4import {
5 database as db,
6 JOBS_FETCHING_INTERVAL,
7 JOBS_FETCH_LIMIT_PER_CYCLE,
8 JOB_STATES
9} from '../../initializers'
10import { logger } from '../../helpers' 4import { logger } from '../../helpers'
5import { database as db, JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers'
11import { JobInstance } from '../../models' 6import { JobInstance } from '../../models'
12import { JobCategory } from '../../../shared'
13 7
14export interface JobHandler<T> { 8export interface JobHandler<P, T> {
15 process (data: object, jobId: number): T 9 process (data: object, jobId: number): Promise<T>
16 onError (err: Error, jobId: number) 10 onError (err: Error, jobId: number)
17 onSuccess (jobId: number, jobResult: T) 11 onSuccess (jobId: number, jobResult: T, jobScheduler: JobScheduler<P, T>)
18} 12}
19type JobQueueCallback = (err: Error) => void 13type JobQueueCallback = (err: Error) => void
20 14
21class JobScheduler<T> { 15class JobScheduler<P, T> {
22 16
23 constructor ( 17 constructor (
24 private jobCategory: JobCategory, 18 private jobCategory: JobCategory,
25 private jobHandlers: { [ id: string ]: JobHandler<T> } 19 private jobHandlers: { [ id: string ]: JobHandler<P, T> }
26 ) {} 20 ) {}
27 21
28 async activate () { 22 async activate () {
@@ -66,13 +60,14 @@ class JobScheduler<T> {
66 ) 60 )
67 } 61 }
68 62
69 createJob (transaction: Sequelize.Transaction, category: JobCategory, handlerName: string, handlerInputData: object) { 63 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: P) {
70 const createQuery = { 64 const createQuery = {
71 state: JOB_STATES.PENDING, 65 state: JOB_STATES.PENDING,
72 category, 66 category: this.jobCategory,
73 handlerName, 67 handlerName,
74 handlerInputData 68 handlerInputData
75 } 69 }
70
76 const options = { transaction } 71 const options = { transaction }
77 72
78 return db.Job.create(createQuery, options) 73 return db.Job.create(createQuery, options)
@@ -95,7 +90,7 @@ class JobScheduler<T> {
95 await job.save() 90 await job.save()
96 91
97 try { 92 try {
98 const result = await jobHandler.process(job.handlerInputData, job.id) 93 const result: T = await jobHandler.process(job.handlerInputData, job.id)
99 await this.onJobSuccess(jobHandler, job, result) 94 await this.onJobSuccess(jobHandler, job, result)
100 } catch (err) { 95 } catch (err) {
101 logger.error('Error in job handler %s.', job.handlerName, err) 96 logger.error('Error in job handler %s.', job.handlerName, err)
@@ -111,7 +106,7 @@ class JobScheduler<T> {
111 callback(null) 106 callback(null)
112 } 107 }
113 108
114 private async onJobError (jobHandler: JobHandler<any>, job: JobInstance, err: Error) { 109 private async onJobError (jobHandler: JobHandler<P, T>, job: JobInstance, err: Error) {
115 job.state = JOB_STATES.ERROR 110 job.state = JOB_STATES.ERROR
116 111
117 try { 112 try {
@@ -122,12 +117,12 @@ class JobScheduler<T> {
122 } 117 }
123 } 118 }
124 119
125 private async onJobSuccess (jobHandler: JobHandler<any>, job: JobInstance, jobResult: any) { 120 private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobInstance, jobResult: T) {
126 job.state = JOB_STATES.SUCCESS 121 job.state = JOB_STATES.SUCCESS
127 122
128 try { 123 try {
129 await job.save() 124 await job.save()
130 jobHandler.onSuccess(job.id, jobResult) 125 jobHandler.onSuccess(job.id, jobResult, this)
131 } catch (err) { 126 } catch (err) {
132 this.cannotSaveJobError(err) 127 this.cannotSaveJobError(err)
133 } 128 }
diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
index d7c614fb8..c5efe8eeb 100644
--- a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
+++ b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
@@ -1,10 +1,14 @@
1import { JobScheduler, JobHandler } from '../job-scheduler' 1import { JobCategory } from '../../../../shared'
2 2import { JobHandler, JobScheduler } from '../job-scheduler'
3import * as videoFileOptimizer from './video-file-optimizer-handler' 3import * as videoFileOptimizer from './video-file-optimizer-handler'
4import * as videoFileTranscoder from './video-file-transcoder-handler' 4import * as videoFileTranscoder from './video-file-transcoder-handler'
5import { JobCategory } from '../../../../shared' 5import { VideoInstance } from '../../../models/video/video-interface'
6 6
7const jobHandlers: { [ handlerName: string ]: JobHandler<any> } = { 7type TranscodingJobPayload = {
8 videoUUID: string
9 resolution?: number
10}
11const jobHandlers: { [ handlerName: string ]: JobHandler<TranscodingJobPayload, VideoInstance> } = {
8 videoFileOptimizer, 12 videoFileOptimizer,
9 videoFileTranscoder 13 videoFileTranscoder
10} 14}
@@ -13,5 +17,6 @@ const jobCategory: JobCategory = 'transcoding'
13const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers) 17const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers)
14 18
15export { 19export {
20 TranscodingJobPayload,
16 transcodingJobScheduler 21 transcodingJobScheduler
17} 22}
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts
index f019c28bc..47603a66c 100644
--- a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts
+++ b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts
@@ -1,12 +1,13 @@
1import * as Bluebird from 'bluebird' 1import * as Bluebird from 'bluebird'
2import { computeResolutionsToTranscode, logger } from '../../../helpers'
2 3
3import { database as db } from '../../../initializers/database' 4import { database as db } from '../../../initializers/database'
4import { logger, computeResolutionsToTranscode } from '../../../helpers'
5import { VideoInstance } from '../../../models' 5import { VideoInstance } from '../../../models'
6import { addVideoToFriends } from '../../friends' 6import { sendAddVideo } from '../../activitypub/send-request'
7import { JobScheduler } from '../job-scheduler' 7import { JobScheduler } from '../job-scheduler'
8import { TranscodingJobPayload } from './transcoding-job-scheduler'
8 9
9async function process (data: { videoUUID: string }, jobId: number) { 10async function process (data: TranscodingJobPayload, jobId: number) {
10 const video = await db.Video.loadByUUIDAndPopulateAccountAndPodAndTags(data.videoUUID) 11 const video = await db.Video.loadByUUIDAndPopulateAccountAndPodAndTags(data.videoUUID)
11 // No video, maybe deleted? 12 // No video, maybe deleted?
12 if (!video) { 13 if (!video) {
@@ -24,7 +25,7 @@ function onError (err: Error, jobId: number) {
24 return Promise.resolve() 25 return Promise.resolve()
25} 26}
26 27
27async function onSuccess (jobId: number, video: VideoInstance) { 28async function onSuccess (jobId: number, video: VideoInstance, jobScheduler: JobScheduler<TranscodingJobPayload, VideoInstance>) {
28 if (video === undefined) return undefined 29 if (video === undefined) return undefined
29 30
30 logger.info('Job %d is a success.', jobId) 31 logger.info('Job %d is a success.', jobId)
@@ -34,10 +35,8 @@ async function onSuccess (jobId: number, video: VideoInstance) {
34 // Video does not exist anymore 35 // Video does not exist anymore
35 if (!videoDatabase) return undefined 36 if (!videoDatabase) return undefined
36 37
37 const remoteVideo = await videoDatabase.toAddRemoteJSON() 38 // Now we'll add the video's meta data to our followers
38 39 await sendAddVideo(video, undefined)
39 // Now we'll add the video's meta data to our friends
40 await addVideoToFriends(remoteVideo, null)
41 40
42 const originalFileHeight = await videoDatabase.getOriginalFileHeight() 41 const originalFileHeight = await videoDatabase.getOriginalFileHeight()
43 // Create transcoding jobs if there are enabled resolutions 42 // Create transcoding jobs if there are enabled resolutions
@@ -59,7 +58,7 @@ async function onSuccess (jobId: number, video: VideoInstance) {
59 resolution 58 resolution
60 } 59 }
61 60
62 const p = JobScheduler.Instance.createJob(t, 'videoFileTranscoder', dataInput) 61 const p = jobScheduler.createJob(t, 'videoFileTranscoder', dataInput)
63 tasks.push(p) 62 tasks.push(p)
64 } 63 }
65 64
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
index 397b95795..77e5d9f7f 100644
--- a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
+++ b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
@@ -1,8 +1,8 @@
1import { database as db } from '../../../initializers/database' 1import { VideoResolution } from '../../../../shared'
2import { updateVideoToFriends } from '../../friends'
3import { logger } from '../../../helpers' 2import { logger } from '../../../helpers'
3import { database as db } from '../../../initializers/database'
4import { VideoInstance } from '../../../models' 4import { VideoInstance } from '../../../models'
5import { VideoResolution } from '../../../../shared' 5import { sendUpdateVideo } from '../../activitypub/send-request'
6 6
7async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) { 7async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) {
8 const video = await db.Video.loadByUUIDAndPopulateAccountAndPodAndTags(data.videoUUID) 8 const video = await db.Video.loadByUUIDAndPopulateAccountAndPodAndTags(data.videoUUID)
@@ -32,10 +32,7 @@ async function onSuccess (jobId: number, video: VideoInstance) {
32 // Video does not exist anymore 32 // Video does not exist anymore
33 if (!videoDatabase) return undefined 33 if (!videoDatabase) return undefined
34 34
35 const remoteVideo = videoDatabase.toUpdateRemoteJSON() 35 await sendUpdateVideo(video, undefined)
36
37 // Now we'll add the video's meta data to our friends
38 await updateVideoToFriends(remoteVideo, null)
39 36
40 return undefined 37 return undefined
41} 38}