diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-11-23 14:19:55 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-11-27 19:40:53 +0100 |
commit | 0032ebe94aa83fab761c7de3ceb6210ac4532824 (patch) | |
tree | 3ea407d7ea6de4c7f7bc66caba7e23c0cc4036e3 /server/lib/jobs | |
parent | d52eb8f656242c7e34afdb2dee681861fb9bce35 (diff) | |
download | PeerTube-0032ebe94aa83fab761c7de3ceb6210ac4532824.tar.gz PeerTube-0032ebe94aa83fab761c7de3ceb6210ac4532824.tar.zst PeerTube-0032ebe94aa83fab761c7de3ceb6210ac4532824.zip |
Federate likes/dislikes
Diffstat (limited to 'server/lib/jobs')
3 files changed, 37 insertions, 5 deletions
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts index 111fc88a4..5b4c65b81 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts | |||
@@ -2,7 +2,7 @@ import { logger } from '../../../helpers' | |||
2 | import { buildSignedActivity } from '../../../helpers/activitypub' | 2 | import { buildSignedActivity } from '../../../helpers/activitypub' |
3 | import { doRequest } from '../../../helpers/requests' | 3 | import { doRequest } from '../../../helpers/requests' |
4 | import { database as db } from '../../../initializers' | 4 | import { database as db } from '../../../initializers' |
5 | import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' | 5 | import { ActivityPubHttpPayload, maybeRetryRequestLater } from './activitypub-http-job-scheduler' |
6 | 6 | ||
7 | async function process (payload: ActivityPubHttpPayload, jobId: number) { | 7 | async function process (payload: ActivityPubHttpPayload, jobId: number) { |
8 | logger.info('Processing ActivityPub broadcast in job %d.', jobId) | 8 | logger.info('Processing ActivityPub broadcast in job %d.', jobId) |
@@ -20,7 +20,12 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) { | |||
20 | 20 | ||
21 | for (const uri of payload.uris) { | 21 | for (const uri of payload.uris) { |
22 | options.uri = uri | 22 | options.uri = uri |
23 | await doRequest(options) | 23 | |
24 | try { | ||
25 | await doRequest(options) | ||
26 | } catch (err) { | ||
27 | await maybeRetryRequestLater(err, payload, uri) | ||
28 | } | ||
24 | } | 29 | } |
25 | } | 30 | } |
26 | 31 | ||
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts index aef217ce7..ccf109935 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts | |||
@@ -4,12 +4,16 @@ import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-h | |||
4 | import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler' | 4 | import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler' |
5 | import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler' | 5 | import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler' |
6 | import { JobCategory } from '../../../../shared' | 6 | import { JobCategory } from '../../../../shared' |
7 | import { ACTIVITY_PUB } from '../../../initializers/constants' | ||
8 | import { logger } from '../../../helpers/logger' | ||
7 | 9 | ||
8 | type ActivityPubHttpPayload = { | 10 | type ActivityPubHttpPayload = { |
9 | uris: string[] | 11 | uris: string[] |
10 | signatureAccountId?: number | 12 | signatureAccountId?: number |
11 | body?: any | 13 | body?: any |
14 | attemptNumber?: number | ||
12 | } | 15 | } |
16 | |||
13 | const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = { | 17 | const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = { |
14 | activitypubHttpBroadcastHandler, | 18 | activitypubHttpBroadcastHandler, |
15 | activitypubHttpUnicastHandler, | 19 | activitypubHttpUnicastHandler, |
@@ -19,7 +23,25 @@ const jobCategory: JobCategory = 'activitypub-http' | |||
19 | 23 | ||
20 | const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers) | 24 | const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers) |
21 | 25 | ||
26 | function maybeRetryRequestLater (err: Error, payload: ActivityPubHttpPayload, uri: string) { | ||
27 | logger.warn('Cannot make request to %s.', uri, err) | ||
28 | |||
29 | let attemptNumber = payload.attemptNumber || 1 | ||
30 | attemptNumber += 1 | ||
31 | |||
32 | if (attemptNumber < ACTIVITY_PUB.MAX_HTTP_ATTEMPT) { | ||
33 | logger.debug('Retrying request to %s (attempt %d/%d).', uri, attemptNumber, ACTIVITY_PUB.MAX_HTTP_ATTEMPT, err) | ||
34 | |||
35 | const newPayload = Object.assign(payload, { | ||
36 | uris: [ uri ], | ||
37 | attemptNumber | ||
38 | }) | ||
39 | return activitypubHttpJobScheduler.createJob(undefined, 'activitypubHttpUnicastHandler', newPayload) | ||
40 | } | ||
41 | } | ||
42 | |||
22 | export { | 43 | export { |
23 | ActivityPubHttpPayload, | 44 | ActivityPubHttpPayload, |
24 | activitypubHttpJobScheduler | 45 | activitypubHttpJobScheduler, |
46 | maybeRetryRequestLater | ||
25 | } | 47 | } |
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts index 8d3b755ad..f7f3dabbd 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts | |||
@@ -1,6 +1,6 @@ | |||
1 | import { logger } from '../../../helpers' | 1 | import { logger } from '../../../helpers' |
2 | import { doRequest } from '../../../helpers/requests' | 2 | import { doRequest } from '../../../helpers/requests' |
3 | import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' | 3 | import { ActivityPubHttpPayload, maybeRetryRequestLater } from './activitypub-http-job-scheduler' |
4 | import { database as db } from '../../../initializers/database' | 4 | import { database as db } from '../../../initializers/database' |
5 | import { buildSignedActivity } from '../../../helpers/activitypub' | 5 | import { buildSignedActivity } from '../../../helpers/activitypub' |
6 | 6 | ||
@@ -18,7 +18,12 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) { | |||
18 | json: signedBody | 18 | json: signedBody |
19 | } | 19 | } |
20 | 20 | ||
21 | await doRequest(options) | 21 | try { |
22 | await doRequest(options) | ||
23 | } catch (err) { | ||
24 | await maybeRetryRequestLater(err, payload, uri) | ||
25 | throw err | ||
26 | } | ||
22 | } | 27 | } |
23 | 28 | ||
24 | function onError (err: Error, jobId: number) { | 29 | function onError (err: Error, jobId: number) { |