From 8fffe21a7bc96d08b229293d66ddba576e609790 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Fri, 25 May 2018 16:21:16 +0200 Subject: Refractor and optimize AP collections Only display urls in general object, and paginate video comments, shares, likes and dislikes --- server/lib/activitypub/crawl.ts | 40 ++++++++++++++++++++ server/lib/activitypub/video-comments.ts | 2 +- server/lib/activitypub/videos.ts | 27 +++++-------- .../job-queue/handlers/activitypub-http-fetcher.ts | 44 ++-------------------- 4 files changed, 54 insertions(+), 59 deletions(-) create mode 100644 server/lib/activitypub/crawl.ts (limited to 'server/lib') diff --git a/server/lib/activitypub/crawl.ts b/server/lib/activitypub/crawl.ts new file mode 100644 index 000000000..7305b3969 --- /dev/null +++ b/server/lib/activitypub/crawl.ts @@ -0,0 +1,40 @@ +import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../initializers' +import { doRequest } from '../../helpers/requests' +import { logger } from '../../helpers/logger' + +async function crawlCollectionPage (uri: string, handler: (items: T[]) => Promise) { + logger.info('Crawling ActivityPub data on %s.', uri) + + const options = { + method: 'GET', + uri, + json: true, + activityPub: true, + timeout: JOB_REQUEST_TIMEOUT + } + + const response = await doRequest(options) + const firstBody = response.body + + let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT + let i = 0 + let nextLink = firstBody.first + while (nextLink && i < limit) { + options.uri = nextLink + + const { body } = await doRequest(options) + nextLink = body.next + i++ + + if (Array.isArray(body.orderedItems)) { + const items = body.orderedItems + logger.info('Processing %i ActivityPub items for %s.', items.length, nextLink) + + await handler(items) + } + } +} + +export { + crawlCollectionPage +} diff --git a/server/lib/activitypub/video-comments.ts b/server/lib/activitypub/video-comments.ts index 60c9179a6..fd03710c2 100644 --- a/server/lib/activitypub/video-comments.ts +++ b/server/lib/activitypub/video-comments.ts @@ -37,7 +37,7 @@ async function videoCommentActivityObjectToDBAttributes (video: VideoModel, acto } } -async function addVideoComments (instance: VideoModel, commentUrls: string[]) { +async function addVideoComments (commentUrls: string[], instance: VideoModel) { for (const commentUrl of commentUrls) { await addVideoComment(instance, commentUrl) } diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index dbd7385a4..be6794cef 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts @@ -20,6 +20,7 @@ import { VideoFileModel } from '../../models/video/video-file' import { VideoShareModel } from '../../models/video/video-share' import { getOrCreateActorAndServerAndModel } from './actor' import { addVideoComments } from './video-comments' +import { crawlCollectionPage } from './crawl' function fetchRemoteVideoPreview (video: VideoModel, reject: Function) { const host = video.VideoChannel.Account.Actor.Server.host @@ -216,25 +217,17 @@ async function getOrCreateAccountAndVideoAndChannel (videoObject: VideoTorrentOb const video = await retryTransactionWrapper(getOrCreateVideo, options) // Process outside the transaction because we could fetch remote data - if (videoObject.likes && Array.isArray(videoObject.likes.orderedItems)) { - logger.info('Adding likes of video %s.', video.uuid) - await createRates(videoObject.likes.orderedItems, video, 'like') - } + logger.info('Adding likes of video %s.', video.uuid) + await crawlCollectionPage(videoObject.likes, (items) => createRates(items, video, 'like')) - if (videoObject.dislikes && Array.isArray(videoObject.dislikes.orderedItems)) { - logger.info('Adding dislikes of video %s.', video.uuid) - await createRates(videoObject.dislikes.orderedItems, video, 'dislike') - } + logger.info('Adding dislikes of video %s.', video.uuid) + await crawlCollectionPage(videoObject.dislikes, (items) => createRates(items, video, 'dislike')) - if (videoObject.shares && Array.isArray(videoObject.shares.orderedItems)) { - logger.info('Adding shares of video %s.', video.uuid) - await addVideoShares(video, videoObject.shares.orderedItems) - } + logger.info('Adding shares of video %s.', video.uuid) + await crawlCollectionPage(videoObject.shares, (items) => addVideoShares(items, video)) - if (videoObject.comments && Array.isArray(videoObject.comments.orderedItems)) { - logger.info('Adding comments of video %s.', video.uuid) - await addVideoComments(video, videoObject.comments.orderedItems) - } + logger.info('Adding comments of video %s.', video.uuid) + await crawlCollectionPage(videoObject.comments, (items) => addVideoComments(items, video)) return { actor, channelActor, video } } @@ -266,7 +259,7 @@ async function createRates (actorUrls: string[], video: VideoModel, rate: VideoR return } -async function addVideoShares (instance: VideoModel, shareUrls: string[]) { +async function addVideoShares (shareUrls: string[], instance: VideoModel) { for (const shareUrl of shareUrls) { // Fetch url const { body } = await doRequest({ diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index 4683beb2f..10c0e606f 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts @@ -1,9 +1,9 @@ import * as kue from 'kue' import { logger } from '../../../helpers/logger' -import { doRequest } from '../../../helpers/requests' -import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../../initializers' import { processActivities } from '../../activitypub/process' import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' +import { crawlCollectionPage } from '../../activitypub/crawl' +import { Activity } from '../../../../shared/models/activitypub' export type ActivitypubHttpFetcherPayload = { uris: string[] @@ -14,46 +14,8 @@ async function processActivityPubHttpFetcher (job: kue.Job) { const payload = job.data as ActivitypubHttpBroadcastPayload - const options = { - method: 'GET', - uri: '', - json: true, - activityPub: true, - timeout: JOB_REQUEST_TIMEOUT - } - for (const uri of payload.uris) { - options.uri = uri - logger.info('Fetching ActivityPub data on %s.', uri) - - const response = await doRequest(options) - const firstBody = response.body - - if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) { - const activities = firstBody.first.orderedItems - - logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) - - await processActivities(activities) - } - - let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT - let i = 0 - let nextLink = firstBody.first.next - while (nextLink && i < limit) { - options.uri = nextLink - - const { body } = await doRequest(options) - nextLink = body.next - i++ - - if (Array.isArray(body.orderedItems)) { - const activities = body.orderedItems - logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) - - await processActivities(activities) - } - } + await crawlCollectionPage(uri, (items) => processActivities(items)) } } -- cgit v1.2.3