diff options
Diffstat (limited to 'server/lib')
-rw-r--r-- | server/lib/activitypub/actor.ts | 4 | ||||
-rw-r--r-- | server/lib/activitypub/crawl.ts | 3 | ||||
-rw-r--r-- | server/lib/activitypub/process/process-announce.ts | 4 | ||||
-rw-r--r-- | server/lib/activitypub/process/process-create.ts | 9 | ||||
-rw-r--r-- | server/lib/activitypub/video-comments.ts | 9 | ||||
-rw-r--r-- | server/lib/activitypub/videos.ts | 182 | ||||
-rw-r--r-- | server/lib/job-queue/handlers/activitypub-http-fetcher.ts | 26 |
7 files changed, 141 insertions, 96 deletions
diff --git a/server/lib/activitypub/actor.ts b/server/lib/activitypub/actor.ts index d84b465b2..9922229d2 100644 --- a/server/lib/activitypub/actor.ts +++ b/server/lib/activitypub/actor.ts | |||
@@ -177,7 +177,8 @@ async function addFetchOutboxJob (actor: ActorModel) { | |||
177 | } | 177 | } |
178 | 178 | ||
179 | const payload = { | 179 | const payload = { |
180 | uris: [ actor.outboxUrl ] | 180 | uri: actor.outboxUrl, |
181 | type: 'activity' as 'activity' | ||
181 | } | 182 | } |
182 | 183 | ||
183 | return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) | 184 | return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) |
@@ -248,6 +249,7 @@ function saveActorAndServerAndModelIfNotExist ( | |||
248 | } else if (actorCreated.type === 'Group') { // Video channel | 249 | } else if (actorCreated.type === 'Group') { // Video channel |
249 | actorCreated.VideoChannel = await saveVideoChannel(actorCreated, result, ownerActor, t) | 250 | actorCreated.VideoChannel = await saveVideoChannel(actorCreated, result, ownerActor, t) |
250 | actorCreated.VideoChannel.Actor = actorCreated | 251 | actorCreated.VideoChannel.Actor = actorCreated |
252 | actorCreated.VideoChannel.Account = ownerActor.Account | ||
251 | } | 253 | } |
252 | 254 | ||
253 | return actorCreated | 255 | return actorCreated |
diff --git a/server/lib/activitypub/crawl.ts b/server/lib/activitypub/crawl.ts index d4fc786f7..55912341c 100644 --- a/server/lib/activitypub/crawl.ts +++ b/server/lib/activitypub/crawl.ts | |||
@@ -1,8 +1,9 @@ | |||
1 | import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../initializers' | 1 | import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../initializers' |
2 | import { doRequest } from '../../helpers/requests' | 2 | import { doRequest } from '../../helpers/requests' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import Bluebird = require('bluebird') | ||
4 | 5 | ||
5 | async function crawlCollectionPage <T> (uri: string, handler: (items: T[]) => Promise<any>) { | 6 | async function crawlCollectionPage <T> (uri: string, handler: (items: T[]) => Promise<any> | Bluebird<any>) { |
6 | logger.info('Crawling ActivityPub data on %s.', uri) | 7 | logger.info('Crawling ActivityPub data on %s.', uri) |
7 | 8 | ||
8 | const options = { | 9 | const options = { |
diff --git a/server/lib/activitypub/process/process-announce.ts b/server/lib/activitypub/process/process-announce.ts index d8ca59425..b08156aa1 100644 --- a/server/lib/activitypub/process/process-announce.ts +++ b/server/lib/activitypub/process/process-announce.ts | |||
@@ -24,10 +24,8 @@ export { | |||
24 | 24 | ||
25 | async function processVideoShare (actorAnnouncer: ActorModel, activity: ActivityAnnounce) { | 25 | async function processVideoShare (actorAnnouncer: ActorModel, activity: ActivityAnnounce) { |
26 | const objectUri = typeof activity.object === 'string' ? activity.object : activity.object.id | 26 | const objectUri = typeof activity.object === 'string' ? activity.object : activity.object.id |
27 | let video: VideoModel | ||
28 | 27 | ||
29 | const res = await getOrCreateAccountAndVideoAndChannel(objectUri) | 28 | const { video } = await getOrCreateAccountAndVideoAndChannel(objectUri) |
30 | video = res.video | ||
31 | 29 | ||
32 | return sequelizeTypescript.transaction(async t => { | 30 | return sequelizeTypescript.transaction(async t => { |
33 | // Add share entry | 31 | // Add share entry |
diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts index 791148919..9655d015f 100644 --- a/server/lib/activitypub/process/process-create.ts +++ b/server/lib/activitypub/process/process-create.ts | |||
@@ -23,7 +23,7 @@ async function processCreateActivity (activity: ActivityCreate) { | |||
23 | } else if (activityType === 'Dislike') { | 23 | } else if (activityType === 'Dislike') { |
24 | return retryTransactionWrapper(processCreateDislike, actor, activity) | 24 | return retryTransactionWrapper(processCreateDislike, actor, activity) |
25 | } else if (activityType === 'Video') { | 25 | } else if (activityType === 'Video') { |
26 | return processCreateVideo(actor, activity) | 26 | return processCreateVideo(activity) |
27 | } else if (activityType === 'Flag') { | 27 | } else if (activityType === 'Flag') { |
28 | return retryTransactionWrapper(processCreateVideoAbuse, actor, activityObject as VideoAbuseObject) | 28 | return retryTransactionWrapper(processCreateVideoAbuse, actor, activityObject as VideoAbuseObject) |
29 | } else if (activityType === 'Note') { | 29 | } else if (activityType === 'Note') { |
@@ -42,13 +42,10 @@ export { | |||
42 | 42 | ||
43 | // --------------------------------------------------------------------------- | 43 | // --------------------------------------------------------------------------- |
44 | 44 | ||
45 | async function processCreateVideo ( | 45 | async function processCreateVideo (activity: ActivityCreate) { |
46 | actor: ActorModel, | ||
47 | activity: ActivityCreate | ||
48 | ) { | ||
49 | const videoToCreateData = activity.object as VideoTorrentObject | 46 | const videoToCreateData = activity.object as VideoTorrentObject |
50 | 47 | ||
51 | const { video } = await getOrCreateAccountAndVideoAndChannel(videoToCreateData, actor) | 48 | const { video } = await getOrCreateAccountAndVideoAndChannel(videoToCreateData) |
52 | 49 | ||
53 | return video | 50 | return video |
54 | } | 51 | } |
diff --git a/server/lib/activitypub/video-comments.ts b/server/lib/activitypub/video-comments.ts index fd03710c2..14c7fde69 100644 --- a/server/lib/activitypub/video-comments.ts +++ b/server/lib/activitypub/video-comments.ts | |||
@@ -2,12 +2,13 @@ import { VideoCommentObject } from '../../../shared/models/activitypub/objects/v | |||
2 | import { sanitizeAndCheckVideoCommentObject } from '../../helpers/custom-validators/activitypub/video-comments' | 2 | import { sanitizeAndCheckVideoCommentObject } from '../../helpers/custom-validators/activitypub/video-comments' |
3 | import { logger } from '../../helpers/logger' | 3 | import { logger } from '../../helpers/logger' |
4 | import { doRequest } from '../../helpers/requests' | 4 | import { doRequest } from '../../helpers/requests' |
5 | import { ACTIVITY_PUB } from '../../initializers' | 5 | import { ACTIVITY_PUB, CRAWL_REQUEST_CONCURRENCY } from '../../initializers' |
6 | import { ActorModel } from '../../models/activitypub/actor' | 6 | import { ActorModel } from '../../models/activitypub/actor' |
7 | import { VideoModel } from '../../models/video/video' | 7 | import { VideoModel } from '../../models/video/video' |
8 | import { VideoCommentModel } from '../../models/video/video-comment' | 8 | import { VideoCommentModel } from '../../models/video/video-comment' |
9 | import { getOrCreateActorAndServerAndModel } from './actor' | 9 | import { getOrCreateActorAndServerAndModel } from './actor' |
10 | import { getOrCreateAccountAndVideoAndChannel } from './videos' | 10 | import { getOrCreateAccountAndVideoAndChannel } from './videos' |
11 | import * as Bluebird from 'bluebird' | ||
11 | 12 | ||
12 | async function videoCommentActivityObjectToDBAttributes (video: VideoModel, actor: ActorModel, comment: VideoCommentObject) { | 13 | async function videoCommentActivityObjectToDBAttributes (video: VideoModel, actor: ActorModel, comment: VideoCommentObject) { |
13 | let originCommentId: number = null | 14 | let originCommentId: number = null |
@@ -38,9 +39,9 @@ async function videoCommentActivityObjectToDBAttributes (video: VideoModel, acto | |||
38 | } | 39 | } |
39 | 40 | ||
40 | async function addVideoComments (commentUrls: string[], instance: VideoModel) { | 41 | async function addVideoComments (commentUrls: string[], instance: VideoModel) { |
41 | for (const commentUrl of commentUrls) { | 42 | return Bluebird.map(commentUrls, commentUrl => { |
42 | await addVideoComment(instance, commentUrl) | 43 | return addVideoComment(instance, commentUrl) |
43 | } | 44 | }, { concurrency: CRAWL_REQUEST_CONCURRENCY }) |
44 | } | 45 | } |
45 | 46 | ||
46 | async function addVideoComment (videoInstance: VideoModel, commentUrl: string) { | 47 | async function addVideoComment (videoInstance: VideoModel, commentUrl: string) { |
diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index d1888556c..fac1d3fc7 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts | |||
@@ -11,7 +11,7 @@ import { isVideoFileInfoHashValid } from '../../helpers/custom-validators/videos | |||
11 | import { retryTransactionWrapper } from '../../helpers/database-utils' | 11 | import { retryTransactionWrapper } from '../../helpers/database-utils' |
12 | import { logger } from '../../helpers/logger' | 12 | import { logger } from '../../helpers/logger' |
13 | import { doRequest, doRequestAndSaveToFile } from '../../helpers/requests' | 13 | import { doRequest, doRequestAndSaveToFile } from '../../helpers/requests' |
14 | import { ACTIVITY_PUB, CONFIG, REMOTE_SCHEME, sequelizeTypescript, VIDEO_MIMETYPE_EXT } from '../../initializers' | 14 | import { ACTIVITY_PUB, CONFIG, CRAWL_REQUEST_CONCURRENCY, REMOTE_SCHEME, sequelizeTypescript, VIDEO_MIMETYPE_EXT } from '../../initializers' |
15 | import { AccountVideoRateModel } from '../../models/account/account-video-rate' | 15 | import { AccountVideoRateModel } from '../../models/account/account-video-rate' |
16 | import { ActorModel } from '../../models/activitypub/actor' | 16 | import { ActorModel } from '../../models/activitypub/actor' |
17 | import { TagModel } from '../../models/video/tag' | 17 | import { TagModel } from '../../models/video/tag' |
@@ -26,6 +26,8 @@ import { sendCreateVideo, sendUpdateVideo } from './send' | |||
26 | import { shareVideoByServerAndChannel } from './index' | 26 | import { shareVideoByServerAndChannel } from './index' |
27 | import { isArray } from '../../helpers/custom-validators/misc' | 27 | import { isArray } from '../../helpers/custom-validators/misc' |
28 | import { VideoCaptionModel } from '../../models/video/video-caption' | 28 | import { VideoCaptionModel } from '../../models/video/video-caption' |
29 | import { JobQueue } from '../job-queue' | ||
30 | import { ActivitypubHttpFetcherPayload } from '../job-queue/handlers/activitypub-http-fetcher' | ||
29 | 31 | ||
30 | async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) { | 32 | async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) { |
31 | // If the video is not private and published, we federate it | 33 | // If the video is not private and published, we federate it |
@@ -178,10 +180,10 @@ function getOrCreateVideoChannel (videoObject: VideoTorrentObject) { | |||
178 | return getOrCreateActorAndServerAndModel(channel.id) | 180 | return getOrCreateActorAndServerAndModel(channel.id) |
179 | } | 181 | } |
180 | 182 | ||
181 | async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor: ActorModel) { | 183 | async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor: ActorModel, waitThumbnail = false) { |
182 | logger.debug('Adding remote video %s.', videoObject.id) | 184 | logger.debug('Adding remote video %s.', videoObject.id) |
183 | 185 | ||
184 | return sequelizeTypescript.transaction(async t => { | 186 | const videoCreated: VideoModel = await sequelizeTypescript.transaction(async t => { |
185 | const sequelizeOptions = { | 187 | const sequelizeOptions = { |
186 | transaction: t | 188 | transaction: t |
187 | } | 189 | } |
@@ -191,10 +193,6 @@ async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor: | |||
191 | const videoData = await videoActivityObjectToDBAttributes(channelActor.VideoChannel, videoObject, videoObject.to) | 193 | const videoData = await videoActivityObjectToDBAttributes(channelActor.VideoChannel, videoObject, videoObject.to) |
192 | const video = VideoModel.build(videoData) | 194 | const video = VideoModel.build(videoData) |
193 | 195 | ||
194 | // Don't block on remote HTTP request (we are in a transaction!) | ||
195 | generateThumbnailFromUrl(video, videoObject.icon) | ||
196 | .catch(err => logger.warn('Cannot generate thumbnail of %s.', videoObject.id, { err })) | ||
197 | |||
198 | const videoCreated = await video.save(sequelizeOptions) | 196 | const videoCreated = await video.save(sequelizeOptions) |
199 | 197 | ||
200 | // Process files | 198 | // Process files |
@@ -222,68 +220,100 @@ async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor: | |||
222 | videoCreated.VideoChannel = channelActor.VideoChannel | 220 | videoCreated.VideoChannel = channelActor.VideoChannel |
223 | return videoCreated | 221 | return videoCreated |
224 | }) | 222 | }) |
223 | |||
224 | const p = generateThumbnailFromUrl(videoCreated, videoObject.icon) | ||
225 | .catch(err => logger.warn('Cannot generate thumbnail of %s.', videoObject.id, { err })) | ||
226 | |||
227 | if (waitThumbnail === true) await p | ||
228 | |||
229 | return videoCreated | ||
225 | } | 230 | } |
226 | 231 | ||
227 | async function getOrCreateAccountAndVideoAndChannel (videoObject: VideoTorrentObject | string, actor?: ActorModel) { | 232 | type SyncParam = { |
233 | likes: boolean, | ||
234 | dislikes: boolean, | ||
235 | shares: boolean, | ||
236 | comments: boolean, | ||
237 | thumbnail: boolean | ||
238 | } | ||
239 | async function getOrCreateAccountAndVideoAndChannel ( | ||
240 | videoObject: VideoTorrentObject | string, | ||
241 | syncParam: SyncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true } | ||
242 | ) { | ||
228 | const videoUrl = typeof videoObject === 'string' ? videoObject : videoObject.id | 243 | const videoUrl = typeof videoObject === 'string' ? videoObject : videoObject.id |
229 | 244 | ||
230 | const videoFromDatabase = await VideoModel.loadByUrlAndPopulateAccount(videoUrl) | 245 | const videoFromDatabase = await VideoModel.loadByUrlAndPopulateAccount(videoUrl) |
231 | if (videoFromDatabase) { | 246 | if (videoFromDatabase) return { video: videoFromDatabase } |
232 | return { | ||
233 | video: videoFromDatabase, | ||
234 | actor: videoFromDatabase.VideoChannel.Account.Actor, | ||
235 | channelActor: videoFromDatabase.VideoChannel.Actor | ||
236 | } | ||
237 | } | ||
238 | 247 | ||
239 | videoObject = await fetchRemoteVideo(videoUrl) | 248 | const fetchedVideo = await fetchRemoteVideo(videoUrl) |
240 | if (!videoObject) throw new Error('Cannot fetch remote video with url: ' + videoUrl) | 249 | if (!fetchedVideo) throw new Error('Cannot fetch remote video with url: ' + videoUrl) |
241 | 250 | ||
242 | if (!actor) { | 251 | const channelActor = await getOrCreateVideoChannel(fetchedVideo) |
243 | const actorObj = videoObject.attributedTo.find(a => a.type === 'Person') | 252 | const video = await retryTransactionWrapper(getOrCreateVideo, fetchedVideo, channelActor, syncParam.thumbnail) |
244 | if (!actorObj) throw new Error('Cannot find associated actor to video ' + videoObject.url) | ||
245 | 253 | ||
246 | actor = await getOrCreateActorAndServerAndModel(actorObj.id) | 254 | // Process outside the transaction because we could fetch remote data |
247 | } | ||
248 | 255 | ||
249 | const channelActor = await getOrCreateVideoChannel(videoObject) | 256 | logger.info('Adding likes/dislikes/shares/comments of video %s.', video.uuid) |
250 | 257 | ||
251 | const video = await retryTransactionWrapper(getOrCreateVideo, videoObject, channelActor) | 258 | const jobPayloads: ActivitypubHttpFetcherPayload[] = [] |
252 | 259 | ||
253 | // Process outside the transaction because we could fetch remote data | 260 | if (syncParam.likes === true) { |
254 | logger.info('Adding likes of video %s.', video.uuid) | 261 | await crawlCollectionPage<string>(fetchedVideo.likes, items => createRates(items, video, 'like')) |
255 | await crawlCollectionPage<string>(videoObject.likes, (items) => createRates(items, video, 'like')) | 262 | .catch(err => logger.error('Cannot add likes of video %s.', video.uuid, { err })) |
263 | } else { | ||
264 | jobPayloads.push({ uri: fetchedVideo.likes, videoId: video.id, type: 'video-likes' as 'video-likes' }) | ||
265 | } | ||
256 | 266 | ||
257 | logger.info('Adding dislikes of video %s.', video.uuid) | 267 | if (syncParam.dislikes === true) { |
258 | await crawlCollectionPage<string>(videoObject.dislikes, (items) => createRates(items, video, 'dislike')) | 268 | await crawlCollectionPage<string>(fetchedVideo.dislikes, items => createRates(items, video, 'dislike')) |
269 | .catch(err => logger.error('Cannot add dislikes of video %s.', video.uuid, { err })) | ||
270 | } else { | ||
271 | jobPayloads.push({ uri: fetchedVideo.dislikes, videoId: video.id, type: 'video-dislikes' as 'video-dislikes' }) | ||
272 | } | ||
273 | |||
274 | if (syncParam.shares === true) { | ||
275 | await crawlCollectionPage<string>(fetchedVideo.shares, items => addVideoShares(items, video)) | ||
276 | .catch(err => logger.error('Cannot add shares of video %s.', video.uuid, { err })) | ||
277 | } else { | ||
278 | jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' }) | ||
279 | } | ||
259 | 280 | ||
260 | logger.info('Adding shares of video %s.', video.uuid) | 281 | if (syncParam.comments === true) { |
261 | await crawlCollectionPage<string>(videoObject.shares, (items) => addVideoShares(items, video)) | 282 | await crawlCollectionPage<string>(fetchedVideo.comments, items => addVideoComments(items, video)) |
283 | .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err })) | ||
284 | } else { | ||
285 | jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' }) | ||
286 | } | ||
262 | 287 | ||
263 | logger.info('Adding comments of video %s.', video.uuid) | 288 | await Bluebird.map(jobPayloads, payload => JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })) |
264 | await crawlCollectionPage<string>(videoObject.comments, (items) => addVideoComments(items, video)) | ||
265 | 289 | ||
266 | return { actor, channelActor, video } | 290 | return { video } |
267 | } | 291 | } |
268 | 292 | ||
269 | async function createRates (actorUrls: string[], video: VideoModel, rate: VideoRateType) { | 293 | async function createRates (actorUrls: string[], video: VideoModel, rate: VideoRateType) { |
270 | let rateCounts = 0 | 294 | let rateCounts = 0 |
271 | const tasks: Bluebird<number>[] = [] | ||
272 | |||
273 | for (const actorUrl of actorUrls) { | ||
274 | const actor = await getOrCreateActorAndServerAndModel(actorUrl) | ||
275 | const p = AccountVideoRateModel | ||
276 | .create({ | ||
277 | videoId: video.id, | ||
278 | accountId: actor.Account.id, | ||
279 | type: rate | ||
280 | }) | ||
281 | .then(() => rateCounts += 1) | ||
282 | |||
283 | tasks.push(p) | ||
284 | } | ||
285 | 295 | ||
286 | await Promise.all(tasks) | 296 | await Bluebird.map(actorUrls, async actorUrl => { |
297 | try { | ||
298 | const actor = await getOrCreateActorAndServerAndModel(actorUrl) | ||
299 | const [ , created ] = await AccountVideoRateModel | ||
300 | .findOrCreate({ | ||
301 | where: { | ||
302 | videoId: video.id, | ||
303 | accountId: actor.Account.id | ||
304 | }, | ||
305 | defaults: { | ||
306 | videoId: video.id, | ||
307 | accountId: actor.Account.id, | ||
308 | type: rate | ||
309 | } | ||
310 | }) | ||
311 | |||
312 | if (created) rateCounts += 1 | ||
313 | } catch (err) { | ||
314 | logger.warn('Cannot add rate %s for actor %s.', rate, actorUrl, { err }) | ||
315 | } | ||
316 | }, { concurrency: CRAWL_REQUEST_CONCURRENCY }) | ||
287 | 317 | ||
288 | logger.info('Adding %d %s to video %s.', rateCounts, rate, video.uuid) | 318 | logger.info('Adding %d %s to video %s.', rateCounts, rate, video.uuid) |
289 | 319 | ||
@@ -294,34 +324,35 @@ async function createRates (actorUrls: string[], video: VideoModel, rate: VideoR | |||
294 | } | 324 | } |
295 | 325 | ||
296 | async function addVideoShares (shareUrls: string[], instance: VideoModel) { | 326 | async function addVideoShares (shareUrls: string[], instance: VideoModel) { |
297 | for (const shareUrl of shareUrls) { | 327 | await Bluebird.map(shareUrls, async shareUrl => { |
298 | // Fetch url | 328 | try { |
299 | const { body } = await doRequest({ | 329 | // Fetch url |
300 | uri: shareUrl, | 330 | const { body } = await doRequest({ |
301 | json: true, | 331 | uri: shareUrl, |
302 | activityPub: true | 332 | json: true, |
303 | }) | 333 | activityPub: true |
304 | if (!body || !body.actor) { | 334 | }) |
305 | logger.warn('Cannot add remote share with url: %s, skipping...', shareUrl) | 335 | if (!body || !body.actor) throw new Error('Body of body actor is invalid') |
306 | continue | ||
307 | } | ||
308 | |||
309 | const actorUrl = body.actor | ||
310 | const actor = await getOrCreateActorAndServerAndModel(actorUrl) | ||
311 | 336 | ||
312 | const entry = { | 337 | const actorUrl = body.actor |
313 | actorId: actor.id, | 338 | const actor = await getOrCreateActorAndServerAndModel(actorUrl) |
314 | videoId: instance.id, | ||
315 | url: shareUrl | ||
316 | } | ||
317 | 339 | ||
318 | await VideoShareModel.findOrCreate({ | 340 | const entry = { |
319 | where: { | 341 | actorId: actor.id, |
342 | videoId: instance.id, | ||
320 | url: shareUrl | 343 | url: shareUrl |
321 | }, | 344 | } |
322 | defaults: entry | 345 | |
323 | }) | 346 | await VideoShareModel.findOrCreate({ |
324 | } | 347 | where: { |
348 | url: shareUrl | ||
349 | }, | ||
350 | defaults: entry | ||
351 | }) | ||
352 | } catch (err) { | ||
353 | logger.warn('Cannot add share %s.', shareUrl, { err }) | ||
354 | } | ||
355 | }, { concurrency: CRAWL_REQUEST_CONCURRENCY }) | ||
325 | } | 356 | } |
326 | 357 | ||
327 | async function fetchRemoteVideo (videoUrl: string): Promise<VideoTorrentObject> { | 358 | async function fetchRemoteVideo (videoUrl: string): Promise<VideoTorrentObject> { |
@@ -355,5 +386,6 @@ export { | |||
355 | videoFileActivityUrlToDBAttributes, | 386 | videoFileActivityUrlToDBAttributes, |
356 | getOrCreateVideo, | 387 | getOrCreateVideo, |
357 | getOrCreateVideoChannel, | 388 | getOrCreateVideoChannel, |
358 | addVideoShares | 389 | addVideoShares, |
390 | createRates | ||
359 | } | 391 | } |
diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index f21da087e..72d670277 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts | |||
@@ -1,22 +1,36 @@ | |||
1 | import * as Bull from 'bull' | 1 | import * as Bull from 'bull' |
2 | import { logger } from '../../../helpers/logger' | 2 | import { logger } from '../../../helpers/logger' |
3 | import { processActivities } from '../../activitypub/process' | 3 | import { processActivities } from '../../activitypub/process' |
4 | import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' | 4 | import { VideoModel } from '../../../models/video/video' |
5 | import { addVideoShares, createRates } from '../../activitypub/videos' | ||
6 | import { addVideoComments } from '../../activitypub/video-comments' | ||
5 | import { crawlCollectionPage } from '../../activitypub/crawl' | 7 | import { crawlCollectionPage } from '../../activitypub/crawl' |
6 | import { Activity } from '../../../../shared/models/activitypub' | 8 | |
9 | type FetchType = 'activity' | 'video-likes' | 'video-dislikes' | 'video-shares' | 'video-comments' | ||
7 | 10 | ||
8 | export type ActivitypubHttpFetcherPayload = { | 11 | export type ActivitypubHttpFetcherPayload = { |
9 | uris: string[] | 12 | uri: string |
13 | type: FetchType | ||
14 | videoId?: number | ||
10 | } | 15 | } |
11 | 16 | ||
12 | async function processActivityPubHttpFetcher (job: Bull.Job) { | 17 | async function processActivityPubHttpFetcher (job: Bull.Job) { |
13 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) | 18 | logger.info('Processing ActivityPub fetcher in job %d.', job.id) |
14 | 19 | ||
15 | const payload = job.data as ActivitypubHttpBroadcastPayload | 20 | const payload = job.data as ActivitypubHttpFetcherPayload |
21 | |||
22 | let video: VideoModel | ||
23 | if (payload.videoId) video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId) | ||
16 | 24 | ||
17 | for (const uri of payload.uris) { | 25 | const fetcherType: { [ id in FetchType ]: (items: any[]) => Promise<any> } = { |
18 | await crawlCollectionPage<Activity>(uri, (items) => processActivities(items)) | 26 | 'activity': items => processActivities(items), |
27 | 'video-likes': items => createRates(items, video, 'like'), | ||
28 | 'video-dislikes': items => createRates(items, video, 'dislike'), | ||
29 | 'video-shares': items => addVideoShares(items, video), | ||
30 | 'video-comments': items => addVideoComments(items, video) | ||
19 | } | 31 | } |
32 | |||
33 | return crawlCollectionPage(payload.uri, fetcherType[payload.type]) | ||
20 | } | 34 | } |
21 | 35 | ||
22 | // --------------------------------------------------------------------------- | 36 | // --------------------------------------------------------------------------- |