From c48e82b5e0478434de30626d14594a97f2402e7c Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Tue, 11 Sep 2018 16:27:07 +0200 Subject: Basic video redundancy implementation --- server/lib/activitypub/actor.ts | 6 +- server/lib/activitypub/cache-file.ts | 47 ++++++ server/lib/activitypub/process/process-create.ts | 21 ++- server/lib/activitypub/process/process-undo.ts | 44 +++++- server/lib/activitypub/process/process-update.ts | 34 ++++- server/lib/activitypub/send/send-accept.ts | 8 +- server/lib/activitypub/send/send-announce.ts | 33 ++--- server/lib/activitypub/send/send-create.ts | 68 +++++---- server/lib/activitypub/send/send-delete.ts | 25 ++-- server/lib/activitypub/send/send-follow.ts | 6 +- server/lib/activitypub/send/send-like.ts | 10 +- server/lib/activitypub/send/send-undo.ts | 63 +++++--- server/lib/activitypub/send/send-update.ts | 38 +++-- server/lib/activitypub/send/utils.ts | 8 +- server/lib/activitypub/url.ts | 10 +- server/lib/activitypub/videos.ts | 32 ++-- server/lib/redundancy.ts | 18 +++ .../lib/schedulers/videos-redundancy-scheduler.ts | 161 +++++++++++++++++++++ 18 files changed, 496 insertions(+), 136 deletions(-) create mode 100644 server/lib/activitypub/cache-file.ts create mode 100644 server/lib/redundancy.ts create mode 100644 server/lib/schedulers/videos-redundancy-scheduler.ts (limited to 'server/lib') diff --git a/server/lib/activitypub/actor.ts b/server/lib/activitypub/actor.ts index 1657262d7..3464add03 100644 --- a/server/lib/activitypub/actor.ts +++ b/server/lib/activitypub/actor.ts @@ -400,17 +400,15 @@ async function refreshActorIfNeeded (actor: ActorModel): Promise<{ actor: ActorM await actor.save({ transaction: t }) if (actor.Account) { - await actor.save({ transaction: t }) - actor.Account.set('name', result.name) actor.Account.set('description', result.summary) + await actor.Account.save({ transaction: t }) } else if (actor.VideoChannel) { - await actor.save({ transaction: t }) - actor.VideoChannel.set('name', result.name) actor.VideoChannel.set('description', result.summary) actor.VideoChannel.set('support', result.support) + await actor.VideoChannel.save({ transaction: t }) } diff --git a/server/lib/activitypub/cache-file.ts b/server/lib/activitypub/cache-file.ts new file mode 100644 index 000000000..7325ddcb6 --- /dev/null +++ b/server/lib/activitypub/cache-file.ts @@ -0,0 +1,47 @@ +import { CacheFileObject } from '../../../shared/index' +import { VideoModel } from '../../models/video/video' +import { ActorModel } from '../../models/activitypub/actor' +import { sequelizeTypescript } from '../../initializers' +import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' + +function cacheFileActivityObjectToDBAttributes (cacheFileObject: CacheFileObject, video: VideoModel, byActor: ActorModel) { + const url = cacheFileObject.url + + const videoFile = video.VideoFiles.find(f => { + return f.resolution === url.height && f.fps === url.fps + }) + + if (!videoFile) throw new Error(`Cannot find video file ${url.height} ${url.fps} of video ${video.url}`) + + return { + expiresOn: new Date(cacheFileObject.expires), + url: cacheFileObject.id, + fileUrl: cacheFileObject.url.href, + strategy: null, + videoFileId: videoFile.id, + actorId: byActor.id + } +} + +function createCacheFile (cacheFileObject: CacheFileObject, video: VideoModel, byActor: ActorModel) { + return sequelizeTypescript.transaction(async t => { + const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, video, byActor) + + return VideoRedundancyModel.create(attributes, { transaction: t }) + }) +} + +function updateCacheFile (cacheFileObject: CacheFileObject, redundancyModel: VideoRedundancyModel, byActor: ActorModel) { + const attributes = cacheFileActivityObjectToDBAttributes(cacheFileObject, redundancyModel.VideoFile.Video, byActor) + + redundancyModel.set('expires', attributes.expiresOn) + redundancyModel.set('fileUrl', attributes.fileUrl) + + return redundancyModel.save() +} + +export { + createCacheFile, + updateCacheFile, + cacheFileActivityObjectToDBAttributes +} diff --git a/server/lib/activitypub/process/process-create.ts b/server/lib/activitypub/process/process-create.ts index 16f426e23..32e555acf 100644 --- a/server/lib/activitypub/process/process-create.ts +++ b/server/lib/activitypub/process/process-create.ts @@ -1,4 +1,4 @@ -import { ActivityCreate, VideoAbuseState, VideoTorrentObject } from '../../../../shared' +import { ActivityCreate, CacheFileObject, VideoAbuseState, VideoTorrentObject } from '../../../../shared' import { DislikeObject, VideoAbuseObject, ViewObject } from '../../../../shared/models/activitypub/objects' import { VideoCommentObject } from '../../../../shared/models/activitypub/objects/video-comment-object' import { retryTransactionWrapper } from '../../../helpers/database-utils' @@ -12,6 +12,7 @@ import { addVideoComment, resolveThread } from '../video-comments' import { getOrCreateVideoAndAccountAndChannel } from '../videos' import { forwardActivity, forwardVideoRelatedActivity } from '../send/utils' import { Redis } from '../../redis' +import { createCacheFile } from '../cache-file' async function processCreateActivity (activity: ActivityCreate) { const activityObject = activity.object @@ -28,6 +29,8 @@ async function processCreateActivity (activity: ActivityCreate) { return retryTransactionWrapper(processCreateVideoAbuse, actor, activityObject as VideoAbuseObject) } else if (activityType === 'Note') { return retryTransactionWrapper(processCreateVideoComment, actor, activity) + } else if (activityType === 'CacheFile') { + return retryTransactionWrapper(processCacheFile, actor, activity) } logger.warn('Unknown activity object type %s when creating activity.', activityType, { activity: activity.id }) @@ -97,6 +100,20 @@ async function processCreateView (byActor: ActorModel, activity: ActivityCreate) } } +async function processCacheFile (byActor: ActorModel, activity: ActivityCreate) { + const cacheFile = activity.object as CacheFileObject + + const { video } = await getOrCreateVideoAndAccountAndChannel(cacheFile.object) + + await createCacheFile(cacheFile, video, byActor) + + if (video.isOwned()) { + // Don't resend the activity to the sender + const exceptions = [ byActor ] + await forwardActivity(activity, undefined, exceptions) + } +} + async function processCreateVideoAbuse (actor: ActorModel, videoAbuseToCreateData: VideoAbuseObject) { logger.debug('Reporting remote abuse for video %s.', videoAbuseToCreateData.object) @@ -113,7 +130,7 @@ async function processCreateVideoAbuse (actor: ActorModel, videoAbuseToCreateDat state: VideoAbuseState.PENDING } - await VideoAbuseModel.create(videoAbuseData) + await VideoAbuseModel.create(videoAbuseData, { transaction: t }) logger.info('Remote abuse for video uuid %s created', videoAbuseToCreateData.object) }) diff --git a/server/lib/activitypub/process/process-undo.ts b/server/lib/activitypub/process/process-undo.ts index 1c1de8827..0eb5fa392 100644 --- a/server/lib/activitypub/process/process-undo.ts +++ b/server/lib/activitypub/process/process-undo.ts @@ -1,4 +1,4 @@ -import { ActivityAnnounce, ActivityFollow, ActivityLike, ActivityUndo } from '../../../../shared/models/activitypub' +import { ActivityAnnounce, ActivityFollow, ActivityLike, ActivityUndo, CacheFileObject } from '../../../../shared/models/activitypub' import { DislikeObject } from '../../../../shared/models/activitypub/objects' import { getActorUrl } from '../../../helpers/activitypub' import { retryTransactionWrapper } from '../../../helpers/database-utils' @@ -11,6 +11,7 @@ import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { forwardVideoRelatedActivity } from '../send/utils' import { getOrCreateVideoAndAccountAndChannel } from '../videos' import { VideoShareModel } from '../../../models/video/video-share' +import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' async function processUndoActivity (activity: ActivityUndo) { const activityToUndo = activity.object @@ -19,11 +20,21 @@ async function processUndoActivity (activity: ActivityUndo) { if (activityToUndo.type === 'Like') { return retryTransactionWrapper(processUndoLike, actorUrl, activity) - } else if (activityToUndo.type === 'Create' && activityToUndo.object.type === 'Dislike') { - return retryTransactionWrapper(processUndoDislike, actorUrl, activity) - } else if (activityToUndo.type === 'Follow') { + } + + if (activityToUndo.type === 'Create') { + if (activityToUndo.object.type === 'Dislike') { + return retryTransactionWrapper(processUndoDislike, actorUrl, activity) + } else if (activityToUndo.object.type === 'CacheFile') { + return retryTransactionWrapper(processUndoCacheFile, actorUrl, activity) + } + } + + if (activityToUndo.type === 'Follow') { return retryTransactionWrapper(processUndoFollow, actorUrl, activityToUndo) - } else if (activityToUndo.type === 'Announce') { + } + + if (activityToUndo.type === 'Announce') { return retryTransactionWrapper(processUndoAnnounce, actorUrl, activityToUndo) } @@ -88,6 +99,29 @@ async function processUndoDislike (actorUrl: string, activity: ActivityUndo) { }) } +async function processUndoCacheFile (actorUrl: string, activity: ActivityUndo) { + const cacheFileObject = activity.object.object as CacheFileObject + + const { video } = await getOrCreateVideoAndAccountAndChannel(cacheFileObject.object) + + return sequelizeTypescript.transaction(async t => { + const byActor = await ActorModel.loadByUrl(actorUrl) + if (!byActor) throw new Error('Unknown actor ' + actorUrl) + + const cacheFile = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) + if (!cacheFile) throw new Error('Unknown video cache ' + cacheFile.url) + + await cacheFile.destroy() + + if (video.isOwned()) { + // Don't resend the activity to the sender + const exceptions = [ byActor ] + + await forwardVideoRelatedActivity(activity, t, exceptions, video) + } + }) +} + function processUndoFollow (actorUrl: string, followActivity: ActivityFollow) { return sequelizeTypescript.transaction(async t => { const follower = await ActorModel.loadByUrl(actorUrl, t) diff --git a/server/lib/activitypub/process/process-update.ts b/server/lib/activitypub/process/process-update.ts index d2ad738a2..d3af1a181 100644 --- a/server/lib/activitypub/process/process-update.ts +++ b/server/lib/activitypub/process/process-update.ts @@ -1,4 +1,4 @@ -import { ActivityUpdate, VideoTorrentObject } from '../../../../shared/models/activitypub' +import { ActivityUpdate, CacheFileObject, VideoTorrentObject } from '../../../../shared/models/activitypub' import { ActivityPubActor } from '../../../../shared/models/activitypub/activitypub-actor' import { resetSequelizeInstance, retryTransactionWrapper } from '../../../helpers/database-utils' import { logger } from '../../../helpers/logger' @@ -7,8 +7,11 @@ import { AccountModel } from '../../../models/account/account' import { ActorModel } from '../../../models/activitypub/actor' import { VideoChannelModel } from '../../../models/video/video-channel' import { fetchAvatarIfExists, getOrCreateActorAndServerAndModel, updateActorAvatarInstance, updateActorInstance } from '../actor' -import { getOrCreateVideoAndAccountAndChannel, getOrCreateVideoChannelFromVideoObject, updateVideoFromAP } from '../videos' +import { getOrCreateVideoAndAccountAndChannel, updateVideoFromAP, getOrCreateVideoChannelFromVideoObject } from '../videos' import { sanitizeAndCheckVideoTorrentObject } from '../../../helpers/custom-validators/activitypub/videos' +import { isCacheFileObjectValid } from '../../../helpers/custom-validators/activitypub/cache-file' +import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' +import { createCacheFile, updateCacheFile } from '../cache-file' async function processUpdateActivity (activity: ActivityUpdate) { const actor = await getOrCreateActorAndServerAndModel(activity.actor) @@ -16,10 +19,16 @@ async function processUpdateActivity (activity: ActivityUpdate) { if (objectType === 'Video') { return retryTransactionWrapper(processUpdateVideo, actor, activity) - } else if (objectType === 'Person' || objectType === 'Application' || objectType === 'Group') { + } + + if (objectType === 'Person' || objectType === 'Application' || objectType === 'Group') { return retryTransactionWrapper(processUpdateActor, actor, activity) } + if (objectType === 'CacheFile') { + return retryTransactionWrapper(processUpdateCacheFile, actor, activity) + } + return undefined } @@ -42,7 +51,24 @@ async function processUpdateVideo (actor: ActorModel, activity: ActivityUpdate) const { video } = await getOrCreateVideoAndAccountAndChannel(videoObject.id) const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject) - return updateVideoFromAP(video, videoObject, actor, channelActor, activity.to) + return updateVideoFromAP(video, videoObject, actor.Account, channelActor.VideoChannel, activity.to) +} + +async function processUpdateCacheFile (byActor: ActorModel, activity: ActivityUpdate) { + const cacheFileObject = activity.object as CacheFileObject + + if (!isCacheFileObjectValid(cacheFileObject) === false) { + logger.debug('Cahe file object sent by update is not valid.', { cacheFileObject }) + return undefined + } + + const redundancyModel = await VideoRedundancyModel.loadByUrl(cacheFileObject.id) + if (!redundancyModel) { + const { video } = await getOrCreateVideoAndAccountAndChannel(cacheFileObject.id) + return createCacheFile(cacheFileObject, video, byActor) + } + + return updateCacheFile(cacheFileObject, redundancyModel, byActor) } async function processUpdateActor (actor: ActorModel, activity: ActivityUpdate) { diff --git a/server/lib/activitypub/send/send-accept.ts b/server/lib/activitypub/send/send-accept.ts index ef679707b..b6abde13d 100644 --- a/server/lib/activitypub/send/send-accept.ts +++ b/server/lib/activitypub/send/send-accept.ts @@ -3,7 +3,7 @@ import { ActorModel } from '../../../models/activitypub/actor' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { getActorFollowAcceptActivityPubUrl, getActorFollowActivityPubUrl } from '../url' import { unicastTo } from './utils' -import { followActivityData } from './send-follow' +import { buildFollowActivity } from './send-follow' import { logger } from '../../../helpers/logger' async function sendAccept (actorFollow: ActorFollowModel) { @@ -18,10 +18,10 @@ async function sendAccept (actorFollow: ActorFollowModel) { logger.info('Creating job to accept follower %s.', follower.url) const followUrl = getActorFollowActivityPubUrl(actorFollow) - const followData = followActivityData(followUrl, follower, me) + const followData = buildFollowActivity(followUrl, follower, me) const url = getActorFollowAcceptActivityPubUrl(actorFollow) - const data = acceptActivityData(url, me, followData) + const data = buildAcceptActivity(url, me, followData) return unicastTo(data, me, follower.inboxUrl) } @@ -34,7 +34,7 @@ export { // --------------------------------------------------------------------------- -function acceptActivityData (url: string, byActor: ActorModel, followActivityData: ActivityFollow): ActivityAccept { +function buildAcceptActivity (url: string, byActor: ActorModel, followActivityData: ActivityFollow): ActivityAccept { return { type: 'Accept', id: url, diff --git a/server/lib/activitypub/send/send-announce.ts b/server/lib/activitypub/send/send-announce.ts index 352813d73..f137217f8 100644 --- a/server/lib/activitypub/send/send-announce.ts +++ b/server/lib/activitypub/send/send-announce.ts @@ -4,45 +4,44 @@ import { ActorModel } from '../../../models/activitypub/actor' import { VideoModel } from '../../../models/video/video' import { VideoShareModel } from '../../../models/video/video-share' import { broadcastToFollowers } from './utils' -import { getActorsInvolvedInVideo, getAudience, getObjectFollowersAudience } from '../audience' +import { audiencify, getActorsInvolvedInVideo, getAudience, getObjectFollowersAudience } from '../audience' import { logger } from '../../../helpers/logger' -async function buildVideoAnnounce (byActor: ActorModel, videoShare: VideoShareModel, video: VideoModel, t: Transaction) { +async function buildAnnounceWithVideoAudience (byActor: ActorModel, videoShare: VideoShareModel, video: VideoModel, t: Transaction) { const announcedObject = video.url - const accountsToForwardView = await getActorsInvolvedInVideo(video, t) - const audience = getObjectFollowersAudience(accountsToForwardView) - return announceActivityData(videoShare.url, byActor, announcedObject, audience) + const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, t) + const audience = getObjectFollowersAudience(actorsInvolvedInVideo) + + const activity = buildAnnounceActivity(videoShare.url, byActor, announcedObject, audience) + + return { activity, actorsInvolvedInVideo } } async function sendVideoAnnounce (byActor: ActorModel, videoShare: VideoShareModel, video: VideoModel, t: Transaction) { - const data = await buildVideoAnnounce(byActor, videoShare, video, t) + const { activity, actorsInvolvedInVideo } = await buildAnnounceWithVideoAudience(byActor, videoShare, video, t) logger.info('Creating job to send announce %s.', videoShare.url) - const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, t) const followersException = [ byActor ] - - return broadcastToFollowers(data, byActor, actorsInvolvedInVideo, t, followersException) + return broadcastToFollowers(activity, byActor, actorsInvolvedInVideo, t, followersException) } -function announceActivityData (url: string, byActor: ActorModel, object: string, audience?: ActivityAudience): ActivityAnnounce { +function buildAnnounceActivity (url: string, byActor: ActorModel, object: string, audience?: ActivityAudience): ActivityAnnounce { if (!audience) audience = getAudience(byActor) - return { - type: 'Announce', - to: audience.to, - cc: audience.cc, + return audiencify({ + type: 'Announce' as 'Announce', id: url, actor: byActor.url, object - } + }, audience) } // --------------------------------------------------------------------------- export { sendVideoAnnounce, - announceActivityData, - buildVideoAnnounce + buildAnnounceActivity, + buildAnnounceWithVideoAudience } diff --git a/server/lib/activitypub/send/send-create.ts b/server/lib/activitypub/send/send-create.ts index fc76cdd8a..6f89b1a22 100644 --- a/server/lib/activitypub/send/send-create.ts +++ b/server/lib/activitypub/send/send-create.ts @@ -17,6 +17,7 @@ import { getVideoCommentAudience } from '../audience' import { logger } from '../../../helpers/logger' +import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' async function sendCreateVideo (video: VideoModel, t: Transaction) { if (video.privacy === VideoPrivacy.PRIVATE) return undefined @@ -27,12 +28,12 @@ async function sendCreateVideo (video: VideoModel, t: Transaction) { const videoObject = video.toActivityPubObject() const audience = getAudience(byActor, video.privacy === VideoPrivacy.PUBLIC) - const data = createActivityData(video.url, byActor, videoObject, audience) + const createActivity = buildCreateActivity(video.url, byActor, videoObject, audience) - return broadcastToFollowers(data, byActor, [ byActor ], t) + return broadcastToFollowers(createActivity, byActor, [ byActor ], t) } -async function sendVideoAbuse (byActor: ActorModel, videoAbuse: VideoAbuseModel, video: VideoModel, t: Transaction) { +async function sendVideoAbuse (byActor: ActorModel, videoAbuse: VideoAbuseModel, video: VideoModel) { if (!video.VideoChannel.Account.Actor.serverId) return // Local const url = getVideoAbuseActivityPubUrl(videoAbuse) @@ -40,9 +41,23 @@ async function sendVideoAbuse (byActor: ActorModel, videoAbuse: VideoAbuseModel, logger.info('Creating job to send video abuse %s.', url) const audience = { to: [ video.VideoChannel.Account.Actor.url ], cc: [] } - const data = createActivityData(url, byActor, videoAbuse.toActivityPubObject(), audience) + const createActivity = buildCreateActivity(url, byActor, videoAbuse.toActivityPubObject(), audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) + return unicastTo(createActivity, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) +} + +async function sendCreateCacheFile (byActor: ActorModel, fileRedundancy: VideoRedundancyModel) { + logger.info('Creating job to send file cache of %s.', fileRedundancy.url) + + const redundancyObject = fileRedundancy.toActivityPubObject() + + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(fileRedundancy.VideoFile.Video.id) + const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, undefined) + + const audience = getVideoAudience(video, actorsInvolvedInVideo) + const createActivity = buildCreateActivity(fileRedundancy.url, byActor, redundancyObject, audience) + + return unicastTo(createActivity, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendCreateVideoComment (comment: VideoCommentModel, t: Transaction) { @@ -66,73 +81,73 @@ async function sendCreateVideoComment (comment: VideoCommentModel, t: Transactio audience = getObjectFollowersAudience(actorsInvolvedInComment.concat(parentsCommentActors)) } - const data = createActivityData(comment.url, byActor, commentObject, audience) + const createActivity = buildCreateActivity(comment.url, byActor, commentObject, audience) // This was a reply, send it to the parent actors const actorsException = [ byActor ] - await broadcastToActors(data, byActor, parentsCommentActors, actorsException) + await broadcastToActors(createActivity, byActor, parentsCommentActors, actorsException) // Broadcast to our followers - await broadcastToFollowers(data, byActor, [ byActor ], t) + await broadcastToFollowers(createActivity, byActor, [ byActor ], t) // Send to actors involved in the comment - if (isOrigin) return broadcastToFollowers(data, byActor, actorsInvolvedInComment, t, actorsException) + if (isOrigin) return broadcastToFollowers(createActivity, byActor, actorsInvolvedInComment, t, actorsException) // Send to origin - return unicastTo(data, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl) + return unicastTo(createActivity, byActor, comment.Video.VideoChannel.Account.Actor.sharedInboxUrl) } async function sendCreateView (byActor: ActorModel, video: VideoModel, t: Transaction) { logger.info('Creating job to send view of %s.', video.url) const url = getVideoViewActivityPubUrl(byActor, video) - const viewActivityData = createViewActivityData(byActor, video) + const viewActivity = buildViewActivity(byActor, video) const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, t) // Send to origin if (video.isOwned() === false) { const audience = getVideoAudience(video, actorsInvolvedInVideo) - const data = createActivityData(url, byActor, viewActivityData, audience) + const createActivity = buildCreateActivity(url, byActor, viewActivity, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) + return unicastTo(createActivity, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } // Send to followers const audience = getObjectFollowersAudience(actorsInvolvedInVideo) - const data = createActivityData(url, byActor, viewActivityData, audience) + const createActivity = buildCreateActivity(url, byActor, viewActivity, audience) // Use the server actor to send the view const serverActor = await getServerActor() const actorsException = [ byActor ] - return broadcastToFollowers(data, serverActor, actorsInvolvedInVideo, t, actorsException) + return broadcastToFollowers(createActivity, serverActor, actorsInvolvedInVideo, t, actorsException) } async function sendCreateDislike (byActor: ActorModel, video: VideoModel, t: Transaction) { logger.info('Creating job to dislike %s.', video.url) const url = getVideoDislikeActivityPubUrl(byActor, video) - const dislikeActivityData = createDislikeActivityData(byActor, video) + const dislikeActivity = buildDislikeActivity(byActor, video) const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, t) // Send to origin if (video.isOwned() === false) { const audience = getVideoAudience(video, actorsInvolvedInVideo) - const data = createActivityData(url, byActor, dislikeActivityData, audience) + const createActivity = buildCreateActivity(url, byActor, dislikeActivity, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) + return unicastTo(createActivity, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } // Send to followers const audience = getObjectFollowersAudience(actorsInvolvedInVideo) - const data = createActivityData(url, byActor, dislikeActivityData, audience) + const createActivity = buildCreateActivity(url, byActor, dislikeActivity, audience) const actorsException = [ byActor ] - return broadcastToFollowers(data, byActor, actorsInvolvedInVideo, t, actorsException) + return broadcastToFollowers(createActivity, byActor, actorsInvolvedInVideo, t, actorsException) } -function createActivityData (url: string, byActor: ActorModel, object: any, audience?: ActivityAudience): ActivityCreate { +function buildCreateActivity (url: string, byActor: ActorModel, object: any, audience?: ActivityAudience): ActivityCreate { if (!audience) audience = getAudience(byActor) return audiencify( @@ -146,7 +161,7 @@ function createActivityData (url: string, byActor: ActorModel, object: any, audi ) } -function createDislikeActivityData (byActor: ActorModel, video: VideoModel) { +function buildDislikeActivity (byActor: ActorModel, video: VideoModel) { return { type: 'Dislike', actor: byActor.url, @@ -154,7 +169,7 @@ function createDislikeActivityData (byActor: ActorModel, video: VideoModel) { } } -function createViewActivityData (byActor: ActorModel, video: VideoModel) { +function buildViewActivity (byActor: ActorModel, video: VideoModel) { return { type: 'View', actor: byActor.url, @@ -167,9 +182,10 @@ function createViewActivityData (byActor: ActorModel, video: VideoModel) { export { sendCreateVideo, sendVideoAbuse, - createActivityData, + buildCreateActivity, sendCreateView, sendCreateDislike, - createDislikeActivityData, - sendCreateVideoComment + buildDislikeActivity, + sendCreateVideoComment, + sendCreateCacheFile } diff --git a/server/lib/activitypub/send/send-delete.ts b/server/lib/activitypub/send/send-delete.ts index 3d1dfb699..479182543 100644 --- a/server/lib/activitypub/send/send-delete.ts +++ b/server/lib/activitypub/send/send-delete.ts @@ -15,24 +15,23 @@ async function sendDeleteVideo (video: VideoModel, t: Transaction) { const url = getDeleteActivityPubUrl(video.url) const byActor = video.VideoChannel.Account.Actor - const data = deleteActivityData(url, video.url, byActor) + const activity = buildDeleteActivity(url, video.url, byActor) - const actorsInvolved = await VideoShareModel.loadActorsByShare(video.id, t) - actorsInvolved.push(byActor) + const actorsInvolved = await getActorsInvolvedInVideo(video, t) - return broadcastToFollowers(data, byActor, actorsInvolved, t) + return broadcastToFollowers(activity, byActor, actorsInvolved, t) } async function sendDeleteActor (byActor: ActorModel, t: Transaction) { logger.info('Creating job to broadcast delete of actor %s.', byActor.url) const url = getDeleteActivityPubUrl(byActor.url) - const data = deleteActivityData(url, byActor.url, byActor) + const activity = buildDeleteActivity(url, byActor.url, byActor) const actorsInvolved = await VideoShareModel.loadActorsByVideoOwner(byActor.id, t) actorsInvolved.push(byActor) - return broadcastToFollowers(data, byActor, actorsInvolved, t) + return broadcastToFollowers(activity, byActor, actorsInvolved, t) } async function sendDeleteVideoComment (videoComment: VideoCommentModel, t: Transaction) { @@ -45,23 +44,23 @@ async function sendDeleteVideoComment (videoComment: VideoCommentModel, t: Trans const threadParentComments = await VideoCommentModel.listThreadParentComments(videoComment, t) const actorsInvolvedInComment = await getActorsInvolvedInVideo(videoComment.Video, t) - actorsInvolvedInComment.push(byActor) + actorsInvolvedInComment.push(byActor) // Add the actor that commented the video const audience = getVideoCommentAudience(videoComment, threadParentComments, actorsInvolvedInComment, isVideoOrigin) - const data = deleteActivityData(url, videoComment.url, byActor, audience) + const activity = buildDeleteActivity(url, videoComment.url, byActor, audience) // This was a reply, send it to the parent actors const actorsException = [ byActor ] - await broadcastToActors(data, byActor, threadParentComments.map(c => c.Account.Actor), actorsException) + await broadcastToActors(activity, byActor, threadParentComments.map(c => c.Account.Actor), actorsException) // Broadcast to our followers - await broadcastToFollowers(data, byActor, [ byActor ], t) + await broadcastToFollowers(activity, byActor, [ byActor ], t) // Send to actors involved in the comment - if (isVideoOrigin) return broadcastToFollowers(data, byActor, actorsInvolvedInComment, t, actorsException) + if (isVideoOrigin) return broadcastToFollowers(activity, byActor, actorsInvolvedInComment, t, actorsException) // Send to origin - return unicastTo(data, byActor, videoComment.Video.VideoChannel.Account.Actor.sharedInboxUrl) + return unicastTo(activity, byActor, videoComment.Video.VideoChannel.Account.Actor.sharedInboxUrl) } // --------------------------------------------------------------------------- @@ -74,7 +73,7 @@ export { // --------------------------------------------------------------------------- -function deleteActivityData (url: string, object: string, byActor: ActorModel, audience?: ActivityAudience): ActivityDelete { +function buildDeleteActivity (url: string, object: string, byActor: ActorModel, audience?: ActivityAudience): ActivityDelete { const activity = { type: 'Delete' as 'Delete', id: url, diff --git a/server/lib/activitypub/send/send-follow.ts b/server/lib/activitypub/send/send-follow.ts index 46d08c17b..170b46b48 100644 --- a/server/lib/activitypub/send/send-follow.ts +++ b/server/lib/activitypub/send/send-follow.ts @@ -15,12 +15,12 @@ function sendFollow (actorFollow: ActorFollowModel) { logger.info('Creating job to send follow request to %s.', following.url) const url = getActorFollowActivityPubUrl(actorFollow) - const data = followActivityData(url, me, following) + const data = buildFollowActivity(url, me, following) return unicastTo(data, me, following.inboxUrl) } -function followActivityData (url: string, byActor: ActorModel, targetActor: ActorModel): ActivityFollow { +function buildFollowActivity (url: string, byActor: ActorModel, targetActor: ActorModel): ActivityFollow { return { type: 'Follow', id: url, @@ -33,5 +33,5 @@ function followActivityData (url: string, byActor: ActorModel, targetActor: Acto export { sendFollow, - followActivityData + buildFollowActivity } diff --git a/server/lib/activitypub/send/send-like.ts b/server/lib/activitypub/send/send-like.ts index 83225f5df..a5408ac6a 100644 --- a/server/lib/activitypub/send/send-like.ts +++ b/server/lib/activitypub/send/send-like.ts @@ -17,20 +17,20 @@ async function sendLike (byActor: ActorModel, video: VideoModel, t: Transaction) // Send to origin if (video.isOwned() === false) { const audience = getVideoAudience(video, accountsInvolvedInVideo) - const data = likeActivityData(url, byActor, video, audience) + const data = buildLikeActivity(url, byActor, video, audience) return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } // Send to followers const audience = getObjectFollowersAudience(accountsInvolvedInVideo) - const data = likeActivityData(url, byActor, video, audience) + const activity = buildLikeActivity(url, byActor, video, audience) const followersException = [ byActor ] - return broadcastToFollowers(data, byActor, accountsInvolvedInVideo, t, followersException) + return broadcastToFollowers(activity, byActor, accountsInvolvedInVideo, t, followersException) } -function likeActivityData (url: string, byActor: ActorModel, video: VideoModel, audience?: ActivityAudience): ActivityLike { +function buildLikeActivity (url: string, byActor: ActorModel, video: VideoModel, audience?: ActivityAudience): ActivityLike { if (!audience) audience = getAudience(byActor) return audiencify( @@ -48,5 +48,5 @@ function likeActivityData (url: string, byActor: ActorModel, video: VideoModel, export { sendLike, - likeActivityData + buildLikeActivity } diff --git a/server/lib/activitypub/send/send-undo.ts b/server/lib/activitypub/send/send-undo.ts index 30d0fd98b..a50673c79 100644 --- a/server/lib/activitypub/send/send-undo.ts +++ b/server/lib/activitypub/send/send-undo.ts @@ -13,12 +13,13 @@ import { VideoModel } from '../../../models/video/video' import { getActorFollowActivityPubUrl, getUndoActivityPubUrl, getVideoDislikeActivityPubUrl, getVideoLikeActivityPubUrl } from '../url' import { broadcastToFollowers, unicastTo } from './utils' import { audiencify, getActorsInvolvedInVideo, getAudience, getObjectFollowersAudience, getVideoAudience } from '../audience' -import { createActivityData, createDislikeActivityData } from './send-create' -import { followActivityData } from './send-follow' -import { likeActivityData } from './send-like' +import { buildCreateActivity, buildDislikeActivity } from './send-create' +import { buildFollowActivity } from './send-follow' +import { buildLikeActivity } from './send-like' import { VideoShareModel } from '../../../models/video/video-share' -import { buildVideoAnnounce } from './send-announce' +import { buildAnnounceWithVideoAudience } from './send-announce' import { logger } from '../../../helpers/logger' +import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' async function sendUndoFollow (actorFollow: ActorFollowModel, t: Transaction) { const me = actorFollow.ActorFollower @@ -32,10 +33,10 @@ async function sendUndoFollow (actorFollow: ActorFollowModel, t: Transaction) { const followUrl = getActorFollowActivityPubUrl(actorFollow) const undoUrl = getUndoActivityPubUrl(followUrl) - const object = followActivityData(followUrl, me, following) - const data = undoActivityData(undoUrl, me, object) + const followActivity = buildFollowActivity(followUrl, me, following) + const undoActivity = undoActivityData(undoUrl, me, followActivity) - return unicastTo(data, me, following.inboxUrl) + return unicastTo(undoActivity, me, following.inboxUrl) } async function sendUndoLike (byActor: ActorModel, video: VideoModel, t: Transaction) { @@ -45,21 +46,21 @@ async function sendUndoLike (byActor: ActorModel, video: VideoModel, t: Transact const undoUrl = getUndoActivityPubUrl(likeUrl) const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, t) - const object = likeActivityData(likeUrl, byActor, video) + const likeActivity = buildLikeActivity(likeUrl, byActor, video) // Send to origin if (video.isOwned() === false) { const audience = getVideoAudience(video, actorsInvolvedInVideo) - const data = undoActivityData(undoUrl, byActor, object, audience) + const undoActivity = undoActivityData(undoUrl, byActor, likeActivity, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) + return unicastTo(undoActivity, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } const audience = getObjectFollowersAudience(actorsInvolvedInVideo) - const data = undoActivityData(undoUrl, byActor, object, audience) + const undoActivity = undoActivityData(undoUrl, byActor, likeActivity, audience) const followersException = [ byActor ] - return broadcastToFollowers(data, byActor, actorsInvolvedInVideo, t, followersException) + return broadcastToFollowers(undoActivity, byActor, actorsInvolvedInVideo, t, followersException) } async function sendUndoDislike (byActor: ActorModel, video: VideoModel, t: Transaction) { @@ -69,20 +70,20 @@ async function sendUndoDislike (byActor: ActorModel, video: VideoModel, t: Trans const undoUrl = getUndoActivityPubUrl(dislikeUrl) const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, t) - const dislikeActivity = createDislikeActivityData(byActor, video) - const object = createActivityData(dislikeUrl, byActor, dislikeActivity) + const dislikeActivity = buildDislikeActivity(byActor, video) + const createDislikeActivity = buildCreateActivity(dislikeUrl, byActor, dislikeActivity) if (video.isOwned() === false) { const audience = getVideoAudience(video, actorsInvolvedInVideo) - const data = undoActivityData(undoUrl, byActor, object, audience) + const undoActivity = undoActivityData(undoUrl, byActor, createDislikeActivity, audience) - return unicastTo(data, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) + return unicastTo(undoActivity, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } - const data = undoActivityData(undoUrl, byActor, object) + const undoActivity = undoActivityData(undoUrl, byActor, createDislikeActivity) const followersException = [ byActor ] - return broadcastToFollowers(data, byActor, actorsInvolvedInVideo, t, followersException) + return broadcastToFollowers(undoActivity, byActor, actorsInvolvedInVideo, t, followersException) } async function sendUndoAnnounce (byActor: ActorModel, videoShare: VideoShareModel, video: VideoModel, t: Transaction) { @@ -90,12 +91,27 @@ async function sendUndoAnnounce (byActor: ActorModel, videoShare: VideoShareMode const undoUrl = getUndoActivityPubUrl(videoShare.url) - const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, t) - const object = await buildVideoAnnounce(byActor, videoShare, video, t) - const data = undoActivityData(undoUrl, byActor, object) + const { activity: announceActivity, actorsInvolvedInVideo } = await buildAnnounceWithVideoAudience(byActor, videoShare, video, t) + const undoActivity = undoActivityData(undoUrl, byActor, announceActivity) const followersException = [ byActor ] - return broadcastToFollowers(data, byActor, actorsInvolvedInVideo, t, followersException) + return broadcastToFollowers(undoActivity, byActor, actorsInvolvedInVideo, t, followersException) +} + +async function sendUndoCacheFile (byActor: ActorModel, redundancyModel: VideoRedundancyModel, t: Transaction) { + logger.info('Creating job to undo cache file %s.', redundancyModel.url) + + const undoUrl = getUndoActivityPubUrl(redundancyModel.url) + + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(redundancyModel.VideoFile.Video.id) + const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, t) + + const audience = getVideoAudience(video, actorsInvolvedInVideo) + const createActivity = buildCreateActivity(redundancyModel.url, byActor, redundancyModel.toActivityPubObject()) + + const undoActivity = undoActivityData(undoUrl, byActor, createActivity, audience) + + return unicastTo(undoActivity, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } // --------------------------------------------------------------------------- @@ -104,7 +120,8 @@ export { sendUndoFollow, sendUndoLike, sendUndoDislike, - sendUndoAnnounce + sendUndoAnnounce, + sendUndoCacheFile } // --------------------------------------------------------------------------- diff --git a/server/lib/activitypub/send/send-update.ts b/server/lib/activitypub/send/send-update.ts index 6f1d80898..605473338 100644 --- a/server/lib/activitypub/send/send-update.ts +++ b/server/lib/activitypub/send/send-update.ts @@ -7,11 +7,11 @@ import { VideoModel } from '../../../models/video/video' import { VideoChannelModel } from '../../../models/video/video-channel' import { VideoShareModel } from '../../../models/video/video-share' import { getUpdateActivityPubUrl } from '../url' -import { broadcastToFollowers } from './utils' -import { audiencify, getAudience } from '../audience' +import { broadcastToFollowers, unicastTo } from './utils' +import { audiencify, getActorsInvolvedInVideo, getAudience, getObjectFollowersAudience } from '../audience' import { logger } from '../../../helpers/logger' -import { videoFeedsValidator } from '../../../middlewares/validators' import { VideoCaptionModel } from '../../../models/video/video-caption' +import { VideoRedundancyModel } from '../../../models/redundancy/video-redundancy' async function sendUpdateVideo (video: VideoModel, t: Transaction, overrodeByActor?: ActorModel) { logger.info('Creating job to update video %s.', video.url) @@ -26,12 +26,12 @@ async function sendUpdateVideo (video: VideoModel, t: Transaction, overrodeByAct const videoObject = video.toActivityPubObject() const audience = getAudience(byActor, video.privacy === VideoPrivacy.PUBLIC) - const data = updateActivityData(url, byActor, videoObject, audience) + const updateActivity = buildUpdateActivity(url, byActor, videoObject, audience) - const actorsInvolved = await VideoShareModel.loadActorsByShare(video.id, t) - actorsInvolved.push(byActor) + const actorsInvolved = await getActorsInvolvedInVideo(video, t) + if (overrodeByActor) actorsInvolved.push(overrodeByActor) - return broadcastToFollowers(data, byActor, actorsInvolved, t) + return broadcastToFollowers(updateActivity, byActor, actorsInvolved, t) } async function sendUpdateActor (accountOrChannel: AccountModel | VideoChannelModel, t: Transaction) { @@ -42,7 +42,7 @@ async function sendUpdateActor (accountOrChannel: AccountModel | VideoChannelMod const url = getUpdateActivityPubUrl(byActor.url, byActor.updatedAt.toISOString()) const accountOrChannelObject = accountOrChannel.toActivityPubObject() const audience = getAudience(byActor) - const data = updateActivityData(url, byActor, accountOrChannelObject, audience) + const updateActivity = buildUpdateActivity(url, byActor, accountOrChannelObject, audience) let actorsInvolved: ActorModel[] if (accountOrChannel instanceof AccountModel) { @@ -55,19 +55,35 @@ async function sendUpdateActor (accountOrChannel: AccountModel | VideoChannelMod actorsInvolved.push(byActor) - return broadcastToFollowers(data, byActor, actorsInvolved, t) + return broadcastToFollowers(updateActivity, byActor, actorsInvolved, t) +} + +async function sendUpdateCacheFile (byActor: ActorModel, redundancyModel: VideoRedundancyModel) { + logger.info('Creating job to update cache file %s.', redundancyModel.url) + + const url = getUpdateActivityPubUrl(redundancyModel.url, redundancyModel.updatedAt.toISOString()) + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(redundancyModel.VideoFile.Video.id) + + const redundancyObject = redundancyModel.toActivityPubObject() + + const accountsInvolvedInVideo = await getActorsInvolvedInVideo(video, undefined) + const audience = getObjectFollowersAudience(accountsInvolvedInVideo) + + const updateActivity = buildUpdateActivity(url, byActor, redundancyObject, audience) + return unicastTo(updateActivity, byActor, video.VideoChannel.Account.Actor.sharedInboxUrl) } // --------------------------------------------------------------------------- export { sendUpdateActor, - sendUpdateVideo + sendUpdateVideo, + sendUpdateCacheFile } // --------------------------------------------------------------------------- -function updateActivityData (url: string, byActor: ActorModel, object: any, audience?: ActivityAudience): ActivityUpdate { +function buildUpdateActivity (url: string, byActor: ActorModel, object: any, audience?: ActivityAudience): ActivityUpdate { if (!audience) audience = getAudience(byActor) return audiencify( diff --git a/server/lib/activitypub/send/utils.ts b/server/lib/activitypub/send/utils.ts index da437292e..c20c15633 100644 --- a/server/lib/activitypub/send/utils.ts +++ b/server/lib/activitypub/send/utils.ts @@ -59,11 +59,11 @@ async function forwardActivity ( async function broadcastToFollowers ( data: any, byActor: ActorModel, - toActorFollowers: ActorModel[], + toFollowersOf: ActorModel[], t: Transaction, actorsException: ActorModel[] = [] ) { - const uris = await computeFollowerUris(toActorFollowers, actorsException, t) + const uris = await computeFollowerUris(toFollowersOf, actorsException, t) return broadcastTo(uris, data, byActor) } @@ -115,8 +115,8 @@ export { // --------------------------------------------------------------------------- -async function computeFollowerUris (toActorFollower: ActorModel[], actorsException: ActorModel[], t: Transaction) { - const toActorFollowerIds = toActorFollower.map(a => a.id) +async function computeFollowerUris (toFollowersOf: ActorModel[], actorsException: ActorModel[], t: Transaction) { + const toActorFollowerIds = toFollowersOf.map(a => a.id) const result = await ActorFollowModel.listAcceptedFollowerSharedInboxUrls(toActorFollowerIds, t) const sharedInboxesException = await buildSharedInboxesException(actorsException) diff --git a/server/lib/activitypub/url.ts b/server/lib/activitypub/url.ts index 262463310..2e7c56955 100644 --- a/server/lib/activitypub/url.ts +++ b/server/lib/activitypub/url.ts @@ -4,11 +4,18 @@ import { ActorFollowModel } from '../../models/activitypub/actor-follow' import { VideoModel } from '../../models/video/video' import { VideoAbuseModel } from '../../models/video/video-abuse' import { VideoCommentModel } from '../../models/video/video-comment' +import { VideoFileModel } from '../../models/video/video-file' function getVideoActivityPubUrl (video: VideoModel) { return CONFIG.WEBSERVER.URL + '/videos/watch/' + video.uuid } +function getVideoCacheFileActivityPubUrl (videoFile: VideoFileModel) { + const suffixFPS = videoFile.fps ? '-' + videoFile.fps : '' + + return `${CONFIG.WEBSERVER.URL}/redundancy/videos/${videoFile.Video.uuid}/${videoFile.resolution}${suffixFPS}` +} + function getVideoCommentActivityPubUrl (video: VideoModel, videoComment: VideoCommentModel) { return CONFIG.WEBSERVER.URL + '/videos/watch/' + video.uuid + '/comments/' + videoComment.id } @@ -101,5 +108,6 @@ export { getVideoSharesActivityPubUrl, getVideoCommentsActivityPubUrl, getVideoLikesActivityPubUrl, - getVideoDislikesActivityPubUrl + getVideoDislikesActivityPubUrl, + getVideoCacheFileActivityPubUrl } diff --git a/server/lib/activitypub/videos.ts b/server/lib/activitypub/videos.ts index 6c2095897..783f78d3e 100644 --- a/server/lib/activitypub/videos.ts +++ b/server/lib/activitypub/videos.ts @@ -3,12 +3,12 @@ import * as sequelize from 'sequelize' import * as magnetUtil from 'magnet-uri' import { join } from 'path' import * as request from 'request' -import { ActivityIconObject, VideoState } from '../../../shared/index' +import { ActivityIconObject, ActivityVideoUrlObject, VideoState, ActivityUrlObject } from '../../../shared/index' import { VideoTorrentObject } from '../../../shared/models/activitypub/objects' import { VideoPrivacy } from '../../../shared/models/videos' import { sanitizeAndCheckVideoTorrentObject } from '../../helpers/custom-validators/activitypub/videos' import { isVideoFileInfoHashValid } from '../../helpers/custom-validators/videos' -import { resetSequelizeInstance, retryTransactionWrapper, updateInstanceWithAnother } from '../../helpers/database-utils' +import { resetSequelizeInstance, retryTransactionWrapper } from '../../helpers/database-utils' import { logger } from '../../helpers/logger' import { doRequest, doRequestAndSaveToFile } from '../../helpers/requests' import { ACTIVITY_PUB, CONFIG, REMOTE_SCHEME, sequelizeTypescript, VIDEO_MIMETYPE_EXT } from '../../initializers' @@ -17,7 +17,7 @@ import { TagModel } from '../../models/video/tag' import { VideoModel } from '../../models/video/video' import { VideoChannelModel } from '../../models/video/video-channel' import { VideoFileModel } from '../../models/video/video-file' -import { getOrCreateActorAndServerAndModel, updateActorAvatarInstance } from './actor' +import { getOrCreateActorAndServerAndModel } from './actor' import { addVideoComments } from './video-comments' import { crawlCollectionPage } from './crawl' import { sendCreateVideo, sendUpdateVideo } from './send' @@ -25,7 +25,6 @@ import { isArray } from '../../helpers/custom-validators/misc' import { VideoCaptionModel } from '../../models/video/video-caption' import { JobQueue } from '../job-queue' import { ActivitypubHttpFetcherPayload } from '../job-queue/handlers/activitypub-http-fetcher' -import { getUrlFromWebfinger } from '../../helpers/webfinger' import { createRates } from './video-rates' import { addVideoShares, shareVideoByServerAndChannel } from './share' import { AccountModel } from '../../models/account/account' @@ -137,10 +136,7 @@ async function videoActivityObjectToDBAttributes ( } function videoFileActivityUrlToDBAttributes (videoCreated: VideoModel, videoObject: VideoTorrentObject) { - const mimeTypes = Object.keys(VIDEO_MIMETYPE_EXT) - const fileUrls = videoObject.url.filter(u => { - return mimeTypes.indexOf(u.mimeType) !== -1 && u.mimeType.startsWith('video/') - }) + const fileUrls = videoObject.url.filter(u => isActivityVideoUrlObject(u)) as ActivityVideoUrlObject[] if (fileUrls.length === 0) { throw new Error('Cannot find video files for ' + videoCreated.url) @@ -331,8 +327,8 @@ async function refreshVideoIfNeeded (video: VideoModel): Promise { const channelActor = await getOrCreateVideoChannelFromVideoObject(videoObject) const account = await AccountModel.load(channelActor.VideoChannel.accountId) - return updateVideoFromAP(video, videoObject, account.Actor, channelActor) + return updateVideoFromAP(video, videoObject, account, channelActor.VideoChannel) } catch (err) { logger.warn('Cannot refresh video.', { err }) return video @@ -342,8 +338,8 @@ async function refreshVideoIfNeeded (video: VideoModel): Promise { async function updateVideoFromAP ( video: VideoModel, videoObject: VideoTorrentObject, - accountActor: ActorModel, - channelActor: ActorModel, + account: AccountModel, + channel: VideoChannelModel, overrideTo?: string[] ) { logger.debug('Updating remote video "%s".', videoObject.uuid) @@ -359,12 +355,12 @@ async function updateVideoFromAP ( // Check actor has the right to update the video const videoChannel = video.VideoChannel - if (videoChannel.Account.Actor.id !== accountActor.id) { - throw new Error('Account ' + accountActor.url + ' does not own video channel ' + videoChannel.Actor.url) + if (videoChannel.Account.id !== account.id) { + throw new Error('Account ' + account.Actor.url + ' does not own video channel ' + videoChannel.Actor.url) } const to = overrideTo ? overrideTo : videoObject.to - const videoData = await videoActivityObjectToDBAttributes(channelActor.VideoChannel, videoObject, to) + const videoData = await videoActivityObjectToDBAttributes(channel, videoObject, to) video.set('name', videoData.name) video.set('uuid', videoData.uuid) video.set('url', videoData.url) @@ -444,3 +440,11 @@ export { addVideoShares, createRates } + +// --------------------------------------------------------------------------- + +function isActivityVideoUrlObject (url: ActivityUrlObject): url is ActivityVideoUrlObject { + const mimeTypes = Object.keys(VIDEO_MIMETYPE_EXT) + + return mimeTypes.indexOf(url.mimeType) !== -1 && url.mimeType.startsWith('video/') +} diff --git a/server/lib/redundancy.ts b/server/lib/redundancy.ts new file mode 100644 index 000000000..78221cc3d --- /dev/null +++ b/server/lib/redundancy.ts @@ -0,0 +1,18 @@ +import { VideoRedundancyModel } from '../models/redundancy/video-redundancy' +import { sendUndoCacheFile } from './activitypub/send' +import { Transaction } from 'sequelize' +import { getServerActor } from '../helpers/utils' + +async function removeVideoRedundancy (videoRedundancy: VideoRedundancyModel, t?: Transaction) { + const serverActor = await getServerActor() + + await sendUndoCacheFile(serverActor, videoRedundancy, t) + + await videoRedundancy.destroy({ transaction: t }) +} + +// --------------------------------------------------------------------------- + +export { + removeVideoRedundancy +} diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts new file mode 100644 index 000000000..ee9ba1766 --- /dev/null +++ b/server/lib/schedulers/videos-redundancy-scheduler.ts @@ -0,0 +1,161 @@ +import { AbstractScheduler } from './abstract-scheduler' +import { CONFIG, JOB_TTL, REDUNDANCY, SCHEDULER_INTERVALS_MS } from '../../initializers' +import { logger } from '../../helpers/logger' +import { VideoRedundancyStrategy } from '../../../shared/models/redundancy' +import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy' +import { VideoFileModel } from '../../models/video/video-file' +import { sortBy } from 'lodash' +import { downloadWebTorrentVideo } from '../../helpers/webtorrent' +import { join } from 'path' +import { rename } from 'fs-extra' +import { getServerActor } from '../../helpers/utils' +import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send' +import { VideoModel } from '../../models/video/video' +import { getVideoCacheFileActivityPubUrl } from '../activitypub/url' +import { removeVideoRedundancy } from '../redundancy' +import { isTestInstance } from '../../helpers/core-utils' + +export class VideosRedundancyScheduler extends AbstractScheduler { + + private static instance: AbstractScheduler + private executing = false + + protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.videosRedundancy + + private constructor () { + super() + } + + async execute () { + if (this.executing) return + + this.executing = true + + for (const obj of CONFIG.REDUNDANCY.VIDEOS) { + + try { + const videoToDuplicate = await this.findVideoToDuplicate(obj.strategy) + if (!videoToDuplicate) continue + + const videoFiles = videoToDuplicate.VideoFiles + videoFiles.forEach(f => f.Video = videoToDuplicate) + + const videosRedundancy = await VideoRedundancyModel.getVideoFiles(obj.strategy) + if (this.isTooHeavy(videosRedundancy, videoFiles, obj.size)) { + if (!isTestInstance()) logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url) + continue + } + + logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy) + + await this.createVideoRedundancy(obj.strategy, videoFiles) + } catch (err) { + logger.error('Cannot run videos redundancy %s.', obj.strategy, { err }) + } + } + + const expired = await VideoRedundancyModel.listAllExpired() + + for (const m of expired) { + logger.info('Removing expired video %s from our redundancy system.', this.buildEntryLogId(m)) + + try { + await m.destroy() + } catch (err) { + logger.error('Cannot remove %s video from our redundancy system.', this.buildEntryLogId(m)) + } + } + + this.executing = false + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } + + private findVideoToDuplicate (strategy: VideoRedundancyStrategy) { + if (strategy === 'most-views') return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR) + } + + private async createVideoRedundancy (strategy: VideoRedundancyStrategy, filesToDuplicate: VideoFileModel[]) { + const serverActor = await getServerActor() + + for (const file of filesToDuplicate) { + const existing = await VideoRedundancyModel.loadByFileId(file.id) + if (existing) { + logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', file.Video.url, file.resolution, strategy) + + existing.expiresOn = this.buildNewExpiration() + await existing.save() + + await sendUpdateCacheFile(serverActor, existing) + continue + } + + // We need more attributes and check if the video still exists + const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(file.Video.id) + if (!video) continue + + logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy) + + const { baseUrlHttp, baseUrlWs } = video.getBaseUrls() + const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs) + + const tmpPath = await downloadWebTorrentVideo({ magnetUri }, JOB_TTL['video-import']) + + const destPath = join(CONFIG.STORAGE.VIDEOS_DIR, video.getVideoFilename(file)) + await rename(tmpPath, destPath) + + const createdModel = await VideoRedundancyModel.create({ + expiresOn: new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS), + url: getVideoCacheFileActivityPubUrl(file), + fileUrl: video.getVideoFileUrl(file, CONFIG.WEBSERVER.URL), + strategy, + videoFileId: file.id, + actorId: serverActor.id + }) + createdModel.VideoFile = file + + await sendCreateCacheFile(serverActor, createdModel) + } + } + + // Unused, but could be useful in the future, with a custom strategy + private async purgeVideosIfNeeded (videosRedundancy: VideoRedundancyModel[], filesToDuplicate: VideoFileModel[], maxSize: number) { + const sortedVideosRedundancy = sortBy(videosRedundancy, 'createdAt') + + while (this.isTooHeavy(sortedVideosRedundancy, filesToDuplicate, maxSize)) { + const toDelete = sortedVideosRedundancy.shift() + + const videoFile = toDelete.VideoFile + logger.info('Purging video %s (resolution %d) from our redundancy system.', videoFile.Video.url, videoFile.resolution) + + await removeVideoRedundancy(toDelete, undefined) + } + + return sortedVideosRedundancy + } + + private isTooHeavy (videosRedundancy: VideoRedundancyModel[], filesToDuplicate: VideoFileModel[], maxSizeArg: number) { + const maxSize = maxSizeArg - this.getTotalFileSizes(filesToDuplicate) + + const redundancyReducer = (previous: number, current: VideoRedundancyModel) => previous + current.VideoFile.size + const totalDuplicated = videosRedundancy.reduce(redundancyReducer, 0) + + return totalDuplicated > maxSize + } + + private buildNewExpiration () { + return new Date(Date.now() + REDUNDANCY.VIDEOS.EXPIRES_AFTER_MS) + } + + private buildEntryLogId (object: VideoRedundancyModel) { + return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}` + } + + private getTotalFileSizes (files: VideoFileModel[]) { + const fileReducer = (previous: number, current: VideoFileModel) => previous + current.size + + return files.reduce(fileReducer, 0) + } +} -- cgit v1.2.3