From c986175d68a18e96fbd41537a05c7796a2c64f38 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 22 Nov 2017 10:29:55 +0100 Subject: Fetch outbox to grab old activities --- server/lib/activitypub/fetch.ts | 15 +++++ server/lib/activitypub/index.ts | 1 + server/lib/activitypub/process/index.ts | 1 + server/lib/activitypub/process/process.ts | 38 ++++++++++++ server/lib/activitypub/send/send-create.ts | 2 + .../activitypub-http-fetcher-handler.ts | 69 ++++++++++++++++++++++ .../activitypub-http-job-scheduler.ts | 8 ++- 7 files changed, 131 insertions(+), 3 deletions(-) create mode 100644 server/lib/activitypub/fetch.ts create mode 100644 server/lib/activitypub/process/process.ts create mode 100644 server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts (limited to 'server/lib') diff --git a/server/lib/activitypub/fetch.ts b/server/lib/activitypub/fetch.ts new file mode 100644 index 000000000..fd2af0761 --- /dev/null +++ b/server/lib/activitypub/fetch.ts @@ -0,0 +1,15 @@ +import { Transaction } from 'sequelize' +import { AccountInstance } from '../../models/account/account-interface' +import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler' + +async function addFetchOutboxJob (account: AccountInstance, t: Transaction) { + const jobPayload: ActivityPubHttpPayload = { + uris: [ account.outboxUrl ] + } + + return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpFetcherHandler', jobPayload) +} + +export { + addFetchOutboxJob +} diff --git a/server/lib/activitypub/index.ts b/server/lib/activitypub/index.ts index b1daa70bb..fcea662a6 100644 --- a/server/lib/activitypub/index.ts +++ b/server/lib/activitypub/index.ts @@ -1,6 +1,7 @@ export * from './process' export * from './send' export * from './account' +export * from './fetch' export * from './share' export * from './video-channels' export * from './videos' diff --git a/server/lib/activitypub/process/index.ts b/server/lib/activitypub/process/index.ts index e80b46b6f..c68312053 100644 --- a/server/lib/activitypub/process/index.ts +++ b/server/lib/activitypub/process/index.ts @@ -1,3 +1,4 @@ +export * from './process' export * from './process-accept' export * from './process-add' export * from './process-announce' diff --git a/server/lib/activitypub/process/process.ts b/server/lib/activitypub/process/process.ts new file mode 100644 index 000000000..613597341 --- /dev/null +++ b/server/lib/activitypub/process/process.ts @@ -0,0 +1,38 @@ +import { Activity, ActivityType } from '../../../../shared/models/activitypub/activity' +import { AccountInstance } from '../../../models/account/account-interface' +import { processAcceptActivity } from './process-accept' +import { processAddActivity } from './process-add' +import { processAnnounceActivity } from './process-announce' +import { processCreateActivity } from './process-create' +import { processDeleteActivity } from './process-delete' +import { processFollowActivity } from './process-follow' +import { processUndoActivity } from './process-undo' +import { processUpdateActivity } from './process-update' +import { logger } from '../../../helpers/logger' + +const processActivity: { [ P in ActivityType ]: (activity: Activity, inboxAccount?: AccountInstance) => Promise } = { + Create: processCreateActivity, + Add: processAddActivity, + Update: processUpdateActivity, + Delete: processDeleteActivity, + Follow: processFollowActivity, + Accept: processAcceptActivity, + Announce: processAnnounceActivity, + Undo: processUndoActivity +} + +async function processActivities (activities: Activity[], inboxAccount?: AccountInstance) { + for (const activity of activities) { + const activityProcessor = processActivity[activity.type] + if (activityProcessor === undefined) { + logger.warn('Unknown activity type %s.', activity.type, { activityId: activity.id }) + continue + } + + await activityProcessor(activity, inboxAccount) + } +} + +export { + processActivities +} diff --git a/server/lib/activitypub/send/send-create.ts b/server/lib/activitypub/send/send-create.ts index 14b666fc9..df8e0a642 100644 --- a/server/lib/activitypub/send/send-create.ts +++ b/server/lib/activitypub/send/send-create.ts @@ -21,6 +21,8 @@ async function sendVideoAbuse (byAccount: AccountInstance, videoAbuse: VideoAbus return unicastTo(data, byAccount, video.VideoChannel.Account.sharedInboxUrl, t) } +// async function sendCreateView () + async function createActivityData (url: string, byAccount: AccountInstance, object: any) { const { to, cc } = await getAudience(byAccount) const activity: ActivityCreate = { diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts new file mode 100644 index 000000000..b8ead32a4 --- /dev/null +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts @@ -0,0 +1,69 @@ +import { logger } from '../../../helpers' +import { buildSignedActivity } from '../../../helpers/activitypub' +import { doRequest } from '../../../helpers/requests' +import { database as db } from '../../../initializers' +import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' +import { processActivities } from '../../activitypub/process/process' +import { ACTIVITY_PUB } from '../../../initializers/constants' + +async function process (payload: ActivityPubHttpPayload, jobId: number) { + logger.info('Processing ActivityPub fetcher in job %d.', jobId) + + const options = { + method: 'GET', + uri: '', + json: true + } + + for (const uri of payload.uris) { + options.uri = uri + logger.info('Fetching ActivityPub data on %s.', uri) + + const response = await doRequest(options) + const firstBody = response.body + + if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) { + const activities = firstBody.first.orderedItems + + logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, uri) + + await processActivities(activities) + } + + let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT + let i = 0 + let nextLink = firstBody.first.next + while (nextLink && i < limit) { + options.uri = nextLink + + const { body } = await doRequest(options) + nextLink = body.nextLink + i++ + + if (Array.isArray(body.orderedItems)) { + const activities = body.orderedItems + logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, uri) + + await processActivities(activities) + } + } + } +} + +function onError (err: Error, jobId: number) { + logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err) + return Promise.resolve() +} + +function onSuccess (jobId: number) { + logger.info('Job %d is a success.', jobId) + return Promise.resolve() +} + +// --------------------------------------------------------------------------- + +export { + process, + onError, + onSuccess +} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts index e4f6c94a5..aef217ce7 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts @@ -2,16 +2,18 @@ import { JobScheduler, JobHandler } from '../job-scheduler' import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler' import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler' +import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler' import { JobCategory } from '../../../../shared' type ActivityPubHttpPayload = { uris: string[] - signatureAccountId: number - body: any + signatureAccountId?: number + body?: any } const jobHandlers: { [ handlerName: string ]: JobHandler } = { activitypubHttpBroadcastHandler, - activitypubHttpUnicastHandler + activitypubHttpUnicastHandler, + activitypubHttpFetcherHandler } const jobCategory: JobCategory = 'activitypub-http' -- cgit v1.2.3