aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs
diff options
context:
space:
mode:
Diffstat (limited to 'server/lib/jobs')
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts53
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts68
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts94
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts50
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/index.ts1
-rw-r--r--server/lib/jobs/index.ts2
-rw-r--r--server/lib/jobs/job-scheduler.ts144
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/index.ts1
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts23
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts90
-rw-r--r--server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts48
11 files changed, 0 insertions, 574 deletions
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts
deleted file mode 100644
index 3f780e319..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts
+++ /dev/null
@@ -1,53 +0,0 @@
1import { logger } from '../../../helpers/logger'
2import { doRequest } from '../../../helpers/requests'
3import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
4import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler'
5
6async function process (payload: ActivityPubHttpPayload, jobId: number) {
7 logger.info('Processing ActivityPub broadcast in job %d.', jobId)
8
9 const body = await computeBody(payload)
10 const httpSignatureOptions = await buildSignedRequestOptions(payload)
11
12 const options = {
13 method: 'POST',
14 uri: '',
15 json: body,
16 httpSignature: httpSignatureOptions
17 }
18
19 const badUrls: string[] = []
20 const goodUrls: string[] = []
21
22 for (const uri of payload.uris) {
23 options.uri = uri
24
25 try {
26 await doRequest(options)
27 goodUrls.push(uri)
28 } catch (err) {
29 const isRetryingLater = await maybeRetryRequestLater(err, payload, uri)
30 if (isRetryingLater === false) badUrls.push(uri)
31 }
32 }
33
34 return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined)
35}
36
37function onError (err: Error, jobId: number) {
38 logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err)
39 return Promise.resolve()
40}
41
42function onSuccess (jobId: number) {
43 logger.info('Job %d is a success.', jobId)
44 return Promise.resolve()
45}
46
47// ---------------------------------------------------------------------------
48
49export {
50 process,
51 onError,
52 onSuccess
53}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts
deleted file mode 100644
index a7b5aabd0..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts
+++ /dev/null
@@ -1,68 +0,0 @@
1import { logger } from '../../../helpers/logger'
2import { doRequest } from '../../../helpers/requests'
3import { ACTIVITY_PUB } from '../../../initializers'
4import { processActivities } from '../../activitypub/process'
5import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler'
6
7async function process (payload: ActivityPubHttpPayload, jobId: number) {
8 logger.info('Processing ActivityPub fetcher in job %d.', jobId)
9
10 const options = {
11 method: 'GET',
12 uri: '',
13 json: true,
14 activityPub: true
15 }
16
17 for (const uri of payload.uris) {
18 options.uri = uri
19 logger.info('Fetching ActivityPub data on %s.', uri)
20
21 const response = await doRequest(options)
22 const firstBody = response.body
23
24 if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) {
25 const activities = firstBody.first.orderedItems
26
27 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
28
29 await processActivities(activities)
30 }
31
32 let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT
33 let i = 0
34 let nextLink = firstBody.first.next
35 while (nextLink && i < limit) {
36 options.uri = nextLink
37
38 const { body } = await doRequest(options)
39 nextLink = body.next
40 i++
41
42 if (Array.isArray(body.orderedItems)) {
43 const activities = body.orderedItems
44 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
45
46 await processActivities(activities)
47 }
48 }
49 }
50}
51
52function onError (err: Error, jobId: number) {
53 logger.error('Error when fetcher ActivityPub request in job %d.', jobId, err)
54 return Promise.resolve()
55}
56
57function onSuccess (jobId: number) {
58 logger.info('Job %d is a success.', jobId)
59 return Promise.resolve()
60}
61
62// ---------------------------------------------------------------------------
63
64export {
65 process,
66 onError,
67 onSuccess
68}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
deleted file mode 100644
index 4459152db..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
+++ /dev/null
@@ -1,94 +0,0 @@
1import { JobCategory } from '../../../../shared'
2import { buildSignedActivity } from '../../../helpers/activitypub'
3import { logger } from '../../../helpers/logger'
4import { getServerActor } from '../../../helpers/utils'
5import { ACTIVITY_PUB } from '../../../initializers'
6import { ActorModel } from '../../../models/activitypub/actor'
7import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
8import { JobHandler, JobScheduler } from '../job-scheduler'
9
10import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler'
11import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler'
12import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler'
13
14type ActivityPubHttpPayload = {
15 uris: string[]
16 signatureActorId?: number
17 body?: any
18 attemptNumber?: number
19}
20
21const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = {
22 activitypubHttpBroadcastHandler,
23 activitypubHttpUnicastHandler,
24 activitypubHttpFetcherHandler
25}
26const jobCategory: JobCategory = 'activitypub-http'
27
28const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers)
29
30async function maybeRetryRequestLater (err: Error, payload: ActivityPubHttpPayload, uri: string) {
31 logger.warn('Cannot make request to %s.', uri, err)
32
33 let attemptNumber = payload.attemptNumber || 1
34 attemptNumber += 1
35
36 if (attemptNumber < ACTIVITY_PUB.MAX_HTTP_ATTEMPT) {
37 logger.debug('Retrying request to %s (attempt %d/%d).', uri, attemptNumber, ACTIVITY_PUB.MAX_HTTP_ATTEMPT, err)
38
39 const actor = await ActorFollowModel.loadByFollowerInbox(uri, undefined)
40 if (!actor) {
41 logger.debug('Actor %s is not a follower, do not retry the request.', uri)
42 return false
43 }
44
45 const newPayload = Object.assign(payload, {
46 uris: [ uri ],
47 attemptNumber
48 })
49 await activitypubHttpJobScheduler.createJob(undefined, 'activitypubHttpUnicastHandler', newPayload)
50
51 return true
52 }
53
54 return false
55}
56
57async function computeBody (payload: ActivityPubHttpPayload) {
58 let body = payload.body
59
60 if (payload.signatureActorId) {
61 const actorSignature = await ActorModel.load(payload.signatureActorId)
62 if (!actorSignature) throw new Error('Unknown signature actor id.')
63 body = await buildSignedActivity(actorSignature, payload.body)
64 }
65
66 return body
67}
68
69async function buildSignedRequestOptions (payload: ActivityPubHttpPayload) {
70 let actor: ActorModel
71 if (payload.signatureActorId) {
72 actor = await ActorModel.load(payload.signatureActorId)
73 if (!actor) throw new Error('Unknown signature actor id.')
74 } else {
75 // We need to sign the request, so use the server
76 actor = await getServerActor()
77 }
78
79 const keyId = actor.getWebfingerUrl()
80 return {
81 algorithm: 'rsa-sha256',
82 authorizationHeaderName: 'Signature',
83 keyId,
84 key: actor.privateKey
85 }
86}
87
88export {
89 ActivityPubHttpPayload,
90 activitypubHttpJobScheduler,
91 maybeRetryRequestLater,
92 computeBody,
93 buildSignedRequestOptions
94}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
deleted file mode 100644
index 54a7504e8..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
+++ /dev/null
@@ -1,50 +0,0 @@
1import { logger } from '../../../helpers/logger'
2import { doRequest } from '../../../helpers/requests'
3import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
4import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler'
5
6async function process (payload: ActivityPubHttpPayload, jobId: number) {
7 logger.info('Processing ActivityPub unicast in job %d.', jobId)
8
9 const uri = payload.uris[0]
10
11 const body = await computeBody(payload)
12 const httpSignatureOptions = await buildSignedRequestOptions(payload)
13
14 const options = {
15 method: 'POST',
16 uri,
17 json: body,
18 httpSignature: httpSignatureOptions
19 }
20
21 try {
22 await doRequest(options)
23 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined)
24 } catch (err) {
25 const isRetryingLater = await maybeRetryRequestLater(err, payload, uri)
26 if (isRetryingLater === false) {
27 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined)
28 }
29
30 throw err
31 }
32}
33
34function onError (err: Error, jobId: number) {
35 logger.error('Error when sending ActivityPub request in job %d.', jobId, err)
36 return Promise.resolve()
37}
38
39function onSuccess (jobId: number) {
40 logger.info('Job %d is a success.', jobId)
41 return Promise.resolve()
42}
43
44// ---------------------------------------------------------------------------
45
46export {
47 process,
48 onError,
49 onSuccess
50}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/index.ts b/server/lib/jobs/activitypub-http-job-scheduler/index.ts
deleted file mode 100644
index ad8f527b4..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/index.ts
+++ /dev/null
@@ -1 +0,0 @@
1export * from './activitypub-http-job-scheduler'
diff --git a/server/lib/jobs/index.ts b/server/lib/jobs/index.ts
deleted file mode 100644
index 394264ec1..000000000
--- a/server/lib/jobs/index.ts
+++ /dev/null
@@ -1,2 +0,0 @@
1export * from './activitypub-http-job-scheduler'
2export * from './transcoding-job-scheduler'
diff --git a/server/lib/jobs/job-scheduler.ts b/server/lib/jobs/job-scheduler.ts
deleted file mode 100644
index 9d55880e6..000000000
--- a/server/lib/jobs/job-scheduler.ts
+++ /dev/null
@@ -1,144 +0,0 @@
1import { AsyncQueue, forever, queue } from 'async'
2import * as Sequelize from 'sequelize'
3import { JobCategory } from '../../../shared'
4import { logger } from '../../helpers/logger'
5import { JOB_STATES, JOBS_FETCH_LIMIT_PER_CYCLE, JOBS_FETCHING_INTERVAL } from '../../initializers'
6import { JobModel } from '../../models/job/job'
7
8export interface JobHandler<P, T> {
9 process (data: object, jobId: number): Promise<T>
10 onError (err: Error, jobId: number)
11 onSuccess (jobId: number, jobResult: T, jobScheduler: JobScheduler<P, T>): Promise<any>
12}
13type JobQueueCallback = (err: Error) => void
14
15class JobScheduler<P, T> {
16
17 constructor (
18 private jobCategory: JobCategory,
19 private jobHandlers: { [ id: string ]: JobHandler<P, T> }
20 ) {}
21
22 async activate () {
23 const limit = JOBS_FETCH_LIMIT_PER_CYCLE[this.jobCategory]
24
25 logger.info('Jobs scheduler %s activated.', this.jobCategory)
26
27 const jobsQueue = queue<JobModel, JobQueueCallback>(this.processJob.bind(this))
28
29 // Finish processing jobs from a previous start
30 const state = JOB_STATES.PROCESSING
31 try {
32 const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)
33
34 this.enqueueJobs(jobsQueue, jobs)
35 } catch (err) {
36 logger.error('Cannot list pending jobs.', err)
37 }
38
39 forever(
40 async next => {
41 if (jobsQueue.length() !== 0) {
42 // Finish processing the queue first
43 return setTimeout(next, JOBS_FETCHING_INTERVAL)
44 }
45
46 const state = JOB_STATES.PENDING
47 try {
48 const jobs = await JobModel.listWithLimitByCategory(limit, state, this.jobCategory)
49
50 this.enqueueJobs(jobsQueue, jobs)
51 } catch (err) {
52 logger.error('Cannot list pending jobs.', err)
53 }
54
55 // Optimization: we could use "drain" from queue object
56 return setTimeout(next, JOBS_FETCHING_INTERVAL)
57 },
58
59 err => logger.error('Error in job scheduler queue.', err)
60 )
61 }
62
63 createJob (transaction: Sequelize.Transaction, handlerName: string, handlerInputData: P) {
64 const createQuery = {
65 state: JOB_STATES.PENDING,
66 category: this.jobCategory,
67 handlerName,
68 handlerInputData
69 }
70
71 const options = { transaction }
72
73 return JobModel.create(createQuery, options)
74 }
75
76 private enqueueJobs (jobsQueue: AsyncQueue<JobModel>, jobs: JobModel[]) {
77 jobs.forEach(job => jobsQueue.push(job))
78 }
79
80 private async processJob (job: JobModel, callback: (err: Error) => void) {
81 const jobHandler = this.jobHandlers[job.handlerName]
82 if (jobHandler === undefined) {
83 const errorString = 'Unknown job handler ' + job.handlerName + ' for job ' + job.id
84 logger.error(errorString)
85
86 const error = new Error(errorString)
87 await this.onJobError(jobHandler, job, error)
88 return callback(error)
89 }
90
91 logger.info('Processing job %d with handler %s.', job.id, job.handlerName)
92
93 job.state = JOB_STATES.PROCESSING
94 await job.save()
95
96 try {
97 const result: T = await jobHandler.process(job.handlerInputData, job.id)
98 await this.onJobSuccess(jobHandler, job, result)
99 } catch (err) {
100 logger.error('Error in job handler %s.', job.handlerName, err)
101
102 try {
103 await this.onJobError(jobHandler, job, err)
104 } catch (innerErr) {
105 this.cannotSaveJobError(innerErr)
106 return callback(innerErr)
107 }
108 }
109
110 return callback(null)
111 }
112
113 private async onJobError (jobHandler: JobHandler<P, T>, job: JobModel, err: Error) {
114 job.state = JOB_STATES.ERROR
115
116 try {
117 await job.save()
118 if (jobHandler) await jobHandler.onError(err, job.id)
119 } catch (err) {
120 this.cannotSaveJobError(err)
121 }
122 }
123
124 private async onJobSuccess (jobHandler: JobHandler<P, T>, job: JobModel, jobResult: T) {
125 job.state = JOB_STATES.SUCCESS
126
127 try {
128 await job.save()
129 await jobHandler.onSuccess(job.id, jobResult, this)
130 } catch (err) {
131 this.cannotSaveJobError(err)
132 }
133 }
134
135 private cannotSaveJobError (err: Error) {
136 logger.error('Cannot save new job state.', err)
137 }
138}
139
140// ---------------------------------------------------------------------------
141
142export {
143 JobScheduler
144}
diff --git a/server/lib/jobs/transcoding-job-scheduler/index.ts b/server/lib/jobs/transcoding-job-scheduler/index.ts
deleted file mode 100644
index 73152a1be..000000000
--- a/server/lib/jobs/transcoding-job-scheduler/index.ts
+++ /dev/null
@@ -1 +0,0 @@
1export * from './transcoding-job-scheduler'
diff --git a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts b/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
deleted file mode 100644
index e5530a73c..000000000
--- a/server/lib/jobs/transcoding-job-scheduler/transcoding-job-scheduler.ts
+++ /dev/null
@@ -1,23 +0,0 @@
1import { JobCategory } from '../../../../shared'
2import { VideoModel } from '../../../models/video/video'
3import { JobHandler, JobScheduler } from '../job-scheduler'
4
5import * as videoFileOptimizer from './video-file-optimizer-handler'
6import * as videoFileTranscoder from './video-file-transcoder-handler'
7
8type TranscodingJobPayload = {
9 videoUUID: string
10 resolution?: number
11}
12const jobHandlers: { [ handlerName: string ]: JobHandler<TranscodingJobPayload, VideoModel> } = {
13 videoFileOptimizer,
14 videoFileTranscoder
15}
16const jobCategory: JobCategory = 'transcoding'
17
18const transcodingJobScheduler = new JobScheduler(jobCategory, jobHandlers)
19
20export {
21 TranscodingJobPayload,
22 transcodingJobScheduler
23}
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts
deleted file mode 100644
index f224a31b4..000000000
--- a/server/lib/jobs/transcoding-job-scheduler/video-file-optimizer-handler.ts
+++ /dev/null
@@ -1,90 +0,0 @@
1import * as Bluebird from 'bluebird'
2import { VideoPrivacy } from '../../../../shared/models/videos'
3import { logger } from '../../../helpers/logger'
4import { computeResolutionsToTranscode } from '../../../helpers/utils'
5import { sequelizeTypescript } from '../../../initializers'
6import { JobModel } from '../../../models/job/job'
7import { VideoModel } from '../../../models/video/video'
8import { shareVideoByServerAndChannel } from '../../activitypub'
9import { sendCreateVideo } from '../../activitypub/send'
10import { JobScheduler } from '../job-scheduler'
11import { TranscodingJobPayload } from './transcoding-job-scheduler'
12
13async function process (data: TranscodingJobPayload, jobId: number) {
14 const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID)
15 // No video, maybe deleted?
16 if (!video) {
17 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
18 return undefined
19 }
20
21 await video.optimizeOriginalVideofile()
22
23 return video
24}
25
26function onError (err: Error, jobId: number) {
27 logger.error('Error when optimized video file in job %d.', jobId, err)
28 return Promise.resolve()
29}
30
31async function onSuccess (jobId: number, video: VideoModel, jobScheduler: JobScheduler<TranscodingJobPayload, VideoModel>) {
32 if (video === undefined) return undefined
33
34 logger.info('Job %d is a success.', jobId)
35
36 // Maybe the video changed in database, refresh it
37 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
38 // Video does not exist anymore
39 if (!videoDatabase) return undefined
40
41 if (video.privacy !== VideoPrivacy.PRIVATE) {
42 // Now we'll add the video's meta data to our followers
43 await sendCreateVideo(video, undefined)
44 await shareVideoByServerAndChannel(video, undefined)
45 }
46
47 const originalFileHeight = await videoDatabase.getOriginalFileHeight()
48
49 // Create transcoding jobs if there are enabled resolutions
50 const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight)
51 logger.info(
52 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, originalFileHeight,
53 { resolutions: resolutionsEnabled }
54 )
55
56 if (resolutionsEnabled.length !== 0) {
57 try {
58 await sequelizeTypescript.transaction(async t => {
59 const tasks: Bluebird<JobModel>[] = []
60
61 for (const resolution of resolutionsEnabled) {
62 const dataInput = {
63 videoUUID: videoDatabase.uuid,
64 resolution
65 }
66
67 const p = jobScheduler.createJob(t, 'videoFileTranscoder', dataInput)
68 tasks.push(p)
69 }
70
71 await Promise.all(tasks)
72 })
73
74 logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
75 } catch (err) {
76 logger.warn('Cannot transcode the video.', err)
77 }
78 } else {
79 logger.info('No transcoding jobs created for video %s (no resolutions enabled).')
80 return undefined
81 }
82}
83
84// ---------------------------------------------------------------------------
85
86export {
87 process,
88 onError,
89 onSuccess
90}
diff --git a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts b/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
deleted file mode 100644
index 883d3eba8..000000000
--- a/server/lib/jobs/transcoding-job-scheduler/video-file-transcoder-handler.ts
+++ /dev/null
@@ -1,48 +0,0 @@
1import { VideoResolution } from '../../../../shared'
2import { VideoPrivacy } from '../../../../shared/models/videos'
3import { logger } from '../../../helpers/logger'
4import { VideoModel } from '../../../models/video/video'
5import { sendUpdateVideo } from '../../activitypub/send'
6
7async function process (data: { videoUUID: string, resolution: VideoResolution }, jobId: number) {
8 const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(data.videoUUID)
9 // No video, maybe deleted?
10 if (!video) {
11 logger.info('Do not process job %d, video does not exist.', jobId, { videoUUID: video.uuid })
12 return undefined
13 }
14
15 await video.transcodeOriginalVideofile(data.resolution)
16
17 return video
18}
19
20function onError (err: Error, jobId: number) {
21 logger.error('Error when transcoding video file in job %d.', jobId, err)
22 return Promise.resolve()
23}
24
25async function onSuccess (jobId: number, video: VideoModel) {
26 if (video === undefined) return undefined
27
28 logger.info('Job %d is a success.', jobId)
29
30 // Maybe the video changed in database, refresh it
31 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
32 // Video does not exist anymore
33 if (!videoDatabase) return undefined
34
35 if (video.privacy !== VideoPrivacy.PRIVATE) {
36 await sendUpdateVideo(video, undefined)
37 }
38
39 return undefined
40}
41
42// ---------------------------------------------------------------------------
43
44export {
45 process,
46 onError,
47 onSuccess
48}