diff options
author | Chocobozzz <me@florianbigard.com> | 2018-01-25 15:05:18 +0100 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2018-01-25 18:41:17 +0100 |
commit | 94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4 (patch) | |
tree | 32a9148e0e4567f0c4ffae0412cbed20b84e8873 /server/lib/job-queue | |
parent | d765fafc3faf0db9818eb1a07161df1cb1bc0efa (diff) | |
download | PeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.tar.gz PeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.tar.zst PeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.zip |
Move job queue to redis
We'll use it as cache in the future.
/!\ You'll loose your old jobs (pending jobs too) so upgrade only when
you don't have pending job anymore.
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-broadcast.ts | 49 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-fetcher.ts | 63 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-unicast.ts | 43 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/utils/activitypub-http-utils.ts | 39 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/video-file.ts | 110 | ||||
-rw-r--r-- | server/lib/job-queue/index.ts | 1 | ||||
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 124 |
7 files changed, 429 insertions, 0 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts new file mode 100644 index 000000000..159856cda --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -0,0 +1,49 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { doRequest } from '../../../helpers/requests' | ||
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | ||
5 | import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | ||
6 | |||
7 | export type ActivitypubHttpBroadcastPayload = { | ||
8 | uris: string[] | ||
9 | signatureActorId?: number | ||
10 | body: any | ||
11 | } | ||
12 | |||
13 | async function processActivityPubHttpBroadcast (job: kue.Job) { | ||
14 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) | ||
15 | |||
16 | const payload = job.data as ActivitypubHttpBroadcastPayload | ||
17 | |||
18 | const body = await computeBody(payload) | ||
19 | const httpSignatureOptions = await buildSignedRequestOptions(payload) | ||
20 | |||
21 | const options = { | ||
22 | method: 'POST', | ||
23 | uri: '', | ||
24 | json: body, | ||
25 | httpSignature: httpSignatureOptions | ||
26 | } | ||
27 | |||
28 | const badUrls: string[] = [] | ||
29 | const goodUrls: string[] = [] | ||
30 | |||
31 | for (const uri of payload.uris) { | ||
32 | options.uri = uri | ||
33 | |||
34 | try { | ||
35 | await doRequest(options) | ||
36 | goodUrls.push(uri) | ||
37 | } catch (err) { | ||
38 | badUrls.push(uri) | ||
39 | } | ||
40 | } | ||
41 | |||
42 | return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined) | ||
43 | } | ||
44 | |||
45 | // --------------------------------------------------------------------------- | ||
46 | |||
47 | export { | ||
48 | processActivityPubHttpBroadcast | ||
49 | } | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts new file mode 100644 index 000000000..062211c85 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -0,0 +1,63 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { doRequest } from '../../../helpers/requests' | ||
4 | import { ACTIVITY_PUB } from '../../../initializers' | ||
5 | import { processActivities } from '../../activitypub/process' | ||
6 | import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' | ||
7 | |||
8 | export type ActivitypubHttpFetcherPayload = { | ||
9 | uris: string[] | ||
10 | } | ||
11 | |||
12 | async function processActivityPubHttpFetcher (job: kue.Job) { | ||
13 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) | ||
14 | |||
15 | const payload = job.data as ActivitypubHttpBroadcastPayload | ||
16 | |||
17 | const options = { | ||
18 | method: 'GET', | ||
19 | uri: '', | ||
20 | json: true, | ||
21 | activityPub: true | ||
22 | } | ||
23 | |||
24 | for (const uri of payload.uris) { | ||
25 | options.uri = uri | ||
26 | logger.info('Fetching ActivityPub data on %s.', uri) | ||
27 | |||
28 | const response = await doRequest(options) | ||
29 | const firstBody = response.body | ||
30 | |||
31 | if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) { | ||
32 | const activities = firstBody.first.orderedItems | ||
33 | |||
34 | logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) | ||
35 | |||
36 | await processActivities(activities) | ||
37 | } | ||
38 | |||
39 | let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT | ||
40 | let i = 0 | ||
41 | let nextLink = firstBody.first.next | ||
42 | while (nextLink && i < limit) { | ||
43 | options.uri = nextLink | ||
44 | |||
45 | const { body } = await doRequest(options) | ||
46 | nextLink = body.next | ||
47 | i++ | ||
48 | |||
49 | if (Array.isArray(body.orderedItems)) { | ||
50 | const activities = body.orderedItems | ||
51 | logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) | ||
52 | |||
53 | await processActivities(activities) | ||
54 | } | ||
55 | } | ||
56 | } | ||
57 | } | ||
58 | |||
59 | // --------------------------------------------------------------------------- | ||
60 | |||
61 | export { | ||
62 | processActivityPubHttpFetcher | ||
63 | } | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts new file mode 100644 index 000000000..9b4188c50 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -0,0 +1,43 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { doRequest } from '../../../helpers/requests' | ||
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | ||
5 | import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | ||
6 | |||
7 | export type ActivitypubHttpUnicastPayload = { | ||
8 | uri: string | ||
9 | signatureActorId?: number | ||
10 | body: any | ||
11 | } | ||
12 | |||
13 | async function processActivityPubHttpUnicast (job: kue.Job) { | ||
14 | logger.info('Processing ActivityPub unicast in job %d.', job.id) | ||
15 | |||
16 | const payload = job.data as ActivitypubHttpUnicastPayload | ||
17 | const uri = payload.uri | ||
18 | |||
19 | const body = await computeBody(payload) | ||
20 | const httpSignatureOptions = await buildSignedRequestOptions(payload) | ||
21 | |||
22 | const options = { | ||
23 | method: 'POST', | ||
24 | uri, | ||
25 | json: body, | ||
26 | httpSignature: httpSignatureOptions | ||
27 | } | ||
28 | |||
29 | try { | ||
30 | await doRequest(options) | ||
31 | ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined) | ||
32 | } catch (err) { | ||
33 | ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined) | ||
34 | |||
35 | throw err | ||
36 | } | ||
37 | } | ||
38 | |||
39 | // --------------------------------------------------------------------------- | ||
40 | |||
41 | export { | ||
42 | processActivityPubHttpUnicast | ||
43 | } | ||
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts new file mode 100644 index 000000000..c087371c6 --- /dev/null +++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts | |||
@@ -0,0 +1,39 @@ | |||
1 | import { buildSignedActivity } from '../../../../helpers/activitypub' | ||
2 | import { getServerActor } from '../../../../helpers/utils' | ||
3 | import { ActorModel } from '../../../../models/activitypub/actor' | ||
4 | |||
5 | async function computeBody (payload: { body: any, signatureActorId?: number }) { | ||
6 | let body = payload.body | ||
7 | |||
8 | if (payload.signatureActorId) { | ||
9 | const actorSignature = await ActorModel.load(payload.signatureActorId) | ||
10 | if (!actorSignature) throw new Error('Unknown signature actor id.') | ||
11 | body = await buildSignedActivity(actorSignature, payload.body) | ||
12 | } | ||
13 | |||
14 | return body | ||
15 | } | ||
16 | |||
17 | async function buildSignedRequestOptions (payload: { signatureActorId?: number }) { | ||
18 | let actor: ActorModel | ||
19 | if (payload.signatureActorId) { | ||
20 | actor = await ActorModel.load(payload.signatureActorId) | ||
21 | if (!actor) throw new Error('Unknown signature actor id.') | ||
22 | } else { | ||
23 | // We need to sign the request, so use the server | ||
24 | actor = await getServerActor() | ||
25 | } | ||
26 | |||
27 | const keyId = actor.getWebfingerUrl() | ||
28 | return { | ||
29 | algorithm: 'rsa-sha256', | ||
30 | authorizationHeaderName: 'Signature', | ||
31 | keyId, | ||
32 | key: actor.privateKey | ||
33 | } | ||
34 | } | ||
35 | |||
36 | export { | ||
37 | computeBody, | ||
38 | buildSignedRequestOptions | ||
39 | } | ||
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts new file mode 100644 index 000000000..5294483bd --- /dev/null +++ b/server/lib/job-queue/handlers/video-file.ts | |||
@@ -0,0 +1,110 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { VideoResolution } from '../../../../shared' | ||
3 | import { VideoPrivacy } from '../../../../shared/models/videos' | ||
4 | import { logger } from '../../../helpers/logger' | ||
5 | import { computeResolutionsToTranscode } from '../../../helpers/utils' | ||
6 | import { sequelizeTypescript } from '../../../initializers' | ||
7 | import { VideoModel } from '../../../models/video/video' | ||
8 | import { shareVideoByServerAndChannel } from '../../activitypub' | ||
9 | import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send' | ||
10 | import { JobQueue } from '../job-queue' | ||
11 | |||
12 | export type VideoFilePayload = { | ||
13 | videoUUID: string | ||
14 | resolution?: VideoResolution | ||
15 | } | ||
16 | |||
17 | async function processVideoFile (job: kue.Job) { | ||
18 | const payload = job.data as VideoFilePayload | ||
19 | logger.info('Processing video file in job %d.', job.id) | ||
20 | |||
21 | const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(payload.videoUUID) | ||
22 | // No video, maybe deleted? | ||
23 | if (!video) { | ||
24 | logger.info('Do not process job %d, video does not exist.', job.id, { videoUUID: video.uuid }) | ||
25 | return undefined | ||
26 | } | ||
27 | |||
28 | // Transcoding in other resolution | ||
29 | if (payload.resolution) { | ||
30 | await video.transcodeOriginalVideofile(payload.resolution) | ||
31 | await onVideoFileTranscoderSuccess(video) | ||
32 | } else { | ||
33 | await video.optimizeOriginalVideofile() | ||
34 | await onVideoFileOptimizerSuccess(video) | ||
35 | } | ||
36 | |||
37 | return video | ||
38 | } | ||
39 | |||
40 | async function onVideoFileTranscoderSuccess (video: VideoModel) { | ||
41 | if (video === undefined) return undefined | ||
42 | |||
43 | // Maybe the video changed in database, refresh it | ||
44 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | ||
45 | // Video does not exist anymore | ||
46 | if (!videoDatabase) return undefined | ||
47 | |||
48 | if (video.privacy !== VideoPrivacy.PRIVATE) { | ||
49 | await sendUpdateVideo(video, undefined) | ||
50 | } | ||
51 | |||
52 | return undefined | ||
53 | } | ||
54 | |||
55 | async function onVideoFileOptimizerSuccess (video: VideoModel) { | ||
56 | if (video === undefined) return undefined | ||
57 | |||
58 | // Maybe the video changed in database, refresh it | ||
59 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | ||
60 | // Video does not exist anymore | ||
61 | if (!videoDatabase) return undefined | ||
62 | |||
63 | if (video.privacy !== VideoPrivacy.PRIVATE) { | ||
64 | // Now we'll add the video's meta data to our followers | ||
65 | await sendCreateVideo(video, undefined) | ||
66 | await shareVideoByServerAndChannel(video, undefined) | ||
67 | } | ||
68 | |||
69 | const originalFileHeight = await videoDatabase.getOriginalFileHeight() | ||
70 | |||
71 | // Create transcoding jobs if there are enabled resolutions | ||
72 | const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) | ||
73 | logger.info( | ||
74 | 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, originalFileHeight, | ||
75 | { resolutions: resolutionsEnabled } | ||
76 | ) | ||
77 | |||
78 | if (resolutionsEnabled.length !== 0) { | ||
79 | try { | ||
80 | await sequelizeTypescript.transaction(async t => { | ||
81 | const tasks: Promise<any>[] = [] | ||
82 | |||
83 | for (const resolution of resolutionsEnabled) { | ||
84 | const dataInput = { | ||
85 | videoUUID: videoDatabase.uuid, | ||
86 | resolution | ||
87 | } | ||
88 | |||
89 | const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) | ||
90 | tasks.push(p) | ||
91 | } | ||
92 | |||
93 | await Promise.all(tasks) | ||
94 | }) | ||
95 | |||
96 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) | ||
97 | } catch (err) { | ||
98 | logger.warn('Cannot transcode the video.', err) | ||
99 | } | ||
100 | } else { | ||
101 | logger.info('No transcoding jobs created for video %s (no resolutions enabled).') | ||
102 | return undefined | ||
103 | } | ||
104 | } | ||
105 | |||
106 | // --------------------------------------------------------------------------- | ||
107 | |||
108 | export { | ||
109 | processVideoFile | ||
110 | } | ||
diff --git a/server/lib/job-queue/index.ts b/server/lib/job-queue/index.ts new file mode 100644 index 000000000..57231e649 --- /dev/null +++ b/server/lib/job-queue/index.ts | |||
@@ -0,0 +1 @@ | |||
export * from './job-queue' | |||
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts new file mode 100644 index 000000000..7a2b6c78d --- /dev/null +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -0,0 +1,124 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { JobType, JobState } from '../../../shared/models' | ||
3 | import { logger } from '../../helpers/logger' | ||
4 | import { CONFIG, JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY } from '../../initializers' | ||
5 | import { ActivitypubHttpBroadcastPayload, processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast' | ||
6 | import { ActivitypubHttpFetcherPayload, processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher' | ||
7 | import { ActivitypubHttpUnicastPayload, processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast' | ||
8 | import { processVideoFile, VideoFilePayload } from './handlers/video-file' | ||
9 | |||
10 | type CreateJobArgument = | ||
11 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | ||
12 | { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } | | ||
13 | { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } | | ||
14 | { type: 'video-file', payload: VideoFilePayload } | ||
15 | |||
16 | const handlers: { [ id in JobType ]: (job: kue.Job) => Promise<any>} = { | ||
17 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | ||
18 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | ||
19 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | ||
20 | 'video-file': processVideoFile | ||
21 | } | ||
22 | |||
23 | class JobQueue { | ||
24 | |||
25 | private static instance: JobQueue | ||
26 | |||
27 | private jobQueue: kue.Queue | ||
28 | private initialized = false | ||
29 | |||
30 | private constructor () {} | ||
31 | |||
32 | init () { | ||
33 | // Already initialized | ||
34 | if (this.initialized === true) return | ||
35 | this.initialized = true | ||
36 | |||
37 | this.jobQueue = kue.createQueue({ | ||
38 | prefix: 'q-' + CONFIG.WEBSERVER.HOST, | ||
39 | redis: { | ||
40 | host: CONFIG.REDIS.HOSTNAME, | ||
41 | port: CONFIG.REDIS.PORT, | ||
42 | auth: CONFIG.REDIS.AUTH | ||
43 | } | ||
44 | }) | ||
45 | |||
46 | this.jobQueue.on('error', err => { | ||
47 | logger.error('Error in job queue.', err) | ||
48 | process.exit(-1) | ||
49 | }) | ||
50 | this.jobQueue.watchStuckJobs(5000) | ||
51 | |||
52 | for (const handlerName of Object.keys(handlers)) { | ||
53 | this.jobQueue.process(handlerName, JOB_CONCURRENCY[handlerName], async (job, done) => { | ||
54 | try { | ||
55 | const res = await handlers[ handlerName ](job) | ||
56 | return done(null, res) | ||
57 | } catch (err) { | ||
58 | return done(err) | ||
59 | } | ||
60 | }) | ||
61 | } | ||
62 | } | ||
63 | |||
64 | createJob (obj: CreateJobArgument, priority = 'normal') { | ||
65 | return new Promise((res, rej) => { | ||
66 | this.jobQueue | ||
67 | .create(obj.type, obj.payload) | ||
68 | .priority(priority) | ||
69 | .attempts(JOB_ATTEMPTS[obj.type]) | ||
70 | .backoff({ type: 'exponential' }) | ||
71 | .save(err => { | ||
72 | if (err) return rej(err) | ||
73 | |||
74 | return res() | ||
75 | }) | ||
76 | }) | ||
77 | } | ||
78 | |||
79 | listForApi (state: JobState, start: number, count: number, sort: string) { | ||
80 | return new Promise<kue.Job[]>((res, rej) => { | ||
81 | kue.Job.rangeByState(state, start, count, sort, (err, jobs) => { | ||
82 | if (err) return rej(err) | ||
83 | |||
84 | return res(jobs) | ||
85 | }) | ||
86 | }) | ||
87 | } | ||
88 | |||
89 | count (state: JobState) { | ||
90 | return new Promise<number>((res, rej) => { | ||
91 | this.jobQueue[state + 'Count']((err, total) => { | ||
92 | if (err) return rej(err) | ||
93 | |||
94 | return res(total) | ||
95 | }) | ||
96 | }) | ||
97 | } | ||
98 | |||
99 | removeOldJobs () { | ||
100 | const now = new Date().getTime() | ||
101 | kue.Job.rangeByState('complete', 0, -1, 'asc', (err, jobs) => { | ||
102 | if (err) { | ||
103 | logger.error('Cannot get jobs when removing old jobs.', err) | ||
104 | return | ||
105 | } | ||
106 | |||
107 | for (const job of jobs) { | ||
108 | if (now - job.created_at > JOB_COMPLETED_LIFETIME) { | ||
109 | job.remove() | ||
110 | } | ||
111 | } | ||
112 | }) | ||
113 | } | ||
114 | |||
115 | static get Instance () { | ||
116 | return this.instance || (this.instance = new this()) | ||
117 | } | ||
118 | } | ||
119 | |||
120 | // --------------------------------------------------------------------------- | ||
121 | |||
122 | export { | ||
123 | JobQueue | ||
124 | } | ||