From f6eebcb336c067e160a62020a5140d8d992ba384 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 22 Aug 2018 11:51:39 +0200 Subject: Add ability to search a video with an URL --- server/lib/activitypub/actor.ts | 4 +- server/lib/activitypub/crawl.ts | 3 +- server/lib/activitypub/process/process-announce.ts | 4 +- server/lib/activitypub/process/process-create.ts | 9 +- server/lib/activitypub/video-comments.ts | 9 +- server/lib/activitypub/videos.ts | 182 ++++++++++++--------- .../job-queue/handlers/activitypub-http-fetcher.ts | 26 ++- 7 files changed, 141 insertions(+), 96 deletions(-) (limited to 'server/lib') diff --git a/server/lib/activitypub/actor.ts b/server/lib/activitypub/actor.ts index d84b465b2..9922229d2 100644 --- a/server/lib/activitypub/actor.ts +++ b/server/lib/activitypub/actor.ts @@ -177,7 +177,8 @@ async function addFetchOutboxJob (actor: ActorModel) { } const payload = { - uris: [ actor.outboxUrl ] + uri: actor.outboxUrl, + type: 'activity' as 'activity' } return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) @@ -248,6 +249,7 @@ function saveActorAndServerAndModelIfNotExist ( } else if (actorCreated.type === 'Group') { // Video channel actorCreated.VideoChannel = await saveVideoChannel(actorCreated, result, ownerActor, t) actorCreated.VideoChannel.Actor = actorCreated + actorCreated.VideoChannel.Account = ownerActor.Account } return actorCreated diff --git a/server/lib/activitypub/crawl.ts b/server/lib/activitypub/crawl.ts index d4fc786f7..55912341c 100644 --- a/server/lib/activitypub/crawl.ts +++ b/server/lib/activitypub/crawl.ts @@ -1,8 +1,9 @@ import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../initializers' import { doRequest } from '../../helpers/requests' import { logger } from '../../helpers/logger' +import Bluebird = require('bluebird') -async function crawlCollectionPage (uri: string, handler: (items: T[]) => Promise) { +async function crawlCollectionPage (uri: string, handler: (items: T[]) => Promise | Bluebird) { logger.info('Crawling ActivityPub data on %s.', uri) const options = { diff --git a/server/lib/activitypub/process/process-announce.ts b/server/lib/activitypub/process/process-announce.ts index d8ca59425..b08156aa1 100644 --- a/server/lib/activitypub/process/process-announce.ts +++ b/server/lib/activitypub/process/process-announce.ts @@ -24,10 +24,8 @@ export { async function processVideoShare (actorAnnouncer: ActorModel, activity: ActivityAnnounce) { const objectUri = typeof activity.object === 'string' ? activity.object : activity.object.id - let video: VideoModel - const res = await getOrCreateAccountAndVideoAndChannel(objectUri) - video = res.video + const { video } = await getOrCreateAccountAndVideoAndChannel(objectUri) return sequelizeTypescript.transaction(async t => { // Add share entry diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts index 791148919..9655d015f 100644 --- a/server/lib/activitypub/process/process-create.ts +++ b/server/lib/activitypub/process/process-create.ts @@ -23,7 +23,7 @@ async function processCreateActivity (activity: ActivityCreate) { } else if (activityType === 'Dislike') { return retryTransactionWrapper(processCreateDislike, actor, activity) } else if (activityType === 'Video') { - return processCreateVideo(actor, activity) + return processCreateVideo(activity) } else if (activityType === 'Flag') { return retryTransactionWrapper(processCreateVideoAbuse, actor, activityObject as VideoAbuseObject) } else if (activityType === 'Note') { @@ -42,13 +42,10 @@ export { // --------------------------------------------------------------------------- -async function processCreateVideo ( - actor: ActorModel, - activity: ActivityCreate -) { +async function processCreateVideo (activity: ActivityCreate) { const videoToCreateData = activity.object as VideoTorrentObject - const { video } = await getOrCreateAccountAndVideoAndChannel(videoToCreateData, actor) + const { video } = await getOrCreateAccountAndVideoAndChannel(videoToCreateData) return video } diff --git a/server/lib/activitypub/video-comments.ts b/server/lib/activitypub/video-comments.ts index fd03710c2..14c7fde69 100644 --- a/server/lib/activitypub/video-comments.ts +++ b/server/lib/activitypub/video-comments.ts @@ -2,12 +2,13 @@ import { VideoCommentObject } from '../../../shared/models/activitypub/objects/v import { sanitizeAndCheckVideoCommentObject } from '../../helpers/custom-validators/activitypub/video-comments' import { logger } from '../../helpers/logger' import { doRequest } from '../../helpers/requests' -import { ACTIVITY_PUB } from '../../initializers' +import { ACTIVITY_PUB, CRAWL_REQUEST_CONCURRENCY } from '../../initializers' import { ActorModel } from '../../models/activitypub/actor' import { VideoModel } from '../../models/video/video' import { VideoCommentModel } from '../../models/video/video-comment' import { getOrCreateActorAndServerAndModel } from './actor' import { getOrCreateAccountAndVideoAndChannel } from './videos' +import * as Bluebird from 'bluebird' async function videoCommentActivityObjectToDBAttributes (video: VideoModel, actor: ActorModel, comment: VideoCommentObject) { let originCommentId: number = null @@ -38,9 +39,9 @@ async function videoCommentActivityObjectToDBAttributes (video: VideoModel, acto } async function addVideoComments (commentUrls: string[], instance: VideoModel) { - for (const commentUrl of commentUrls) { - await addVideoComment(instance, commentUrl) - } + return Bluebird.map(commentUrls, commentUrl => { + return addVideoComment(instance, commentUrl) + }, { concurrency: CRAWL_REQUEST_CONCURRENCY }) } async function addVideoComment (videoInstance: VideoModel, commentUrl: string) { diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index d1888556c..fac1d3fc7 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts @@ -11,7 +11,7 @@ import { isVideoFileInfoHashValid } from '../../helpers/custom-validators/videos import { retryTransactionWrapper } from '../../helpers/database-utils' import { logger } from '../../helpers/logger' import { doRequest, doRequestAndSaveToFile } from '../../helpers/requests' -import { ACTIVITY_PUB, CONFIG, REMOTE_SCHEME, sequelizeTypescript, VIDEO_MIMETYPE_EXT } from '../../initializers' +import { ACTIVITY_PUB, CONFIG, CRAWL_REQUEST_CONCURRENCY, REMOTE_SCHEME, sequelizeTypescript, VIDEO_MIMETYPE_EXT } from '../../initializers' import { AccountVideoRateModel } from '../../models/account/account-video-rate' import { ActorModel } from '../../models/activitypub/actor' import { TagModel } from '../../models/video/tag' @@ -26,6 +26,8 @@ import { sendCreateVideo, sendUpdateVideo } from './send' import { shareVideoByServerAndChannel } from './index' import { isArray } from '../../helpers/custom-validators/misc' import { VideoCaptionModel } from '../../models/video/video-caption' +import { JobQueue } from '../job-queue' +import { ActivitypubHttpFetcherPayload } from '../job-queue/handlers/activitypub-http-fetcher' async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) { // If the video is not private and published, we federate it @@ -178,10 +180,10 @@ function getOrCreateVideoChannel (videoObject: VideoTorrentObject) { return getOrCreateActorAndServerAndModel(channel.id) } -async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor: ActorModel) { +async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor: ActorModel, waitThumbnail = false) { logger.debug('Adding remote video %s.', videoObject.id) - return sequelizeTypescript.transaction(async t => { + const videoCreated: VideoModel = await sequelizeTypescript.transaction(async t => { const sequelizeOptions = { transaction: t } @@ -191,10 +193,6 @@ async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor: const videoData = await videoActivityObjectToDBAttributes(channelActor.VideoChannel, videoObject, videoObject.to) const video = VideoModel.build(videoData) - // Don't block on remote HTTP request (we are in a transaction!) - generateThumbnailFromUrl(video, videoObject.icon) - .catch(err => logger.warn('Cannot generate thumbnail of %s.', videoObject.id, { err })) - const videoCreated = await video.save(sequelizeOptions) // Process files @@ -222,68 +220,100 @@ async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor: videoCreated.VideoChannel = channelActor.VideoChannel return videoCreated }) + + const p = generateThumbnailFromUrl(videoCreated, videoObject.icon) + .catch(err => logger.warn('Cannot generate thumbnail of %s.', videoObject.id, { err })) + + if (waitThumbnail === true) await p + + return videoCreated } -async function getOrCreateAccountAndVideoAndChannel (videoObject: VideoTorrentObject | string, actor?: ActorModel) { +type SyncParam = { + likes: boolean, + dislikes: boolean, + shares: boolean, + comments: boolean, + thumbnail: boolean +} +async function getOrCreateAccountAndVideoAndChannel ( + videoObject: VideoTorrentObject | string, + syncParam: SyncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true } +) { const videoUrl = typeof videoObject === 'string' ? videoObject : videoObject.id const videoFromDatabase = await VideoModel.loadByUrlAndPopulateAccount(videoUrl) - if (videoFromDatabase) { - return { - video: videoFromDatabase, - actor: videoFromDatabase.VideoChannel.Account.Actor, - channelActor: videoFromDatabase.VideoChannel.Actor - } - } + if (videoFromDatabase) return { video: videoFromDatabase } - videoObject = await fetchRemoteVideo(videoUrl) - if (!videoObject) throw new Error('Cannot fetch remote video with url: ' + videoUrl) + const fetchedVideo = await fetchRemoteVideo(videoUrl) + if (!fetchedVideo) throw new Error('Cannot fetch remote video with url: ' + videoUrl) - if (!actor) { - const actorObj = videoObject.attributedTo.find(a => a.type === 'Person') - if (!actorObj) throw new Error('Cannot find associated actor to video ' + videoObject.url) + const channelActor = await getOrCreateVideoChannel(fetchedVideo) + const video = await retryTransactionWrapper(getOrCreateVideo, fetchedVideo, channelActor, syncParam.thumbnail) - actor = await getOrCreateActorAndServerAndModel(actorObj.id) - } + // Process outside the transaction because we could fetch remote data - const channelActor = await getOrCreateVideoChannel(videoObject) + logger.info('Adding likes/dislikes/shares/comments of video %s.', video.uuid) - const video = await retryTransactionWrapper(getOrCreateVideo, videoObject, channelActor) + const jobPayloads: ActivitypubHttpFetcherPayload[] = [] - // Process outside the transaction because we could fetch remote data - logger.info('Adding likes of video %s.', video.uuid) - await crawlCollectionPage(videoObject.likes, (items) => createRates(items, video, 'like')) + if (syncParam.likes === true) { + await crawlCollectionPage(fetchedVideo.likes, items => createRates(items, video, 'like')) + .catch(err => logger.error('Cannot add likes of video %s.', video.uuid, { err })) + } else { + jobPayloads.push({ uri: fetchedVideo.likes, videoId: video.id, type: 'video-likes' as 'video-likes' }) + } - logger.info('Adding dislikes of video %s.', video.uuid) - await crawlCollectionPage(videoObject.dislikes, (items) => createRates(items, video, 'dislike')) + if (syncParam.dislikes === true) { + await crawlCollectionPage(fetchedVideo.dislikes, items => createRates(items, video, 'dislike')) + .catch(err => logger.error('Cannot add dislikes of video %s.', video.uuid, { err })) + } else { + jobPayloads.push({ uri: fetchedVideo.dislikes, videoId: video.id, type: 'video-dislikes' as 'video-dislikes' }) + } + + if (syncParam.shares === true) { + await crawlCollectionPage(fetchedVideo.shares, items => addVideoShares(items, video)) + .catch(err => logger.error('Cannot add shares of video %s.', video.uuid, { err })) + } else { + jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' }) + } - logger.info('Adding shares of video %s.', video.uuid) - await crawlCollectionPage(videoObject.shares, (items) => addVideoShares(items, video)) + if (syncParam.comments === true) { + await crawlCollectionPage(fetchedVideo.comments, items => addVideoComments(items, video)) + .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err })) + } else { + jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' }) + } - logger.info('Adding comments of video %s.', video.uuid) - await crawlCollectionPage(videoObject.comments, (items) => addVideoComments(items, video)) + await Bluebird.map(jobPayloads, payload => JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })) - return { actor, channelActor, video } + return { video } } async function createRates (actorUrls: string[], video: VideoModel, rate: VideoRateType) { let rateCounts = 0 - const tasks: Bluebird[] = [] - - for (const actorUrl of actorUrls) { - const actor = await getOrCreateActorAndServerAndModel(actorUrl) - const p = AccountVideoRateModel - .create({ - videoId: video.id, - accountId: actor.Account.id, - type: rate - }) - .then(() => rateCounts += 1) - - tasks.push(p) - } - await Promise.all(tasks) + await Bluebird.map(actorUrls, async actorUrl => { + try { + const actor = await getOrCreateActorAndServerAndModel(actorUrl) + const [ , created ] = await AccountVideoRateModel + .findOrCreate({ + where: { + videoId: video.id, + accountId: actor.Account.id + }, + defaults: { + videoId: video.id, + accountId: actor.Account.id, + type: rate + } + }) + + if (created) rateCounts += 1 + } catch (err) { + logger.warn('Cannot add rate %s for actor %s.', rate, actorUrl, { err }) + } + }, { concurrency: CRAWL_REQUEST_CONCURRENCY }) logger.info('Adding %d %s to video %s.', rateCounts, rate, video.uuid) @@ -294,34 +324,35 @@ async function createRates (actorUrls: string[], video: VideoModel, rate: VideoR } async function addVideoShares (shareUrls: string[], instance: VideoModel) { - for (const shareUrl of shareUrls) { - // Fetch url - const { body } = await doRequest({ - uri: shareUrl, - json: true, - activityPub: true - }) - if (!body || !body.actor) { - logger.warn('Cannot add remote share with url: %s, skipping...', shareUrl) - continue - } - - const actorUrl = body.actor - const actor = await getOrCreateActorAndServerAndModel(actorUrl) + await Bluebird.map(shareUrls, async shareUrl => { + try { + // Fetch url + const { body } = await doRequest({ + uri: shareUrl, + json: true, + activityPub: true + }) + if (!body || !body.actor) throw new Error('Body of body actor is invalid') - const entry = { - actorId: actor.id, - videoId: instance.id, - url: shareUrl - } + const actorUrl = body.actor + const actor = await getOrCreateActorAndServerAndModel(actorUrl) - await VideoShareModel.findOrCreate({ - where: { + const entry = { + actorId: actor.id, + videoId: instance.id, url: shareUrl - }, - defaults: entry - }) - } + } + + await VideoShareModel.findOrCreate({ + where: { + url: shareUrl + }, + defaults: entry + }) + } catch (err) { + logger.warn('Cannot add share %s.', shareUrl, { err }) + } + }, { concurrency: CRAWL_REQUEST_CONCURRENCY }) } async function fetchRemoteVideo (videoUrl: string): Promise { @@ -355,5 +386,6 @@ export { videoFileActivityUrlToDBAttributes, getOrCreateVideo, getOrCreateVideoChannel, - addVideoShares + addVideoShares, + createRates } diff --git a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts index f21da087e..72d670277 100644 --- a/server/lib/job-queue/handlers/activitypub-http-fetcher.ts +++ b/server/lib/job-queue/handlers/activitypub-http-fetcher.ts @@ -1,22 +1,36 @@ import * as Bull from 'bull' import { logger } from '../../../helpers/logger' import { processActivities } from '../../activitypub/process' -import { ActivitypubHttpBroadcastPayload } from './activitypub-http-broadcast' +import { VideoModel } from '../../../models/video/video' +import { addVideoShares, createRates } from '../../activitypub/videos' +import { addVideoComments } from '../../activitypub/video-comments' import { crawlCollectionPage } from '../../activitypub/crawl' -import { Activity } from '../../../../shared/models/activitypub' + +type FetchType = 'activity' | 'video-likes' | 'video-dislikes' | 'video-shares' | 'video-comments' export type ActivitypubHttpFetcherPayload = { - uris: string[] + uri: string + type: FetchType + videoId?: number } async function processActivityPubHttpFetcher (job: Bull.Job) { logger.info('Processing ActivityPub fetcher in job %d.', job.id) - const payload = job.data as ActivitypubHttpBroadcastPayload + const payload = job.data as ActivitypubHttpFetcherPayload + + let video: VideoModel + if (payload.videoId) video = await VideoModel.loadAndPopulateAccountAndServerAndTags(payload.videoId) - for (const uri of payload.uris) { - await crawlCollectionPage(uri, (items) => processActivities(items)) + const fetcherType: { [ id in FetchType ]: (items: any[]) => Promise } = { + 'activity': items => processActivities(items), + 'video-likes': items => createRates(items, video, 'like'), + 'video-dislikes': items => createRates(items, video, 'dislike'), + 'video-shares': items => addVideoShares(items, video), + 'video-comments': items => addVideoComments(items, video) } + + return crawlCollectionPage(payload.uri, fetcherType[payload.type]) } // --------------------------------------------------------------------------- -- cgit v1.2.3