aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/activitypub
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-09-24 13:07:33 +0200
committerChocobozzz <me@florianbigard.com>2018-09-24 13:38:39 +0200
commite5565833f62b97f62ea75eba5b479963ae78b873 (patch)
tree835793ce464f9666b0ceae79f3d278cc4e007b32 /server/lib/activitypub
parentd1a63fc7ac58a1db00d8ca4f43aadba02eb9b084 (diff)
downloadPeerTube-e5565833f62b97f62ea75eba5b479963ae78b873.tar.gz
PeerTube-e5565833f62b97f62ea75eba5b479963ae78b873.tar.zst
PeerTube-e5565833f62b97f62ea75eba5b479963ae78b873.zip
Improve redundancy: add 'min_lifetime' configuration
Diffstat (limited to 'server/lib/activitypub')
-rw-r--r--server/lib/activitypub/audience.ts7
-rw-r--r--server/lib/activitypub/cache-file.ts22
-rw-r--r--server/lib/activitypub/process/process-create.ts8
-rw-r--r--server/lib/activitypub/process/process-undo.ts2
-rw-r--r--server/lib/activitypub/process/process-update.ts28
-rw-r--r--server/lib/activitypub/send/send-update.ts4
-rw-r--r--server/lib/activitypub/videos.ts50
7 files changed, 76 insertions, 45 deletions
diff --git a/server/lib/activitypub/audience.ts b/server/lib/activitypub/audience.ts
index a86428461..10277eca7 100644
--- a/server/lib/activitypub/audience.ts
+++ b/server/lib/activitypub/audience.ts
@@ -50,7 +50,12 @@ function getAudienceFromFollowersOf (actorsInvolvedInObject: ActorModel[]): Acti
50 50
51async function getActorsInvolvedInVideo (video: VideoModel, t: Transaction) { 51async function getActorsInvolvedInVideo (video: VideoModel, t: Transaction) {
52 const actors = await VideoShareModel.loadActorsByShare(video.id, t) 52 const actors = await VideoShareModel.loadActorsByShare(video.id, t)
53 actors.push(video.VideoChannel.Account.Actor) 53
54 const videoActor = video.VideoChannel && video.VideoChannel.Account
55 ? video.VideoChannel.Account.Actor
56 : await ActorModel.loadAccountActorByVideoId(video.id, t)
57
58 actors.push(videoActor)
54 59
55 return actors 60 return actors
56} 61}
diff --git a/server/lib/activitypub/cache-file.ts b/server/lib/activitypub/cache-file.ts
index 87f8a4162..5286d8e6d 100644
--- a/server/lib/activitypub/cache-file.ts
+++ b/server/lib/activitypub/cache-file.ts
@@ -1,7 +1,7 @@
1import { CacheFileObject } from '../../../shared/index' 1import { CacheFileObject } from '../../../shared/index'
2import { VideoModel } from '../../models/video/video' 2import { VideoModel } from '../../models/video/video'
3import { sequelizeTypescript } from '../../initializers'
4import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' 3import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
4import { Transaction } from 'sequelize'
5 5
6function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) { 6function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) {
7 const url = cacheFileObject.url 7 const url = cacheFileObject.url
@@ -22,25 +22,29 @@ function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject
22 } 22 }
23} 23}
24 24
25function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) { 25function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }, t: Transaction) {
26 return sequelizeTypescript.transaction(async t => { 26 const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor)
27 const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor)
28 27
29 return VideoRedundancyModel.create(attributes, { transaction: t }) 28 return VideoRedundancyModel.create(attributes, { transaction: t })
30 })
31} 29}
32 30
33function updateCacheFile (cacheFileObject: CacheFileObject, redundancyModel: VideoRedundancyModel, byActor: { id?: number }) { 31function updateCacheFile (
32 cacheFileObject: CacheFileObject,
33 redundancyModel: VideoRedundancyModel,
34 video: VideoModel,
35 byActor: { id?: number },
36 t: Transaction
37) {
34 if (redundancyModel.actorId !== byActor.id) { 38 if (redundancyModel.actorId !== byActor.id) {
35 throw new Error('Cannot update redundancy ' + redundancyModel.url + ' of another actor.') 39 throw new Error('Cannot update redundancy ' + redundancyModel.url + ' of another actor.')
36 } 40 }
37 41
38 const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, redundancyModel.VideoFile.Video, byActor) 42 const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor)
39 43
40 redundancyModel.set('expires', attributes.expiresOn) 44 redundancyModel.set('expires', attributes.expiresOn)
41 redundancyModel.set('fileUrl', attributes.fileUrl) 45 redundancyModel.set('fileUrl', attributes.fileUrl)
42 46
43 return redundancyModel.save() 47 return redundancyModel.save({ transaction: t })
44} 48}
45 49
46export { 50export {
diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts
index cff8dcfc6..ceb5413ca 100644
--- a/server/lib/activitypub/process/process-create.ts
+++ b/server/lib/activitypub/process/process-create.ts
@@ -95,7 +95,7 @@ async function processCreateView (byActor: ActorModel, activity: ActivityCreate)
95 if (video.isOwned()) { 95 if (video.isOwned()) {
96 // Don't resend the activity to the sender 96 // Don't resend the activity to the sender
97 const exceptions = [ byActor ] 97 const exceptions = [ byActor ]
98 await forwardActivity(activity, undefined, exceptions) 98 await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
99 } 99 }
100} 100}
101 101
@@ -104,12 +104,14 @@ async function processCacheFile (byActor: ActorModel, activity: ActivityCreate)
104 104
105 const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFile.object }) 105 const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFile.object })
106 106
107 await createCacheFile(cacheFile, video, byActor) 107 await sequelizeTypescript.transaction(async t => {
108 return createCacheFile(cacheFile, video, byActor, t)
109 })
108 110
109 if (video.isOwned()) { 111 if (video.isOwned()) {
110 // Don't resend the activity to the sender 112 // Don't resend the activity to the sender
111 const exceptions = [ byActor ] 113 const exceptions = [ byActor ]
112 await forwardActivity(activity, undefined, exceptions) 114 await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
113 } 115 }
114} 116}
115 117
diff --git a/server/lib/activitypub/process/process-undo.ts b/server/lib/activitypub/process/process-undo.ts
index 73ca0a17c..ff019cd8c 100644
--- a/server/lib/activitypub/process/process-undo.ts
+++ b/server/lib/activitypub/process/process-undo.ts
@@ -100,7 +100,7 @@ async function processUndoCacheFile (byActor: ActorModel, activity: ActivityUndo
100 100
101 return sequelizeTypescript.transaction(async t => { 101 return sequelizeTypescript.transaction(async t => {
102 const cacheFile = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) 102 const cacheFile = await VideoRedundancyModel.loadByUrl(cacheFileObject.id)
103 if (!cacheFile) throw new Error('Unknown video cache ' + cacheFile.url) 103 if (!cacheFile) throw new Error('Unknown video cache ' + cacheFileObject.id)
104 104
105 if (cacheFile.actorId !== byActor.id) throw new Error('Cannot delete redundancy ' + cacheFile.url + ' of another actor.') 105 if (cacheFile.actorId !== byActor.id) throw new Error('Cannot delete redundancy ' + cacheFile.url + ' of another actor.')
106 106
diff --git a/server/lib/activitypub/process/process-update.ts b/server/lib/activitypub/process/process-update.ts
index ed3489ebf..e092a6729 100644
--- a/server/lib/activitypub/process/process-update.ts
+++ b/server/lib/activitypub/process/process-update.ts
@@ -12,6 +12,7 @@ import { sanitizeAndCheckVideoTorrentObject } from '../../../helpers/custom-vali
12import { isCacheFileObjectValid } from '../../../helpers/custom-validators/activitypub/cache-file' 12import { isCacheFileObjectValid } from '../../../helpers/custom-validators/activitypub/cache-file'
13import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' 13import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy'
14import { createCacheFile, updateCacheFile } from '../cache-file' 14import { createCacheFile, updateCacheFile } from '../cache-file'
15import { forwardVideoRelatedActivity } from '../send/utils'
15 16
16async function processUpdateActivity (activity: ActivityUpdate, byActor: ActorModel) { 17async function processUpdateActivity (activity: ActivityUpdate, byActor: ActorModel) {
17 const objectType = activity.object.type 18 const objectType = activity.object.type
@@ -68,18 +69,29 @@ async function processUpdateVideo (actor: ActorModel, activity: ActivityUpdate)
68async function processUpdateCacheFile (byActor: ActorModel, activity: ActivityUpdate) { 69async function processUpdateCacheFile (byActor: ActorModel, activity: ActivityUpdate) {
69 const cacheFileObject = activity.object as CacheFileObject 70 const cacheFileObject = activity.object as CacheFileObject
70 71
71 if (!isCacheFileObjectValid(cacheFileObject) === false) { 72 if (!isCacheFileObjectValid(cacheFileObject)) {
72 logger.debug('Cahe file object sent by update is not valid.', { cacheFileObject }) 73 logger.debug('Cache file object sent by update is not valid.', { cacheFileObject })
73 return undefined 74 return undefined
74 } 75 }
75 76
76 const redundancyModel = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) 77 const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFileObject.object })
77 if (!redundancyModel) { 78
78 const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFileObject.id }) 79 await sequelizeTypescript.transaction(async t => {
79 return createCacheFile(cacheFileObject, video, byActor) 80 const redundancyModel = await VideoRedundancyModel.loadByUrl(cacheFileObject.id, t)
80 } 81
82 if (!redundancyModel) {
83 await createCacheFile(cacheFileObject, video, byActor, t)
84 } else {
85 await updateCacheFile(cacheFileObject, redundancyModel, video, byActor, t)
86 }
87 })
88
89 if (video.isOwned()) {
90 // Don't resend the activity to the sender
91 const exceptions = [ byActor ]
81 92
82 return updateCacheFile(cacheFileObject, redundancyModel, byActor) 93 await forwardVideoRelatedActivity(activity, undefined, exceptions, video)
94 }
83} 95}
84 96
85async function processUpdateActor (actor: ActorModel, activity: ActivityUpdate) { 97async function processUpdateActor (actor: ActorModel, activity: ActivityUpdate) {
diff --git a/server/lib/activitypub/send/send-update.ts b/server/lib/activitypub/send/send-update.ts
index ec46789b7..a68f03edf 100644
--- a/server/lib/activitypub/send/send-update.ts
+++ b/server/lib/activitypub/send/send-update.ts
@@ -7,8 +7,8 @@ import { VideoModel } from '../../../models/video/video'
7import { VideoChannelModel } from '../../../models/video/video-channel' 7import { VideoChannelModel } from '../../../models/video/video-channel'
8import { VideoShareModel } from '../../../models/video/video-share' 8import { VideoShareModel } from '../../../models/video/video-share'
9import { getUpdateActivityPubUrl } from '../url' 9import { getUpdateActivityPubUrl } from '../url'
10import { broadcastToFollowers, sendVideoRelatedActivity, unicastTo } from './utils' 10import { broadcastToFollowers, sendVideoRelatedActivity } from './utils'
11import { audiencify, getActorsInvolvedInVideo, getAudience, getAudienceFromFollowersOf } from '../audience' 11import { audiencify, getActorsInvolvedInVideo, getAudience } from '../audience'
12import { logger } from '../../../helpers/logger' 12import { logger } from '../../../helpers/logger'
13import { VideoCaptionModel } from '../../../models/video/video-caption' 13import { VideoCaptionModel } from '../../../models/video/video-caption'
14import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' 14import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy'
diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts
index 48c0e0a5c..db72ef23c 100644
--- a/server/lib/activitypub/videos.ts
+++ b/server/lib/activitypub/videos.ts
@@ -176,7 +176,7 @@ async function getOrCreateVideoAndAccountAndChannel (options: {
176 syncParam, 176 syncParam,
177 refreshViews 177 refreshViews
178 } 178 }
179 const p = retryTransactionWrapper(refreshVideoIfNeeded, refreshOptions) 179 const p = refreshVideoIfNeeded(refreshOptions)
180 if (syncParam.refreshVideo === true) videoFromDatabase = await p 180 if (syncParam.refreshVideo === true) videoFromDatabase = await p
181 181
182 return { video: videoFromDatabase } 182 return { video: videoFromDatabase }
@@ -245,29 +245,37 @@ async function updateVideoFromAP (options: {
245 generateThumbnailFromUrl(options.video, options.videoObject.icon) 245 generateThumbnailFromUrl(options.video, options.videoObject.icon)
246 .catch(err => logger.warn('Cannot generate thumbnail of %s.', options.videoObject.id, { err })) 246 .catch(err => logger.warn('Cannot generate thumbnail of %s.', options.videoObject.id, { err }))
247 247
248 // Remove old video files 248 {
249 const videoFileDestroyTasks: Bluebird<void>[] = [] 249 const videoFileAttributes = videoFileActivityUrlToDBAttributes(options.video, options.videoObject)
250 for (const videoFile of options.video.VideoFiles) { 250 const newVideoFiles = videoFileAttributes.map(a => new VideoFileModel(a))
251 videoFileDestroyTasks.push(videoFile.destroy(sequelizeOptions))
252 }
253 await Promise.all(videoFileDestroyTasks)
254 251
255 const videoFileAttributes = videoFileActivityUrlToDBAttributes(options.video, options.videoObject) 252 // Remove video files that do not exist anymore
256 const tasks = videoFileAttributes.map(f => VideoFileModel.create(f, sequelizeOptions)) 253 const destroyTasks = options.video.VideoFiles
257 await Promise.all(tasks) 254 .filter(f => !newVideoFiles.find(newFile => newFile.hasSameUniqueKeysThan(f)))
255 .map(f => f.destroy(sequelizeOptions))
256 await Promise.all(destroyTasks)
258 257
259 // Update Tags 258 // Update or add other one
260 const tags = options.videoObject.tag.map(tag => tag.name) 259 const upsertTasks = videoFileAttributes.map(a => VideoFileModel.upsert(a, sequelizeOptions))
261 const tagInstances = await TagModel.findOrCreateTags(tags, t) 260 await Promise.all(upsertTasks)
262 await options.video.$set('Tags', tagInstances, sequelizeOptions) 261 }
263 262
264 // Update captions 263 {
265 await VideoCaptionModel.deleteAllCaptionsOfRemoteVideo(options.video.id, t) 264 // Update Tags
265 const tags = options.videoObject.tag.map(tag => tag.name)
266 const tagInstances = await TagModel.findOrCreateTags(tags, t)
267 await options.video.$set('Tags', tagInstances, sequelizeOptions)
268 }
266 269
267 const videoCaptionsPromises = options.videoObject.subtitleLanguage.map(c => { 270 {
268 return VideoCaptionModel.insertOrReplaceLanguage(options.video.id, c.identifier, t) 271 // Update captions
269 }) 272 await VideoCaptionModel.deleteAllCaptionsOfRemoteVideo(options.video.id, t)
270 await Promise.all(videoCaptionsPromises) 273
274 const videoCaptionsPromises = options.videoObject.subtitleLanguage.map(c => {
275 return VideoCaptionModel.insertOrReplaceLanguage(options.video.id, c.identifier, t)
276 })
277 await Promise.all(videoCaptionsPromises)
278 }
271 }) 279 })
272 280
273 logger.info('Remote video with uuid %s updated', options.videoObject.uuid) 281 logger.info('Remote video with uuid %s updated', options.videoObject.uuid)
@@ -382,7 +390,7 @@ async function refreshVideoIfNeeded (options: {
382 channel: channelActor.VideoChannel, 390 channel: channelActor.VideoChannel,
383 updateViews: options.refreshViews 391 updateViews: options.refreshViews
384 } 392 }
385 await updateVideoFromAP(updateOptions) 393 await retryTransactionWrapper(updateVideoFromAP, updateOptions)
386 await syncVideoExternalAttributes(video, videoObject, options.syncParam) 394 await syncVideoExternalAttributes(video, videoObject, options.syncParam)
387 } catch (err) { 395 } catch (err) {
388 logger.warn('Cannot refresh video.', { err }) 396 logger.warn('Cannot refresh video.', { err })