From c986175d68a18e96fbd41537a05c7796a2c64f38 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 22 Nov 2017 10:29:55 +0100 Subject: Fetch outbox to grab old activities --- server/controllers/activitypub/inbox.ts | 33 +---------- server/controllers/activitypub/outbox.ts | 2 - server/controllers/api/server/follows.ts | 3 + server/helpers/activitypub.ts | 14 +++-- server/initializers/constants.ts | 1 + server/lib/activitypub/fetch.ts | 15 +++++ server/lib/activitypub/index.ts | 1 + server/lib/activitypub/process/index.ts | 1 + server/lib/activitypub/process/process.ts | 38 ++++++++++++ server/lib/activitypub/send/send-create.ts | 2 + .../activitypub-http-fetcher-handler.ts | 69 ++++++++++++++++++++++ .../activitypub-http-job-scheduler.ts | 8 ++- 12 files changed, 145 insertions(+), 42 deletions(-) create mode 100644 server/lib/activitypub/fetch.ts create mode 100644 server/lib/activitypub/process/process.ts create mode 100644 server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts (limited to 'server') diff --git a/server/controllers/activitypub/inbox.ts b/server/controllers/activitypub/inbox.ts index 807d0bdf4..30e7f706b 100644 --- a/server/controllers/activitypub/inbox.ts +++ b/server/controllers/activitypub/inbox.ts @@ -1,27 +1,10 @@ import * as express from 'express' -import { Activity, ActivityPubCollection, ActivityPubOrderedCollection, ActivityType, RootActivity } from '../../../shared' +import { Activity, ActivityPubCollection, ActivityPubOrderedCollection, RootActivity } from '../../../shared' import { logger } from '../../helpers' import { isActivityValid } from '../../helpers/custom-validators/activitypub/activity' -import { processCreateActivity, processUpdateActivity, processUndoActivity } from '../../lib' -import { processAcceptActivity } from '../../lib/activitypub/process/process-accept' -import { processAddActivity } from '../../lib/activitypub/process/process-add' -import { processAnnounceActivity } from '../../lib/activitypub/process/process-announce' -import { processDeleteActivity } from '../../lib/activitypub/process/process-delete' -import { processFollowActivity } from '../../lib/activitypub/process/process-follow' +import { processActivities } from '../../lib/activitypub/process/process' import { asyncMiddleware, checkSignature, localAccountValidator, signatureValidator } from '../../middlewares' import { activityPubValidator } from '../../middlewares/validators/activitypub/activity' -import { AccountInstance } from '../../models/account/account-interface' - -const processActivity: { [ P in ActivityType ]: (activity: Activity, inboxAccount?: AccountInstance) => Promise } = { - Create: processCreateActivity, - Add: processAddActivity, - Update: processUpdateActivity, - Delete: processDeleteActivity, - Follow: processFollowActivity, - Accept: processAcceptActivity, - Announce: processAnnounceActivity, - Undo: processUndoActivity -} const inboxRouter = express.Router() @@ -69,15 +52,3 @@ async function inboxController (req: express.Request, res: express.Response, nex res.status(204).end() } - -async function processActivities (activities: Activity[], inboxAccount?: AccountInstance) { - for (const activity of activities) { - const activityProcessor = processActivity[activity.type] - if (activityProcessor === undefined) { - logger.warn('Unknown activity type %s.', activity.type, { activityId: activity.id }) - continue - } - - await activityProcessor(activity, inboxAccount) - } -} diff --git a/server/controllers/activitypub/outbox.ts b/server/controllers/activitypub/outbox.ts index 396fa2db5..1a74bde33 100644 --- a/server/controllers/activitypub/outbox.ts +++ b/server/controllers/activitypub/outbox.ts @@ -34,8 +34,6 @@ async function outboxController (req: express.Request, res: express.Response, ne const data = await db.Video.listAllAndSharedByAccountForOutbox(account.id, start, count) const activities: Activity[] = [] - console.log(account.url) - for (const video of data.data) { const videoObject = video.toActivityPubObject() let addActivity: ActivityAdd = await addActivityData(video.url, account, video, video.VideoChannel.url, videoObject) diff --git a/server/controllers/api/server/follows.ts b/server/controllers/api/server/follows.ts index c759824e0..4b54afc8d 100644 --- a/server/controllers/api/server/follows.ts +++ b/server/controllers/api/server/follows.ts @@ -19,6 +19,7 @@ import { sendUndoFollow } from '../../../lib/activitypub/send/send-undo' import { AccountInstance } from '../../../models/account/account-interface' import { retryTransactionWrapper } from '../../../helpers/database-utils' import { saveAccountAndServerIfNotExist } from '../../../lib/activitypub/account' +import { addFetchOutboxJob } from '../../../lib/activitypub/fetch' const serverFollowsRouter = express.Router() @@ -136,6 +137,8 @@ async function follow (fromAccount: AccountInstance, targetAccount: AccountInsta if (accountFollow.state === 'pending') { await sendFollow(accountFollow, t) } + + await addFetchOutboxJob(targetAccount, t) }) } catch (err) { // Reset target account diff --git a/server/helpers/activitypub.ts b/server/helpers/activitypub.ts index 04d85b8e6..fb4a43a01 100644 --- a/server/helpers/activitypub.ts +++ b/server/helpers/activitypub.ts @@ -46,14 +46,16 @@ function activityPubCollectionPagination (url: string, page: number, result: Res orderedItems: result.data } - const obj = { - id: url, - type: 'OrderedCollection', - totalItems: result.total, - orderedItems: orderedCollectionPagination + if (page === 1) { + return activityPubContextify({ + id: url, + type: 'OrderedCollection', + totalItems: result.total, + first: orderedCollectionPagination + }) } - return activityPubContextify(obj) + return orderedCollectionPagination } function buildSignedActivity (byAccount: AccountInstance, data: Object) { diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index 7c0640cc0..398691eba 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -228,6 +228,7 @@ const ACTIVITY_PUB = { ACCEPT_HEADER: 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"', PUBLIC: 'https://www.w3.org/ns/activitystreams#Public', COLLECTION_ITEMS_PER_PAGE: 10, + FETCH_PAGE_LIMIT: 100, URL_MIME_TYPES: { VIDEO: [ 'video/mp4', 'video/webm', 'video/ogg' ], // TODO: Merge with VIDEO_MIMETYPE_EXT TORRENT: [ 'application/x-bittorrent' ], diff --git a/server/lib/activitypub/fetch.ts b/server/lib/activitypub/fetch.ts new file mode 100644 index 000000000..fd2af0761 --- /dev/null +++ b/server/lib/activitypub/fetch.ts @@ -0,0 +1,15 @@ +import { Transaction } from 'sequelize' +import { AccountInstance } from '../../models/account/account-interface' +import { activitypubHttpJobScheduler, ActivityPubHttpPayload } from '../jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler' + +async function addFetchOutboxJob (account: AccountInstance, t: Transaction) { + const jobPayload: ActivityPubHttpPayload = { + uris: [ account.outboxUrl ] + } + + return activitypubHttpJobScheduler.createJob(t, 'activitypubHttpFetcherHandler', jobPayload) +} + +export { + addFetchOutboxJob +} diff --git a/server/lib/activitypub/index.ts b/server/lib/activitypub/index.ts index b1daa70bb..fcea662a6 100644 --- a/server/lib/activitypub/index.ts +++ b/server/lib/activitypub/index.ts @@ -1,6 +1,7 @@ export * from './process' export * from './send' export * from './account' +export * from './fetch' export * from './share' export * from './video-channels' export * from './videos' diff --git a/server/lib/activitypub/process/index.ts b/server/lib/activitypub/process/index.ts index e80b46b6f..c68312053 100644 --- a/server/lib/activitypub/process/index.ts +++ b/server/lib/activitypub/process/index.ts @@ -1,3 +1,4 @@ +export * from './process' export * from './process-accept' export * from './process-add' export * from './process-announce' diff --git a/server/lib/activitypub/process/process.ts b/server/lib/activitypub/process/process.ts new file mode 100644 index 000000000..613597341 --- /dev/null +++ b/server/lib/activitypub/process/process.ts @@ -0,0 +1,38 @@ +import { Activity, ActivityType } from '../../../../shared/models/activitypub/activity' +import { AccountInstance } from '../../../models/account/account-interface' +import { processAcceptActivity } from './process-accept' +import { processAddActivity } from './process-add' +import { processAnnounceActivity } from './process-announce' +import { processCreateActivity } from './process-create' +import { processDeleteActivity } from './process-delete' +import { processFollowActivity } from './process-follow' +import { processUndoActivity } from './process-undo' +import { processUpdateActivity } from './process-update' +import { logger } from '../../../helpers/logger' + +const processActivity: { [ P in ActivityType ]: (activity: Activity, inboxAccount?: AccountInstance) => Promise } = { + Create: processCreateActivity, + Add: processAddActivity, + Update: processUpdateActivity, + Delete: processDeleteActivity, + Follow: processFollowActivity, + Accept: processAcceptActivity, + Announce: processAnnounceActivity, + Undo: processUndoActivity +} + +async function processActivities (activities: Activity[], inboxAccount?: AccountInstance) { + for (const activity of activities) { + const activityProcessor = processActivity[activity.type] + if (activityProcessor === undefined) { + logger.warn('Unknown activity type %s.', activity.type, { activityId: activity.id }) + continue + } + + await activityProcessor(activity, inboxAccount) + } +} + +export { + processActivities +} diff --git a/server/lib/activitypub/send/send-create.ts b/server/lib/activitypub/send/send-create.ts index 14b666fc9..df8e0a642 100644 --- a/server/lib/activitypub/send/send-create.ts +++ b/server/lib/activitypub/send/send-create.ts @@ -21,6 +21,8 @@ async function sendVideoAbuse (byAccount: AccountInstance, videoAbuse: VideoAbus return unicastTo(data, byAccount, video.VideoChannel.Account.sharedInboxUrl, t) } +// async function sendCreateView () + async function createActivityData (url: string, byAccount: AccountInstance, object: any) { const { to, cc } = await getAudience(byAccount) const activity: ActivityCreate = { diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts new file mode 100644 index 000000000..b8ead32a4 --- /dev/null +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-fetcher-handler.ts @@ -0,0 +1,69 @@ +import { logger } from '../../../helpers' +import { buildSignedActivity } from '../../../helpers/activitypub' +import { doRequest } from '../../../helpers/requests' +import { database as db } from '../../../initializers' +import { ActivityPubHttpPayload } from './activitypub-http-job-scheduler' +import { processActivities } from '../../activitypub/process/process' +import { ACTIVITY_PUB } from '../../../initializers/constants' + +async function process (payload: ActivityPubHttpPayload, jobId: number) { + logger.info('Processing ActivityPub fetcher in job %d.', jobId) + + const options = { + method: 'GET', + uri: '', + json: true + } + + for (const uri of payload.uris) { + options.uri = uri + logger.info('Fetching ActivityPub data on %s.', uri) + + const response = await doRequest(options) + const firstBody = response.body + + if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) { + const activities = firstBody.first.orderedItems + + logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, uri) + + await processActivities(activities) + } + + let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT + let i = 0 + let nextLink = firstBody.first.next + while (nextLink && i < limit) { + options.uri = nextLink + + const { body } = await doRequest(options) + nextLink = body.nextLink + i++ + + if (Array.isArray(body.orderedItems)) { + const activities = body.orderedItems + logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, uri) + + await processActivities(activities) + } + } + } +} + +function onError (err: Error, jobId: number) { + logger.error('Error when broadcasting ActivityPub request in job %d.', jobId, err) + return Promise.resolve() +} + +function onSuccess (jobId: number) { + logger.info('Job %d is a success.', jobId) + return Promise.resolve() +} + +// --------------------------------------------------------------------------- + +export { + process, + onError, + onSuccess +} diff --git a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts index e4f6c94a5..aef217ce7 100644 --- a/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts +++ b/server/lib/jobs/activitypub-http-job-scheduler/activitypub-http-job-scheduler.ts @@ -2,16 +2,18 @@ import { JobScheduler, JobHandler } from '../job-scheduler' import * as activitypubHttpBroadcastHandler from './activitypub-http-broadcast-handler' import * as activitypubHttpUnicastHandler from './activitypub-http-unicast-handler' +import * as activitypubHttpFetcherHandler from './activitypub-http-fetcher-handler' import { JobCategory } from '../../../../shared' type ActivityPubHttpPayload = { uris: string[] - signatureAccountId: number - body: any + signatureAccountId?: number + body?: any } const jobHandlers: { [ handlerName: string ]: JobHandler } = { activitypubHttpBroadcastHandler, - activitypubHttpUnicastHandler + activitypubHttpUnicastHandler, + activitypubHttpFetcherHandler } const jobCategory: JobCategory = 'activitypub-http' -- cgit v1.2.3