]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/blobdiff - server/lib/job-queue/handlers/activitypub-cleaner.ts
Fix server lint
[github/Chocobozzz/PeerTube.git] / server / lib / job-queue / handlers / activitypub-cleaner.ts
index 1caca1dcc04bb386d9c4efa7751bea6f85b964f9..a25f00b0a8491dad5ead3ac7013940d00a0e4a56 100644 (file)
@@ -1,6 +1,5 @@
-import * as Bluebird from 'bluebird'
-import * as Bull from 'bull'
-import { checkUrlsSameHost } from '@server/helpers/activitypub'
+import { map } from 'bluebird'
+import { Job } from 'bullmq'
 import {
   isAnnounceActivityValid,
   isDislikeActivityValid,
@@ -8,62 +7,57 @@ import {
 } from '@server/helpers/custom-validators/activitypub/activity'
 import { sanitizeAndCheckVideoCommentObject } from '@server/helpers/custom-validators/activitypub/video-comments'
 import { doJSONRequest, PeerTubeRequestError } from '@server/helpers/requests'
-import { AP_CLEANER_CONCURRENCY } from '@server/initializers/constants'
+import { AP_CLEANER } from '@server/initializers/constants'
+import { checkUrlsSameHost } from '@server/lib/activitypub/url'
+import { Redis } from '@server/lib/redis'
 import { VideoModel } from '@server/models/video/video'
 import { VideoCommentModel } from '@server/models/video/video-comment'
 import { VideoShareModel } from '@server/models/video/video-share'
-import { HttpStatusCode } from '@shared/core-utils'
-import { logger } from '../../../helpers/logger'
+import { HttpStatusCode } from '@shared/models'
+import { logger, loggerTagsFactory } from '../../../helpers/logger'
 import { AccountVideoRateModel } from '../../../models/account/account-video-rate'
 
+const lTags = loggerTagsFactory('ap-cleaner')
+
 // Job to clean remote interactions off local videos
 
-async function processActivityPubCleaner (_job: Bull.Job) {
-  logger.info('Processing ActivityPub cleaner.')
+async function processActivityPubCleaner (_job: Job) {
+  logger.info('Processing ActivityPub cleaner.', lTags())
 
   {
     const rateUrls = await AccountVideoRateModel.listRemoteRateUrlsOfLocalVideos()
     const { bodyValidator, deleter, updater } = rateOptionsFactory()
 
-    await Bluebird.map(rateUrls, async rateUrl => {
-      try {
-        const result = await updateObjectIfNeeded(rateUrl, bodyValidator, updater, deleter)
+    await map(rateUrls, async rateUrl => {
+      // TODO: remove when https://github.com/mastodon/mastodon/issues/13571 is fixed
+      if (rateUrl.includes('#')) return
+
+      const result = await updateObjectIfNeeded({ url: rateUrl, bodyValidator, updater, deleter })
 
-        if (result?.status === 'deleted') {
-          const { videoId, type } = result.data
+      if (result?.status === 'deleted') {
+        const { videoId, type } = result.data
 
-          await VideoModel.updateRatesOf(videoId, type, undefined)
-        }
-      } catch (err) {
-        logger.warn('Cannot update/delete remote AP rate %s.', rateUrl, { err })
+        await VideoModel.syncLocalRates(videoId, type, undefined)
       }
-    }, { concurrency: AP_CLEANER_CONCURRENCY })
+    }, { concurrency: AP_CLEANER.CONCURRENCY })
   }
 
   {
     const shareUrls = await VideoShareModel.listRemoteShareUrlsOfLocalVideos()
     const { bodyValidator, deleter, updater } = shareOptionsFactory()
 
-    await Bluebird.map(shareUrls, async shareUrl => {
-      try {
-        await updateObjectIfNeeded(shareUrl, bodyValidator, updater, deleter)
-      } catch (err) {
-        logger.warn('Cannot update/delete remote AP share %s.', shareUrl, { err })
-      }
-    }, { concurrency: AP_CLEANER_CONCURRENCY })
+    await map(shareUrls, async shareUrl => {
+      await updateObjectIfNeeded({ url: shareUrl, bodyValidator, updater, deleter })
+    }, { concurrency: AP_CLEANER.CONCURRENCY })
   }
 
   {
     const commentUrls = await VideoCommentModel.listRemoteCommentUrlsOfLocalVideos()
     const { bodyValidator, deleter, updater } = commentOptionsFactory()
 
-    await Bluebird.map(commentUrls, async commentUrl => {
-      try {
-        await updateObjectIfNeeded(commentUrl, bodyValidator, updater, deleter)
-      } catch (err) {
-        logger.warn('Cannot update/delete remote AP comment %s.', commentUrl, { err })
-      }
-    }, { concurrency: AP_CLEANER_CONCURRENCY })
+    await map(commentUrls, async commentUrl => {
+      await updateObjectIfNeeded({ url: commentUrl, bodyValidator, updater, deleter })
+    }, { concurrency: AP_CLEANER.CONCURRENCY })
   }
 }
 
@@ -75,14 +69,16 @@ export {
 
 // ---------------------------------------------------------------------------
 
-async function updateObjectIfNeeded <T> (
-  url: string,
-  bodyValidator: (body: any) => boolean,
-  updater: (url: string, newUrl: string) => Promise<T>,
-  deleter: (url: string) => Promise<T>
+async function updateObjectIfNeeded <T> (options: {
+  url: string
+  bodyValidator: (body: any) => boolean
+  updater: (url: string, newUrl: string) => Promise<T>
+  deleter: (url: string) => Promise<T> }
 ): Promise<{ data: T, status: 'deleted' | 'updated' } | null> {
+  const { url, bodyValidator, updater, deleter } = options
+
   const on404OrTombstone = async () => {
-    logger.info('Removing remote AP object %s.', url)
+    logger.info('Removing remote AP object %s.', url, lTags(url))
     const data = await deleter(url)
 
     return { status: 'deleted' as 'deleted', data }
@@ -92,7 +88,7 @@ async function updateObjectIfNeeded <T> (
     const { body } = await doJSONRequest<any>(url, { activityPub: true })
 
     // If not same id, check same host and update
-    if (!body || !body.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`)
+    if (!body?.id || !bodyValidator(body)) throw new Error(`Body or body id of ${url} is invalid`)
 
     if (body.type === 'Tombstone') {
       return on404OrTombstone()
@@ -104,7 +100,7 @@ async function updateObjectIfNeeded <T> (
         throw new Error(`New url ${newUrl} has not the same host than old url ${url}`)
       }
 
-      logger.info('Updating remote AP object %s.', url)
+      logger.info('Updating remote AP object %s.', url, lTags(url))
       const data = await updater(url, newUrl)
 
       return { status: 'updated', data }
@@ -117,7 +113,15 @@ async function updateObjectIfNeeded <T> (
       return on404OrTombstone()
     }
 
-    throw err
+    logger.debug('Remote AP object %s is unavailable.', url, lTags(url))
+
+    const unavailability = await Redis.Instance.addAPUnavailability(url)
+    if (unavailability >= AP_CLEANER.UNAVAILABLE_TRESHOLD) {
+      logger.info('Removing unavailable AP resource %s.', url, lTags(url))
+      return on404OrTombstone()
+    }
+
+    return null
   }
 }