diff options
author | Chocobozzz <florian.bigard@gmail.com> | 2017-11-22 10:29:55 +0100 |
---|---|---|
committer | Chocobozzz <florian.bigard@gmail.com> | 2017-11-27 19:40:53 +0100 |
commit | c986175d68a18e96fbd41537a05c7796a2c64f38 (patch) | |
tree | e8a8a02abb57e637451afbf3b1b0e7a0dbd4206c /server/lib | |
parent | e71bcc0f4b31ecfd84a786411febfc6d18a85258 (diff) | |
download | PeerTube-c986175d68a18e96fbd41537a05c7796a2c64f38.tar.gz PeerTube-c986175d68a18e96fbd41537a05c7796a2c64f38.tar.zst PeerTube-c986175d68a18e96fbd41537a05c7796a2c64f38.zip |
Fetch outbox to grab old activities
Diffstat (limited to 'server/lib')
7 files changed, 131 insertions, 3 deletions
diff --git a/server/lib/activitypub/fetch.ts b/server/lib/activitypub/fetch.ts new file mode 100644 index 000000000..fd2af0761 --- /dev/null +++ b/server/lib/activitypub/fetch.ts | |||
@@ -0,0 +1,15 @@ | |||
1 | import { Transaction } from 'sequelize' | ||
2 | import { AccountInstance } from '../../models/account/account-interface' | ||
3 | import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler' | ||
4 | |||
5 | async function addFetchOutboxJob (account: AccountInstance, t: Transaction) { | ||
6 | const jobPayload: ActivityPubHttpPayload = { | ||
7 | uris: [ account.outboxUrl ] | ||
8 | } | ||
9 | |||
10 | return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpFetcherHandler', jobPayload) | ||
11 | } | ||
12 | |||
13 | export { | ||
14 | addFetchOutboxJob | ||
15 | } | ||
diff --git a/server/lib/activitypub/index.ts b/server/lib/activitypub/index.ts index b1daa70bb..fcea662a6 100644 --- a/server/lib/activitypub/index.ts +++ b/server/lib/activitypub/index.ts | |||
@@ -1,6 +1,7 @@ | |||
1 | export * from './process' | 1 | export * from './process' |
2 | export * from './send' | 2 | export * from './send' |
3 | export * from './account' | 3 | export * from './account' |
4 | export * from './fetch' | ||
4 | export * from './share' | 5 | export * from './share' |
5 | export * from './video-channels' | 6 | export * from './video-channels' |
6 | export * from './videos' | 7 | export * from './videos' |
diff --git a/server/lib/activitypub/process/index.ts b/server/lib/activitypub/process/index.ts index e80b46b6f..c68312053 100644 --- a/server/lib/activitypub/process/index.ts +++ b/server/lib/activitypub/process/index.ts | |||
@@ -1,3 +1,4 @@ | |||
1 | export * from './process' | ||
1 | export * from './process-accept' | 2 | export * from './process-accept' |
2 | export * from './process-add' | 3 | export * from './process-add' |
3 | export * from './process-announce' | 4 | export * from './process-announce' |
diff --git a/server/lib/activitypub/process/process.ts b/server/lib/activitypub/process/process.ts new file mode 100644 index 000000000..613597341 --- /dev/null +++ b/server/lib/activitypub/process/process.ts | |||
@@ -0,0 +1,38 @@ | |||
1 | import { Activity, ActivityType } from '../../../../shared/models/activitypub/activity' | ||
2 | import { AccountInstance } from '../../../models/account/account-interface' | ||
3 | import { processAcceptActivity } from './process-accept' | ||
4 | import { processAddActivity } from './process-add' | ||
5 | import { processAnnounceActivity } from './process-announce' | ||
6 | import { processCreateActivity } from './process-create' | ||
7 | import { processDeleteActivity } from './process-delete' | ||
8 | import { processFollowActivity } from './process-follow' | ||
9 | import { processUndoActivity } from './process-undo' | ||
10 | import { processUpdateActivity } from './process-update' | ||
11 | import { logger } from '../../../helpers/logger' | ||
12 | |||
13 | const processActivity: { [ P in ActivityType ]: (activity: Activity, inboxAccount?: AccountInstance) => Promise<any> } = { | ||
14 | Create: processCreateActivity, | ||
15 | Add: processAddActivity, | ||
16 | Update: processUpdateActivity, | ||
17 | Delete: processDeleteActivity, | ||
18 | Follow: processFollowActivity, | ||
19 | Accept: processAcceptActivity, | ||
20 | Announce: processAnnounceActivity, | ||
21 | Undo: processUndoActivity | ||
22 | } | ||
23 | |||
24 | async function processActivities (activities: Activity[], inboxAccount?: AccountInstance) { | ||
25 | for (const activity of activities) { | ||
26 | const activityProcessor = processActivity[activity.type] | ||
27 | if (activityProcessor === undefined) { | ||
28 | logger.warn('Unknown activity type %s.', activity.type, { activityId: activity.id }) | ||
29 | continue | ||
30 | } | ||
31 | |||
32 | await activityProcessor(activity, inboxAccount) | ||
33 | } | ||
34 | } | ||
35 | |||
36 | export { | ||
37 | processActivities | ||
38 | } | ||
diff --git a/server/lib/activitypub/send/send-create.ts b/server/lib/activitypub/send/send-create.ts index 14b666fc9..df8e0a642 100644 --- a/server/lib/activitypub/send/send-create.ts +++ b/server/lib/activitypub/send/send-create.ts | |||
@@ -21,6 +21,8 @@ async function sendVideoAbuse (byAccount: AccountInstance, videoAbuse: VideoAbus | |||
21 | return unicastTo(data, byAccount, video.VideoChannel.Account.sharedInboxUrl, t) | 21 | return unicastTo(data, byAccount, video.VideoChannel.Account.sharedInboxUrl, t) |
22 | } | 22 | } |
23 | 23 | ||
24 | // async function sendCreateView () | ||
25 | |||
24 | async function createActivityData (url: string, byAccount: AccountInstance, object: any) { | 26 | async function createActivityData (url: string, byAccount: AccountInstance, object: any) { |
25 | const { to, cc } = await getAudience(byAccount) | 27 | const { to, cc } = await getAudience(byAccount) |
26 | const activity: ActivityCreate = { | 28 | const activity: ActivityCreate = { |
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts new file mode 100644 index 000000000..b8ead32a4 --- /dev/null +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts | |||
@@ -0,0 +1,69 @@ | |||
1 | import { logger } from '../../../helpers' | ||
2 | import { buildSignedActivity } from '../../../helpers/activitypub' | ||
3 | import { doRequest } from '../../../helpers/requests' | ||
4 | import { database as db } from '../../../initializers' | ||
5 | import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' | ||
6 | import { processActivities } from '../../activitypub/process/process' | ||
7 | import { ACTIVITY_PUB } from '../../../initializers/constants' | ||
8 | |||
9 | async function process (payload: ActivityPubHttpPayload, jobId: number) { | ||
10 | logger.info('Processing ActivityPub fetcher in job %d.', jobId) | ||
11 | |||
12 | const options = { | ||
13 | method: 'GET', | ||
14 | uri: '', | ||
15 | json: true | ||
16 | } | ||
17 | |||
18 | for (const uri of payload.uris) { | ||
19 | options.uri = uri | ||
20 | logger.info('Fetching ActivityPub data on %s.', uri) | ||
21 | |||
22 | const response = await doRequest(options) | ||
23 | const firstBody = response.body | ||
24 | |||
25 | if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) { | ||
26 | const activities = firstBody.first.orderedItems | ||
27 | |||
28 | logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, uri) | ||
29 | |||
30 | await processActivities(activities) | ||
31 | } | ||
32 | |||
33 | let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT | ||
34 | let i = 0 | ||
35 | let nextLink = firstBody.first.next | ||
36 | while (nextLink && i < limit) { | ||
37 | options.uri = nextLink | ||
38 | |||
39 | const { body } = await doRequest(options) | ||
40 | nextLink = body.nextLink | ||
41 | i++ | ||
42 | |||
43 | if (Array.isArray(body.orderedItems)) { | ||
44 | const activities = body.orderedItems | ||
45 | logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, uri) | ||
46 | |||
47 | await processActivities(activities) | ||
48 | } | ||
49 | } | ||
50 | } | ||
51 | } | ||
52 | |||
53 | function onError (err: Error, jobId: number) { | ||
54 | logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err) | ||
55 | return Promise.resolve() | ||
56 | } | ||
57 | |||
58 | function onSuccess (jobId: number) { | ||
59 | logger.info('Job %d is a success.', jobId) | ||
60 | return Promise.resolve() | ||
61 | } | ||
62 | |||
63 | // --------------------------------------------------------------------------- | ||
64 | |||
65 | export { | ||
66 | process, | ||
67 | onError, | ||
68 | onSuccess | ||
69 | } | ||
diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts index e4f6c94a5..aef217ce7 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts | |||
@@ -2,16 +2,18 @@ import { JobScheduler, JobHandler } from '../job-scheduler' | |||
2 | 2 | ||
3 | import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler' | 3 | import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler' |
4 | import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler' | 4 | import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler' |
5 | import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler' | ||
5 | import { JobCategory } from '../../../../shared' | 6 | import { JobCategory } from '../../../../shared' |
6 | 7 | ||
7 | type ActivityPubHttpPayload = { | 8 | type ActivityPubHttpPayload = { |
8 | uris: string[] | 9 | uris: string[] |
9 | signatureAccountId: number | 10 | signatureAccountId?: number |
10 | body: any | 11 | body?: any |
11 | } | 12 | } |
12 | const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = { | 13 | const jobHandlers: { [ handlerName: string ]: JobHandler<ActivityPubHttpPayload, void> } = { |
13 | activitypubHttpBroadcastHandler, | 14 | activitypubHttpBroadcastHandler, |
14 | activitypubHttpUnicastHandler | 15 | activitypubHttpUnicastHandler, |
16 | activitypubHttpFetcherHandler | ||
15 | } | 17 | } |
16 | const jobCategory: JobCategory = 'activitypub-http' | 18 | const jobCategory: JobCategory = 'activitypub-http' |
17 | 19 | ||