X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=server%2Flib%2Fjob-queue%2Fhandlers%2Factivitypub-http-fetcher.ts;h=de533de6c5ef7ce40c07161447b6416410c2e67e;hb=c8fa571f32b10c083fab07f28d2ef55895ef40af;hp=4683beb2f58f40e88425f5b06c4e30a37abc55fb;hpb=71e3dfda4e2bcc228415c0d66b09a84bb73dddd1;p=github%2FChocobozzz%2FPeerTube.git diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index 4683beb2f..de533de6c 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts @@ -1,60 +1,37 @@ -import * as kue from 'kue' +import { Job } from 'bull' +import { ActivitypubHttpFetcherPayload, FetchType } from '@shared/models' import { logger } from '../../../helpers/logger' -import { doRequest } from '../../../helpers/requests' -import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../../initializers' +import { VideoModel } from '../../../models/video/video' +import { VideoCommentModel } from '../../../models/video/video-comment' +import { VideoShareModel } from '../../../models/video/video-share' +import { MVideoFullLight } from '../../../types/models' +import { crawlCollectionPage } from '../../activitypub/crawl' +import { createAccountPlaylists } from '../../activitypub/playlists' import { processActivities } from '../../activitypub/process' -import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' +import { addVideoShares } from '../../activitypub/share' +import { addVideoComments } from '../../activitypub/video-comments' -export type ActivitypubHttpFetcherPayload = { - uris: string[] -} - -async function processActivityPubHttpFetcher (job: kue.Job) { +async function processActivityPubHttpFetcher (job: Job) { logger.info('Processing ActivityPub fetcher in job %d.', job.id) - const payload = job.data as ActivitypubHttpBroadcastPayload - - const options = { - method: 'GET', - uri: '', - json: true, - activityPub: true, - timeout: JOB_REQUEST_TIMEOUT - } - - for (const uri of payload.uris) { - options.uri = uri - logger.info('Fetching ActivityPub data on %s.', uri) - - const response = await doRequest(options) - const firstBody = response.body + const payload = job.data as ActivitypubHttpFetcherPayload - if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) { - const activities = firstBody.first.orderedItems + let video: MVideoFullLight + if (payload.videoId) video = await VideoModel.loadFull(payload.videoId) - logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) - - await processActivities(activities) - } - - let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT - let i = 0 - let nextLink = firstBody.first.next - while (nextLink && i < limit) { - options.uri = nextLink - - const { body } = await doRequest(options) - nextLink = body.next - i++ - - if (Array.isArray(body.orderedItems)) { - const activities = body.orderedItems - logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) + const fetcherType: { [ id in FetchType ]: (items: any[]) => Promise } = { + 'activity': items => processActivities(items, { outboxUrl: payload.uri, fromFetch: true }), + 'video-shares': items => addVideoShares(items, video), + 'video-comments': items => addVideoComments(items), + 'account-playlists': items => createAccountPlaylists(items) + } - await processActivities(activities) - } - } + const cleanerType: { [ id in FetchType ]?: (crawlStartDate: Date) => Promise } = { + 'video-shares': crawlStartDate => VideoShareModel.cleanOldSharesOf(video.id, crawlStartDate), + 'video-comments': crawlStartDate => VideoCommentModel.cleanOldCommentsOf(video.id, crawlStartDate) } + + return crawlCollectionPage(payload.uri, fetcherType[payload.type], cleanerType[payload.type]) } // ---------------------------------------------------------------------------