diff options
author | Chocobozzz <me@florianbigard.com> | 2018-05-25 16:21:16 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2018-05-25 16:21:16 +0200 |
commit | 8fffe21a7bc96d08b229293d66ddba576e609790 (patch) | |
tree | 5ebd5f5198a59084c5338ce197d7e836b39200a4 /server/lib | |
parent | e251f170b00b2014ac4e823113c6ff40e3fb1471 (diff) | |
download | PeerTube-8fffe21a7bc96d08b229293d66ddba576e609790.tar.gz PeerTube-8fffe21a7bc96d08b229293d66ddba576e609790.tar.zst PeerTube-8fffe21a7bc96d08b229293d66ddba576e609790.zip |
Refractor and optimize AP collections
Only display urls in general object, and paginate video comments, shares, likes and
dislikes
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/activitypub/crawl.ts | 40 | ||||
-rw-r--r-- | server/lib/activitypub/video-comments.ts | 2 | ||||
-rw-r--r-- | server/lib/activitypub/videos.ts | 27 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-fetcher.ts | 44 |
4 files changed, 54 insertions, 59 deletions
diff --git a/server/lib/activitypub/crawl.ts b/server/lib/activitypub/crawl.ts new file mode 100644 index 000000000..7305b3969 --- /dev/null +++ b/server/lib/activitypub/crawl.ts | |||
@@ -0,0 +1,40 @@ | |||
1 | import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../initializers' | ||
2 | import { doRequest } from '../../helpers/requests' | ||
3 | import { logger } from '../../helpers/logger' | ||
4 | |||
5 | async function crawlCollectionPage <T> (uri: string, handler: (items: T[]) => Promise<any>) { | ||
6 | logger.info('Crawling ActivityPub data on %s.', uri) | ||
7 | |||
8 | const options = { | ||
9 | method: 'GET', | ||
10 | uri, | ||
11 | json: true, | ||
12 | activityPub: true, | ||
13 | timeout: JOB_REQUEST_TIMEOUT | ||
14 | } | ||
15 | |||
16 | const response = await doRequest(options) | ||
17 | const firstBody = response.body | ||
18 | |||
19 | let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT | ||
20 | let i = 0 | ||
21 | let nextLink = firstBody.first | ||
22 | while (nextLink && i < limit) { | ||
23 | options.uri = nextLink | ||
24 | |||
25 | const { body } = await doRequest(options) | ||
26 | nextLink = body.next | ||
27 | i++ | ||
28 | |||
29 | if (Array.isArray(body.orderedItems)) { | ||
30 | const items = body.orderedItems | ||
31 | logger.info('Processing %i ActivityPub items for %s.', items.length, nextLink) | ||
32 | |||
33 | await handler(items) | ||
34 | } | ||
35 | } | ||
36 | } | ||
37 | |||
38 | export { | ||
39 | crawlCollectionPage | ||
40 | } | ||
diff --git a/server/lib/activitypub/video-comments.ts b/server/lib/activitypub/video-comments.ts index 60c9179a6..fd03710c2 100644 --- a/server/lib/activitypub/video-comments.ts +++ b/server/lib/activitypub/video-comments.ts | |||
@@ -37,7 +37,7 @@ async function videoCommentActivityObjectToDBAttributes (video: VideoModel, acto | |||
37 | } | 37 | } |
38 | } | 38 | } |
39 | 39 | ||
40 | async function addVideoComments (instance: VideoModel, commentUrls: string[]) { | 40 | async function addVideoComments (commentUrls: string[], instance: VideoModel) { |
41 | for (const commentUrl of commentUrls) { | 41 | for (const commentUrl of commentUrls) { |
42 | await addVideoComment(instance, commentUrl) | 42 | await addVideoComment(instance, commentUrl) |
43 | } | 43 | } |
diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index dbd7385a4..be6794cef 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts | |||
@@ -20,6 +20,7 @@ import { VideoFileModel } from '../../models/video/video-file' | |||
20 | import { VideoShareModel } from '../../models/video/video-share' | 20 | import { VideoShareModel } from '../../models/video/video-share' |
21 | import { getOrCreateActorAndServerAndModel } from './actor' | 21 | import { getOrCreateActorAndServerAndModel } from './actor' |
22 | import { addVideoComments } from './video-comments' | 22 | import { addVideoComments } from './video-comments' |
23 | import { crawlCollectionPage } from './crawl' | ||
23 | 24 | ||
24 | function fetchRemoteVideoPreview (video: VideoModel, reject: Function) { | 25 | function fetchRemoteVideoPreview (video: VideoModel, reject: Function) { |
25 | const host = video.VideoChannel.Account.Actor.Server.host | 26 | const host = video.VideoChannel.Account.Actor.Server.host |
@@ -216,25 +217,17 @@ async function getOrCreateAccountAndVideoAndChannel (videoObject: VideoTorrentOb | |||
216 | const video = await retryTransactionWrapper(getOrCreateVideo, options) | 217 | const video = await retryTransactionWrapper(getOrCreateVideo, options) |
217 | 218 | ||
218 | // Process outside the transaction because we could fetch remote data | 219 | // Process outside the transaction because we could fetch remote data |
219 | if (videoObject.likes && Array.isArray(videoObject.likes.orderedItems)) { | 220 | logger.info('Adding likes of video %s.', video.uuid) |
220 | logger.info('Adding likes of video %s.', video.uuid) | 221 | await crawlCollectionPage<string>(videoObject.likes, (items) => createRates(items, video, 'like')) |
221 | await createRates(videoObject.likes.orderedItems, video, 'like') | ||
222 | } | ||
223 | 222 | ||
224 | if (videoObject.dislikes && Array.isArray(videoObject.dislikes.orderedItems)) { | 223 | logger.info('Adding dislikes of video %s.', video.uuid) |
225 | logger.info('Adding dislikes of video %s.', video.uuid) | 224 | await crawlCollectionPage<string>(videoObject.dislikes, (items) => createRates(items, video, 'dislike')) |
226 | await createRates(videoObject.dislikes.orderedItems, video, 'dislike') | ||
227 | } | ||
228 | 225 | ||
229 | if (videoObject.shares && Array.isArray(videoObject.shares.orderedItems)) { | 226 | logger.info('Adding shares of video %s.', video.uuid) |
230 | logger.info('Adding shares of video %s.', video.uuid) | 227 | await crawlCollectionPage<string>(videoObject.shares, (items) => addVideoShares(items, video)) |
231 | await addVideoShares(video, videoObject.shares.orderedItems) | ||
232 | } | ||
233 | 228 | ||
234 | if (videoObject.comments && Array.isArray(videoObject.comments.orderedItems)) { | 229 | logger.info('Adding comments of video %s.', video.uuid) |
235 | logger.info('Adding comments of video %s.', video.uuid) | 230 | await crawlCollectionPage<string>(videoObject.comments, (items) => addVideoComments(items, video)) |
236 | await addVideoComments(video, videoObject.comments.orderedItems) | ||
237 | } | ||
238 | 231 | ||
239 | return { actor, channelActor, video } | 232 | return { actor, channelActor, video } |
240 | } | 233 | } |
@@ -266,7 +259,7 @@ async function createRates (actorUrls: string[], video: VideoModel, rate: VideoR | |||
266 | return | 259 | return |
267 | } | 260 | } |
268 | 261 | ||
269 | async function addVideoShares (instance: VideoModel, shareUrls: string[]) { | 262 | async function addVideoShares (shareUrls: string[], instance: VideoModel) { |
270 | for (const shareUrl of shareUrls) { | 263 | for (const shareUrl of shareUrls) { |
271 | // Fetch url | 264 | // Fetch url |
272 | const { body } = await doRequest({ | 265 | const { body } = await doRequest({ |
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index 4683beb2f..10c0e606f 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -1,9 +1,9 @@ | |||
1 | import * as kue from 'kue' | 1 | import * as kue from 'kue' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { doRequest } from '../../../helpers/requests' | ||
4 | import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../../initializers' | ||
5 | import { processActivities } from '../../activitypub/process' | 3 | import { processActivities } from '../../activitypub/process' |
6 | import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' | 4 | import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' |
5 | import { crawlCollectionPage } from '../../activitypub/crawl' | ||
6 | import { Activity } from '../../../../shared/models/activitypub' | ||
7 | 7 | ||
8 | export type ActivitypubHttpFetcherPayload = { | 8 | export type ActivitypubHttpFetcherPayload = { |
9 | uris: string[] | 9 | uris: string[] |
@@ -14,46 +14,8 @@ async function processActivityPubHttpFetcher (job: kue.Job) { | |||
14 | 14 | ||
15 | const payload = job.data as ActivitypubHttpBroadcastPayload | 15 | const payload = job.data as ActivitypubHttpBroadcastPayload |
16 | 16 | ||
17 | const options = { | ||
18 | method: 'GET', | ||
19 | uri: '', | ||
20 | json: true, | ||
21 | activityPub: true, | ||
22 | timeout: JOB_REQUEST_TIMEOUT | ||
23 | } | ||
24 | |||
25 | for (const uri of payload.uris) { | 17 | for (const uri of payload.uris) { |
26 | options.uri = uri | 18 | await crawlCollectionPage<Activity>(uri, (items) => processActivities(items)) |
27 | logger.info('Fetching ActivityPub data on %s.', uri) | ||
28 | |||
29 | const response = await doRequest(options) | ||
30 | const firstBody = response.body | ||
31 | |||
32 | if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) { | ||
33 | const activities = firstBody.first.orderedItems | ||
34 | |||
35 | logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) | ||
36 | |||
37 | await processActivities(activities) | ||
38 | } | ||
39 | |||
40 | let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT | ||
41 | let i = 0 | ||
42 | let nextLink = firstBody.first.next | ||
43 | while (nextLink && i < limit) { | ||
44 | options.uri = nextLink | ||
45 | |||
46 | const { body } = await doRequest(options) | ||
47 | nextLink = body.next | ||
48 | i++ | ||
49 | |||
50 | if (Array.isArray(body.orderedItems)) { | ||
51 | const activities = body.orderedItems | ||
52 | logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri) | ||
53 | |||
54 | await processActivities(activities) | ||
55 | } | ||
56 | } | ||
57 | } | 19 | } |
58 | } | 20 | } |
59 | 21 | ||