]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Add ability to cleanup remote AP interactions
authorChocobozzz <me@florianbigard.com>
Fri, 26 Feb 2021 15:26:27 +0000 (16:26 +0100)
committerChocobozzz <me@florianbigard.com>
Wed, 3 Mar 2021 09:08:08 +0000 (10:08 +0100)
24 files changed:
client/src/app/+admin/system/jobs/jobs.component.ts
config/default.yaml
config/production.yaml.example
server/helpers/custom-validators/activitypub/activity.ts
server/helpers/custom-validators/activitypub/rate.ts
server/helpers/custom-validators/activitypub/share.ts [new file with mode: 0644]
server/initializers/checker-before-init.ts
server/initializers/config.ts
server/initializers/constants.ts
server/lib/activitypub/video-comments.ts
server/lib/job-queue/handlers/activitypub-cleaner.ts [new file with mode: 0644]
server/lib/job-queue/handlers/actor-keys.ts
server/lib/job-queue/job-queue.ts
server/middlewares/validators/videos/video-rates.ts
server/models/account/account-video-rate.ts
server/models/video/video-comment.ts
server/models/video/video-share.ts
server/models/video/video.ts
server/tests/api/activitypub/cleaner.ts [new file with mode: 0644]
server/tests/api/activitypub/index.ts
shared/extra-utils/miscs/sql.ts
shared/extra-utils/server/jobs.ts
shared/extra-utils/videos/videos.ts
shared/models/server/job.model.ts

index 9f5c044064e2888fb60a87f320d11d1fd55d5f0a..43578eedd434a4f9c92cd50754b7c21ec73e5a27 100644 (file)
@@ -28,6 +28,7 @@ export class JobsComponent extends RestTable implements OnInit {
     'activitypub-http-fetcher',
     'activitypub-http-unicast',
     'activitypub-refresher',
+    'activitypub-cleaner',
     'actor-keys',
     'email',
     'video-file-import',
index 2d8afe1c38a59dc2db17972ea1286783d7896bfb..a09d20b9d6d9e61a3188b2057e79db2659893877 100644 (file)
@@ -192,6 +192,12 @@ federation:
   videos:
     federate_unlisted: false
 
+    # Add a weekly job that cleans up remote AP interactions on local videos (shares, rates and comments)
+    # It removes objects that do not exist anymore, and potentially fix their URLs
+    # This setting is opt-in because due to an old bug in PeerTube, remote rates sent by instance before PeerTube 3.0 will be deleted
+    # We still suggest you to enable this setting even if your users will loose most of their video's likes/dislikes
+    cleanup_remote_interactions: false
+
 cache:
   previews:
     size: 500 # Max number of previews you want to cache
index 2794c543c066eb597669d117438b26917ec0d87b..31c0e6b9618f4262d4d29d912eb2740f05b57b89 100644 (file)
@@ -190,6 +190,12 @@ federation:
   videos:
     federate_unlisted: false
 
+    # Add a weekly job that cleans up remote AP interactions on local videos (shares, rates and comments)
+    # It removes objects that do not exist anymore, and potentially fix their URLs
+    # This setting is opt-in because due to an old bug in PeerTube, remote rates sent by instance before PeerTube 3.0 will be deleted
+    # We still suggest you to enable this setting even if your users will loose most of their video's likes/dislikes
+    cleanup_remote_interactions: false
+
 
 ###############################################################################
 #
index 8b8c0685fda5fe016b9cc81fcdb76c36b27e6230..da79b2782cf8d676e9e1c68be51c7ef2d6845c95 100644 (file)
@@ -1,15 +1,16 @@
 import validator from 'validator'
 import { Activity, ActivityType } from '../../../../shared/models/activitypub'
+import { exists } from '../misc'
 import { sanitizeAndCheckActorObject } from './actor'
+import { isCacheFileObjectValid } from './cache-file'
+import { isFlagActivityValid } from './flag'
 import { isActivityPubUrlValid, isBaseActivityValid, isObjectValid } from './misc'
-import { isDislikeActivityValid } from './rate'
+import { isPlaylistObjectValid } from './playlist'
+import { isDislikeActivityValid, isLikeActivityValid } from './rate'
+import { isShareActivityValid } from './share'
 import { sanitizeAndCheckVideoCommentObject } from './video-comments'
 import { sanitizeAndCheckVideoTorrentObject } from './videos'
 import { isViewActivityValid } from './view'
-import { exists } from '../misc'
-import { isCacheFileObjectValid } from './cache-file'
-import { isFlagActivityValid } from './flag'
-import { isPlaylistObjectValid } from './playlist'
 
 function isRootActivityValid (activity: any) {
   return isCollection(activity) || isActivity(activity)
@@ -70,8 +71,11 @@ function checkFlagActivity (activity: any) {
 }
 
 function checkDislikeActivity (activity: any) {
-  return isBaseActivityValid(activity, 'Dislike') &&
-    isDislikeActivityValid(activity)
+  return isDislikeActivityValid(activity)
+}
+
+function checkLikeActivity (activity: any) {
+  return isLikeActivityValid(activity)
 }
 
 function checkCreateActivity (activity: any) {
@@ -118,8 +122,7 @@ function checkRejectActivity (activity: any) {
 }
 
 function checkAnnounceActivity (activity: any) {
-  return isBaseActivityValid(activity, 'Announce') &&
-    isObjectValid(activity.object)
+  return isShareActivityValid(activity)
 }
 
 function checkUndoActivity (activity: any) {
@@ -132,8 +135,3 @@ function checkUndoActivity (activity: any) {
       checkCreateActivity(activity.object)
     )
 }
-
-function checkLikeActivity (activity: any) {
-  return isBaseActivityValid(activity, 'Like') &&
-    isObjectValid(activity.object)
-}
index ba68e8074f2e5ffd3987ff0397f5293f8315cca3..aafdda4436edc68986c5f6b1c7670f9343a712f0 100644 (file)
@@ -1,13 +1,18 @@
-import { isActivityPubUrlValid, isObjectValid } from './misc'
+import { isBaseActivityValid, isObjectValid } from './misc'
+
+function isLikeActivityValid (activity: any) {
+  return isBaseActivityValid(activity, 'Like') &&
+    isObjectValid(activity.object)
+}
 
 function isDislikeActivityValid (activity: any) {
-  return activity.type === 'Dislike' &&
-    isActivityPubUrlValid(activity.actor) &&
+  return isBaseActivityValid(activity, 'Dislike') &&
     isObjectValid(activity.object)
 }
 
 // ---------------------------------------------------------------------------
 
 export {
-  isDislikeActivityValid
+  isDislikeActivityValid,
+  isLikeActivityValid
 }
diff --git a/server/helpers/custom-validators/activitypub/share.ts b/server/helpers/custom-validators/activitypub/share.ts
new file mode 100644 (file)
index 0000000..fb5e4c0
--- /dev/null
@@ -0,0 +1,11 @@
+import { isBaseActivityValid, isObjectValid } from './misc'
+
+function isShareActivityValid (activity: any) {
+  return isBaseActivityValid(activity, 'Announce') &&
+    isObjectValid(activity.object)
+}
+// ---------------------------------------------------------------------------
+
+export {
+  isShareActivityValid
+}
index 2578de5edd5cc00d7257b21ceac02b9005361004..565e0d1fa15bd1d4ac4fabb240d87a66b074a7a5 100644 (file)
@@ -36,7 +36,7 @@ function checkMissedConfig () {
     'rates_limit.login.window', 'rates_limit.login.max', 'rates_limit.ask_send_email.window', 'rates_limit.ask_send_email.max',
     'theme.default',
     'remote_redundancy.videos.accept_from',
-    'federation.videos.federate_unlisted',
+    'federation.videos.federate_unlisted', 'federation.videos.cleanup_remote_interactions',
     'search.remote_uri.users', 'search.remote_uri.anonymous', 'search.search_index.enabled', 'search.search_index.url',
     'search.search_index.disable_local_search', 'search.search_index.is_default_search',
     'live.enabled', 'live.allow_replay', 'live.max_duration', 'live.max_user_lives', 'live.max_instance_lives',
index 21ca785848cd9fce9dc15dbc49234ab6d037f0c6..c16b63c33903ec1952a1aaa733a681dedc3d200b 100644 (file)
@@ -159,7 +159,8 @@ const CONFIG = {
   },
   FEDERATION: {
     VIDEOS: {
-      FEDERATE_UNLISTED: config.get<boolean>('federation.videos.federate_unlisted')
+      FEDERATE_UNLISTED: config.get<boolean>('federation.videos.federate_unlisted'),
+      CLEANUP_REMOTE_INTERACTIONS: config.get<boolean>('federation.videos.cleanup_remote_interactions')
     }
   },
   ADMIN: {
index 74192d590271fce20ca2b8cfdc4a128b104b0a87..083a298899846f3d89d176d89e01437ef4334509 100644 (file)
@@ -137,6 +137,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
   'activitypub-http-unicast': 5,
   'activitypub-http-fetcher': 5,
   'activitypub-follow': 5,
+  'activitypub-cleaner': 1,
   'video-file-import': 1,
   'video-transcoding': 1,
   'video-import': 1,
@@ -147,10 +148,12 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
   'video-redundancy': 1,
   'video-live-ending': 1
 }
-const JOB_CONCURRENCY: { [id in JobType]?: number } = {
+// Excluded keys are jobs that can be configured by admins
+const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
   'activitypub-http-broadcast': 1,
   'activitypub-http-unicast': 5,
   'activitypub-http-fetcher': 1,
+  'activitypub-cleaner': 1,
   'activitypub-follow': 1,
   'video-file-import': 1,
   'email': 5,
@@ -165,6 +168,7 @@ const JOB_TTL: { [id in JobType]: number } = {
   'activitypub-http-unicast': 60000 * 10, // 10 minutes
   'activitypub-http-fetcher': 1000 * 3600 * 10, // 10 hours
   'activitypub-follow': 60000 * 10, // 10 minutes
+  'activitypub-cleaner': 1000 * 3600, // 1 hour
   'video-file-import': 1000 * 3600, // 1 hour
   'video-transcoding': 1000 * 3600 * 48, // 2 days, transcoding could be long
   'video-import': 1000 * 3600 * 2, // 2 hours
@@ -178,6 +182,9 @@ const JOB_TTL: { [id in JobType]: number } = {
 const REPEAT_JOBS: { [ id: string ]: EveryRepeatOptions | CronRepeatOptions } = {
   'videos-views': {
     cron: randomInt(1, 20) + ' * * * *' // Between 1-20 minutes past the hour
+  },
+  'activitypub-cleaner': {
+    cron: '30 5 * * ' + randomInt(0, 7) // 1 time per week (random day) at 5:30 AM
   }
 }
 const JOB_PRIORITY = {
@@ -188,6 +195,7 @@ const JOB_PRIORITY = {
 }
 
 const BROADCAST_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-http-broadcast job
+const AP_CLEANER_CONCURRENCY = 10 // How many requests in parallel we do in activitypub-cleaner job
 const CRAWL_REQUEST_CONCURRENCY = 1 // How many requests in parallel to fetch remote data (likes, shares...)
 const JOB_REQUEST_TIMEOUT = 7000 // 7 seconds
 const JOB_COMPLETED_LIFETIME = 60000 * 60 * 24 * 2 // 2 days
@@ -756,6 +764,7 @@ if (isTestInstance() === true) {
   SCHEDULER_INTERVALS_MS.autoFollowIndexInstances = 5000
   SCHEDULER_INTERVALS_MS.updateInboxStats = 5000
   REPEAT_JOBS['videos-views'] = { every: 5000 }
+  REPEAT_JOBS['activitypub-cleaner'] = { every: 5000 }
 
   REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR = 1
 
@@ -815,6 +824,7 @@ export {
   REDUNDANCY,
   JOB_CONCURRENCY,
   JOB_ATTEMPTS,
+  AP_CLEANER_CONCURRENCY,
   LAST_MIGRATION_VERSION,
   OAUTH_LIFETIME,
   CUSTOM_HTML_TAG_COMMENTS,
index 902d877c466dc0b61d5382ed8ddbe083b01f12bf..d025ed7f12eb348bec081647b68894a1fd164525 100644 (file)
@@ -41,10 +41,10 @@ async function resolveThread (params: ResolveThreadParams): ResolveThreadResult
       return await tryResolveThreadFromVideo(params)
     }
   } catch (err) {
-    logger.debug('Cannot get or create account and video and channel for reply %s, fetch comment', url, { err })
+    logger.debug('Cannot resolve thread from video %s, maybe because it was not a video', url, { err })
   }
 
-  return resolveParentComment(params)
+  return resolveRemoteParentComment(params)
 }
 
 export {
@@ -119,7 +119,7 @@ async function tryResolveThreadFromVideo (params: ResolveThreadParams) {
   return { video, comment: resultComment, commentCreated }
 }
 
-async function resolveParentComment (params: ResolveThreadParams) {
+async function resolveRemoteParentComment (params: ResolveThreadParams) {
   const { url, comments } = params
 
   if (comments.length > ACTIVITY_PUB.MAX_RECURSION_COMMENTS) {
@@ -133,7 +133,7 @@ async function resolveParentComment (params: ResolveThreadParams) {
   })
 
   if (sanitizeAndCheckVideoCommentObject(body) === false) {
-    throw new Error('Remote video comment JSON is not valid:' + JSON.stringify(body))
+    throw new Error(`Remote video comment JSON ${url} is not valid:` + JSON.stringify(body))
   }
 
   const actorUrl = body.attributedTo
diff --git a/server/lib/job-queue/handlers/activitypub-cleaner.ts b/server/lib/job-queue/handlers/activitypub-cleaner.ts
new file mode 100644 (file)
index 0000000..b58bbc9
--- /dev/null
@@ -0,0 +1,194 @@
+import * as Bluebird from 'bluebird'
+import * as Bull from 'bull'
+import { checkUrlsSameHost } from '@server/helpers/activitypub'
+import { isDislikeActivityValid, isLikeActivityValid } from '@server/helpers/custom-validators/activitypub/rate'
+import { isShareActivityValid } from '@server/helpers/custom-validators/activitypub/share'
+import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments'
+import { doRequest } from '@server/helpers/requests'
+import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants'
+import { VideoModel } from '@server/models/video/video'
+import { VideoCommentModel } from '@server/models/video/video-comment'
+import { VideoShareModel } from '@server/models/video/video-share'
+import { HttpStatusCode } from '@shared/core-utils'
+import { logger } from '../../../helpers/logger'
+import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
+
+// Job to clean remote interactions off local videos
+
+async function processActivityPubCleaner (_job: Bull.Job) {
+  logger.info('Processing ActivityPub cleaner.')
+
+  {
+    const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos()
+    const { bodyValidator, deleter, updater } = rateOptionsFactory()
+
+    await Bluebird.map(rateUrls, async rateUrl => {
+      try {
+        const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter)
+
+        if (result?.status === 'deleted') {
+          const { videoId, type } = result.data
+
+          await VideoModel.updateRatesOf(videoId, type, undefined)
+        }
+      } catch (err) {
+        logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err })
+      }
+    }, { concurrency: AP_CLEANER_CONCURRENCY })
+  }
+
+  {
+    const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos()
+    const { bodyValidator, deleter, updater } = shareOptionsFactory()
+
+    await Bluebird.map(shareUrls, async shareUrl => {
+      try {
+        await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter)
+      } catch (err) {
+        logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err })
+      }
+    }, { concurrency: AP_CLEANER_CONCURRENCY })
+  }
+
+  {
+    const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos()
+    const { bodyValidator, deleter, updater } = commentOptionsFactory()
+
+    await Bluebird.map(commentUrls, async commentUrl => {
+      try {
+        await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter)
+      } catch (err) {
+        logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err })
+      }
+    }, { concurrency: AP_CLEANER_CONCURRENCY })
+  }
+}
+
+// ---------------------------------------------------------------------------
+
+export {
+  processActivityPubCleaner
+}
+
+// ---------------------------------------------------------------------------
+
+async function updateObjectIfNeeded <T> (
+  url: string,
+  bodyValidator: (body: any) => boolean,
+  updater: (url: string, newUrl: string) => Promise<T>,
+  deleter: (url: string) => Promise<T>
+): Promise<{ data: T, status: 'deleted' | 'updated' } | null> {
+  // Fetch url
+  const { response, body } = await doRequest<any>({
+    uri: url,
+    json: true,
+    activityPub: true
+  })
+
+  // Does not exist anymore, remove entry
+  if (response.statusCode === HttpStatusCode.NOT_FOUND_404) {
+    logger.info('Removing remote AP object %s.', url)
+    const data = await deleter(url)
+
+    return { status: 'deleted', data }
+  }
+
+  // If not same id, check same host and update
+  if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`)
+
+  if (body.type === 'Tombstone') {
+    logger.info('Removing remote AP object %s.', url)
+    const data = await deleter(url)
+
+    return { status: 'deleted', data }
+  }
+
+  const newUrl = body.id
+  if (newUrl !== url) {
+    if (checkUrlsSameHost(newUrl, url) !== true) {
+      throw new Error(`New url ${newUrl} has not the same host than old url ${url}`)
+    }
+
+    logger.info('Updating remote AP object %s.', url)
+    const data = await updater(url, newUrl)
+
+    return { status: 'updated', data }
+  }
+
+  return null
+}
+
+function rateOptionsFactory () {
+  return {
+    bodyValidator: (body: any) => isLikeActivityValid(body) || isDislikeActivityValid(body),
+
+    updater: async (url: string, newUrl: string) => {
+      const rate = await AccountVideoRateModel.loadByUrl(url, undefined)
+      rate.url = newUrl
+
+      const videoId = rate.videoId
+      const type = rate.type
+
+      await rate.save()
+
+      return { videoId, type }
+    },
+
+    deleter: async (url) => {
+      const rate = await AccountVideoRateModel.loadByUrl(url, undefined)
+
+      const videoId = rate.videoId
+      const type = rate.type
+
+      await rate.destroy()
+
+      return { videoId, type }
+    }
+  }
+}
+
+function shareOptionsFactory () {
+  return {
+    bodyValidator: (body: any) => isShareActivityValid(body),
+
+    updater: async (url: string, newUrl: string) => {
+      const share = await VideoShareModel.loadByUrl(url, undefined)
+      share.url = newUrl
+
+      await share.save()
+
+      return undefined
+    },
+
+    deleter: async (url) => {
+      const share = await VideoShareModel.loadByUrl(url, undefined)
+
+      await share.destroy()
+
+      return undefined
+    }
+  }
+}
+
+function commentOptionsFactory () {
+  return {
+    bodyValidator: (body: any) => sanitizeAndCheckVideoCommentObject(body),
+
+    updater: async (url: string, newUrl: string) => {
+      const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url)
+      comment.url = newUrl
+
+      await comment.save()
+
+      return undefined
+    },
+
+    deleter: async (url) => {
+      const comment = await VideoCommentModel.loadByUrlAndPopulateAccountAndVideo(url)
+
+      await comment.destroy()
+
+      return undefined
+    }
+  }
+}
index 8da549640c0413f3f17bff8f087ab81e7107ccb2..125307843814fc0dee9aa8a0ea570941b7efee32 100644 (file)
@@ -6,7 +6,7 @@ import { logger } from '../../../helpers/logger'
 
 async function processActorKeys (job: Bull.Job) {
   const payload = job.data as ActorKeysPayload
-  logger.info('Processing email in job %d.', job.id)
+  logger.info('Processing actor keys in job %d.', job.id)
 
   const actor = await ActorModel.load(payload.actorId)
 
index efda2e038ed1d43584434287c21e76367092a9fd..42e8347b1f663650c5aed7042c5e02cbd1b9ffcc 100644 (file)
@@ -21,6 +21,7 @@ import {
 import { logger } from '../../helpers/logger'
 import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
 import { Redis } from '../redis'
+import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
 import { processActivityPubFollow } from './handlers/activitypub-follow'
 import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
 import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
@@ -38,6 +39,7 @@ type CreateJobArgument =
   { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
   { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
   { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
+  { type: 'activitypub-http-cleaner', payload: {} } |
   { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
   { type: 'video-file-import', payload: VideoFileImportPayload } |
   { type: 'video-transcoding', payload: VideoTranscodingPayload } |
@@ -58,6 +60,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
   'activitypub-http-broadcast': processActivityPubHttpBroadcast,
   'activitypub-http-unicast': processActivityPubHttpUnicast,
   'activitypub-http-fetcher': processActivityPubHttpFetcher,
+  'activitypub-cleaner': processActivityPubCleaner,
   'activitypub-follow': processActivityPubFollow,
   'video-file-import': processVideoFileImport,
   'video-transcoding': processVideoTranscoding,
@@ -75,6 +78,7 @@ const jobTypes: JobType[] = [
   'activitypub-http-broadcast',
   'activitypub-http-fetcher',
   'activitypub-http-unicast',
+  'activitypub-cleaner',
   'email',
   'video-transcoding',
   'video-file-import',
@@ -233,6 +237,12 @@ class JobQueue {
     this.queues['videos-views'].add({}, {
       repeat: REPEAT_JOBS['videos-views']
     }).catch(err => logger.error('Cannot add repeatable job.', { err }))
+
+    if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
+      this.queues['activitypub-cleaner'].add({}, {
+        repeat: REPEAT_JOBS['activitypub-cleaner']
+      }).catch(err => logger.error('Cannot add repeatable job.', { err }))
+    }
   }
 
   private filterJobTypes (jobType?: JobType) {
index 7dcba15f1354fa971b048e0b6aed139fd40dce9a..01bdef25fe3870da940a359829735300aa6e2b1e 100644 (file)
@@ -1,6 +1,6 @@
 import * as express from 'express'
 import { body, param, query } from 'express-validator'
-import { isIdOrUUIDValid } from '../../../helpers/custom-validators/misc'
+import { isIdOrUUIDValid, isIdValid } from '../../../helpers/custom-validators/misc'
 import { isRatingValid } from '../../../helpers/custom-validators/video-rates'
 import { isVideoRatingTypeValid } from '../../../helpers/custom-validators/videos'
 import { logger } from '../../../helpers/logger'
@@ -28,14 +28,14 @@ const videoUpdateRateValidator = [
 const getAccountVideoRateValidatorFactory = function (rateType: VideoRateType) {
   return [
     param('name').custom(isAccountNameValid).withMessage('Should have a valid account name'),
-    param('videoId').custom(isIdOrUUIDValid).not().isEmpty().withMessage('Should have a valid videoId'),
+    param('videoId').custom(isIdValid).not().isEmpty().withMessage('Should have a valid videoId'),
 
     async (req: express.Request, res: express.Response, next: express.NextFunction) => {
       logger.debug('Checking videoCommentGetValidator parameters.', { parameters: req.params })
 
       if (areValidationErrors(req, res)) return
 
-      const rate = await AccountVideoRateModel.loadLocalAndPopulateVideo(rateType, req.params.name, req.params.videoId)
+      const rate = await AccountVideoRateModel.loadLocalAndPopulateVideo(rateType, req.params.name, +req.params.videoId)
       if (!rate) {
         return res.status(HttpStatusCode.NOT_FOUND_404)
                   .json({ error: 'Video rate not found' })
index d9c52949114becc9b603f7cb0874aa947df6cbce..801f76bbae6615a52ecee09e44f9724fc9385376 100644 (file)
@@ -146,10 +146,22 @@ export class AccountVideoRateModel extends Model {
     return AccountVideoRateModel.findAndCountAll(query)
   }
 
+  static listRemoteRateUrlsOfLocalVideos () {
+    const query = `SELECT "accountVideoRate".url FROM "accountVideoRate" ` +
+      `INNER JOIN account ON account.id = "accountVideoRate"."accountId" ` +
+      `INNER JOIN actor ON actor.id = account."actorId" AND actor."serverId" IS NOT NULL ` +
+      `INNER JOIN video ON video.id = "accountVideoRate"."videoId" AND video.remote IS FALSE`
+
+    return AccountVideoRateModel.sequelize.query<{ url: string }>(query, {
+      type: QueryTypes.SELECT,
+      raw: true
+    }).then(rows => rows.map(r => r.url))
+  }
+
   static loadLocalAndPopulateVideo (
     rateType: VideoRateType,
     accountName: string,
-    videoId: number | string,
+    videoId: number,
     t?: Transaction
   ): Promise<MAccountVideoRateAccountVideo> {
     const options: FindOptions = {
@@ -241,21 +253,7 @@ export class AccountVideoRateModel extends Model {
 
       await AccountVideoRateModel.destroy(query)
 
-      const field = type === 'like'
-        ? 'likes'
-        : 'dislikes'
-
-      const rawQuery = `UPDATE "video" SET "${field}" = ` +
-        '(' +
-          'SELECT COUNT(id) FROM "accountVideoRate" WHERE "accountVideoRate"."videoId" = "video"."id" AND type = :rateType' +
-        ') ' +
-        'WHERE "video"."id" = :videoId'
-
-      return AccountVideoRateModel.sequelize.query(rawQuery, {
-        transaction: t,
-        replacements: { videoId, rateType: type },
-        type: QueryTypes.UPDATE
-      })
+      return VideoModel.updateRatesOf(videoId, type, t)
     })
   }
 
index dc7556d441a476bc279ff6e850e4182a84d7750f..151c2bc81a793fd11ec3622ec1cc363fe7fcbf3a 100644 (file)
@@ -1,5 +1,5 @@
 import { uniq } from 'lodash'
-import { FindAndCountOptions, FindOptions, Op, Order, ScopeOptions, Sequelize, Transaction, WhereOptions } from 'sequelize'
+import { FindAndCountOptions, FindOptions, Op, Order, QueryTypes, ScopeOptions, Sequelize, Transaction, WhereOptions } from 'sequelize'
 import {
   AllowNull,
   BelongsTo,
@@ -696,6 +696,18 @@ export class VideoCommentModel extends Model {
     }
   }
 
+  static listRemoteCommentUrlsOfLocalVideos () {
+    const query = `SELECT "videoComment".url FROM "videoComment" ` +
+      `INNER JOIN account ON account.id = "videoComment"."accountId" ` +
+      `INNER JOIN actor ON actor.id = "account"."actorId" AND actor."serverId" IS NOT NULL ` +
+      `INNER JOIN video ON video.id = "videoComment"."videoId" AND video.remote IS FALSE`
+
+    return VideoCommentModel.sequelize.query<{ url: string }>(query, {
+      type: QueryTypes.SELECT,
+      raw: true
+    }).then(rows => rows.map(r => r.url))
+  }
+
   static cleanOldCommentsOf (videoId: number, beforeUpdatedAt: Date) {
     const query = {
       where: {
index b7f5f3fa3c54fa4da41bef2a0bd035a1b7cfdaf3..5059c1fa69cb55893117b7f1b89075d0771b1d7b 100644 (file)
@@ -1,4 +1,4 @@
-import { literal, Op, Transaction } from 'sequelize'
+import { literal, Op, QueryTypes, Transaction } from 'sequelize'
 import { AllowNull, BelongsTo, Column, CreatedAt, DataType, ForeignKey, Is, Model, Scopes, Table, UpdatedAt } from 'sequelize-typescript'
 import { isActivityPubUrlValid } from '../../helpers/custom-validators/activitypub/misc'
 import { CONSTRAINTS_FIELDS } from '../../initializers/constants'
@@ -185,6 +185,17 @@ export class VideoShareModel extends Model {
     return VideoShareModel.findAndCountAll(query)
   }
 
+  static listRemoteShareUrlsOfLocalVideos () {
+    const query = `SELECT "videoShare".url FROM "videoShare" ` +
+      `INNER JOIN actor ON actor.id = "videoShare"."actorId" AND actor."serverId" IS NOT NULL ` +
+      `INNER JOIN video ON video.id = "videoShare"."videoId" AND video.remote IS FALSE`
+
+    return VideoShareModel.sequelize.query<{ url: string }>(query, {
+      type: QueryTypes.SELECT,
+      raw: true
+    }).then(rows => rows.map(r => r.url))
+  }
+
   static cleanOldSharesOf (videoId: number, beforeUpdatedAt: Date) {
     const query = {
       where: {
index 8894843e0de04f89d50634ac28e5471315c3d262..b4c7da655bf190dbc0c772f2bfd8813ea030779a 100644 (file)
@@ -34,7 +34,7 @@ import { ModelCache } from '@server/models/model-cache'
 import { VideoFile } from '@shared/models/videos/video-file.model'
 import { ResultList, UserRight, VideoPrivacy, VideoState } from '../../../shared'
 import { VideoObject } from '../../../shared/models/activitypub/objects'
-import { Video, VideoDetails } from '../../../shared/models/videos'
+import { Video, VideoDetails, VideoRateType } from '../../../shared/models/videos'
 import { ThumbnailType } from '../../../shared/models/videos/thumbnail.type'
 import { VideoFilter } from '../../../shared/models/videos/video-query.type'
 import { VideoStreamingPlaylistType } from '../../../shared/models/videos/video-streaming-playlist.type'
@@ -1509,6 +1509,24 @@ export class VideoModel extends Model {
     })
   }
 
+  static updateRatesOf (videoId: number, type: VideoRateType, t: Transaction) {
+    const field = type === 'like'
+      ? 'likes'
+      : 'dislikes'
+
+    const rawQuery = `UPDATE "video" SET "${field}" = ` +
+      '(' +
+      'SELECT COUNT(id) FROM "accountVideoRate" WHERE "accountVideoRate"."videoId" = "video"."id" AND type = :rateType' +
+      ') ' +
+      'WHERE "video"."id" = :videoId'
+
+    return AccountVideoRateModel.sequelize.query(rawQuery, {
+      transaction: t,
+      replacements: { videoId, rateType: type },
+      type: QueryTypes.UPDATE
+    })
+  }
+
   static checkVideoHasInstanceFollow (videoId: number, followerActorId: number) {
     // Instances only share videos
     const query = 'SELECT 1 FROM "videoShare" ' +
diff --git a/server/tests/api/activitypub/cleaner.ts b/server/tests/api/activitypub/cleaner.ts
new file mode 100644 (file)
index 0000000..75ef56c
--- /dev/null
@@ -0,0 +1,283 @@
+/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
+
+import 'mocha'
+import * as chai from 'chai'
+import {
+  cleanupTests,
+  closeAllSequelize,
+  deleteAll,
+  doubleFollow,
+  getCount,
+  selectQuery,
+  setVideoField,
+  updateQuery,
+  wait
+} from '../../../../shared/extra-utils'
+import { flushAndRunMultipleServers, ServerInfo, setAccessTokensToServers } from '../../../../shared/extra-utils/index'
+import { waitJobs } from '../../../../shared/extra-utils/server/jobs'
+import { addVideoCommentThread, getVideoCommentThreads } from '../../../../shared/extra-utils/videos/video-comments'
+import { getVideo, rateVideo, uploadVideoAndGetId } from '../../../../shared/extra-utils/videos/videos'
+
+const expect = chai.expect
+
+describe('Test AP cleaner', function () {
+  let servers: ServerInfo[] = []
+  let videoUUID1: string
+  let videoUUID2: string
+  let videoUUID3: string
+
+  let videoUUIDs: string[]
+
+  before(async function () {
+    this.timeout(120000)
+
+    const config = {
+      federation: {
+        videos: { cleanup_remote_interactions: true }
+      }
+    }
+    servers = await flushAndRunMultipleServers(3, config)
+
+    // Get the access tokens
+    await setAccessTokensToServers(servers)
+
+    await Promise.all([
+      doubleFollow(servers[0], servers[1]),
+      doubleFollow(servers[1], servers[2]),
+      doubleFollow(servers[0], servers[2])
+    ])
+
+    // Update 1 local share, check 6 shares
+
+    // Create 1 comment per video
+    // Update 1 remote URL and 1 local URL on
+
+    videoUUID1 = (await uploadVideoAndGetId({ server: servers[0], videoName: 'server 1' })).uuid
+    videoUUID2 = (await uploadVideoAndGetId({ server: servers[1], videoName: 'server 2' })).uuid
+    videoUUID3 = (await uploadVideoAndGetId({ server: servers[2], videoName: 'server 3' })).uuid
+
+    videoUUIDs = [ videoUUID1, videoUUID2, videoUUID3 ]
+
+    await waitJobs(servers)
+
+    for (const server of servers) {
+      for (const uuid of videoUUIDs) {
+        await rateVideo(server.url, server.accessToken, uuid, 'like')
+        await addVideoCommentThread(server.url, server.accessToken, uuid, 'comment')
+      }
+    }
+
+    await waitJobs(servers)
+  })
+
+  it('Should have the correct likes', async function () {
+    for (const server of servers) {
+      for (const uuid of videoUUIDs) {
+        const res = await getVideo(server.url, uuid)
+        expect(res.body.likes).to.equal(3)
+        expect(res.body.dislikes).to.equal(0)
+      }
+    }
+  })
+
+  it('Should destroy server 3 internal likes and correctly clean them', async function () {
+    this.timeout(20000)
+
+    await deleteAll(servers[2].internalServerNumber, 'accountVideoRate')
+    for (const uuid of videoUUIDs) {
+      await setVideoField(servers[2].internalServerNumber, uuid, 'likes', '0')
+    }
+
+    await wait(5000)
+    await waitJobs(servers)
+
+    // Updated rates of my video
+    {
+      const res = await getVideo(servers[0].url, videoUUID1)
+      expect(res.body.likes).to.equal(2)
+      expect(res.body.dislikes).to.equal(0)
+    }
+
+    // Did not update rates of a remote video
+    {
+      const res = await getVideo(servers[0].url, videoUUID2)
+      expect(res.body.likes).to.equal(3)
+      expect(res.body.dislikes).to.equal(0)
+    }
+  })
+
+  it('Should update rates to dislikes', async function () {
+    this.timeout(20000)
+
+    for (const server of servers) {
+      for (const uuid of videoUUIDs) {
+        await rateVideo(server.url, server.accessToken, uuid, 'dislike')
+      }
+    }
+
+    await waitJobs(servers)
+
+    for (const server of servers) {
+      for (const uuid of videoUUIDs) {
+        const res = await getVideo(server.url, uuid)
+        expect(res.body.likes).to.equal(0)
+        expect(res.body.dislikes).to.equal(3)
+      }
+    }
+  })
+
+  it('Should destroy server 3 internal dislikes and correctly clean them', async function () {
+    this.timeout(20000)
+
+    await deleteAll(servers[2].internalServerNumber, 'accountVideoRate')
+
+    for (const uuid of videoUUIDs) {
+      await setVideoField(servers[2].internalServerNumber, uuid, 'dislikes', '0')
+    }
+
+    await wait(5000)
+    await waitJobs(servers)
+
+    // Updated rates of my video
+    {
+      const res = await getVideo(servers[0].url, videoUUID1)
+      expect(res.body.likes).to.equal(0)
+      expect(res.body.dislikes).to.equal(2)
+    }
+
+    // Did not update rates of a remote video
+    {
+      const res = await getVideo(servers[0].url, videoUUID2)
+      expect(res.body.likes).to.equal(0)
+      expect(res.body.dislikes).to.equal(3)
+    }
+  })
+
+  it('Should destroy server 3 internal shares and correctly clean them', async function () {
+    this.timeout(20000)
+
+    const preCount = await getCount(servers[0].internalServerNumber, 'videoShare')
+    expect(preCount).to.equal(6)
+
+    await deleteAll(servers[2].internalServerNumber, 'videoShare')
+    await wait(5000)
+    await waitJobs(servers)
+
+    // Still 6 because we don't have remote shares on local videos
+    const postCount = await getCount(servers[0].internalServerNumber, 'videoShare')
+    expect(postCount).to.equal(6)
+  })
+
+  it('Should destroy server 3 internal comments and correctly clean them', async function () {
+    this.timeout(20000)
+
+    {
+      const res = await getVideoCommentThreads(servers[0].url, videoUUID1, 0, 5)
+      expect(res.body.total).to.equal(3)
+    }
+
+    await deleteAll(servers[2].internalServerNumber, 'videoComment')
+
+    await wait(5000)
+    await waitJobs(servers)
+
+    {
+      const res = await getVideoCommentThreads(servers[0].url, videoUUID1, 0, 5)
+      expect(res.body.total).to.equal(2)
+    }
+  })
+
+  it('Should correctly update rate URLs', async function () {
+    this.timeout(30000)
+
+    async function check (like: string, ofServerUrl: string, urlSuffix: string, remote: 'true' | 'false') {
+      const query = `SELECT "videoId", "accountVideoRate".url FROM "accountVideoRate" ` +
+        `INNER JOIN video ON "accountVideoRate"."videoId" = video.id AND remote IS ${remote} WHERE "accountVideoRate"."url" LIKE '${like}'`
+      const res = await selectQuery(servers[0].internalServerNumber, query)
+
+      for (const rate of res) {
+        const matcher = new RegExp(`^${ofServerUrl}/accounts/root/dislikes/\\d+${urlSuffix}$`)
+        expect(rate.url).to.match(matcher)
+      }
+    }
+
+    async function checkLocal () {
+      const startsWith = 'http://' + servers[0].host + '%'
+      // On local videos
+      await check(startsWith, servers[0].url, '', 'false')
+      // On remote videos
+      await check(startsWith, servers[0].url, '', 'true')
+    }
+
+    async function checkRemote (suffix: string) {
+      const startsWith = 'http://' + servers[1].host + '%'
+      // On local videos
+      await check(startsWith, servers[1].url, suffix, 'false')
+      // On remote videos, we should not update URLs so no suffix
+      await check(startsWith, servers[1].url, '', 'true')
+    }
+
+    await checkLocal()
+    await checkRemote('')
+
+    {
+      const query = `UPDATE "accountVideoRate" SET url = url || 'stan'`
+      await updateQuery(servers[1].internalServerNumber, query)
+
+      await wait(5000)
+      await waitJobs(servers)
+    }
+
+    await checkLocal()
+    await checkRemote('stan')
+  })
+
+  it('Should correctly update comment URLs', async function () {
+    this.timeout(30000)
+
+    async function check (like: string, ofServerUrl: string, urlSuffix: string, remote: 'true' | 'false') {
+      const query = `SELECT "videoId", "videoComment".url, uuid as "videoUUID" FROM "videoComment" ` +
+        `INNER JOIN video ON "videoComment"."videoId" = video.id AND remote IS ${remote} WHERE "videoComment"."url" LIKE '${like}'`
+
+      const res = await selectQuery(servers[0].internalServerNumber, query)
+
+      for (const comment of res) {
+        const matcher = new RegExp(`${ofServerUrl}/videos/watch/${comment.videoUUID}/comments/\\d+${urlSuffix}`)
+        expect(comment.url).to.match(matcher)
+      }
+    }
+
+    async function checkLocal () {
+      const startsWith = 'http://' + servers[0].host + '%'
+      // On local videos
+      await check(startsWith, servers[0].url, '', 'false')
+      // On remote videos
+      await check(startsWith, servers[0].url, '', 'true')
+    }
+
+    async function checkRemote (suffix: string) {
+      const startsWith = 'http://' + servers[1].host + '%'
+      // On local videos
+      await check(startsWith, servers[1].url, suffix, 'false')
+      // On remote videos, we should not update URLs so no suffix
+      await check(startsWith, servers[1].url, '', 'true')
+    }
+
+    {
+      const query = `UPDATE "videoComment" SET url = url || 'kyle'`
+      await updateQuery(servers[1].internalServerNumber, query)
+
+      await wait(5000)
+      await waitJobs(servers)
+    }
+
+    await checkLocal()
+    await checkRemote('kyle')
+  })
+
+  after(async function () {
+    await cleanupTests(servers)
+
+    await closeAllSequelize(servers)
+  })
+})
index 92bd6f6607a01101350836ec49cfc8ac55dbba4e..324b444e4c1c26665e2414a2c6dbc410c0778b7d 100644 (file)
@@ -1,3 +1,4 @@
+import './cleaner'
 import './client'
 import './fetch'
 import './refresher'
index e68812e1b4363df75b55e1d37021b26f8a638684..740f0c2d6d817c2dbe78c9d4f1bdd8adeab5596b 100644 (file)
@@ -24,6 +24,25 @@ function getSequelize (internalServerNumber: number) {
   return seq
 }
 
+function deleteAll (internalServerNumber: number, table: string) {
+  const seq = getSequelize(internalServerNumber)
+
+  const options = { type: QueryTypes.DELETE }
+
+  return seq.query(`DELETE FROM "${table}"`, options)
+}
+
+async function getCount (internalServerNumber: number, table: string) {
+  const seq = getSequelize(internalServerNumber)
+
+  const options = { type: QueryTypes.SELECT as QueryTypes.SELECT }
+
+  const [ { total } ] = await seq.query<{ total: string }>(`SELECT COUNT(*) as total FROM "${table}"`, options)
+  if (total === null) return 0
+
+  return parseInt(total, 10)
+}
+
 function setActorField (internalServerNumber: number, to: string, field: string, value: string) {
   const seq = getSequelize(internalServerNumber)
 
@@ -63,6 +82,20 @@ async function countVideoViewsOf (internalServerNumber: number, uuid: string) {
   return parseInt(total + '', 10)
 }
 
+function selectQuery (internalServerNumber: number, query: string) {
+  const seq = getSequelize(internalServerNumber)
+  const options = { type: QueryTypes.SELECT as QueryTypes.SELECT }
+
+  return seq.query<any>(query, options)
+}
+
+function updateQuery (internalServerNumber: number, query: string) {
+  const seq = getSequelize(internalServerNumber)
+  const options = { type: QueryTypes.UPDATE as QueryTypes.UPDATE }
+
+  return seq.query(query, options)
+}
+
 async function closeAllSequelize (servers: ServerInfo[]) {
   for (const server of servers) {
     if (sequelizes[server.internalServerNumber]) {
@@ -95,6 +128,10 @@ export {
   setActorField,
   countVideoViewsOf,
   setPluginVersion,
+  selectQuery,
+  deleteAll,
+  updateQuery,
   setActorFollowScores,
-  closeAllSequelize
+  closeAllSequelize,
+  getCount
 }
index 97971f960186693245b7ca2869c37e20febf0135..704929bd4f499560f7d7eacdff60a10f214ff434 100644 (file)
@@ -63,6 +63,7 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
   else servers = serversArg as ServerInfo[]
 
   const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
+  const repeatableJobs = [ 'videos-views', 'activitypub-cleaner' ]
   let pendingRequests: boolean
 
   function tasksBuilder () {
@@ -79,7 +80,7 @@ async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
           count: 10,
           sort: '-createdAt'
         }).then(res => res.body.data)
-          .then((jobs: Job[]) => jobs.filter(j => j.type !== 'videos-views'))
+          .then((jobs: Job[]) => jobs.filter(j => !repeatableJobs.includes(j.type)))
           .then(jobs => {
             if (jobs.length !== 0) {
               pendingRequests = true
index 0b6a540462cba3c1aa60d67704a151e2409895fd..67fe82d41883f743cb319b0af743823b003f453b 100644 (file)
@@ -498,7 +498,7 @@ function updateVideo (
   })
 }
 
-function rateVideo (url: string, accessToken: string, id: number, rating: string, specialStatus = HttpStatusCode.NO_CONTENT_204) {
+function rateVideo (url: string, accessToken: string, id: number | string, rating: string, specialStatus = HttpStatusCode.NO_CONTENT_204) {
   const path = '/api/v1/videos/' + id + '/rate'
 
   return request(url)
index c693827b0a874674ccb82283de40406da2232fce..83ef844570354407b81437c6732bf64dcd284c82 100644 (file)
@@ -8,6 +8,7 @@ export type JobType =
   | 'activitypub-http-unicast'
   | 'activitypub-http-broadcast'
   | 'activitypub-http-fetcher'
+  | 'activitypub-cleaner'
   | 'activitypub-follow'
   | 'video-file-import'
   | 'video-transcoding'