aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-05-25 16:21:16 +0200
committerChocobozzz <me@florianbigard.com>2018-05-25 16:21:16 +0200
commit8fffe21a7bc96d08b229293d66ddba576e609790 (patch)
tree5ebd5f5198a59084c5338ce197d7e836b39200a4 /server/lib
parente251f170b00b2014ac4e823113c6ff40e3fb1471 (diff)
downloadPeerTube-8fffe21a7bc96d08b229293d66ddba576e609790.tar.gz
PeerTube-8fffe21a7bc96d08b229293d66ddba576e609790.tar.zst
PeerTube-8fffe21a7bc96d08b229293d66ddba576e609790.zip
Refractor and optimize AP collections
Only display urls in general object, and paginate video comments, shares, likes and dislikes
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/activitypub/crawl.ts40
-rw-r--r--server/lib/activitypub/video-comments.ts2
-rw-r--r--server/lib/activitypub/videos.ts27
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-fetcher.ts44
4 files changed, 54 insertions, 59 deletions
diff --git a/server/lib/activitypub/crawl.ts b/server/lib/activitypub/crawl.ts
new file mode 100644
index 000000000..7305b3969
--- /dev/null
+++ b/server/lib/activitypub/crawl.ts
@@ -0,0 +1,40 @@
1import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../initializers'
2import { doRequest } from '../../helpers/requests'
3import { logger } from '../../helpers/logger'
4
5async function crawlCollectionPage <T> (uri: string, handler: (items: T[]) => Promise<any>) {
6 logger.info('Crawling ActivityPub data on %s.', uri)
7
8 const options = {
9 method: 'GET',
10 uri,
11 json: true,
12 activityPub: true,
13 timeout: JOB_REQUEST_TIMEOUT
14 }
15
16 const response = await doRequest(options)
17 const firstBody = response.body
18
19 let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT
20 let i = 0
21 let nextLink = firstBody.first
22 while (nextLink && i < limit) {
23 options.uri = nextLink
24
25 const { body } = await doRequest(options)
26 nextLink = body.next
27 i++
28
29 if (Array.isArray(body.orderedItems)) {
30 const items = body.orderedItems
31 logger.info('Processing %i ActivityPub items for %s.', items.length, nextLink)
32
33 await handler(items)
34 }
35 }
36}
37
38export {
39 crawlCollectionPage
40}
diff --git a/server/lib/activitypub/video-comments.ts b/server/lib/activitypub/video-comments.ts
index 60c9179a6..fd03710c2 100644
--- a/server/lib/activitypub/video-comments.ts
+++ b/server/lib/activitypub/video-comments.ts
@@ -37,7 +37,7 @@ async function videoCommentActivityObjectToDBAttributes (video: VideoModel, acto
37 } 37 }
38} 38}
39 39
40async function addVideoComments (instance: VideoModel, commentUrls: string[]) { 40async function addVideoComments (commentUrls: string[], instance: VideoModel) {
41 for (const commentUrl of commentUrls) { 41 for (const commentUrl of commentUrls) {
42 await addVideoComment(instance, commentUrl) 42 await addVideoComment(instance, commentUrl)
43 } 43 }
diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts
index dbd7385a4..be6794cef 100644
--- a/server/lib/activitypub/videos.ts
+++ b/server/lib/activitypub/videos.ts
@@ -20,6 +20,7 @@ import { VideoFileModel } from '../../models/video/video-file'
20import { VideoShareModel } from '../../models/video/video-share' 20import { VideoShareModel } from '../../models/video/video-share'
21import { getOrCreateActorAndServerAndModel } from './actor' 21import { getOrCreateActorAndServerAndModel } from './actor'
22import { addVideoComments } from './video-comments' 22import { addVideoComments } from './video-comments'
23import { crawlCollectionPage } from './crawl'
23 24
24function fetchRemoteVideoPreview (video: VideoModel, reject: Function) { 25function fetchRemoteVideoPreview (video: VideoModel, reject: Function) {
25 const host = video.VideoChannel.Account.Actor.Server.host 26 const host = video.VideoChannel.Account.Actor.Server.host
@@ -216,25 +217,17 @@ async function getOrCreateAccountAndVideoAndChannel (videoObject: VideoTorrentOb
216 const video = await retryTransactionWrapper(getOrCreateVideo, options) 217 const video = await retryTransactionWrapper(getOrCreateVideo, options)
217 218
218 // Process outside the transaction because we could fetch remote data 219 // Process outside the transaction because we could fetch remote data
219 if (videoObject.likes && Array.isArray(videoObject.likes.orderedItems)) { 220 logger.info('Adding likes of video %s.', video.uuid)
220 logger.info('Adding likes of video %s.', video.uuid) 221 await crawlCollectionPage<string>(videoObject.likes, (items) => createRates(items, video, 'like'))
221 await createRates(videoObject.likes.orderedItems, video, 'like')
222 }
223 222
224 if (videoObject.dislikes && Array.isArray(videoObject.dislikes.orderedItems)) { 223 logger.info('Adding dislikes of video %s.', video.uuid)
225 logger.info('Adding dislikes of video %s.', video.uuid) 224 await crawlCollectionPage<string>(videoObject.dislikes, (items) => createRates(items, video, 'dislike'))
226 await createRates(videoObject.dislikes.orderedItems, video, 'dislike')
227 }
228 225
229 if (videoObject.shares && Array.isArray(videoObject.shares.orderedItems)) { 226 logger.info('Adding shares of video %s.', video.uuid)
230 logger.info('Adding shares of video %s.', video.uuid) 227 await crawlCollectionPage<string>(videoObject.shares, (items) => addVideoShares(items, video))
231 await addVideoShares(video, videoObject.shares.orderedItems)
232 }
233 228
234 if (videoObject.comments && Array.isArray(videoObject.comments.orderedItems)) { 229 logger.info('Adding comments of video %s.', video.uuid)
235 logger.info('Adding comments of video %s.', video.uuid) 230 await crawlCollectionPage<string>(videoObject.comments, (items) => addVideoComments(items, video))
236 await addVideoComments(video, videoObject.comments.orderedItems)
237 }
238 231
239 return { actor, channelActor, video } 232 return { actor, channelActor, video }
240} 233}
@@ -266,7 +259,7 @@ async function createRates (actorUrls: string[], video: VideoModel, rate: VideoR
266 return 259 return
267} 260}
268 261
269async function addVideoShares (instance: VideoModel, shareUrls: string[]) { 262async function addVideoShares (shareUrls: string[], instance: VideoModel) {
270 for (const shareUrl of shareUrls) { 263 for (const shareUrl of shareUrls) {
271 // Fetch url 264 // Fetch url
272 const { body } = await doRequest({ 265 const { body } = await doRequest({
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
index 4683beb2f..10c0e606f 100644
--- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts
@@ -1,9 +1,9 @@
1import * as kue from 'kue' 1import * as kue from 'kue'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests'
4import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../../initializers'
5import { processActivities } from '../../activitypub/process' 3import { processActivities } from '../../activitypub/process'
6import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' 4import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast'
5import { crawlCollectionPage } from '../../activitypub/crawl'
6import { Activity } from '../../../../shared/models/activitypub'
7 7
8export type ActivitypubHttpFetcherPayload = { 8export type ActivitypubHttpFetcherPayload = {
9 uris: string[] 9 uris: string[]
@@ -14,46 +14,8 @@ async function processActivityPubHttpFetcher (job: kue.Job) {
14 14
15 const payload = job.data as ActivitypubHttpBroadcastPayload 15 const payload = job.data as ActivitypubHttpBroadcastPayload
16 16
17 const options = {
18 method: 'GET',
19 uri: '',
20 json: true,
21 activityPub: true,
22 timeout: JOB_REQUEST_TIMEOUT
23 }
24
25 for (const uri of payload.uris) { 17 for (const uri of payload.uris) {
26 options.uri = uri 18 await crawlCollectionPage<Activity>(uri, (items) => processActivities(items))
27 logger.info('Fetching ActivityPub data on %s.', uri)
28
29 const response = await doRequest(options)
30 const firstBody = response.body
31
32 if (firstBody.first && Array.isArray(firstBody.first.orderedItems)) {
33 const activities = firstBody.first.orderedItems
34
35 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
36
37 await processActivities(activities)
38 }
39
40 let limit = ACTIVITY_PUB.FETCH_PAGE_LIMIT
41 let i = 0
42 let nextLink = firstBody.first.next
43 while (nextLink && i < limit) {
44 options.uri = nextLink
45
46 const { body } = await doRequest(options)
47 nextLink = body.next
48 i++
49
50 if (Array.isArray(body.orderedItems)) {
51 const activities = body.orderedItems
52 logger.info('Processing %i items ActivityPub fetcher for %s.', activities.length, options.uri)
53
54 await processActivities(activities)
55 }
56 }
57 } 19 }
58} 20}
59 21