From 9db437c8155f3563a33e22ed2896072a9f1fbdb0 Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Wed, 13 Oct 2021 11:47:32 +0200 Subject: [PATCH] Process slow followers in unicast job queue --- server/initializers/constants.ts | 8 +- server/lib/activitypub/send/utils.ts | 40 +++++++-- server/lib/activitypub/videos/refresh.ts | 6 +- ...-cache.ts => actor-follow-health-cache.ts} | 23 ++++-- server/lib/files-cache/index.ts | 2 +- .../handlers/activitypub-http-broadcast.ts | 16 ++-- .../handlers/activitypub-http-unicast.ts | 6 +- .../lib/schedulers/actor-follow-scheduler.ts | 14 ++-- server/tests/api/server/index.ts | 1 + server/tests/api/server/slow-follows.ts | 81 +++++++++++++++++++ 10 files changed, 160 insertions(+), 37 deletions(-) rename server/lib/{files-cache/actor-follow-score-cache.ts => actor-follow-health-cache.ts} (79%) create mode 100644 server/tests/api/server/slow-follows.ts diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index facd3b721..9896e1efb 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -134,9 +134,9 @@ const REMOTE_SCHEME = { } const JOB_ATTEMPTS: { [id in JobType]: number } = { - 'activitypub-http-broadcast': 5, - 'activitypub-http-unicast': 5, - 'activitypub-http-fetcher': 5, + 'activitypub-http-broadcast': 1, + 'activitypub-http-unicast': 1, + 'activitypub-http-fetcher': 2, 'activitypub-follow': 5, 'activitypub-cleaner': 1, 'video-file-import': 1, @@ -153,7 +153,7 @@ const JOB_ATTEMPTS: { [id in JobType]: number } = { // Excluded keys are jobs that can be configured by admins const JOB_CONCURRENCY: { [id in Exclude]: number } = { 'activitypub-http-broadcast': 1, - 'activitypub-http-unicast': 5, + 'activitypub-http-unicast': 10, 'activitypub-http-fetcher': 3, 'activitypub-cleaner': 1, 'activitypub-follow': 1, diff --git a/server/lib/activitypub/send/utils.ts b/server/lib/activitypub/send/utils.ts index 7cd8030e1..7729703b8 100644 --- a/server/lib/activitypub/send/utils.ts +++ b/server/lib/activitypub/send/utils.ts @@ -1,4 +1,5 @@ import { Transaction } from 'sequelize' +import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' import { getServerActor } from '@server/models/application/application' import { ContextType } from '@shared/models/activitypub/context' import { Activity, ActivityAudience } from '../../../../shared/models/activitypub' @@ -119,16 +120,41 @@ async function broadcastToActors ( function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) { if (uris.length === 0) return undefined - logger.debug('Creating broadcast job.', { uris }) + const broadcastUris: string[] = [] + const unicastUris: string[] = [] - const payload = { - uris, - signatureActorId: byActor.id, - body: data, - contextType + // Bad URIs could be slow to respond, prefer to process them in a dedicated queue + for (const uri of uris) { + if (ActorFollowHealthCache.Instance.isBadInbox(uri)) { + unicastUris.push(uri) + } else { + broadcastUris.push(uri) + } + } + + logger.debug('Creating broadcast job.', { broadcastUris, unicastUris }) + + if (broadcastUris.length !== 0) { + const payload = { + uris: broadcastUris, + signatureActorId: byActor.id, + body: data, + contextType + } + + JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) } - return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }) + for (const unicastUri of unicastUris) { + const payload = { + uri: unicastUri, + signatureActorId: byActor.id, + body: data, + contextType + } + + JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload }) + } } function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) { diff --git a/server/lib/activitypub/videos/refresh.ts b/server/lib/activitypub/videos/refresh.ts index 3af08acf4..9f952a218 100644 --- a/server/lib/activitypub/videos/refresh.ts +++ b/server/lib/activitypub/videos/refresh.ts @@ -1,10 +1,10 @@ import { logger, loggerTagsFactory } from '@server/helpers/logger' import { PeerTubeRequestError } from '@server/helpers/requests' -import { ActorFollowScoreCache } from '@server/lib/files-cache' import { VideoLoadByUrlType } from '@server/lib/model-loaders' import { VideoModel } from '@server/models/video/video' import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models' import { HttpStatusCode } from '@shared/models' +import { ActorFollowHealthCache } from '../../actor-follow-health-cache' import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared' import { APVideoUpdater } from './updater' @@ -39,7 +39,7 @@ async function refreshVideoIfNeeded (options: { await syncVideoExternalAttributes(video, videoObject, options.syncParam) - ActorFollowScoreCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId) + ActorFollowHealthCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId) return video } catch (err) { @@ -53,7 +53,7 @@ async function refreshVideoIfNeeded (options: { logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() }) - ActorFollowScoreCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId) + ActorFollowHealthCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId) // Don't refresh in loop await video.setAsRefreshed() diff --git a/server/lib/files-cache/actor-follow-score-cache.ts b/server/lib/actor-follow-health-cache.ts similarity index 79% rename from server/lib/files-cache/actor-follow-score-cache.ts rename to server/lib/actor-follow-health-cache.ts index 465080e72..ab8cc98df 100644 --- a/server/lib/files-cache/actor-follow-score-cache.ts +++ b/server/lib/actor-follow-health-cache.ts @@ -1,22 +1,28 @@ -import { ACTOR_FOLLOW_SCORE } from '../../initializers/constants' -import { logger } from '../../helpers/logger' +import { ACTOR_FOLLOW_SCORE } from '../initializers/constants' +import { logger } from '../helpers/logger' // Cache follows scores, instead of writing them too often in database // Keep data in memory, we don't really need Redis here as we don't really care to loose some scores -class ActorFollowScoreCache { +class ActorFollowHealthCache { + + private static instance: ActorFollowHealthCache - private static instance: ActorFollowScoreCache private pendingFollowsScore: { [ url: string ]: number } = {} + private pendingBadServer = new Set() private pendingGoodServer = new Set() + private badInboxes = new Set() + private constructor () {} static get Instance () { return this.instance || (this.instance = new this()) } - updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) { + updateActorFollowsHealth (goodInboxes: string[], badInboxes: string[]) { + this.badInboxes.clear() + if (goodInboxes.length === 0 && badInboxes.length === 0) return logger.info( @@ -34,9 +40,14 @@ class ActorFollowScoreCache { if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0 this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY + this.badInboxes.add(badInbox) } } + isBadInbox (inboxUrl: string) { + return this.badInboxes.has(inboxUrl) + } + addBadServerId (serverId: number) { this.pendingBadServer.add(serverId) } @@ -71,5 +82,5 @@ class ActorFollowScoreCache { } export { - ActorFollowScoreCache + ActorFollowHealthCache } diff --git a/server/lib/files-cache/index.ts b/server/lib/files-cache/index.ts index e921d04a7..e5853f7d6 100644 --- a/server/lib/files-cache/index.ts +++ b/server/lib/files-cache/index.ts @@ -1,3 +1,3 @@ -export * from './actor-follow-score-cache' export * from './videos-preview-cache' export * from './videos-caption-cache' +export * from './videos-torrent-cache' diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 9b0bb6574..fbf01d276 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -1,10 +1,10 @@ import { map } from 'bluebird' import { Job } from 'bull' +import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache' import { ActivitypubHttpBroadcastPayload } from '@shared/models' import { logger } from '../../../helpers/logger' import { doRequest } from '../../../helpers/requests' import { BROADCAST_CONCURRENCY } from '../../../initializers/constants' -import { ActorFollowScoreCache } from '../../files-cache' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' async function processActivityPubHttpBroadcast (job: Job) { @@ -25,13 +25,17 @@ async function processActivityPubHttpBroadcast (job: Job) { const badUrls: string[] = [] const goodUrls: string[] = [] - await map(payload.uris, uri => { - return doRequest(uri, options) - .then(() => goodUrls.push(uri)) - .catch(() => badUrls.push(uri)) + await map(payload.uris, async uri => { + try { + await doRequest(uri, options) + goodUrls.push(uri) + } catch (err) { + logger.debug('HTTP broadcast to %s failed.', uri, { err }) + badUrls.push(uri) + } }, { concurrency: BROADCAST_CONCURRENCY }) - return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) + return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls) } // --------------------------------------------------------------------------- diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index 9be50837f..673583d2b 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts @@ -2,7 +2,7 @@ import { Job } from 'bull' import { ActivitypubHttpUnicastPayload } from '@shared/models' import { logger } from '../../../helpers/logger' import { doRequest } from '../../../helpers/requests' -import { ActorFollowScoreCache } from '../../files-cache' +import { ActorFollowHealthCache } from '../../actor-follow-health-cache' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' async function processActivityPubHttpUnicast (job: Job) { @@ -23,9 +23,9 @@ async function processActivityPubHttpUnicast (job: Job) { try { await doRequest(uri, options) - ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) + ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], []) } catch (err) { - ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) + ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ]) throw err } diff --git a/server/lib/schedulers/actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts index 1b80316e9..a5a377573 100644 --- a/server/lib/schedulers/actor-follow-scheduler.ts +++ b/server/lib/schedulers/actor-follow-scheduler.ts @@ -2,7 +2,7 @@ import { isTestInstance } from '../../helpers/core-utils' import { logger } from '../../helpers/logger' import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants' import { ActorFollowModel } from '../../models/actor/actor-follow' -import { ActorFollowScoreCache } from '../files-cache' +import { ActorFollowHealthCache } from '../actor-follow-health-cache' import { AbstractScheduler } from './abstract-scheduler' export class ActorFollowScheduler extends AbstractScheduler { @@ -22,13 +22,13 @@ export class ActorFollowScheduler extends AbstractScheduler { } private async processPendingScores () { - const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScore() - const badServerIds = ActorFollowScoreCache.Instance.getBadFollowingServerIds() - const goodServerIds = ActorFollowScoreCache.Instance.getGoodFollowingServerIds() + const pendingScores = ActorFollowHealthCache.Instance.getPendingFollowsScore() + const badServerIds = ActorFollowHealthCache.Instance.getBadFollowingServerIds() + const goodServerIds = ActorFollowHealthCache.Instance.getGoodFollowingServerIds() - ActorFollowScoreCache.Instance.clearPendingFollowsScore() - ActorFollowScoreCache.Instance.clearBadFollowingServerIds() - ActorFollowScoreCache.Instance.clearGoodFollowingServerIds() + ActorFollowHealthCache.Instance.clearPendingFollowsScore() + ActorFollowHealthCache.Instance.clearBadFollowingServerIds() + ActorFollowHealthCache.Instance.clearGoodFollowingServerIds() for (const inbox of Object.keys(pendingScores)) { await ActorFollowModel.updateScore(inbox, pendingScores[inbox]) diff --git a/server/tests/api/server/index.ts b/server/tests/api/server/index.ts index b16a22ee7..8136fc3c6 100644 --- a/server/tests/api/server/index.ts +++ b/server/tests/api/server/index.ts @@ -11,6 +11,7 @@ import './jobs' import './logs' import './reverse-proxy' import './services' +import './slow-follows' import './stats' import './tracker' import './no-client' diff --git a/server/tests/api/server/slow-follows.ts b/server/tests/api/server/slow-follows.ts new file mode 100644 index 000000000..2bef0c9f2 --- /dev/null +++ b/server/tests/api/server/slow-follows.ts @@ -0,0 +1,81 @@ +/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */ + +import 'mocha' +import * as chai from 'chai' +import { cleanupTests, createMultipleServers, doubleFollow, PeerTubeServer, setAccessTokensToServers, waitJobs } from '@shared/extra-utils' +import { Job } from '@shared/models' + +const expect = chai.expect + +describe('Test slow follows', function () { + let servers: PeerTubeServer[] = [] + + let afterFollows: Date + + before(async function () { + this.timeout(60000) + + servers = await createMultipleServers(3) + + // Get the access tokens + await setAccessTokensToServers(servers) + + await doubleFollow(servers[0], servers[1]) + await doubleFollow(servers[0], servers[2]) + + afterFollows = new Date() + + for (let i = 0; i < 5; i++) { + await servers[0].videos.quickUpload({ name: 'video ' + i }) + } + + await waitJobs(servers) + }) + + it('Should only have broadcast jobs', async function () { + const { data } = await servers[0].jobs.list({ jobType: 'activitypub-http-unicast', sort: '-createdAt' }) + + for (const job of data) { + expect(new Date(job.createdAt)).below(afterFollows) + } + }) + + it('Should process bad follower', async function () { + this.timeout(30000) + + await servers[1].kill() + + // Set server 2 as bad follower + await servers[0].videos.quickUpload({ name: 'video 6' }) + await waitJobs(servers[0]) + + afterFollows = new Date() + const filter = (job: Job) => new Date(job.createdAt) > afterFollows + + // Resend another broadcast job + await servers[0].videos.quickUpload({ name: 'video 7' }) + await waitJobs(servers[0]) + + const resBroadcast = await servers[0].jobs.list({ jobType: 'activitypub-http-broadcast', sort: '-createdAt' }) + const resUnicast = await servers[0].jobs.list({ jobType: 'activitypub-http-unicast', sort: '-createdAt' }) + + const broadcast = resBroadcast.data.filter(filter) + const unicast = resUnicast.data.filter(filter) + + expect(unicast).to.have.lengthOf(2) + expect(broadcast).to.have.lengthOf(2) + + for (const u of unicast) { + expect(u.data.uri).to.equal(servers[1].url + '/inbox') + } + + for (const b of broadcast) { + expect(b.data.uris).to.have.lengthOf(1) + expect(b.data.uris[0]).to.equal(servers[2].url + '/inbox') + } + }) + + after(async function () { + await cleanupTests(servers) + }) +}) -- 2.41.0