aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs
diff options
context:
space:
mode:
authorChocobozzz <florian.bigard@gmail.com>2017-11-23 14:19:55 +0100
committerChocobozzz <florian.bigard@gmail.com>2017-11-27 19:40:53 +0100
commit0032ebe94aa83fab761c7de3ceb6210ac4532824 (patch)
tree3ea407d7ea6de4c7f7bc66caba7e23c0cc4036e3 /server/lib/jobs
parentd52eb8f656242c7e34afdb2dee681861fb9bce35 (diff)
downloadPeerTube-0032ebe94aa83fab761c7de3ceb6210ac4532824.tar.gz
PeerTube-0032ebe94aa83fab761c7de3ceb6210ac4532824.tar.zst
PeerTube-0032ebe94aa83fab761c7de3ceb6210ac4532824.zip
Federate likes/dislikes
Diffstat (limited to 'server/lib/jobs')
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts9
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts24
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts9
3 files changed, 37 insertions, 5 deletions
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts
index 111fc88a4..5b4c65b81 100644
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts
+++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts
@@ -2,7 +2,7 @@ import { logger } from '../../../helpers'
2import { buildSignedActivity } from '../../../helpers/activitypub' 2import { buildSignedActivity } from '../../../helpers/activitypub'
3import { doRequest } from '../../../helpers/requests' 3import { doRequest } from '../../../helpers/requests'
4import { database as db } from '../../../initializers' 4import { database as db } from '../../../initializers'
5import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' 5import { ActivityPubHttpPayload, maybeRetryRequestLater } from './activitypub-http-job-scheduler'
6 6
7async function process (payload: ActivityPubHttpPayload, jobId: number) { 7async function process (payload: ActivityPubHttpPayload, jobId: number) {
8 logger.info('Processing ActivityPub broadcast in job %d.', jobId) 8 logger.info('Processing ActivityPub broadcast in job %d.', jobId)
@@ -20,7 +20,12 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) {
20 20
21 for (const uri of payload.uris) { 21 for (const uri of payload.uris) {
22 options.uri = uri 22 options.uri = uri
23 await doRequest(options) 23
24 try {
25 await doRequest(options)
26 } catch (err) {
27 await maybeRetryRequestLater(err, payload, uri)
28 }
24 } 29 }
25} 30}
26 31
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
index aef217ce7..ccf109935 100644
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
+++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
@@ -4,12 +4,16 @@ import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-h
4import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler' 4import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler'
5import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler' 5import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler'
6import { JobCategory } from '../../../../shared' 6import { JobCategory } from '../../../../shared'
7import { ACTIVITY_PUB } from '../../../initializers/constants'
8import { logger } from '../../../helpers/logger'
7 9
8type ActivityPubHttpPayload = { 10type ActivityPubHttpPayload = {
9 uris: string[] 11 uris: string[]
10 signatureAccountId?: number 12 signatureAccountId?: number
11 body?: any 13 body?: any
14 attemptNumber?: number
12} 15}
16
13const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = { 17const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = {
14 activitypubHttpBroadcastHandler, 18 activitypubHttpBroadcastHandler,
15 activitypubHttpUnicastHandler, 19 activitypubHttpUnicastHandler,
@@ -19,7 +23,25 @@ const jobCategory: JobCategory = 'activitypub-http'
19 23
20const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers) 24const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers)
21 25
26function maybeRetryRequestLater (err: Error, payload: ActivityPubHttpPayload, uri: string) {
27 logger.warn('Cannot make request to %s.', uri, err)
28
29 let attemptNumber = payload.attemptNumber || 1
30 attemptNumber += 1
31
32 if (attemptNumber < ACTIVITY_PUB.MAX_HTTP_ATTEMPT) {
33 logger.debug('Retrying request to %s (attempt %d/%d).', uri, attemptNumber, ACTIVITY_PUB.MAX_HTTP_ATTEMPT, err)
34
35 const newPayload = Object.assign(payload, {
36 uris: [ uri ],
37 attemptNumber
38 })
39 return activitypubHttpJobScheduler.createJob(undefined, 'activitypubHttpUnicastHandler', newPayload)
40 }
41}
42
22export { 43export {
23 ActivityPubHttpPayload, 44 ActivityPubHttpPayload,
24 activitypubHttpJobScheduler 45 activitypubHttpJobScheduler,
46 maybeRetryRequestLater
25} 47}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
index 8d3b755ad..f7f3dabbd 100644
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
+++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
@@ -1,6 +1,6 @@
1import { logger } from '../../../helpers' 1import { logger } from '../../../helpers'
2import { doRequest } from '../../../helpers/requests' 2import { doRequest } from '../../../helpers/requests'
3import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' 3import { ActivityPubHttpPayload, maybeRetryRequestLater } from './activitypub-http-job-scheduler'
4import { database as db } from '../../../initializers/database' 4import { database as db } from '../../../initializers/database'
5import { buildSignedActivity } from '../../../helpers/activitypub' 5import { buildSignedActivity } from '../../../helpers/activitypub'
6 6
@@ -18,7 +18,12 @@ async function process (payload: ActivityPubHttpPayload, jobId: number) {
18 json: signedBody 18 json: signedBody
19 } 19 }
20 20
21 await doRequest(options) 21 try {
22 await doRequest(options)
23 } catch (err) {
24 await maybeRetryRequestLater(err, payload, uri)
25 throw err
26 }
22} 27}
23 28
24function onError (err: Error, jobId: number) { 29function onError (err: Error, jobId: number) {