aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/activitypub
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-08-22 11:51:39 +0200
committerChocobozzz <me@florianbigard.com>2018-08-27 09:41:54 +0200
commitf6eebcb336c067e160a62020a5140d8d992ba384 (patch)
tree48fbf9c292243c9cc13beb3749eceaf61fe2baef /server/lib/activitypub
parent22a16e36f6526887ed8f5e5d3c9f9e5da0b4a8cd (diff)
downloadPeerTube-f6eebcb336c067e160a62020a5140d8d992ba384.tar.gz
PeerTube-f6eebcb336c067e160a62020a5140d8d992ba384.tar.zst
PeerTube-f6eebcb336c067e160a62020a5140d8d992ba384.zip
Add ability to search a video with an URL
Diffstat (limited to 'server/lib/activitypub')
-rw-r--r--server/lib/activitypub/actor.ts4
-rw-r--r--server/lib/activitypub/crawl.ts3
-rw-r--r--server/lib/activitypub/process/process-announce.ts4
-rw-r--r--server/lib/activitypub/process/process-create.ts9
-rw-r--r--server/lib/activitypub/video-comments.ts9
-rw-r--r--server/lib/activitypub/videos.ts182
6 files changed, 121 insertions, 90 deletions
diff --git a/server/lib/activitypub/actor.ts b/server/lib/activitypub/actor.ts
index d84b465b2..9922229d2 100644
--- a/server/lib/activitypub/actor.ts
+++ b/server/lib/activitypub/actor.ts
@@ -177,7 +177,8 @@ async function addFetchOutboxJob (actor: ActorModel) {
177 } 177 }
178 178
179 const payload = { 179 const payload = {
180 uris: [ actor.outboxUrl ] 180 uri: actor.outboxUrl,
181 type: 'activity' as 'activity'
181 } 182 }
182 183
183 return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }) 184 return JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload })
@@ -248,6 +249,7 @@ function saveActorAndServerAndModelIfNotExist (
248 } else if (actorCreated.type === 'Group') { // Video channel 249 } else if (actorCreated.type === 'Group') { // Video channel
249 actorCreated.VideoChannel = await saveVideoChannel(actorCreated, result, ownerActor, t) 250 actorCreated.VideoChannel = await saveVideoChannel(actorCreated, result, ownerActor, t)
250 actorCreated.VideoChannel.Actor = actorCreated 251 actorCreated.VideoChannel.Actor = actorCreated
252 actorCreated.VideoChannel.Account = ownerActor.Account
251 } 253 }
252 254
253 return actorCreated 255 return actorCreated
diff --git a/server/lib/activitypub/crawl.ts b/server/lib/activitypub/crawl.ts
index d4fc786f7..55912341c 100644
--- a/server/lib/activitypub/crawl.ts
+++ b/server/lib/activitypub/crawl.ts
@@ -1,8 +1,9 @@
1import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../initializers' 1import { ACTIVITY_PUB, JOB_REQUEST_TIMEOUT } from '../../initializers'
2import { doRequest } from '../../helpers/requests' 2import { doRequest } from '../../helpers/requests'
3import { logger } from '../../helpers/logger' 3import { logger } from '../../helpers/logger'
4import Bluebird = require('bluebird')
4 5
5async function crawlCollectionPage <T> (uri: string, handler: (items: T[]) => Promise<any>) { 6async function crawlCollectionPage <T> (uri: string, handler: (items: T[]) => Promise<any> | Bluebird<any>) {
6 logger.info('Crawling ActivityPub data on %s.', uri) 7 logger.info('Crawling ActivityPub data on %s.', uri)
7 8
8 const options = { 9 const options = {
diff --git a/server/lib/activitypub/process/process-announce.ts b/server/lib/activitypub/process/process-announce.ts
index d8ca59425..b08156aa1 100644
--- a/server/lib/activitypub/process/process-announce.ts
+++ b/server/lib/activitypub/process/process-announce.ts
@@ -24,10 +24,8 @@ export {
24 24
25async function processVideoShare (actorAnnouncer: ActorModel, activity: ActivityAnnounce) { 25async function processVideoShare (actorAnnouncer: ActorModel, activity: ActivityAnnounce) {
26 const objectUri = typeof activity.object === 'string' ? activity.object : activity.object.id 26 const objectUri = typeof activity.object === 'string' ? activity.object : activity.object.id
27 let video: VideoModel
28 27
29 const res = await getOrCreateAccountAndVideoAndChannel(objectUri) 28 const { video } = await getOrCreateAccountAndVideoAndChannel(objectUri)
30 video = res.video
31 29
32 return sequelizeTypescript.transaction(async t => { 30 return sequelizeTypescript.transaction(async t => {
33 // Add share entry 31 // Add share entry
diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts
index 791148919..9655d015f 100644
--- a/server/lib/activitypub/process/process-create.ts
+++ b/server/lib/activitypub/process/process-create.ts
@@ -23,7 +23,7 @@ async function processCreateActivity (activity: ActivityCreate) {
23 } else if (activityType === 'Dislike') { 23 } else if (activityType === 'Dislike') {
24 return retryTransactionWrapper(processCreateDislike, actor, activity) 24 return retryTransactionWrapper(processCreateDislike, actor, activity)
25 } else if (activityType === 'Video') { 25 } else if (activityType === 'Video') {
26 return processCreateVideo(actor, activity) 26 return processCreateVideo(activity)
27 } else if (activityType === 'Flag') { 27 } else if (activityType === 'Flag') {
28 return retryTransactionWrapper(processCreateVideoAbuse, actor, activityObject as VideoAbuseObject) 28 return retryTransactionWrapper(processCreateVideoAbuse, actor, activityObject as VideoAbuseObject)
29 } else if (activityType === 'Note') { 29 } else if (activityType === 'Note') {
@@ -42,13 +42,10 @@ export {
42 42
43// --------------------------------------------------------------------------- 43// ---------------------------------------------------------------------------
44 44
45async function processCreateVideo ( 45async function processCreateVideo (activity: ActivityCreate) {
46 actor: ActorModel,
47 activity: ActivityCreate
48) {
49 const videoToCreateData = activity.object as VideoTorrentObject 46 const videoToCreateData = activity.object as VideoTorrentObject
50 47
51 const { video } = await getOrCreateAccountAndVideoAndChannel(videoToCreateData, actor) 48 const { video } = await getOrCreateAccountAndVideoAndChannel(videoToCreateData)
52 49
53 return video 50 return video
54} 51}
diff --git a/server/lib/activitypub/video-comments.ts b/server/lib/activitypub/video-comments.ts
index fd03710c2..14c7fde69 100644
--- a/server/lib/activitypub/video-comments.ts
+++ b/server/lib/activitypub/video-comments.ts
@@ -2,12 +2,13 @@ import { VideoCommentObject } from '../../../shared/models/activitypub/objects/v
2import { sanitizeAndCheckVideoCommentObject } from '../../helpers/custom-validators/activitypub/video-comments' 2import { sanitizeAndCheckVideoCommentObject } from '../../helpers/custom-validators/activitypub/video-comments'
3import { logger } from '../../helpers/logger' 3import { logger } from '../../helpers/logger'
4import { doRequest } from '../../helpers/requests' 4import { doRequest } from '../../helpers/requests'
5import { ACTIVITY_PUB } from '../../initializers' 5import { ACTIVITY_PUB, CRAWL_REQUEST_CONCURRENCY } from '../../initializers'
6import { ActorModel } from '../../models/activitypub/actor' 6import { ActorModel } from '../../models/activitypub/actor'
7import { VideoModel } from '../../models/video/video' 7import { VideoModel } from '../../models/video/video'
8import { VideoCommentModel } from '../../models/video/video-comment' 8import { VideoCommentModel } from '../../models/video/video-comment'
9import { getOrCreateActorAndServerAndModel } from './actor' 9import { getOrCreateActorAndServerAndModel } from './actor'
10import { getOrCreateAccountAndVideoAndChannel } from './videos' 10import { getOrCreateAccountAndVideoAndChannel } from './videos'
11import * as Bluebird from 'bluebird'
11 12
12async function videoCommentActivityObjectToDBAttributes (video: VideoModel, actor: ActorModel, comment: VideoCommentObject) { 13async function videoCommentActivityObjectToDBAttributes (video: VideoModel, actor: ActorModel, comment: VideoCommentObject) {
13 let originCommentId: number = null 14 let originCommentId: number = null
@@ -38,9 +39,9 @@ async function videoCommentActivityObjectToDBAttributes (video: VideoModel, acto
38} 39}
39 40
40async function addVideoComments (commentUrls: string[], instance: VideoModel) { 41async function addVideoComments (commentUrls: string[], instance: VideoModel) {
41 for (const commentUrl of commentUrls) { 42 return Bluebird.map(commentUrls, commentUrl => {
42 await addVideoComment(instance, commentUrl) 43 return addVideoComment(instance, commentUrl)
43 } 44 }, { concurrency: CRAWL_REQUEST_CONCURRENCY })
44} 45}
45 46
46async function addVideoComment (videoInstance: VideoModel, commentUrl: string) { 47async function addVideoComment (videoInstance: VideoModel, commentUrl: string) {
diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts
index d1888556c..fac1d3fc7 100644
--- a/server/lib/activitypub/videos.ts
+++ b/server/lib/activitypub/videos.ts
@@ -11,7 +11,7 @@ import { isVideoFileInfoHashValid } from '../../helpers/custom-validators/videos
11import { retryTransactionWrapper } from '../../helpers/database-utils' 11import { retryTransactionWrapper } from '../../helpers/database-utils'
12import { logger } from '../../helpers/logger' 12import { logger } from '../../helpers/logger'
13import { doRequest, doRequestAndSaveToFile } from '../../helpers/requests' 13import { doRequest, doRequestAndSaveToFile } from '../../helpers/requests'
14import { ACTIVITY_PUB, CONFIG, REMOTE_SCHEME, sequelizeTypescript, VIDEO_MIMETYPE_EXT } from '../../initializers' 14import { ACTIVITY_PUB, CONFIG, CRAWL_REQUEST_CONCURRENCY, REMOTE_SCHEME, sequelizeTypescript, VIDEO_MIMETYPE_EXT } from '../../initializers'
15import { AccountVideoRateModel } from '../../models/account/account-video-rate' 15import { AccountVideoRateModel } from '../../models/account/account-video-rate'
16import { ActorModel } from '../../models/activitypub/actor' 16import { ActorModel } from '../../models/activitypub/actor'
17import { TagModel } from '../../models/video/tag' 17import { TagModel } from '../../models/video/tag'
@@ -26,6 +26,8 @@ import { sendCreateVideo, sendUpdateVideo } from './send'
26import { shareVideoByServerAndChannel } from './index' 26import { shareVideoByServerAndChannel } from './index'
27import { isArray } from '../../helpers/custom-validators/misc' 27import { isArray } from '../../helpers/custom-validators/misc'
28import { VideoCaptionModel } from '../../models/video/video-caption' 28import { VideoCaptionModel } from '../../models/video/video-caption'
29import { JobQueue } from '../job-queue'
30import { ActivitypubHttpFetcherPayload } from '../job-queue/handlers/activitypub-http-fetcher'
29 31
30async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) { 32async function federateVideoIfNeeded (video: VideoModel, isNewVideo: boolean, transaction?: sequelize.Transaction) {
31 // If the video is not private and published, we federate it 33 // If the video is not private and published, we federate it
@@ -178,10 +180,10 @@ function getOrCreateVideoChannel (videoObject: VideoTorrentObject) {
178 return getOrCreateActorAndServerAndModel(channel.id) 180 return getOrCreateActorAndServerAndModel(channel.id)
179} 181}
180 182
181async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor: ActorModel) { 183async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor: ActorModel, waitThumbnail = false) {
182 logger.debug('Adding remote video %s.', videoObject.id) 184 logger.debug('Adding remote video %s.', videoObject.id)
183 185
184 return sequelizeTypescript.transaction(async t => { 186 const videoCreated: VideoModel = await sequelizeTypescript.transaction(async t => {
185 const sequelizeOptions = { 187 const sequelizeOptions = {
186 transaction: t 188 transaction: t
187 } 189 }
@@ -191,10 +193,6 @@ async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor:
191 const videoData = await videoActivityObjectToDBAttributes(channelActor.VideoChannel, videoObject, videoObject.to) 193 const videoData = await videoActivityObjectToDBAttributes(channelActor.VideoChannel, videoObject, videoObject.to)
192 const video = VideoModel.build(videoData) 194 const video = VideoModel.build(videoData)
193 195
194 // Don't block on remote HTTP request (we are in a transaction!)
195 generateThumbnailFromUrl(video, videoObject.icon)
196 .catch(err => logger.warn('Cannot generate thumbnail of %s.', videoObject.id, { err }))
197
198 const videoCreated = await video.save(sequelizeOptions) 196 const videoCreated = await video.save(sequelizeOptions)
199 197
200 // Process files 198 // Process files
@@ -222,68 +220,100 @@ async function getOrCreateVideo (videoObject: VideoTorrentObject, channelActor:
222 videoCreated.VideoChannel = channelActor.VideoChannel 220 videoCreated.VideoChannel = channelActor.VideoChannel
223 return videoCreated 221 return videoCreated
224 }) 222 })
223
224 const p = generateThumbnailFromUrl(videoCreated, videoObject.icon)
225 .catch(err => logger.warn('Cannot generate thumbnail of %s.', videoObject.id, { err }))
226
227 if (waitThumbnail === true) await p
228
229 return videoCreated
225} 230}
226 231
227async function getOrCreateAccountAndVideoAndChannel (videoObject: VideoTorrentObject | string, actor?: ActorModel) { 232type SyncParam = {
233 likes: boolean,
234 dislikes: boolean,
235 shares: boolean,
236 comments: boolean,
237 thumbnail: boolean
238}
239async function getOrCreateAccountAndVideoAndChannel (
240 videoObject: VideoTorrentObject | string,
241 syncParam: SyncParam = { likes: true, dislikes: true, shares: true, comments: true, thumbnail: true }
242) {
228 const videoUrl = typeof videoObject === 'string' ? videoObject : videoObject.id 243 const videoUrl = typeof videoObject === 'string' ? videoObject : videoObject.id
229 244
230 const videoFromDatabase = await VideoModel.loadByUrlAndPopulateAccount(videoUrl) 245 const videoFromDatabase = await VideoModel.loadByUrlAndPopulateAccount(videoUrl)
231 if (videoFromDatabase) { 246 if (videoFromDatabase) return { video: videoFromDatabase }
232 return {
233 video: videoFromDatabase,
234 actor: videoFromDatabase.VideoChannel.Account.Actor,
235 channelActor: videoFromDatabase.VideoChannel.Actor
236 }
237 }
238 247
239 videoObject = await fetchRemoteVideo(videoUrl) 248 const fetchedVideo = await fetchRemoteVideo(videoUrl)
240 if (!videoObject) throw new Error('Cannot fetch remote video with url: ' + videoUrl) 249 if (!fetchedVideo) throw new Error('Cannot fetch remote video with url: ' + videoUrl)
241 250
242 if (!actor) { 251 const channelActor = await getOrCreateVideoChannel(fetchedVideo)
243 const actorObj = videoObject.attributedTo.find(a => a.type === 'Person') 252 const video = await retryTransactionWrapper(getOrCreateVideo, fetchedVideo, channelActor, syncParam.thumbnail)
244 if (!actorObj) throw new Error('Cannot find associated actor to video ' + videoObject.url)
245 253
246 actor = await getOrCreateActorAndServerAndModel(actorObj.id) 254 // Process outside the transaction because we could fetch remote data
247 }
248 255
249 const channelActor = await getOrCreateVideoChannel(videoObject) 256 logger.info('Adding likes/dislikes/shares/comments of video %s.', video.uuid)
250 257
251 const video = await retryTransactionWrapper(getOrCreateVideo, videoObject, channelActor) 258 const jobPayloads: ActivitypubHttpFetcherPayload[] = []
252 259
253 // Process outside the transaction because we could fetch remote data 260 if (syncParam.likes === true) {
254 logger.info('Adding likes of video %s.', video.uuid) 261 await crawlCollectionPage<string>(fetchedVideo.likes, items => createRates(items, video, 'like'))
255 await crawlCollectionPage<string>(videoObject.likes, (items) => createRates(items, video, 'like')) 262 .catch(err => logger.error('Cannot add likes of video %s.', video.uuid, { err }))
263 } else {
264 jobPayloads.push({ uri: fetchedVideo.likes, videoId: video.id, type: 'video-likes' as 'video-likes' })
265 }
256 266
257 logger.info('Adding dislikes of video %s.', video.uuid) 267 if (syncParam.dislikes === true) {
258 await crawlCollectionPage<string>(videoObject.dislikes, (items) => createRates(items, video, 'dislike')) 268 await crawlCollectionPage<string>(fetchedVideo.dislikes, items => createRates(items, video, 'dislike'))
269 .catch(err => logger.error('Cannot add dislikes of video %s.', video.uuid, { err }))
270 } else {
271 jobPayloads.push({ uri: fetchedVideo.dislikes, videoId: video.id, type: 'video-dislikes' as 'video-dislikes' })
272 }
273
274 if (syncParam.shares === true) {
275 await crawlCollectionPage<string>(fetchedVideo.shares, items => addVideoShares(items, video))
276 .catch(err => logger.error('Cannot add shares of video %s.', video.uuid, { err }))
277 } else {
278 jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' })
279 }
259 280
260 logger.info('Adding shares of video %s.', video.uuid) 281 if (syncParam.comments === true) {
261 await crawlCollectionPage<string>(videoObject.shares, (items) => addVideoShares(items, video)) 282 await crawlCollectionPage<string>(fetchedVideo.comments, items => addVideoComments(items, video))
283 .catch(err => logger.error('Cannot add comments of video %s.', video.uuid, { err }))
284 } else {
285 jobPayloads.push({ uri: fetchedVideo.shares, videoId: video.id, type: 'video-shares' as 'video-shares' })
286 }
262 287
263 logger.info('Adding comments of video %s.', video.uuid) 288 await Bluebird.map(jobPayloads, payload => JobQueue.Instance.createJob({ type: 'activitypub-http-fetcher', payload }))
264 await crawlCollectionPage<string>(videoObject.comments, (items) => addVideoComments(items, video))
265 289
266 return { actor, channelActor, video } 290 return { video }
267} 291}
268 292
269async function createRates (actorUrls: string[], video: VideoModel, rate: VideoRateType) { 293async function createRates (actorUrls: string[], video: VideoModel, rate: VideoRateType) {
270 let rateCounts = 0 294 let rateCounts = 0
271 const tasks: Bluebird<number>[] = []
272
273 for (const actorUrl of actorUrls) {
274 const actor = await getOrCreateActorAndServerAndModel(actorUrl)
275 const p = AccountVideoRateModel
276 .create({
277 videoId: video.id,
278 accountId: actor.Account.id,
279 type: rate
280 })
281 .then(() => rateCounts += 1)
282
283 tasks.push(p)
284 }
285 295
286 await Promise.all(tasks) 296 await Bluebird.map(actorUrls, async actorUrl => {
297 try {
298 const actor = await getOrCreateActorAndServerAndModel(actorUrl)
299 const [ , created ] = await AccountVideoRateModel
300 .findOrCreate({
301 where: {
302 videoId: video.id,
303 accountId: actor.Account.id
304 },
305 defaults: {
306 videoId: video.id,
307 accountId: actor.Account.id,
308 type: rate
309 }
310 })
311
312 if (created) rateCounts += 1
313 } catch (err) {
314 logger.warn('Cannot add rate %s for actor %s.', rate, actorUrl, { err })
315 }
316 }, { concurrency: CRAWL_REQUEST_CONCURRENCY })
287 317
288 logger.info('Adding %d %s to video %s.', rateCounts, rate, video.uuid) 318 logger.info('Adding %d %s to video %s.', rateCounts, rate, video.uuid)
289 319
@@ -294,34 +324,35 @@ async function createRates (actorUrls: string[], video: VideoModel, rate: VideoR
294} 324}
295 325
296async function addVideoShares (shareUrls: string[], instance: VideoModel) { 326async function addVideoShares (shareUrls: string[], instance: VideoModel) {
297 for (const shareUrl of shareUrls) { 327 await Bluebird.map(shareUrls, async shareUrl => {
298 // Fetch url 328 try {
299 const { body } = await doRequest({ 329 // Fetch url
300 uri: shareUrl, 330 const { body } = await doRequest({
301 json: true, 331 uri: shareUrl,
302 activityPub: true 332 json: true,
303 }) 333 activityPub: true
304 if (!body || !body.actor) { 334 })
305 logger.warn('Cannot add remote share with url: %s, skipping...', shareUrl) 335 if (!body || !body.actor) throw new Error('Body of body actor is invalid')
306 continue
307 }
308
309 const actorUrl = body.actor
310 const actor = await getOrCreateActorAndServerAndModel(actorUrl)
311 336
312 const entry = { 337 const actorUrl = body.actor
313 actorId: actor.id, 338 const actor = await getOrCreateActorAndServerAndModel(actorUrl)
314 videoId: instance.id,
315 url: shareUrl
316 }
317 339
318 await VideoShareModel.findOrCreate({ 340 const entry = {
319 where: { 341 actorId: actor.id,
342 videoId: instance.id,
320 url: shareUrl 343 url: shareUrl
321 }, 344 }
322 defaults: entry 345
323 }) 346 await VideoShareModel.findOrCreate({
324 } 347 where: {
348 url: shareUrl
349 },
350 defaults: entry
351 })
352 } catch (err) {
353 logger.warn('Cannot add share %s.', shareUrl, { err })
354 }
355 }, { concurrency: CRAWL_REQUEST_CONCURRENCY })
325} 356}
326 357
327async function fetchRemoteVideo (videoUrl: string): Promise<VideoTorrentObject> { 358async function fetchRemoteVideo (videoUrl: string): Promise<VideoTorrentObject> {
@@ -355,5 +386,6 @@ export {
355 videoFileActivityUrlToDBAttributes, 386 videoFileActivityUrlToDBAttributes,
356 getOrCreateVideo, 387 getOrCreateVideo,
357 getOrCreateVideoChannel, 388 getOrCreateVideoChannel,
358 addVideoShares 389 addVideoShares,
390 createRates
359} 391}