aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/activitypub/process
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-09-11 16:27:07 +0200
committerChocobozzz <me@florianbigard.com>2018-09-13 14:05:49 +0200
commitc48e82b5e0478434de30626d14594a97f2402e7c (patch)
treea78e5272bd0fe4f5b41831e571e02d05f1515b82 /server/lib/activitypub/process
parenta651038487faa838bda3ce04695b08bc65baff70 (diff)
downloadPeerTube-c48e82b5e0478434de30626d14594a97f2402e7c.tar.gz
PeerTube-c48e82b5e0478434de30626d14594a97f2402e7c.tar.zst
PeerTube-c48e82b5e0478434de30626d14594a97f2402e7c.zip
Basic video redundancy implementation
Diffstat (limited to 'server/lib/activitypub/process')
-rw-r--r--server/lib/activitypub/process/process-create.ts21
-rw-r--r--server/lib/activitypub/process/process-undo.ts44
-rw-r--r--server/lib/activitypub/process/process-update.ts34
3 files changed, 88 insertions, 11 deletions
diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts
index 16f426e23..32e555acf 100644
--- a/server/lib/activitypub/process/process-create.ts
+++ b/server/lib/activitypub/process/process-create.ts
@@ -1,4 +1,4 @@
1import { ActivityCreate, VideoAbuseState, VideoTorrentObject } from '../../../../shared' 1import { ActivityCreate, CacheFileObject, VideoAbuseState, VideoTorrentObject } from '../../../../shared'
2import { DislikeObject, VideoAbuseObject, ViewObject } from '../../../../shared/models/activitypub/objects' 2import { DislikeObject, VideoAbuseObject, ViewObject } from '../../../../shared/models/activitypub/objects'
3import { VideoCommentObject } from '../../../../shared/models/activitypub/objects/video-comment-object' 3import { VideoCommentObject } from '../../../../shared/models/activitypub/objects/video-comment-object'
4import { retryTransactionWrapper } from '../../../helpers/database-utils' 4import { retryTransactionWrapper } from '../../../helpers/database-utils'
@@ -12,6 +12,7 @@ import { addVideoComment, resolveThread } from '../video-comments'
12import { getOrCreateVideoAndAccountAndChannel } from '../videos' 12import { getOrCreateVideoAndAccountAndChannel } from '../videos'
13import { forwardActivity, forwardVideoRelatedActivity } from '../send/utils' 13import { forwardActivity, forwardVideoRelatedActivity } from '../send/utils'
14import { Redis } from '../../redis' 14import { Redis } from '../../redis'
15import { createCacheFile } from '../cache-file'
15 16
16async function processCreateActivity (activity: ActivityCreate) { 17async function processCreateActivity (activity: ActivityCreate) {
17 const activityObject = activity.object 18 const activityObject = activity.object
@@ -28,6 +29,8 @@ async function processCreateActivity (activity: ActivityCreate) {
28 return retryTransactionWrapper(processCreateVideoAbuse, actor, activityObject as VideoAbuseObject) 29 return retryTransactionWrapper(processCreateVideoAbuse, actor, activityObject as VideoAbuseObject)
29 } else if (activityType === 'Note') { 30 } else if (activityType === 'Note') {
30 return retryTransactionWrapper(processCreateVideoComment, actor, activity) 31 return retryTransactionWrapper(processCreateVideoComment, actor, activity)
32 } else if (activityType === 'CacheFile') {
33 return retryTransactionWrapper(processCacheFile, actor, activity)
31 } 34 }
32 35
33 logger.warn('Unknown activity object type %s when creating activity.', activityType, { activity: activity.id }) 36 logger.warn('Unknown activity object type %s when creating activity.', activityType, { activity: activity.id })
@@ -97,6 +100,20 @@ async function processCreateView (byActor: ActorModel, activity: ActivityCreate)
97 } 100 }
98} 101}
99 102
103async function processCacheFile (byActor: ActorModel, activity: ActivityCreate) {
104 const cacheFile = activity.object as CacheFileObject
105
106 const { video } = await getOrCreateVideoAndAccountAndChannel(cacheFile.object)
107
108 await createCacheFile(cacheFile, video, byActor)
109
110 if (video.isOwned()) {
111 // Don't resend the activity to the sender
112 const exceptions = [ byActor ]
113 await forwardActivity(activity, undefined, exceptions)
114 }
115}
116
100async function processCreateVideoAbuse (actor: ActorModel, videoAbuseToCreateData: VideoAbuseObject) { 117async function processCreateVideoAbuse (actor: ActorModel, videoAbuseToCreateData: VideoAbuseObject) {
101 logger.debug('Reporting remote abuse for video %s.', videoAbuseToCreateData.object) 118 logger.debug('Reporting remote abuse for video %s.', videoAbuseToCreateData.object)
102 119
@@ -113,7 +130,7 @@ async function processCreateVideoAbuse (actor: ActorModel, videoAbuseToCreateDat
113 state: VideoAbuseState.PENDING 130 state: VideoAbuseState.PENDING
114 } 131 }
115 132
116 await VideoAbuseModel.create(videoAbuseData) 133 await VideoAbuseModel.create(videoAbuseData, { transaction: t })
117 134
118 logger.info('Remote abuse for video uuid %s created', videoAbuseToCreateData.object) 135 logger.info('Remote abuse for video uuid %s created', videoAbuseToCreateData.object)
119 }) 136 })
diff --git a/server/lib/activitypub/process/process-undo.ts b/server/lib/activitypub/process/process-undo.ts
index 1c1de8827..0eb5fa392 100644
--- a/server/lib/activitypub/process/process-undo.ts
+++ b/server/lib/activitypub/process/process-undo.ts
@@ -1,4 +1,4 @@
1import { ActivityAnnounce, ActivityFollow, ActivityLike, ActivityUndo } from '../../../../shared/models/activitypub' 1import { ActivityAnnounce, ActivityFollow, ActivityLike, ActivityUndo, CacheFileObject } from '../../../../shared/models/activitypub'
2import { DislikeObject } from '../../../../shared/models/activitypub/objects' 2import { DislikeObject } from '../../../../shared/models/activitypub/objects'
3import { getActorUrl } from '../../../helpers/activitypub' 3import { getActorUrl } from '../../../helpers/activitypub'
4import { retryTransactionWrapper } from '../../../helpers/database-utils' 4import { retryTransactionWrapper } from '../../../helpers/database-utils'
@@ -11,6 +11,7 @@ import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
11import { forwardVideoRelatedActivity } from '../send/utils' 11import { forwardVideoRelatedActivity } from '../send/utils'
12import { getOrCreateVideoAndAccountAndChannel } from '../videos' 12import { getOrCreateVideoAndAccountAndChannel } from '../videos'
13import { VideoShareModel } from '../../../models/video/video-share' 13import { VideoShareModel } from '../../../models/video/video-share'
14import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy'
14 15
15async function processUndoActivity (activity: ActivityUndo) { 16async function processUndoActivity (activity: ActivityUndo) {
16 const activityToUndo = activity.object 17 const activityToUndo = activity.object
@@ -19,11 +20,21 @@ async function processUndoActivity (activity: ActivityUndo) {
19 20
20 if (activityToUndo.type === 'Like') { 21 if (activityToUndo.type === 'Like') {
21 return retryTransactionWrapper(processUndoLike, actorUrl, activity) 22 return retryTransactionWrapper(processUndoLike, actorUrl, activity)
22 } else if (activityToUndo.type === 'Create' && activityToUndo.object.type === 'Dislike') { 23 }
23 return retryTransactionWrapper(processUndoDislike, actorUrl, activity) 24
24 } else if (activityToUndo.type === 'Follow') { 25 if (activityToUndo.type === 'Create') {
26 if (activityToUndo.object.type === 'Dislike') {
27 return retryTransactionWrapper(processUndoDislike, actorUrl, activity)
28 } else if (activityToUndo.object.type === 'CacheFile') {
29 return retryTransactionWrapper(processUndoCacheFile, actorUrl, activity)
30 }
31 }
32
33 if (activityToUndo.type === 'Follow') {
25 return retryTransactionWrapper(processUndoFollow, actorUrl, activityToUndo) 34 return retryTransactionWrapper(processUndoFollow, actorUrl, activityToUndo)
26 } else if (activityToUndo.type === 'Announce') { 35 }
36
37 if (activityToUndo.type === 'Announce') {
27 return retryTransactionWrapper(processUndoAnnounce, actorUrl, activityToUndo) 38 return retryTransactionWrapper(processUndoAnnounce, actorUrl, activityToUndo)
28 } 39 }
29 40
@@ -88,6 +99,29 @@ async function processUndoDislike (actorUrl: string, activity: ActivityUndo) {
88 }) 99 })
89} 100}
90 101
102async function processUndoCacheFile (actorUrl: string, activity: ActivityUndo) {
103 const cacheFileObject = activity.object.object as CacheFileObject
104
105 const { video } = await getOrCreateVideoAndAccountAndChannel(cacheFileObject.object)
106
107 return sequelizeTypescript.transaction(async t => {
108 const byActor = await ActorModel.loadByUrl(actorUrl)
109 if (!byActor) throw new Error('Unknown actor ' + actorUrl)
110
111 const cacheFile = await VideoRedundancyModel.loadByUrl(cacheFileObject.id)
112 if (!cacheFile) throw new Error('Unknown video cache ' + cacheFile.url)
113
114 await cacheFile.destroy()
115
116 if (video.isOwned()) {
117 // Don't resend the activity to the sender
118 const exceptions = [ byActor ]
119
120 await forwardVideoRelatedActivity(activity, t, exceptions, video)
121 }
122 })
123}
124
91function processUndoFollow (actorUrl: string, followActivity: ActivityFollow) { 125function processUndoFollow (actorUrl: string, followActivity: ActivityFollow) {
92 return sequelizeTypescript.transaction(async t => { 126 return sequelizeTypescript.transaction(async t => {
93 const follower = await ActorModel.loadByUrl(actorUrl, t) 127 const follower = await ActorModel.loadByUrl(actorUrl, t)
diff --git a/server/lib/activitypub/process/process-update.ts b/server/lib/activitypub/process/process-update.ts
index d2ad738a2..d3af1a181 100644
--- a/server/lib/activitypub/process/process-update.ts
+++ b/server/lib/activitypub/process/process-update.ts
@@ -1,4 +1,4 @@
1import { ActivityUpdate, VideoTorrentObject } from '../../../../shared/models/activitypub' 1import { ActivityUpdate, CacheFileObject, VideoTorrentObject } from '../../../../shared/models/activitypub'
2import { ActivityPubActor } from '../../../../shared/models/activitypub/activitypub-actor' 2import { ActivityPubActor } from '../../../../shared/models/activitypub/activitypub-actor'
3import { resetSequelizeInstance, retryTransactionWrapper } from '../../../helpers/database-utils' 3import { resetSequelizeInstance, retryTransactionWrapper } from '../../../helpers/database-utils'
4import { logger } from '../../../helpers/logger' 4import { logger } from '../../../helpers/logger'
@@ -7,8 +7,11 @@ import { AccountModel } from '../../../models/account/account'
7import { ActorModel } from '../../../models/activitypub/actor' 7import { ActorModel } from '../../../models/activitypub/actor'
8import { VideoChannelModel } from '../../../models/video/video-channel' 8import { VideoChannelModel } from '../../../models/video/video-channel'
9import { fetchAvatarIfExists, getOrCreateActorAndServerAndModel, updateActorAvatarInstance, updateActorInstance } from '../actor' 9import { fetchAvatarIfExists, getOrCreateActorAndServerAndModel, updateActorAvatarInstance, updateActorInstance } from '../actor'
10import { getOrCreateVideoAndAccountAndChannel, getOrCreateVideoChannelFromVideoObject, updateVideoFromAP } from '../videos' 10import { getOrCreateVideoAndAccountAndChannel, updateVideoFromAP, getOrCreateVideoChannelFromVideoObject } from '../videos'
11import { sanitizeAndCheckVideoTorrentObject } from '../../../helpers/custom-validators/activitypub/videos' 11import { sanitizeAndCheckVideoTorrentObject } from '../../../helpers/custom-validators/activitypub/videos'
12import { isCacheFileObjectValid } from '../../../helpers/custom-validators/activitypub/cache-file'
13import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy'
14import { createCacheFile, updateCacheFile } from '../cache-file'
12 15
13async function processUpdateActivity (activity: ActivityUpdate) { 16async function processUpdateActivity (activity: ActivityUpdate) {
14 const actor = await getOrCreateActorAndServerAndModel(activity.actor) 17 const actor = await getOrCreateActorAndServerAndModel(activity.actor)
@@ -16,10 +19,16 @@ async function processUpdateActivity (activity: ActivityUpdate) {
16 19
17 if (objectType === 'Video') { 20 if (objectType === 'Video') {
18 return retryTransactionWrapper(processUpdateVideo, actor, activity) 21 return retryTransactionWrapper(processUpdateVideo, actor, activity)
19 } else if (objectType === 'Person' || objectType === 'Application' || objectType === 'Group') { 22 }
23
24 if (objectType === 'Person' || objectType === 'Application' || objectType === 'Group') {
20 return retryTransactionWrapper(processUpdateActor, actor, activity) 25 return retryTransactionWrapper(processUpdateActor, actor, activity)
21 } 26 }
22 27
28 if (objectType === 'CacheFile') {
29 return retryTransactionWrapper(processUpdateCacheFile, actor, activity)
30 }
31
23 return undefined 32 return undefined
24} 33}
25 34
@@ -42,7 +51,24 @@ async function processUpdateVideo (actor: ActorModel, activity: ActivityUpdate)
42 const { video } = await getOrCreateVideoAndAccountAndChannel(videoObject.id) 51 const { video } = await getOrCreateVideoAndAccountAndChannel(videoObject.id)
43 const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject) 52 const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject)
44 53
45 return updateVideoFromAP(video, videoObject, actor, channelActor, activity.to) 54 return updateVideoFromAP(video, videoObject, actor.Account, channelActor.VideoChannel, activity.to)
55}
56
57async function processUpdateCacheFile (byActor: ActorModel, activity: ActivityUpdate) {
58 const cacheFileObject = activity.object as CacheFileObject
59
60 if (!isCacheFileObjectValid(cacheFileObject) === false) {
61 logger.debug('Cahe file object sent by update is not valid.', { cacheFileObject })
62 return undefined
63 }
64
65 const redundancyModel = await VideoRedundancyModel.loadByUrl(cacheFileObject.id)
66 if (!redundancyModel) {
67 const { video } = await getOrCreateVideoAndAccountAndChannel(cacheFileObject.id)
68 return createCacheFile(cacheFileObject, video, byActor)
69 }
70
71 return updateCacheFile(cacheFileObject, redundancyModel, byActor)
46} 72}
47 73
48async function processUpdateActor (actor: ActorModel, activity: ActivityUpdate) { 74async function processUpdateActor (actor: ActorModel, activity: ActivityUpdate) {