diff options
author | Chocobozzz <me@florianbigard.com> | 2018-09-24 13:07:33 +0200 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2018-09-24 13:38:39 +0200 |
commit | e5565833f62b97f62ea75eba5b479963ae78b873 (patch) | |
tree | 835793ce464f9666b0ceae79f3d278cc4e007b32 /server/lib/activitypub | |
parent | d1a63fc7ac58a1db00d8ca4f43aadba02eb9b084 (diff) | |
download | PeerTube-e5565833f62b97f62ea75eba5b479963ae78b873.tar.gz PeerTube-e5565833f62b97f62ea75eba5b479963ae78b873.tar.zst PeerTube-e5565833f62b97f62ea75eba5b479963ae78b873.zip |
Improve redundancy: add 'min_lifetime' configuration
Diffstat (limited to 'server/lib/activitypub')
-rw-r--r-- | server/lib/activitypub/audience.ts | 7 | ||||
-rw-r--r-- | server/lib/activitypub/cache-file.ts | 22 | ||||
-rw-r--r-- | server/lib/activitypub/process/process-create.ts | 8 | ||||
-rw-r--r-- | server/lib/activitypub/process/process-undo.ts | 2 | ||||
-rw-r--r-- | server/lib/activitypub/process/process-update.ts | 28 | ||||
-rw-r--r-- | server/lib/activitypub/send/send-update.ts | 4 | ||||
-rw-r--r-- | server/lib/activitypub/videos.ts | 50 |
7 files changed, 76 insertions, 45 deletions
diff --git a/server/lib/activitypub/audience.ts b/server/lib/activitypub/audience.ts index a86428461..10277eca7 100644 --- a/server/lib/activitypub/audience.ts +++ b/server/lib/activitypub/audience.ts | |||
@@ -50,7 +50,12 @@ function getAudienceFromFollowersOf (actorsInvolvedInObject: ActorModel[]): Acti | |||
50 | 50 | ||
51 | async function getActorsInvolvedInVideo (video: VideoModel, t: Transaction) { | 51 | async function getActorsInvolvedInVideo (video: VideoModel, t: Transaction) { |
52 | const actors = await VideoShareModel.loadActorsByShare(video.id, t) | 52 | const actors = await VideoShareModel.loadActorsByShare(video.id, t) |
53 | actors.push(video.VideoChannel.Account.Actor) | 53 | |
54 | const videoActor = video.VideoChannel && video.VideoChannel.Account | ||
55 | ? video.VideoChannel.Account.Actor | ||
56 | : await ActorModel.loadAccountActorByVideoId(video.id, t) | ||
57 | |||
58 | actors.push(videoActor) | ||
54 | 59 | ||
55 | return actors | 60 | return actors |
56 | } | 61 | } |
diff --git a/server/lib/activitypub/cache-file.ts b/server/lib/activitypub/cache-file.ts index 87f8a4162..5286d8e6d 100644 --- a/server/lib/activitypub/cache-file.ts +++ b/server/lib/activitypub/cache-file.ts | |||
@@ -1,7 +1,7 @@ | |||
1 | import { CacheFileObject } from '../../../shared/index' | 1 | import { CacheFileObject } from '../../../shared/index' |
2 | import { VideoModel } from '../../models/video/video' | 2 | import { VideoModel } from '../../models/video/video' |
3 | import { sequelizeTypescript } from '../../initializers' | ||
4 | import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' | 3 | import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' |
4 | import { Transaction } from 'sequelize' | ||
5 | 5 | ||
6 | function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) { | 6 | function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) { |
7 | const url = cacheFileObject.url | 7 | const url = cacheFileObject.url |
@@ -22,25 +22,29 @@ function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject | |||
22 | } | 22 | } |
23 | } | 23 | } |
24 | 24 | ||
25 | function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }) { | 25 | function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, byActor: { id?: number }, t: Transaction) { |
26 | return sequelizeTypescript.transaction(async t => { | 26 | const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor) |
27 | const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor) | ||
28 | 27 | ||
29 | return VideoRedundancyModel.create(attributes, { transaction: t }) | 28 | return VideoRedundancyModel.create(attributes, { transaction: t }) |
30 | }) | ||
31 | } | 29 | } |
32 | 30 | ||
33 | function updateCacheFile (cacheFileObject: CacheFileObject, redundancyModel: VideoRedundancyModel, byActor: { id?: number }) { | 31 | function updateCacheFile ( |
32 | cacheFileObject: CacheFileObject, | ||
33 | redundancyModel: VideoRedundancyModel, | ||
34 | video: VideoModel, | ||
35 | byActor: { id?: number }, | ||
36 | t: Transaction | ||
37 | ) { | ||
34 | if (redundancyModel.actorId !== byActor.id) { | 38 | if (redundancyModel.actorId !== byActor.id) { |
35 | throw new Error('Cannot update redundancy ' + redundancyModel.url + ' of another actor.') | 39 | throw new Error('Cannot update redundancy ' + redundancyModel.url + ' of another actor.') |
36 | } | 40 | } |
37 | 41 | ||
38 | const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, redundancyModel.VideoFile.Video, byActor) | 42 | const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor) |
39 | 43 | ||
40 | redundancyModel.set('expires', attributes.expiresOn) | 44 | redundancyModel.set('expires', attributes.expiresOn) |
41 | redundancyModel.set('fileUrl', attributes.fileUrl) | 45 | redundancyModel.set('fileUrl', attributes.fileUrl) |
42 | 46 | ||
43 | return redundancyModel.save() | 47 | return redundancyModel.save({ transaction: t }) |
44 | } | 48 | } |
45 | 49 | ||
46 | export { | 50 | export { |
diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts index cff8dcfc6..ceb5413ca 100644 --- a/server/lib/activitypub/process/process-create.ts +++ b/server/lib/activitypub/process/process-create.ts | |||
@@ -95,7 +95,7 @@ async function processCreateView (byActor: ActorModel, activity: ActivityCreate) | |||
95 | if (video.isOwned()) { | 95 | if (video.isOwned()) { |
96 | // Don't resend the activity to the sender | 96 | // Don't resend the activity to the sender |
97 | const exceptions = [ byActor ] | 97 | const exceptions = [ byActor ] |
98 | await forwardActivity(activity, undefined, exceptions) | 98 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) |
99 | } | 99 | } |
100 | } | 100 | } |
101 | 101 | ||
@@ -104,12 +104,14 @@ async function processCacheFile (byActor: ActorModel, activity: ActivityCreate) | |||
104 | 104 | ||
105 | const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFile.object }) | 105 | const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFile.object }) |
106 | 106 | ||
107 | await createCacheFile(cacheFile, video, byActor) | 107 | await sequelizeTypescript.transaction(async t => { |
108 | return createCacheFile(cacheFile, video, byActor, t) | ||
109 | }) | ||
108 | 110 | ||
109 | if (video.isOwned()) { | 111 | if (video.isOwned()) { |
110 | // Don't resend the activity to the sender | 112 | // Don't resend the activity to the sender |
111 | const exceptions = [ byActor ] | 113 | const exceptions = [ byActor ] |
112 | await forwardActivity(activity, undefined, exceptions) | 114 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) |
113 | } | 115 | } |
114 | } | 116 | } |
115 | 117 | ||
diff --git a/server/lib/activitypub/process/process-undo.ts b/server/lib/activitypub/process/process-undo.ts index 73ca0a17c..ff019cd8c 100644 --- a/server/lib/activitypub/process/process-undo.ts +++ b/server/lib/activitypub/process/process-undo.ts | |||
@@ -100,7 +100,7 @@ async function processUndoCacheFile (byActor: ActorModel, activity: ActivityUndo | |||
100 | 100 | ||
101 | return sequelizeTypescript.transaction(async t => { | 101 | return sequelizeTypescript.transaction(async t => { |
102 | const cacheFile = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) | 102 | const cacheFile = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) |
103 | if (!cacheFile) throw new Error('Unknown video cache ' + cacheFile.url) | 103 | if (!cacheFile) throw new Error('Unknown video cache ' + cacheFileObject.id) |
104 | 104 | ||
105 | if (cacheFile.actorId !== byActor.id) throw new Error('Cannot delete redundancy ' + cacheFile.url + ' of another actor.') | 105 | if (cacheFile.actorId !== byActor.id) throw new Error('Cannot delete redundancy ' + cacheFile.url + ' of another actor.') |
106 | 106 | ||
diff --git a/server/lib/activitypub/process/process-update.ts b/server/lib/activitypub/process/process-update.ts index ed3489ebf..e092a6729 100644 --- a/server/lib/activitypub/process/process-update.ts +++ b/server/lib/activitypub/process/process-update.ts | |||
@@ -12,6 +12,7 @@ import { sanitizeAndCheckVideoTorrentObject } from '../../../helpers/custom-vali | |||
12 | import { isCacheFileObjectValid } from '../../../helpers/custom-validators/activitypub/cache-file' | 12 | import { isCacheFileObjectValid } from '../../../helpers/custom-validators/activitypub/cache-file' |
13 | import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' | 13 | import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' |
14 | import { createCacheFile, updateCacheFile } from '../cache-file' | 14 | import { createCacheFile, updateCacheFile } from '../cache-file' |
15 | import { forwardVideoRelatedActivity } from '../send/utils' | ||
15 | 16 | ||
16 | async function processUpdateActivity (activity: ActivityUpdate, byActor: ActorModel) { | 17 | async function processUpdateActivity (activity: ActivityUpdate, byActor: ActorModel) { |
17 | const objectType = activity.object.type | 18 | const objectType = activity.object.type |
@@ -68,18 +69,29 @@ async function processUpdateVideo (actor: ActorModel, activity: ActivityUpdate) | |||
68 | async function processUpdateCacheFile (byActor: ActorModel, activity: ActivityUpdate) { | 69 | async function processUpdateCacheFile (byActor: ActorModel, activity: ActivityUpdate) { |
69 | const cacheFileObject = activity.object as CacheFileObject | 70 | const cacheFileObject = activity.object as CacheFileObject |
70 | 71 | ||
71 | if (!isCacheFileObjectValid(cacheFileObject) === false) { | 72 | if (!isCacheFileObjectValid(cacheFileObject)) { |
72 | logger.debug('Cahe file object sent by update is not valid.', { cacheFileObject }) | 73 | logger.debug('Cache file object sent by update is not valid.', { cacheFileObject }) |
73 | return undefined | 74 | return undefined |
74 | } | 75 | } |
75 | 76 | ||
76 | const redundancyModel = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) | 77 | const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFileObject.object }) |
77 | if (!redundancyModel) { | 78 | |
78 | const { video } = await getOrCreateVideoAndAccountAndChannel({ videoObject: cacheFileObject.id }) | 79 | await sequelizeTypescript.transaction(async t => { |
79 | return createCacheFile(cacheFileObject, video, byActor) | 80 | const redundancyModel = await VideoRedundancyModel.loadByUrl(cacheFileObject.id, t) |
80 | } | 81 | |
82 | if (!redundancyModel) { | ||
83 | await createCacheFile(cacheFileObject, video, byActor, t) | ||
84 | } else { | ||
85 | await updateCacheFile(cacheFileObject, redundancyModel, video, byActor, t) | ||
86 | } | ||
87 | }) | ||
88 | |||
89 | if (video.isOwned()) { | ||
90 | // Don't resend the activity to the sender | ||
91 | const exceptions = [ byActor ] | ||
81 | 92 | ||
82 | return updateCacheFile(cacheFileObject, redundancyModel, byActor) | 93 | await forwardVideoRelatedActivity(activity, undefined, exceptions, video) |
94 | } | ||
83 | } | 95 | } |
84 | 96 | ||
85 | async function processUpdateActor (actor: ActorModel, activity: ActivityUpdate) { | 97 | async function processUpdateActor (actor: ActorModel, activity: ActivityUpdate) { |
diff --git a/server/lib/activitypub/send/send-update.ts b/server/lib/activitypub/send/send-update.ts index ec46789b7..a68f03edf 100644 --- a/server/lib/activitypub/send/send-update.ts +++ b/server/lib/activitypub/send/send-update.ts | |||
@@ -7,8 +7,8 @@ import { VideoModel } from '../../../models/video/video' | |||
7 | import { VideoChannelModel } from '../../../models/video/video-channel' | 7 | import { VideoChannelModel } from '../../../models/video/video-channel' |
8 | import { VideoShareModel } from '../../../models/video/video-share' | 8 | import { VideoShareModel } from '../../../models/video/video-share' |
9 | import { getUpdateActivityPubUrl } from '../url' | 9 | import { getUpdateActivityPubUrl } from '../url' |
10 | import { broadcastToFollowers, sendVideoRelatedActivity, unicastTo } from './utils' | 10 | import { broadcastToFollowers, sendVideoRelatedActivity } from './utils' |
11 | import { audiencify, getActorsInvolvedInVideo, getAudience, getAudienceFromFollowersOf } from '../audience' | 11 | import { audiencify, getActorsInvolvedInVideo, getAudience } from '../audience' |
12 | import { logger } from '../../../helpers/logger' | 12 | import { logger } from '../../../helpers/logger' |
13 | import { VideoCaptionModel } from '../../../models/video/video-caption' | 13 | import { VideoCaptionModel } from '../../../models/video/video-caption' |
14 | import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' | 14 | import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' |
diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index 48c0e0a5c..db72ef23c 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts | |||
@@ -176,7 +176,7 @@ async function getOrCreateVideoAndAccountAndChannel (options: { | |||
176 | syncParam, | 176 | syncParam, |
177 | refreshViews | 177 | refreshViews |
178 | } | 178 | } |
179 | const p = retryTransactionWrapper(refreshVideoIfNeeded, refreshOptions) | 179 | const p = refreshVideoIfNeeded(refreshOptions) |
180 | if (syncParam.refreshVideo === true) videoFromDatabase = await p | 180 | if (syncParam.refreshVideo === true) videoFromDatabase = await p |
181 | 181 | ||
182 | return { video: videoFromDatabase } | 182 | return { video: videoFromDatabase } |
@@ -245,29 +245,37 @@ async function updateVideoFromAP (options: { | |||
245 | generateThumbnailFromUrl(options.video, options.videoObject.icon) | 245 | generateThumbnailFromUrl(options.video, options.videoObject.icon) |
246 | .catch(err => logger.warn('Cannot generate thumbnail of %s.', options.videoObject.id, { err })) | 246 | .catch(err => logger.warn('Cannot generate thumbnail of %s.', options.videoObject.id, { err })) |
247 | 247 | ||
248 | // Remove old video files | 248 | { |
249 | const videoFileDestroyTasks: Bluebird<void>[] = [] | 249 | const videoFileAttributes = videoFileActivityUrlToDBAttributes(options.video, options.videoObject) |
250 | for (const videoFile of options.video.VideoFiles) { | 250 | const newVideoFiles = videoFileAttributes.map(a => new VideoFileModel(a)) |
251 | videoFileDestroyTasks.push(videoFile.destroy(sequelizeOptions)) | ||
252 | } | ||
253 | await Promise.all(videoFileDestroyTasks) | ||
254 | 251 | ||
255 | const videoFileAttributes = videoFileActivityUrlToDBAttributes(options.video, options.videoObject) | 252 | // Remove video files that do not exist anymore |
256 | const tasks = videoFileAttributes.map(f => VideoFileModel.create(f, sequelizeOptions)) | 253 | const destroyTasks = options.video.VideoFiles |
257 | await Promise.all(tasks) | 254 | .filter(f => !newVideoFiles.find(newFile => newFile.hasSameUniqueKeysThan(f))) |
255 | .map(f => f.destroy(sequelizeOptions)) | ||
256 | await Promise.all(destroyTasks) | ||
258 | 257 | ||
259 | // Update Tags | 258 | // Update or add other one |
260 | const tags = options.videoObject.tag.map(tag => tag.name) | 259 | const upsertTasks = videoFileAttributes.map(a => VideoFileModel.upsert(a, sequelizeOptions)) |
261 | const tagInstances = await TagModel.findOrCreateTags(tags, t) | 260 | await Promise.all(upsertTasks) |
262 | await options.video.$set('Tags', tagInstances, sequelizeOptions) | 261 | } |
263 | 262 | ||
264 | // Update captions | 263 | { |
265 | await VideoCaptionModel.deleteAllCaptionsOfRemoteVideo(options.video.id, t) | 264 | // Update Tags |
265 | const tags = options.videoObject.tag.map(tag => tag.name) | ||
266 | const tagInstances = await TagModel.findOrCreateTags(tags, t) | ||
267 | await options.video.$set('Tags', tagInstances, sequelizeOptions) | ||
268 | } | ||
266 | 269 | ||
267 | const videoCaptionsPromises = options.videoObject.subtitleLanguage.map(c => { | 270 | { |
268 | return VideoCaptionModel.insertOrReplaceLanguage(options.video.id, c.identifier, t) | 271 | // Update captions |
269 | }) | 272 | await VideoCaptionModel.deleteAllCaptionsOfRemoteVideo(options.video.id, t) |
270 | await Promise.all(videoCaptionsPromises) | 273 | |
274 | const videoCaptionsPromises = options.videoObject.subtitleLanguage.map(c => { | ||
275 | return VideoCaptionModel.insertOrReplaceLanguage(options.video.id, c.identifier, t) | ||
276 | }) | ||
277 | await Promise.all(videoCaptionsPromises) | ||
278 | } | ||
271 | }) | 279 | }) |
272 | 280 | ||
273 | logger.info('Remote video with uuid %s updated', options.videoObject.uuid) | 281 | logger.info('Remote video with uuid %s updated', options.videoObject.uuid) |
@@ -382,7 +390,7 @@ async function refreshVideoIfNeeded (options: { | |||
382 | channel: channelActor.VideoChannel, | 390 | channel: channelActor.VideoChannel, |
383 | updateViews: options.refreshViews | 391 | updateViews: options.refreshViews |
384 | } | 392 | } |
385 | await updateVideoFromAP(updateOptions) | 393 | await retryTransactionWrapper(updateVideoFromAP, updateOptions) |
386 | await syncVideoExternalAttributes(video, videoObject, options.syncParam) | 394 | await syncVideoExternalAttributes(video, videoObject, options.syncParam) |
387 | } catch (err) { | 395 | } catch (err) { |
388 | logger.warn('Cannot refresh video.', { err }) | 396 | logger.warn('Cannot refresh video.', { err }) |