]> git.immae.eu Git - github/Chocobozzz/PeerTube.git/commitdiff
Process slow followers in unicast job queue
authorChocobozzz <me@florianbigard.com>
Wed, 13 Oct 2021 09:47:32 +0000 (11:47 +0200)
committerChocobozzz <me@florianbigard.com>
Wed, 13 Oct 2021 09:47:32 +0000 (11:47 +0200)
server/initializers/constants.ts
server/lib/activitypub/send/utils.ts
server/lib/activitypub/videos/refresh.ts
server/lib/actor-follow-health-cache.ts [moved from server/lib/files-cache/actor-follow-score-cache.ts with 79% similarity]
server/lib/files-cache/index.ts
server/lib/job-queue/handlers/activitypub-http-broadcast.ts
server/lib/job-queue/handlers/activitypub-http-unicast.ts
server/lib/schedulers/actor-follow-scheduler.ts
server/tests/api/server/index.ts
server/tests/api/server/slow-follows.ts [new file with mode: 0644]

index facd3b721fc5cc96359d7efa42ff9e2567ddf042..9896e1efba793c952303119b2041dc082e77d1fc 100644 (file)
@@ -134,9 +134,9 @@ const REMOTE_SCHEME = {
 }
 
 const JOB_ATTEMPTS: { [id in JobType]: number } = {
-  'activitypub-http-broadcast': 5,
-  'activitypub-http-unicast': 5,
-  'activitypub-http-fetcher': 5,
+  'activitypub-http-broadcast': 1,
+  'activitypub-http-unicast': 1,
+  'activitypub-http-fetcher': 2,
   'activitypub-follow': 5,
   'activitypub-cleaner': 1,
   'video-file-import': 1,
@@ -153,7 +153,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = {
 // Excluded keys are jobs that can be configured by admins
 const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
   'activitypub-http-broadcast': 1,
-  'activitypub-http-unicast': 5,
+  'activitypub-http-unicast': 10,
   'activitypub-http-fetcher': 3,
   'activitypub-cleaner': 1,
   'activitypub-follow': 1,
index 7cd8030e1ff4d27eb1e9c52de716ae5e15e457b8..7729703b8ec5b6e4b55a9f2b6a5f2866c499bc20 100644 (file)
@@ -1,4 +1,5 @@
 import { Transaction } from 'sequelize'
+import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
 import { getServerActor } from '@server/models/application/application'
 import { ContextType } from '@shared/models/activitypub/context'
 import { Activity, ActivityAudience } from '../../../../shared/models/activitypub'
@@ -119,16 +120,41 @@ async function broadcastToActors (
 function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) {
   if (uris.length === 0) return undefined
 
-  logger.debug('Creating broadcast job.', { uris })
+  const broadcastUris: string[] = []
+  const unicastUris: string[] = []
 
-  const payload = {
-    uris,
-    signatureActorId: byActor.id,
-    body: data,
-    contextType
+  // Bad URIs could be slow to respond, prefer to process them in a dedicated queue
+  for (const uri of uris) {
+    if (ActorFollowHealthCache.Instance.isBadInbox(uri)) {
+      unicastUris.push(uri)
+    } else {
+      broadcastUris.push(uri)
+    }
+  }
+
+  logger.debug('Creating broadcast job.', { broadcastUris, unicastUris })
+
+  if (broadcastUris.length !== 0) {
+    const payload = {
+      uris: broadcastUris,
+      signatureActorId: byActor.id,
+      body: data,
+      contextType
+    }
+
+    JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
   }
 
-  return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
+  for (const unicastUri of unicastUris) {
+    const payload = {
+      uri: unicastUri,
+      signatureActorId: byActor.id,
+      body: data,
+      contextType
+    }
+
+    JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
+  }
 }
 
 function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) {
index 3af08acf4d0b5642a9b23e1c1498cfa1ab8929db..9f952a2182770dbb053b9cb7934f6e3c197f8dda 100644 (file)
@@ -1,10 +1,10 @@
 import { logger, loggerTagsFactory } from '@server/helpers/logger'
 import { PeerTubeRequestError } from '@server/helpers/requests'
-import { ActorFollowScoreCache } from '@server/lib/files-cache'
 import { VideoLoadByUrlType } from '@server/lib/model-loaders'
 import { VideoModel } from '@server/models/video/video'
 import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models'
 import { HttpStatusCode } from '@shared/models'
+import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
 import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared'
 import { APVideoUpdater } from './updater'
 
@@ -39,7 +39,7 @@ async function refreshVideoIfNeeded (options: {
 
     await syncVideoExternalAttributes(video, videoObject, options.syncParam)
 
-    ActorFollowScoreCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId)
+    ActorFollowHealthCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId)
 
     return video
   } catch (err) {
@@ -53,7 +53,7 @@ async function refreshVideoIfNeeded (options: {
 
     logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() })
 
-    ActorFollowScoreCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId)
+    ActorFollowHealthCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId)
 
     // Don't refresh in loop
     await video.setAsRefreshed()
similarity index 79%
rename from server/lib/files-cache/actor-follow-score-cache.ts
rename to server/lib/actor-follow-health-cache.ts
index 465080e72cf5ac58e5cf6bdb5ba10396a2b63a33..ab8cc98df810f0a69473c0d7a533afd8c5d7563b 100644 (file)
@@ -1,22 +1,28 @@
-import { ACTOR_FOLLOW_SCORE } from '../../initializers/constants'
-import { logger } from '../../helpers/logger'
+import { ACTOR_FOLLOW_SCORE } from '../initializers/constants'
+import { logger } from '../helpers/logger'
 
 // Cache follows scores, instead of writing them too often in database
 // Keep data in memory, we don't really need Redis here as we don't really care to loose some scores
-class ActorFollowScoreCache {
+class ActorFollowHealthCache {
+
+  private static instance: ActorFollowHealthCache
 
-  private static instance: ActorFollowScoreCache
   private pendingFollowsScore: { [ url: string ]: number } = {}
+
   private pendingBadServer = new Set<number>()
   private pendingGoodServer = new Set<number>()
 
+  private badInboxes = new Set<string>()
+
   private constructor () {}
 
   static get Instance () {
     return this.instance || (this.instance = new this())
   }
 
-  updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) {
+  updateActorFollowsHealth (goodInboxes: string[], badInboxes: string[]) {
+    this.badInboxes.clear()
+
     if (goodInboxes.length === 0 && badInboxes.length === 0) return
 
     logger.info(
@@ -34,9 +40,14 @@ class ActorFollowScoreCache {
       if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0
 
       this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY
+      this.badInboxes.add(badInbox)
     }
   }
 
+  isBadInbox (inboxUrl: string) {
+    return this.badInboxes.has(inboxUrl)
+  }
+
   addBadServerId (serverId: number) {
     this.pendingBadServer.add(serverId)
   }
@@ -71,5 +82,5 @@ class ActorFollowScoreCache {
 }
 
 export {
-  ActorFollowScoreCache
+  ActorFollowHealthCache
 }
index e921d04a719f168c6d2028c1ef663f368b92355f..e5853f7d64ab90da73d008db251e39c1706385a6 100644 (file)
@@ -1,3 +1,3 @@
-export * from './actor-follow-score-cache'
 export * from './videos-preview-cache'
 export * from './videos-caption-cache'
+export * from './videos-torrent-cache'
index 9b0bb6574aba65366ffc3d46efb2f573e6376e66..fbf01d2768f92ccd2ce2e6d2754d448fbecc9203 100644 (file)
@@ -1,10 +1,10 @@
 import { map } from 'bluebird'
 import { Job } from 'bull'
+import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
 import { ActivitypubHttpBroadcastPayload } from '@shared/models'
 import { logger } from '../../../helpers/logger'
 import { doRequest } from '../../../helpers/requests'
 import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
-import { ActorFollowScoreCache } from '../../files-cache'
 import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
 
 async function processActivityPubHttpBroadcast (job: Job) {
@@ -25,13 +25,17 @@ async function processActivityPubHttpBroadcast (job: Job) {
   const badUrls: string[] = []
   const goodUrls: string[] = []
 
-  await map(payload.uris, uri => {
-    return doRequest(uri, options)
-      .then(() => goodUrls.push(uri))
-      .catch(() => badUrls.push(uri))
+  await map(payload.uris, async uri => {
+    try {
+      await doRequest(uri, options)
+      goodUrls.push(uri)
+    } catch (err) {
+      logger.debug('HTTP broadcast to %s failed.', uri, { err })
+      badUrls.push(uri)
+    }
   }, { concurrency: BROADCAST_CONCURRENCY })
 
-  return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls)
+  return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
 }
 
 // ---------------------------------------------------------------------------
index 9be50837f28246919f8e86696c696d8b49bddad7..673583d2b05d6c58626f9dcc1c2c343ede81d680 100644 (file)
@@ -2,7 +2,7 @@ import { Job } from 'bull'
 import { ActivitypubHttpUnicastPayload } from '@shared/models'
 import { logger } from '../../../helpers/logger'
 import { doRequest } from '../../../helpers/requests'
-import { ActorFollowScoreCache } from '../../files-cache'
+import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
 import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
 
 async function processActivityPubHttpUnicast (job: Job) {
@@ -23,9 +23,9 @@ async function processActivityPubHttpUnicast (job: Job) {
 
   try {
     await doRequest(uri, options)
-    ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], [])
+    ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], [])
   } catch (err) {
-    ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ])
+    ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ])
 
     throw err
   }
index 1b80316e98ac242d98629bf2ea12aba46089c66e..a5a377573e4c205a890d09bc5b56c8538e4a3b25 100644 (file)
@@ -2,7 +2,7 @@ import { isTestInstance } from '../../helpers/core-utils'
 import { logger } from '../../helpers/logger'
 import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
 import { ActorFollowModel } from '../../models/actor/actor-follow'
-import { ActorFollowScoreCache } from '../files-cache'
+import { ActorFollowHealthCache } from '../actor-follow-health-cache'
 import { AbstractScheduler } from './abstract-scheduler'
 
 export class ActorFollowScheduler extends AbstractScheduler {
@@ -22,13 +22,13 @@ export class ActorFollowScheduler extends AbstractScheduler {
   }
 
   private async processPendingScores () {
-    const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScore()
-    const badServerIds = ActorFollowScoreCache.Instance.getBadFollowingServerIds()
-    const goodServerIds = ActorFollowScoreCache.Instance.getGoodFollowingServerIds()
+    const pendingScores = ActorFollowHealthCache.Instance.getPendingFollowsScore()
+    const badServerIds = ActorFollowHealthCache.Instance.getBadFollowingServerIds()
+    const goodServerIds = ActorFollowHealthCache.Instance.getGoodFollowingServerIds()
 
-    ActorFollowScoreCache.Instance.clearPendingFollowsScore()
-    ActorFollowScoreCache.Instance.clearBadFollowingServerIds()
-    ActorFollowScoreCache.Instance.clearGoodFollowingServerIds()
+    ActorFollowHealthCache.Instance.clearPendingFollowsScore()
+    ActorFollowHealthCache.Instance.clearBadFollowingServerIds()
+    ActorFollowHealthCache.Instance.clearGoodFollowingServerIds()
 
     for (const inbox of Object.keys(pendingScores)) {
       await ActorFollowModel.updateScore(inbox, pendingScores[inbox])
index b16a22ee7ff4b66c5599d192d2833ee413bca374..8136fc3c638263c265d544d09f398b1f1cb4b0b3 100644 (file)
@@ -11,6 +11,7 @@ import './jobs'
 import './logs'
 import './reverse-proxy'
 import './services'
+import './slow-follows'
 import './stats'
 import './tracker'
 import './no-client'
diff --git a/server/tests/api/server/slow-follows.ts b/server/tests/api/server/slow-follows.ts
new file mode 100644 (file)
index 0000000..2bef0c9
--- /dev/null
@@ -0,0 +1,81 @@
+/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
+
+import 'mocha'
+import * as chai from 'chai'
+import { cleanupTests, createMultipleServers, doubleFollow, PeerTubeServer, setAccessTokensToServers, waitJobs } from '@shared/extra-utils'
+import { Job } from '@shared/models'
+
+const expect = chai.expect
+
+describe('Test slow follows', function () {
+  let servers: PeerTubeServer[] = []
+
+  let afterFollows: Date
+
+  before(async function () {
+    this.timeout(60000)
+
+    servers = await createMultipleServers(3)
+
+    // Get the access tokens
+    await setAccessTokensToServers(servers)
+
+    await doubleFollow(servers[0], servers[1])
+    await doubleFollow(servers[0], servers[2])
+
+    afterFollows = new Date()
+
+    for (let i = 0; i < 5; i++) {
+      await servers[0].videos.quickUpload({ name: 'video ' + i })
+    }
+
+    await waitJobs(servers)
+  })
+
+  it('Should only have broadcast jobs', async function () {
+    const { data } = await servers[0].jobs.list({ jobType: 'activitypub-http-unicast', sort: '-createdAt' })
+
+    for (const job of data) {
+      expect(new Date(job.createdAt)).below(afterFollows)
+    }
+  })
+
+  it('Should process bad follower', async function () {
+    this.timeout(30000)
+
+    await servers[1].kill()
+
+    // Set server 2 as bad follower
+    await servers[0].videos.quickUpload({ name: 'video 6' })
+    await waitJobs(servers[0])
+
+    afterFollows = new Date()
+    const filter = (job: Job) => new Date(job.createdAt) > afterFollows
+
+    // Resend another broadcast job
+    await servers[0].videos.quickUpload({ name: 'video 7' })
+    await waitJobs(servers[0])
+
+    const resBroadcast = await servers[0].jobs.list({ jobType: 'activitypub-http-broadcast', sort: '-createdAt' })
+    const resUnicast = await servers[0].jobs.list({ jobType: 'activitypub-http-unicast', sort: '-createdAt' })
+
+    const broadcast = resBroadcast.data.filter(filter)
+    const unicast = resUnicast.data.filter(filter)
+
+    expect(unicast).to.have.lengthOf(2)
+    expect(broadcast).to.have.lengthOf(2)
+
+    for (const u of unicast) {
+      expect(u.data.uri).to.equal(servers[1].url + '/inbox')
+    }
+
+    for (const b of broadcast) {
+      expect(b.data.uris).to.have.lengthOf(1)
+      expect(b.data.uris[0]).to.equal(servers[2].url + '/inbox')
+    }
+  })
+
+  after(async function () {
+    await cleanupTests(servers)
+  })
+})