aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-01-25 15:05:18 +0100
committerChocobozzz <me@florianbigard.com>2018-01-25 18:41:17 +0100
commit94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4 (patch)
tree32a9148e0e4567f0c4ffae0412cbed20b84e8873 /server/lib/job-queue
parentd765fafc3faf0db9818eb1a07161df1cb1bc0efa (diff)
downloadPeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.tar.gz
PeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.tar.zst
PeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.zip
Move job queue to redis
We'll use it as cache in the future. /!\ You'll loose your old jobs (pending jobs too) so upgrade only when you don't have pending job anymore.
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts49
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts63
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts43
-rw-r--r--server/lib/job-queue/handlers/utils/activitypub-http-utils.ts39
-rw-r--r--server/lib/job-queue/handlers/video-file.ts110
-rw-r--r--server/lib/job-queue/index.ts1
-rw-r--r--server/lib/job-queue/job-queue.ts124
7 files changed, 429 insertions, 0 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
new file mode 100644
index 000000000..159856cda
--- /dev/null
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -0,0 +1,49 @@
1import * as kue from 'kue'
2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
5import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
6
7export type ActivitypubHttpBroadcastPayload = {
8 uris: string[]
9 signatureActorId?: number
10 body: any
11}
12
13async function processActivityPubHttpBroadcast (job: kue.Job) {
14 logger.info('Processing ActivityPub broadcast in job %d.', job.id)
15
16 const payload = job.data as ActivitypubHttpBroadcastPayload
17
18 const body = await computeBody(payload)
19 const httpSignatureOptions = await buildSignedRequestOptions(payload)
20
21 const options = {
22 method: 'POST',
23 uri: '',
24 json: body,
25 httpSignature: httpSignatureOptions
26 }
27
28 const badUrls: string[] = []
29 const goodUrls: string[] = []
30
31 for (const uri of payload.uris) {
32 options.uri = uri
33
34 try {
35 await doRequest(options)
36 goodUrls.push(uri)
37 } catch (err) {
38 badUrls.push(uri)
39 }
40 }
41
42 return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined)
43}
44
45// ---------------------------------------------------------------------------
46
47export {
48 processActivityPubHttpBroadcast
49}
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
new file mode 100644
index 000000000..062211c85
--- /dev/null
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -0,0 +1,63 @@
1import * as kue from 'kue'
2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests'
4import { ACTIVITY_PUB } from '../../../initializers'
5import { processActivities } from '../../activitypub/process'
6import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
7
8export type ActivitypubHttpFetcherPayload = {
9 uris: string[]
10}
11
12async function processActivityPubHttpFetcher (job: kue.Job) {
13 logger.info('Processing ActivityPub fetcher in job %d.', job.id)
14
15 const payload = job.data as ActivitypubHttpBroadcastPayload
16
17 const options = {
18 method: 'GET',
19 uri: '',
20 json: true,
21 activityPub: true
22 }
23
24 for (const uri of payload.uris) {
25 options.uri = uri
26 logger.info('Fetching ActivityPub data on %s.', uri)
27
28 const response = await doRequest(options)
29 const firstBody = response.body
30
31 if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) {
32 const activities = firstBody.first.orderedItems
33
34 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
35
36 await processActivities(activities)
37 }
38
39 let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT
40 let i = 0
41 let nextLink = firstBody.first.next
42 while (nextLink && i < limit) {
43 options.uri = nextLink
44
45 const { body } = await doRequest(options)
46 nextLink = body.next
47 i++
48
49 if (Array.isArray(body.orderedItems)) {
50 const activities = body.orderedItems
51 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
52
53 await processActivities(activities)
54 }
55 }
56 }
57}
58
59// ---------------------------------------------------------------------------
60
61export {
62 processActivityPubHttpFetcher
63}
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
new file mode 100644
index 000000000..9b4188c50
--- /dev/null
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -0,0 +1,43 @@
1import * as kue from 'kue'
2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
5import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
6
7export type ActivitypubHttpUnicastPayload = {
8 uri: string
9 signatureActorId?: number
10 body: any
11}
12
13async function processActivityPubHttpUnicast (job: kue.Job) {
14 logger.info('Processing ActivityPub unicast in job %d.', job.id)
15
16 const payload = job.data as ActivitypubHttpUnicastPayload
17 const uri = payload.uri
18
19 const body = await computeBody(payload)
20 const httpSignatureOptions = await buildSignedRequestOptions(payload)
21
22 const options = {
23 method: 'POST',
24 uri,
25 json: body,
26 httpSignature: httpSignatureOptions
27 }
28
29 try {
30 await doRequest(options)
31 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined)
32 } catch (err) {
33 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined)
34
35 throw err
36 }
37}
38
39// ---------------------------------------------------------------------------
40
41export {
42 processActivityPubHttpUnicast
43}
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
new file mode 100644
index 000000000..c087371c6
--- /dev/null
+++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts
@@ -0,0 +1,39 @@
1import { buildSignedActivity } from '../../../../helpers/activitypub'
2import { getServerActor } from '../../../../helpers/utils'
3import { ActorModel } from '../../../../models/activitypub/actor'
4
5async function computeBody (payload: { body: any, signatureActorId?: number }) {
6 let body = payload.body
7
8 if (payload.signatureActorId) {
9 const actorSignature = await ActorModel.load(payload.signatureActorId)
10 if (!actorSignature) throw new Error('Unknown signature actor id.')
11 body = await buildSignedActivity(actorSignature, payload.body)
12 }
13
14 return body
15}
16
17async function buildSignedRequestOptions (payload: { signatureActorId?: number }) {
18 let actor: ActorModel
19 if (payload.signatureActorId) {
20 actor = await ActorModel.load(payload.signatureActorId)
21 if (!actor) throw new Error('Unknown signature actor id.')
22 } else {
23 // We need to sign the request, so use the server
24 actor = await getServerActor()
25 }
26
27 const keyId = actor.getWebfingerUrl()
28 return {
29 algorithm: 'rsa-sha256',
30 authorizationHeaderName: 'Signature',
31 keyId,
32 key: actor.privateKey
33 }
34}
35
36export {
37 computeBody,
38 buildSignedRequestOptions
39}
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts
new file mode 100644
index 000000000..5294483bd
--- /dev/null
+++ b/server/lib/job-queue/handlers/video-file.ts
@@ -0,0 +1,110 @@
1import * as kue from 'kue'
2import { VideoResolution } from '../../../../shared'
3import { VideoPrivacy } from '../../../../shared/models/videos'
4import { logger } from '../../../helpers/logger'
5import { computeResolutionsToTranscode } from '../../../helpers/utils'
6import { sequelizeTypescript } from '../../../initializers'
7import { VideoModel } from '../../../models/video/video'
8import { shareVideoByServerAndChannel } from '../../activitypub'
9import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send'
10import { JobQueue } from '../job-queue'
11
12export type VideoFilePayload = {
13 videoUUID: string
14 resolution?: VideoResolution
15}
16
17async function processVideoFile (job: kue.Job) {
18 const payload = job.data as VideoFilePayload
19 logger.info('Processing video file in job %d.', job.id)
20
21 const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(payload.videoUUID)
22 // No video, maybe deleted?
23 if (!video) {
24 logger.info('Do not process job %d, video does not exist.', job.id, { videoUUID: video.uuid })
25 return undefined
26 }
27
28 // Transcoding in other resolution
29 if (payload.resolution) {
30 await video.transcodeOriginalVideofile(payload.resolution)
31 await onVideoFileTranscoderSuccess(video)
32 } else {
33 await video.optimizeOriginalVideofile()
34 await onVideoFileOptimizerSuccess(video)
35 }
36
37 return video
38}
39
40async function onVideoFileTranscoderSuccess (video: VideoModel) {
41 if (video === undefined) return undefined
42
43 // Maybe the video changed in database, refresh it
44 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
45 // Video does not exist anymore
46 if (!videoDatabase) return undefined
47
48 if (video.privacy !== VideoPrivacy.PRIVATE) {
49 await sendUpdateVideo(video, undefined)
50 }
51
52 return undefined
53}
54
55async function onVideoFileOptimizerSuccess (video: VideoModel) {
56 if (video === undefined) return undefined
57
58 // Maybe the video changed in database, refresh it
59 const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid)
60 // Video does not exist anymore
61 if (!videoDatabase) return undefined
62
63 if (video.privacy !== VideoPrivacy.PRIVATE) {
64 // Now we'll add the video's meta data to our followers
65 await sendCreateVideo(video, undefined)
66 await shareVideoByServerAndChannel(video, undefined)
67 }
68
69 const originalFileHeight = await videoDatabase.getOriginalFileHeight()
70
71 // Create transcoding jobs if there are enabled resolutions
72 const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight)
73 logger.info(
74 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, originalFileHeight,
75 { resolutions: resolutionsEnabled }
76 )
77
78 if (resolutionsEnabled.length !== 0) {
79 try {
80 await sequelizeTypescript.transaction(async t => {
81 const tasks: Promise<any>[] = []
82
83 for (const resolution of resolutionsEnabled) {
84 const dataInput = {
85 videoUUID: videoDatabase.uuid,
86 resolution
87 }
88
89 const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput })
90 tasks.push(p)
91 }
92
93 await Promise.all(tasks)
94 })
95
96 logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled })
97 } catch (err) {
98 logger.warn('Cannot transcode the video.', err)
99 }
100 } else {
101 logger.info('No transcoding jobs created for video %s (no resolutions enabled).')
102 return undefined
103 }
104}
105
106// ---------------------------------------------------------------------------
107
108export {
109 processVideoFile
110}
diff --git a/server/lib/job-queue/index.ts b/server/lib/job-queue/index.ts
new file mode 100644
index 000000000..57231e649
--- /dev/null
+++ b/server/lib/job-queue/index.ts
@@ -0,0 +1 @@
export * from './job-queue'
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
new file mode 100644
index 000000000..7a2b6c78d
--- /dev/null
+++ b/server/lib/job-queue/job-queue.ts
@@ -0,0 +1,124 @@
1import * as kue from 'kue'
2import { JobType, JobState } from '../../../shared/models'
3import { logger } from '../../helpers/logger'
4import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers'
5import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
6import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
7import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
8import { processVideoFile, VideoFilePayload } from './handlers/video-file'
9
10type CreateJobArgument =
11 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
12 { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
13 { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
14 { type: 'video-file', payload: VideoFilePayload }
15
16const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = {
17 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
18 'activitypub-http-unicast': processActivityPubHttpUnicast,
19 'activitypub-http-fetcher': processActivityPubHttpFetcher,
20 'video-file': processVideoFile
21}
22
23class JobQueue {
24
25 private static instance: JobQueue
26
27 private jobQueue: kue.Queue
28 private initialized = false
29
30 private constructor () {}
31
32 init () {
33 // Already initialized
34 if (this.initialized === true) return
35 this.initialized = true
36
37 this.jobQueue = kue.createQueue({
38 prefix: 'q-' + CONFIG.WEBSERVER.HOST,
39 redis: {
40 host: CONFIG.REDIS.HOSTNAME,
41 port: CONFIG.REDIS.PORT,
42 auth: CONFIG.REDIS.AUTH
43 }
44 })
45
46 this.jobQueue.on('error', err => {
47 logger.error('Error in job queue.', err)
48 process.exit(-1)
49 })
50 this.jobQueue.watchStuckJobs(5000)
51
52 for (const handlerName of Object.keys(handlers)) {
53 this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => {
54 try {
55 const res = await handlers[ handlerName ](job)
56 return done(null, res)
57 } catch (err) {
58 return done(err)
59 }
60 })
61 }
62 }
63
64 createJob (obj: CreateJobArgument, priority = 'normal') {
65 return new Promise((res, rej) => {
66 this.jobQueue
67 .create(obj.type, obj.payload)
68 .priority(priority)
69 .attempts(JOB_ATTEMPTS[obj.type])
70 .backoff({ type: 'exponential' })
71 .save(err => {
72 if (err) return rej(err)
73
74 return res()
75 })
76 })
77 }
78
79 listForApi (state: JobState, start: number, count: number, sort: string) {
80 return new Promise<kue.Job[]>((res, rej) => {
81 kue.Job.rangeByState(state, start, count, sort, (err, jobs) => {
82 if (err) return rej(err)
83
84 return res(jobs)
85 })
86 })
87 }
88
89 count (state: JobState) {
90 return new Promise<number>((res, rej) => {
91 this.jobQueue[state + 'Count']((err, total) => {
92 if (err) return rej(err)
93
94 return res(total)
95 })
96 })
97 }
98
99 removeOldJobs () {
100 const now = new Date().getTime()
101 kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => {
102 if (err) {
103 logger.error('Cannot get jobs when removing old jobs.', err)
104 return
105 }
106
107 for (const job of jobs) {
108 if (now - job.created_at > JOB_COMPLETED_LIFETIME) {
109 job.remove()
110 }
111 }
112 })
113 }
114
115 static get Instance () {
116 return this.instance || (this.instance = new this())
117 }
118}
119
120// ---------------------------------------------------------------------------
121
122export {
123 JobQueue
124}