aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/jobs/activitypub-http-job-scheduler
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-01-25 15:05:18 +0100
committerChocobozzz <me@florianbigard.com>2018-01-25 18:41:17 +0100
commit94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4 (patch)
tree32a9148e0e4567f0c4ffae0412cbed20b84e8873 /server/lib/jobs/activitypub-http-job-scheduler
parentd765fafc3faf0db9818eb1a07161df1cb1bc0efa (diff)
downloadPeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.tar.gz
PeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.tar.zst
PeerTube-94a5ff8a4a75d75bb9df542a39ce8769e7a7e6a4.zip
Move job queue to redis
We'll use it as cache in the future. /!\ You'll loose your old jobs (pending jobs too) so upgrade only when you don't have pending job anymore.
Diffstat (limited to 'server/lib/jobs/activitypub-http-job-scheduler')
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts53
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts68
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts94
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts50
-rw-r--r--server/lib/jobs/activitypub-http-job-scheduler/index.ts1
5 files changed, 0 insertions, 266 deletions
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts
deleted file mode 100644
index 3f780e319..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-broadcast-handler.ts
+++ /dev/null
@@ -1,53 +0,0 @@
1import { logger } from '../../../helpers/logger'
2import { doRequest } from '../../../helpers/requests'
3import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
4import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler'
5
6async function process (payload: ActivityPubHttpPayload, jobId: number) {
7 logger.info('Processing ActivityPub broadcast in job %d.', jobId)
8
9 const body = await computeBody(payload)
10 const httpSignatureOptions = await buildSignedRequestOptions(payload)
11
12 const options = {
13 method: 'POST',
14 uri: '',
15 json: body,
16 httpSignature: httpSignatureOptions
17 }
18
19 const badUrls: string[] = []
20 const goodUrls: string[] = []
21
22 for (const uri of payload.uris) {
23 options.uri = uri
24
25 try {
26 await doRequest(options)
27 goodUrls.push(uri)
28 } catch (err) {
29 const isRetryingLater = await maybeRetryRequestLater(err, payload, uri)
30 if (isRetryingLater === false) badUrls.push(uri)
31 }
32 }
33
34 return ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes(goodUrls, badUrls, undefined)
35}
36
37function onError (err: Error, jobId: number) {
38 logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err)
39 return Promise.resolve()
40}
41
42function onSuccess (jobId: number) {
43 logger.info('Job %d is a success.', jobId)
44 return Promise.resolve()
45}
46
47// ---------------------------------------------------------------------------
48
49export {
50 process,
51 onError,
52 onSuccess
53}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts
deleted file mode 100644
index a7b5aabd0..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts
+++ /dev/null
@@ -1,68 +0,0 @@
1import { logger } from '../../../helpers/logger'
2import { doRequest } from '../../../helpers/requests'
3import { ACTIVITY_PUB } from '../../../initializers'
4import { processActivities } from '../../activitypub/process'
5import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler'
6
7async function process (payload: ActivityPubHttpPayload, jobId: number) {
8 logger.info('Processing ActivityPub fetcher in job %d.', jobId)
9
10 const options = {
11 method: 'GET',
12 uri: '',
13 json: true,
14 activityPub: true
15 }
16
17 for (const uri of payload.uris) {
18 options.uri = uri
19 logger.info('Fetching ActivityPub data on %s.', uri)
20
21 const response = await doRequest(options)
22 const firstBody = response.body
23
24 if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) {
25 const activities = firstBody.first.orderedItems
26
27 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
28
29 await processActivities(activities)
30 }
31
32 let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT
33 let i = 0
34 let nextLink = firstBody.first.next
35 while (nextLink && i < limit) {
36 options.uri = nextLink
37
38 const { body } = await doRequest(options)
39 nextLink = body.next
40 i++
41
42 if (Array.isArray(body.orderedItems)) {
43 const activities = body.orderedItems
44 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
45
46 await processActivities(activities)
47 }
48 }
49 }
50}
51
52function onError (err: Error, jobId: number) {
53 logger.error('Error when fetcher ActivityPub request in job %d.', jobId, err)
54 return Promise.resolve()
55}
56
57function onSuccess (jobId: number) {
58 logger.info('Job %d is a success.', jobId)
59 return Promise.resolve()
60}
61
62// ---------------------------------------------------------------------------
63
64export {
65 process,
66 onError,
67 onSuccess
68}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
deleted file mode 100644
index 4459152db..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts
+++ /dev/null
@@ -1,94 +0,0 @@
1import { JobCategory } from '../../../../shared'
2import { buildSignedActivity } from '../../../helpers/activitypub'
3import { logger } from '../../../helpers/logger'
4import { getServerActor } from '../../../helpers/utils'
5import { ACTIVITY_PUB } from '../../../initializers'
6import { ActorModel } from '../../../models/activitypub/actor'
7import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
8import { JobHandler, JobScheduler } from '../job-scheduler'
9
10import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler'
11import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler'
12import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler'
13
14type ActivityPubHttpPayload = {
15 uris: string[]
16 signatureActorId?: number
17 body?: any
18 attemptNumber?: number
19}
20
21const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = {
22 activitypubHttpBroadcastHandler,
23 activitypubHttpUnicastHandler,
24 activitypubHttpFetcherHandler
25}
26const jobCategory: JobCategory = 'activitypub-http'
27
28const activitypubHttpJobScheduler = new JobScheduler(jobCategory, jobHandlers)
29
30async function maybeRetryRequestLater (err: Error, payload: ActivityPubHttpPayload, uri: string) {
31 logger.warn('Cannot make request to %s.', uri, err)
32
33 let attemptNumber = payload.attemptNumber || 1
34 attemptNumber += 1
35
36 if (attemptNumber < ACTIVITY_PUB.MAX_HTTP_ATTEMPT) {
37 logger.debug('Retrying request to %s (attempt %d/%d).', uri, attemptNumber, ACTIVITY_PUB.MAX_HTTP_ATTEMPT, err)
38
39 const actor = await ActorFollowModel.loadByFollowerInbox(uri, undefined)
40 if (!actor) {
41 logger.debug('Actor %s is not a follower, do not retry the request.', uri)
42 return false
43 }
44
45 const newPayload = Object.assign(payload, {
46 uris: [ uri ],
47 attemptNumber
48 })
49 await activitypubHttpJobScheduler.createJob(undefined, 'activitypubHttpUnicastHandler', newPayload)
50
51 return true
52 }
53
54 return false
55}
56
57async function computeBody (payload: ActivityPubHttpPayload) {
58 let body = payload.body
59
60 if (payload.signatureActorId) {
61 const actorSignature = await ActorModel.load(payload.signatureActorId)
62 if (!actorSignature) throw new Error('Unknown signature actor id.')
63 body = await buildSignedActivity(actorSignature, payload.body)
64 }
65
66 return body
67}
68
69async function buildSignedRequestOptions (payload: ActivityPubHttpPayload) {
70 let actor: ActorModel
71 if (payload.signatureActorId) {
72 actor = await ActorModel.load(payload.signatureActorId)
73 if (!actor) throw new Error('Unknown signature actor id.')
74 } else {
75 // We need to sign the request, so use the server
76 actor = await getServerActor()
77 }
78
79 const keyId = actor.getWebfingerUrl()
80 return {
81 algorithm: 'rsa-sha256',
82 authorizationHeaderName: 'Signature',
83 keyId,
84 key: actor.privateKey
85 }
86}
87
88export {
89 ActivityPubHttpPayload,
90 activitypubHttpJobScheduler,
91 maybeRetryRequestLater,
92 computeBody,
93 buildSignedRequestOptions
94}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
deleted file mode 100644
index 54a7504e8..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-unicast-handler.ts
+++ /dev/null
@@ -1,50 +0,0 @@
1import { logger } from '../../../helpers/logger'
2import { doRequest } from '../../../helpers/requests'
3import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
4import { ActivityPubHttpPayload, buildSignedRequestOptions, computeBody, maybeRetryRequestLater } from './activitypub-http-job-scheduler'
5
6async function process (payload: ActivityPubHttpPayload, jobId: number) {
7 logger.info('Processing ActivityPub unicast in job %d.', jobId)
8
9 const uri = payload.uris[0]
10
11 const body = await computeBody(payload)
12 const httpSignatureOptions = await buildSignedRequestOptions(payload)
13
14 const options = {
15 method: 'POST',
16 uri,
17 json: body,
18 httpSignature: httpSignatureOptions
19 }
20
21 try {
22 await doRequest(options)
23 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([ uri ], [], undefined)
24 } catch (err) {
25 const isRetryingLater = await maybeRetryRequestLater(err, payload, uri)
26 if (isRetryingLater === false) {
27 ActorFollowModel.updateActorFollowsScoreAndRemoveBadOnes([], [ uri ], undefined)
28 }
29
30 throw err
31 }
32}
33
34function onError (err: Error, jobId: number) {
35 logger.error('Error when sending ActivityPub request in job %d.', jobId, err)
36 return Promise.resolve()
37}
38
39function onSuccess (jobId: number) {
40 logger.info('Job %d is a success.', jobId)
41 return Promise.resolve()
42}
43
44// ---------------------------------------------------------------------------
45
46export {
47 process,
48 onError,
49 onSuccess
50}
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/index.ts b/server/lib/jobs/activitypub-http-job-scheduler/index.ts
deleted file mode 100644
index ad8f527b4..000000000
--- a/server/lib/jobs/activitypub-http-job-scheduler/index.ts
+++ /dev/null
@@ -1 +0,0 @@
1export * from './activitypub-http-job-scheduler'