diff options
Diffstat (limited to 'server/lib/job-queue/handlers')
5 files changed, 304 insertions, 0 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts new file mode 100644 index 000000000..159856cda --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -0,0 +1,49 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { doRequest } from '../../../helpers/requests' | ||
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | ||
5 | import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | ||
6 | |||
7 | export type ActivitypubHttpBroadcastPayload = { | ||
8 | uris: string[] | ||
9 | signatureActorId?: number | ||
10 | body: any | ||
11 | } | ||
12 | |||
13 | async function processActivityPubHttpBroadcast (job: kue.Job) { | ||
14 | logger.info('Processing ActivityPub broadcast in job %d.', job.id) | ||
15 | |||
16 | const payload = job.data as ActivitypubHttpBroadcastPayload | ||
17 | |||
18 | const body = await computeBody(payload) | ||
19 | const httpSignatureOptions = await buildSignedRequestOptions(payload) | ||
20 | |||
21 | const options = { | ||
22 | method: 'POST', | ||
23 | uri: '', | ||
24 | json: body, | ||
25 | httpSignature: httpSignatureOptions | ||
26 | } | ||
27 | |||
28 | const badUrls: string[] = [] | ||
29 | const goodUrls: string[] = [] | ||
30 | |||
31 | for (const uri of payload.uris) { | ||
32 | options.uri = uri | ||
33 | |||
34 | try { | ||
35 | await doRequest(options) | ||
36 | goodUrls.push(uri) | ||
37 | } catch (err) { | ||
38 | badUrls.push(uri) | ||
39 | } | ||
40 | } | ||
41 | |||
42 | return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined) | ||
43 | } | ||
44 | |||
45 | // --------------------------------------------------------------------------- | ||
46 | |||
47 | export { | ||
48 | processActivityPubHttpBroadcast | ||
49 | } | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts new file mode 100644 index 000000000..062211c85 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -0,0 +1,63 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { doRequest } from '../../../helpers/requests' | ||
4 | import { ACTIVITY_PUB } from '../../../initializers' | ||
5 | import { processActivities } from '../../activitypub/process' | ||
6 | import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' | ||
7 | |||
8 | export type ActivitypubHttpFetcherPayload = { | ||
9 | uris: string[] | ||
10 | } | ||
11 | |||
12 | async function processActivityPubHttpFetcher (job: kue.Job) { | ||
13 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) | ||
14 | |||
15 | const payload = job.data as ActivitypubHttpBroadcastPayload | ||
16 | |||
17 | const options = { | ||
18 | method: 'GET', | ||
19 | uri: '', | ||
20 | json: true, | ||
21 | activityPub: true | ||
22 | } | ||
23 | |||
24 | for (const uri of payload.uris) { | ||
25 | options.uri = uri | ||
26 | logger.info('Fetching ActivityPub data on %s.', uri) | ||
27 | |||
28 | const response = await doRequest(options) | ||
29 | const firstBody = response.body | ||
30 | |||
31 | if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) { | ||
32 | const activities = firstBody.first.orderedItems | ||
33 | |||
34 | logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) | ||
35 | |||
36 | await processActivities(activities) | ||
37 | } | ||
38 | |||
39 | let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT | ||
40 | let i = 0 | ||
41 | let nextLink = firstBody.first.next | ||
42 | while (nextLink && i < limit) { | ||
43 | options.uri = nextLink | ||
44 | |||
45 | const { body } = await doRequest(options) | ||
46 | nextLink = body.next | ||
47 | i++ | ||
48 | |||
49 | if (Array.isArray(body.orderedItems)) { | ||
50 | const activities = body.orderedItems | ||
51 | logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) | ||
52 | |||
53 | await processActivities(activities) | ||
54 | } | ||
55 | } | ||
56 | } | ||
57 | } | ||
58 | |||
59 | // --------------------------------------------------------------------------- | ||
60 | |||
61 | export { | ||
62 | processActivityPubHttpFetcher | ||
63 | } | ||
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts new file mode 100644 index 000000000..9b4188c50 --- /dev/null +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -0,0 +1,43 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { doRequest } from '../../../helpers/requests' | ||
4 | import { ActorFollowModel } from '../../../models/activitypub/actor-follow' | ||
5 | import { buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | ||
6 | |||
7 | export type ActivitypubHttpUnicastPayload = { | ||
8 | uri: string | ||
9 | signatureActorId?: number | ||
10 | body: any | ||
11 | } | ||
12 | |||
13 | async function processActivityPubHttpUnicast (job: kue.Job) { | ||
14 | logger.info('Processing ActivityPub unicast in job %d.', job.id) | ||
15 | |||
16 | const payload = job.data as ActivitypubHttpUnicastPayload | ||
17 | const uri = payload.uri | ||
18 | |||
19 | const body = await computeBody(payload) | ||
20 | const httpSignatureOptions = await buildSignedRequestOptions(payload) | ||
21 | |||
22 | const options = { | ||
23 | method: 'POST', | ||
24 | uri, | ||
25 | json: body, | ||
26 | httpSignature: httpSignatureOptions | ||
27 | } | ||
28 | |||
29 | try { | ||
30 | await doRequest(options) | ||
31 | ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined) | ||
32 | } catch (err) { | ||
33 | ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined) | ||
34 | |||
35 | throw err | ||
36 | } | ||
37 | } | ||
38 | |||
39 | // --------------------------------------------------------------------------- | ||
40 | |||
41 | export { | ||
42 | processActivityPubHttpUnicast | ||
43 | } | ||
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts new file mode 100644 index 000000000..c087371c6 --- /dev/null +++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts | |||
@@ -0,0 +1,39 @@ | |||
1 | import { buildSignedActivity } from '../../../../helpers/activitypub' | ||
2 | import { getServerActor } from '../../../../helpers/utils' | ||
3 | import { ActorModel } from '../../../../models/activitypub/actor' | ||
4 | |||
5 | async function computeBody (payload: { body: any, signatureActorId?: number }) { | ||
6 | let body = payload.body | ||
7 | |||
8 | if (payload.signatureActorId) { | ||
9 | const actorSignature = await ActorModel.load(payload.signatureActorId) | ||
10 | if (!actorSignature) throw new Error('Unknown signature actor id.') | ||
11 | body = await buildSignedActivity(actorSignature, payload.body) | ||
12 | } | ||
13 | |||
14 | return body | ||
15 | } | ||
16 | |||
17 | async function buildSignedRequestOptions (payload: { signatureActorId?: number }) { | ||
18 | let actor: ActorModel | ||
19 | if (payload.signatureActorId) { | ||
20 | actor = await ActorModel.load(payload.signatureActorId) | ||
21 | if (!actor) throw new Error('Unknown signature actor id.') | ||
22 | } else { | ||
23 | // We need to sign the request, so use the server | ||
24 | actor = await getServerActor() | ||
25 | } | ||
26 | |||
27 | const keyId = actor.getWebfingerUrl() | ||
28 | return { | ||
29 | algorithm: 'rsa-sha256', | ||
30 | authorizationHeaderName: 'Signature', | ||
31 | keyId, | ||
32 | key: actor.privateKey | ||
33 | } | ||
34 | } | ||
35 | |||
36 | export { | ||
37 | computeBody, | ||
38 | buildSignedRequestOptions | ||
39 | } | ||
diff --git a/server/lib/job-queue/handlers/video-file.ts b/server/lib/job-queue/handlers/video-file.ts new file mode 100644 index 000000000..5294483bd --- /dev/null +++ b/server/lib/job-queue/handlers/video-file.ts | |||
@@ -0,0 +1,110 @@ | |||
1 | import * as kue from 'kue' | ||
2 | import { VideoResolution } from '../../../../shared' | ||
3 | import { VideoPrivacy } from '../../../../shared/models/videos' | ||
4 | import { logger } from '../../../helpers/logger' | ||
5 | import { computeResolutionsToTranscode } from '../../../helpers/utils' | ||
6 | import { sequelizeTypescript } from '../../../initializers' | ||
7 | import { VideoModel } from '../../../models/video/video' | ||
8 | import { shareVideoByServerAndChannel } from '../../activitypub' | ||
9 | import { sendCreateVideo, sendUpdateVideo } from '../../activitypub/send' | ||
10 | import { JobQueue } from '../job-queue' | ||
11 | |||
12 | export type VideoFilePayload = { | ||
13 | videoUUID: string | ||
14 | resolution?: VideoResolution | ||
15 | } | ||
16 | |||
17 | async function processVideoFile (job: kue.Job) { | ||
18 | const payload = job.data as VideoFilePayload | ||
19 | logger.info('Processing video file in job %d.', job.id) | ||
20 | |||
21 | const video = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(payload.videoUUID) | ||
22 | // No video, maybe deleted? | ||
23 | if (!video) { | ||
24 | logger.info('Do not process job %d, video does not exist.', job.id, { videoUUID: video.uuid }) | ||
25 | return undefined | ||
26 | } | ||
27 | |||
28 | // Transcoding in other resolution | ||
29 | if (payload.resolution) { | ||
30 | await video.transcodeOriginalVideofile(payload.resolution) | ||
31 | await onVideoFileTranscoderSuccess(video) | ||
32 | } else { | ||
33 | await video.optimizeOriginalVideofile() | ||
34 | await onVideoFileOptimizerSuccess(video) | ||
35 | } | ||
36 | |||
37 | return video | ||
38 | } | ||
39 | |||
40 | async function onVideoFileTranscoderSuccess (video: VideoModel) { | ||
41 | if (video === undefined) return undefined | ||
42 | |||
43 | // Maybe the video changed in database, refresh it | ||
44 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | ||
45 | // Video does not exist anymore | ||
46 | if (!videoDatabase) return undefined | ||
47 | |||
48 | if (video.privacy !== VideoPrivacy.PRIVATE) { | ||
49 | await sendUpdateVideo(video, undefined) | ||
50 | } | ||
51 | |||
52 | return undefined | ||
53 | } | ||
54 | |||
55 | async function onVideoFileOptimizerSuccess (video: VideoModel) { | ||
56 | if (video === undefined) return undefined | ||
57 | |||
58 | // Maybe the video changed in database, refresh it | ||
59 | const videoDatabase = await VideoModel.loadByUUIDAndPopulateAccountAndServerAndTags(video.uuid) | ||
60 | // Video does not exist anymore | ||
61 | if (!videoDatabase) return undefined | ||
62 | |||
63 | if (video.privacy !== VideoPrivacy.PRIVATE) { | ||
64 | // Now we'll add the video's meta data to our followers | ||
65 | await sendCreateVideo(video, undefined) | ||
66 | await shareVideoByServerAndChannel(video, undefined) | ||
67 | } | ||
68 | |||
69 | const originalFileHeight = await videoDatabase.getOriginalFileHeight() | ||
70 | |||
71 | // Create transcoding jobs if there are enabled resolutions | ||
72 | const resolutionsEnabled = computeResolutionsToTranscode(originalFileHeight) | ||
73 | logger.info( | ||
74 | 'Resolutions computed for video %s and origin file height of %d.', videoDatabase.uuid, originalFileHeight, | ||
75 | { resolutions: resolutionsEnabled } | ||
76 | ) | ||
77 | |||
78 | if (resolutionsEnabled.length !== 0) { | ||
79 | try { | ||
80 | await sequelizeTypescript.transaction(async t => { | ||
81 | const tasks: Promise<any>[] = [] | ||
82 | |||
83 | for (const resolution of resolutionsEnabled) { | ||
84 | const dataInput = { | ||
85 | videoUUID: videoDatabase.uuid, | ||
86 | resolution | ||
87 | } | ||
88 | |||
89 | const p = JobQueue.Instance.createJob({ type: 'video-file', payload: dataInput }) | ||
90 | tasks.push(p) | ||
91 | } | ||
92 | |||
93 | await Promise.all(tasks) | ||
94 | }) | ||
95 | |||
96 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) | ||
97 | } catch (err) { | ||
98 | logger.warn('Cannot transcode the video.', err) | ||
99 | } | ||
100 | } else { | ||
101 | logger.info('No transcoding jobs created for video %s (no resolutions enabled).') | ||
102 | return undefined | ||
103 | } | ||
104 | } | ||
105 | |||
106 | // --------------------------------------------------------------------------- | ||
107 | |||
108 | export { | ||
109 | processVideoFile | ||
110 | } | ||