diff options
Diffstat (limited to 'server/lib/job-queue')
4 files changed, 46 insertions, 42 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts index b58bbc983..1caca1dcc 100644 --- a/server/lib/job-queue/handlers/activitypub-cleaner.ts +++ b/server/lib/job-queue/handlers/activitypub-cleaner.ts | |||
@@ -1,10 +1,13 @@ | |||
1 | import * as Bluebird from 'bluebird' | 1 | import * as Bluebird from 'bluebird' |
2 | import * as Bull from 'bull' | 2 | import * as Bull from 'bull' |
3 | import { checkUrlsSameHost } from '@server/helpers/activitypub' | 3 | import { checkUrlsSameHost } from '@server/helpers/activitypub' |
4 | import { isDislikeActivityValid, isLikeActivityValid } from '@server/helpers/custom-validators/activitypub/rate' | 4 | import { |
5 | import { isShareActivityValid } from '@server/helpers/custom-validators/activitypub/share' | 5 | isAnnounceActivityValid, |
6 | isDislikeActivityValid, | ||
7 | isLikeActivityValid | ||
8 | } from '@server/helpers/custom-validators/activitypub/activity' | ||
6 | import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' | 9 | import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments' |
7 | import { doRequest } from '@server/helpers/requests' | 10 | import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests' |
8 | import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' | 11 | import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants' |
9 | import { VideoModel } from '@server/models/video/video' | 12 | import { VideoModel } from '@server/models/video/video' |
10 | import { VideoCommentModel } from '@server/models/video/video-comment' | 13 | import { VideoCommentModel } from '@server/models/video/video-comment' |
@@ -78,44 +81,44 @@ async function updateObjectIfNeeded <T> ( | |||
78 | updater: (url: string, newUrl: string) => Promise<T>, | 81 | updater: (url: string, newUrl: string) => Promise<T>, |
79 | deleter: (url: string) => Promise<T> | 82 | deleter: (url: string) => Promise<T> |
80 | ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { | 83 | ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> { |
81 | // Fetch url | 84 | const on404OrTombstone = async () => { |
82 | const { response, body } = await doRequest<any>({ | ||
83 | uri: url, | ||
84 | json: true, | ||
85 | activityPub: true | ||
86 | }) | ||
87 | |||
88 | // Does not exist anymore, remove entry | ||
89 | if (response.statusCode === HttpStatusCode.NOT_FOUND_404) { | ||
90 | logger.info('Removing remote AP object %s.', url) | 85 | logger.info('Removing remote AP object %s.', url) |
91 | const data = await deleter(url) | 86 | const data = await deleter(url) |
92 | 87 | ||
93 | return { status: 'deleted', data } | 88 | return { status: 'deleted' as 'deleted', data } |
94 | } | 89 | } |
95 | 90 | ||
96 | // If not same id, check same host and update | 91 | try { |
97 | if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) | 92 | const { body } = await doJSONRequest<any>(url, { activityPub: true }) |
98 | 93 | ||
99 | if (body.type === 'Tombstone') { | 94 | // If not same id, check same host and update |
100 | logger.info('Removing remote AP object %s.', url) | 95 | if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`) |
101 | const data = await deleter(url) | ||
102 | 96 | ||
103 | return { status: 'deleted', data } | 97 | if (body.type === 'Tombstone') { |
104 | } | 98 | return on404OrTombstone() |
99 | } | ||
105 | 100 | ||
106 | const newUrl = body.id | 101 | const newUrl = body.id |
107 | if (newUrl !== url) { | 102 | if (newUrl !== url) { |
108 | if (checkUrlsSameHost(newUrl, url) !== true) { | 103 | if (checkUrlsSameHost(newUrl, url) !== true) { |
109 | throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) | 104 | throw new Error(`New url ${newUrl} has not the same host than old url ${url}`) |
105 | } | ||
106 | |||
107 | logger.info('Updating remote AP object %s.', url) | ||
108 | const data = await updater(url, newUrl) | ||
109 | |||
110 | return { status: 'updated', data } | ||
110 | } | 111 | } |
111 | 112 | ||
112 | logger.info('Updating remote AP object %s.', url) | 113 | return null |
113 | const data = await updater(url, newUrl) | 114 | } catch (err) { |
115 | // Does not exist anymore, remove entry | ||
116 | if ((err as PeerTubeRequestError).statusCode === HttpStatusCode.NOT_FOUND_404) { | ||
117 | return on404OrTombstone() | ||
118 | } | ||
114 | 119 | ||
115 | return { status: 'updated', data } | 120 | throw err |
116 | } | 121 | } |
117 | |||
118 | return null | ||
119 | } | 122 | } |
120 | 123 | ||
121 | function rateOptionsFactory () { | 124 | function rateOptionsFactory () { |
@@ -149,7 +152,7 @@ function rateOptionsFactory () { | |||
149 | 152 | ||
150 | function shareOptionsFactory () { | 153 | function shareOptionsFactory () { |
151 | return { | 154 | return { |
152 | bodyValidator: (body: any) => isShareActivityValid(body), | 155 | bodyValidator: (body: any) => isAnnounceActivityValid(body), |
153 | 156 | ||
154 | updater: async (url: string, newUrl: string) => { | 157 | updater: async (url: string, newUrl: string) => { |
155 | const share = await VideoShareModel.loadByUrl(url, undefined) | 158 | const share = await VideoShareModel.loadByUrl(url, undefined) |
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 7174786d6..c69ff9e83 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -16,8 +16,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) { | |||
16 | const httpSignatureOptions = await buildSignedRequestOptions(payload) | 16 | const httpSignatureOptions = await buildSignedRequestOptions(payload) |
17 | 17 | ||
18 | const options = { | 18 | const options = { |
19 | method: 'POST', | 19 | method: 'POST' as 'POST', |
20 | uri: '', | ||
21 | json: body, | 20 | json: body, |
22 | httpSignature: httpSignatureOptions, | 21 | httpSignature: httpSignatureOptions, |
23 | timeout: REQUEST_TIMEOUT, | 22 | timeout: REQUEST_TIMEOUT, |
@@ -28,7 +27,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) { | |||
28 | const goodUrls: string[] = [] | 27 | const goodUrls: string[] = [] |
29 | 28 | ||
30 | await Bluebird.map(payload.uris, uri => { | 29 | await Bluebird.map(payload.uris, uri => { |
31 | return doRequest(Object.assign({}, options, { uri })) | 30 | return doRequest(uri, options) |
32 | .then(() => goodUrls.push(uri)) | 31 | .then(() => goodUrls.push(uri)) |
33 | .catch(() => badUrls.push(uri)) | 32 | .catch(() => badUrls.push(uri)) |
34 | }, { concurrency: BROADCAST_CONCURRENCY }) | 33 | }, { concurrency: BROADCAST_CONCURRENCY }) |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 74989d62e..585dad671 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -16,8 +16,7 @@ async function processActivityPubHttpUnicast (job: Bull.Job) { | |||
16 | const httpSignatureOptions = await buildSignedRequestOptions(payload) | 16 | const httpSignatureOptions = await buildSignedRequestOptions(payload) |
17 | 17 | ||
18 | const options = { | 18 | const options = { |
19 | method: 'POST', | 19 | method: 'POST' as 'POST', |
20 | uri, | ||
21 | json: body, | 20 | json: body, |
22 | httpSignature: httpSignatureOptions, | 21 | httpSignature: httpSignatureOptions, |
23 | timeout: REQUEST_TIMEOUT, | 22 | timeout: REQUEST_TIMEOUT, |
@@ -25,7 +24,7 @@ async function processActivityPubHttpUnicast (job: Bull.Job) { | |||
25 | } | 24 | } |
26 | 25 | ||
27 | try { | 26 | try { |
28 | await doRequest(options) | 27 | await doRequest(uri, options) |
29 | ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) | 28 | ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) |
30 | } catch (err) { | 29 | } catch (err) { |
31 | ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) | 30 | ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) |
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts index c030d31ef..e8a91450d 100644 --- a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts +++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts | |||
@@ -6,21 +6,24 @@ import { getServerActor } from '@server/models/application/application' | |||
6 | import { buildDigest } from '@server/helpers/peertube-crypto' | 6 | import { buildDigest } from '@server/helpers/peertube-crypto' |
7 | import { ContextType } from '@shared/models/activitypub/context' | 7 | import { ContextType } from '@shared/models/activitypub/context' |
8 | 8 | ||
9 | type Payload = { body: any, contextType?: ContextType, signatureActorId?: number } | 9 | type Payload <T> = { body: T, contextType?: ContextType, signatureActorId?: number } |
10 | 10 | ||
11 | async function computeBody (payload: Payload) { | 11 | async function computeBody <T> ( |
12 | payload: Payload<T> | ||
13 | ): Promise<T | T & { type: 'RsaSignature2017', creator: string, created: string }> { | ||
12 | let body = payload.body | 14 | let body = payload.body |
13 | 15 | ||
14 | if (payload.signatureActorId) { | 16 | if (payload.signatureActorId) { |
15 | const actorSignature = await ActorModel.load(payload.signatureActorId) | 17 | const actorSignature = await ActorModel.load(payload.signatureActorId) |
16 | if (!actorSignature) throw new Error('Unknown signature actor id.') | 18 | if (!actorSignature) throw new Error('Unknown signature actor id.') |
19 | |||
17 | body = await buildSignedActivity(actorSignature, payload.body, payload.contextType) | 20 | body = await buildSignedActivity(actorSignature, payload.body, payload.contextType) |
18 | } | 21 | } |
19 | 22 | ||
20 | return body | 23 | return body |
21 | } | 24 | } |
22 | 25 | ||
23 | async function buildSignedRequestOptions (payload: Payload) { | 26 | async function buildSignedRequestOptions (payload: Payload<any>) { |
24 | let actor: MActor | null | 27 | let actor: MActor | null |
25 | 28 | ||
26 | if (payload.signatureActorId) { | 29 | if (payload.signatureActorId) { |
@@ -43,9 +46,9 @@ async function buildSignedRequestOptions (payload: Payload) { | |||
43 | 46 | ||
44 | function buildGlobalHeaders (body: any) { | 47 | function buildGlobalHeaders (body: any) { |
45 | return { | 48 | return { |
46 | 'Digest': buildDigest(body), | 49 | 'digest': buildDigest(body), |
47 | 'Content-Type': 'application/activity+json', | 50 | 'content-type': 'application/activity+json', |
48 | 'Accept': ACTIVITY_PUB.ACCEPT_HEADER | 51 | 'accept': ACTIVITY_PUB.ACCEPT_HEADER |
49 | } | 52 | } |
50 | } | 53 | } |
51 | 54 | ||