}
const JOB_ATTEMPTS: { [id in JobType]: number } = {
- 'activitypub-http-broadcast': 5,
- 'activitypub-http-unicast': 5,
- 'activitypub-http-fetcher': 5,
+ 'activitypub-http-broadcast': 1,
+ 'activitypub-http-unicast': 1,
+ 'activitypub-http-fetcher': 2,
'activitypub-follow': 5,
'activitypub-cleaner': 1,
'video-file-import': 1,
// Excluded keys are jobs that can be configured by admins
const JOB_CONCURRENCY: { [id in Exclude<JobType, 'video-transcoding' | 'video-import'>]: number } = {
'activitypub-http-broadcast': 1,
- 'activitypub-http-unicast': 5,
+ 'activitypub-http-unicast': 10,
'activitypub-http-fetcher': 3,
'activitypub-cleaner': 1,
'activitypub-follow': 1,
import { Transaction } from 'sequelize'
+import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
import { getServerActor } from '@server/models/application/application'
import { ContextType } from '@shared/models/activitypub/context'
import { Activity, ActivityAudience } from '../../../../shared/models/activitypub'
function broadcastTo (uris: string[], data: any, byActor: MActorId, contextType?: ContextType) {
if (uris.length === 0) return undefined
- logger.debug('Creating broadcast job.', { uris })
+ const broadcastUris: string[] = []
+ const unicastUris: string[] = []
- const payload = {
- uris,
- signatureActorId: byActor.id,
- body: data,
- contextType
+ // Bad URIs could be slow to respond, prefer to process them in a dedicated queue
+ for (const uri of uris) {
+ if (ActorFollowHealthCache.Instance.isBadInbox(uri)) {
+ unicastUris.push(uri)
+ } else {
+ broadcastUris.push(uri)
+ }
+ }
+
+ logger.debug('Creating broadcast job.', { broadcastUris, unicastUris })
+
+ if (broadcastUris.length !== 0) {
+ const payload = {
+ uris: broadcastUris,
+ signatureActorId: byActor.id,
+ body: data,
+ contextType
+ }
+
+ JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
}
- return JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
+ for (const unicastUri of unicastUris) {
+ const payload = {
+ uri: unicastUri,
+ signatureActorId: byActor.id,
+ body: data,
+ contextType
+ }
+
+ JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
+ }
}
function unicastTo (data: any, byActor: MActorId, toActorUrl: string, contextType?: ContextType) {
import { logger, loggerTagsFactory } from '@server/helpers/logger'
import { PeerTubeRequestError } from '@server/helpers/requests'
-import { ActorFollowScoreCache } from '@server/lib/files-cache'
import { VideoLoadByUrlType } from '@server/lib/model-loaders'
import { VideoModel } from '@server/models/video/video'
import { MVideoAccountLightBlacklistAllFiles, MVideoThumbnail } from '@server/types/models'
import { HttpStatusCode } from '@shared/models'
+import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
import { fetchRemoteVideo, SyncParam, syncVideoExternalAttributes } from './shared'
import { APVideoUpdater } from './updater'
await syncVideoExternalAttributes(video, videoObject, options.syncParam)
- ActorFollowScoreCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId)
+ ActorFollowHealthCache.Instance.addGoodServerId(video.VideoChannel.Actor.serverId)
return video
} catch (err) {
logger.warn('Cannot refresh video %s.', options.video.url, { err, ...lTags() })
- ActorFollowScoreCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId)
+ ActorFollowHealthCache.Instance.addBadServerId(video.VideoChannel.Actor.serverId)
// Don't refresh in loop
await video.setAsRefreshed()
-import { ACTOR_FOLLOW_SCORE } from '../../initializers/constants'
-import { logger } from '../../helpers/logger'
+import { ACTOR_FOLLOW_SCORE } from '../initializers/constants'
+import { logger } from '../helpers/logger'
// Cache follows scores, instead of writing them too often in database
// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores
-class ActorFollowScoreCache {
+class ActorFollowHealthCache {
+
+ private static instance: ActorFollowHealthCache
- private static instance: ActorFollowScoreCache
private pendingFollowsScore: { [ url: string ]: number } = {}
+
private pendingBadServer = new Set<number>()
private pendingGoodServer = new Set<number>()
+ private badInboxes = new Set<string>()
+
private constructor () {}
static get Instance () {
return this.instance || (this.instance = new this())
}
- updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) {
+ updateActorFollowsHealth (goodInboxes: string[], badInboxes: string[]) {
+ this.badInboxes.clear()
+
if (goodInboxes.length === 0 && badInboxes.length === 0) return
logger.info(
if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0
this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY
+ this.badInboxes.add(badInbox)
}
}
+ isBadInbox (inboxUrl: string) {
+ return this.badInboxes.has(inboxUrl)
+ }
+
addBadServerId (serverId: number) {
this.pendingBadServer.add(serverId)
}
}
export {
- ActorFollowScoreCache
+ ActorFollowHealthCache
}
-export * from './actor-follow-score-cache'
export * from './videos-preview-cache'
export * from './videos-caption-cache'
+export * from './videos-torrent-cache'
import { map } from 'bluebird'
import { Job } from 'bull'
+import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
import { ActivitypubHttpBroadcastPayload } from '@shared/models'
import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
import { BROADCAST_CONCURRENCY } from '../../../initializers/constants'
-import { ActorFollowScoreCache } from '../../files-cache'
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
async function processActivityPubHttpBroadcast (job: Job) {
const badUrls: string[] = []
const goodUrls: string[] = []
- await map(payload.uris, uri => {
- return doRequest(uri, options)
- .then(() => goodUrls.push(uri))
- .catch(() => badUrls.push(uri))
+ await map(payload.uris, async uri => {
+ try {
+ await doRequest(uri, options)
+ goodUrls.push(uri)
+ } catch (err) {
+ logger.debug('HTTP broadcast to %s failed.', uri, { err })
+ badUrls.push(uri)
+ }
}, { concurrency: BROADCAST_CONCURRENCY })
- return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls)
+ return ActorFollowHealthCache.Instance.updateActorFollowsHealth(goodUrls, badUrls)
}
// ---------------------------------------------------------------------------
import { ActivitypubHttpUnicastPayload } from '@shared/models'
import { logger } from '../../../helpers/logger'
import { doRequest } from '../../../helpers/requests'
-import { ActorFollowScoreCache } from '../../files-cache'
+import { ActorFollowHealthCache } from '../../actor-follow-health-cache'
import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
async function processActivityPubHttpUnicast (job: Job) {
try {
await doRequest(uri, options)
- ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], [])
+ ActorFollowHealthCache.Instance.updateActorFollowsHealth([ uri ], [])
} catch (err) {
- ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ])
+ ActorFollowHealthCache.Instance.updateActorFollowsHealth([], [ uri ])
throw err
}
import { logger } from '../../helpers/logger'
import { ACTOR_FOLLOW_SCORE, SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
import { ActorFollowModel } from '../../models/actor/actor-follow'
-import { ActorFollowScoreCache } from '../files-cache'
+import { ActorFollowHealthCache } from '../actor-follow-health-cache'
import { AbstractScheduler } from './abstract-scheduler'
export class ActorFollowScheduler extends AbstractScheduler {
}
private async processPendingScores () {
- const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScore()
- const badServerIds = ActorFollowScoreCache.Instance.getBadFollowingServerIds()
- const goodServerIds = ActorFollowScoreCache.Instance.getGoodFollowingServerIds()
+ const pendingScores = ActorFollowHealthCache.Instance.getPendingFollowsScore()
+ const badServerIds = ActorFollowHealthCache.Instance.getBadFollowingServerIds()
+ const goodServerIds = ActorFollowHealthCache.Instance.getGoodFollowingServerIds()
- ActorFollowScoreCache.Instance.clearPendingFollowsScore()
- ActorFollowScoreCache.Instance.clearBadFollowingServerIds()
- ActorFollowScoreCache.Instance.clearGoodFollowingServerIds()
+ ActorFollowHealthCache.Instance.clearPendingFollowsScore()
+ ActorFollowHealthCache.Instance.clearBadFollowingServerIds()
+ ActorFollowHealthCache.Instance.clearGoodFollowingServerIds()
for (const inbox of Object.keys(pendingScores)) {
await ActorFollowModel.updateScore(inbox, pendingScores[inbox])
import './logs'
import './reverse-proxy'
import './services'
+import './slow-follows'
import './stats'
import './tracker'
import './no-client'
--- /dev/null
+/* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
+
+import 'mocha'
+import * as chai from 'chai'
+import { cleanupTests, createMultipleServers, doubleFollow, PeerTubeServer, setAccessTokensToServers, waitJobs } from '@shared/extra-utils'
+import { Job } from '@shared/models'
+
+const expect = chai.expect
+
+describe('Test slow follows', function () {
+ let servers: PeerTubeServer[] = []
+
+ let afterFollows: Date
+
+ before(async function () {
+ this.timeout(60000)
+
+ servers = await createMultipleServers(3)
+
+ // Get the access tokens
+ await setAccessTokensToServers(servers)
+
+ await doubleFollow(servers[0], servers[1])
+ await doubleFollow(servers[0], servers[2])
+
+ afterFollows = new Date()
+
+ for (let i = 0; i < 5; i++) {
+ await servers[0].videos.quickUpload({ name: 'video ' + i })
+ }
+
+ await waitJobs(servers)
+ })
+
+ it('Should only have broadcast jobs', async function () {
+ const { data } = await servers[0].jobs.list({ jobType: 'activitypub-http-unicast', sort: '-createdAt' })
+
+ for (const job of data) {
+ expect(new Date(job.createdAt)).below(afterFollows)
+ }
+ })
+
+ it('Should process bad follower', async function () {
+ this.timeout(30000)
+
+ await servers[1].kill()
+
+ // Set server 2 as bad follower
+ await servers[0].videos.quickUpload({ name: 'video 6' })
+ await waitJobs(servers[0])
+
+ afterFollows = new Date()
+ const filter = (job: Job) => new Date(job.createdAt) > afterFollows
+
+ // Resend another broadcast job
+ await servers[0].videos.quickUpload({ name: 'video 7' })
+ await waitJobs(servers[0])
+
+ const resBroadcast = await servers[0].jobs.list({ jobType: 'activitypub-http-broadcast', sort: '-createdAt' })
+ const resUnicast = await servers[0].jobs.list({ jobType: 'activitypub-http-unicast', sort: '-createdAt' })
+
+ const broadcast = resBroadcast.data.filter(filter)
+ const unicast = resUnicast.data.filter(filter)
+
+ expect(unicast).to.have.lengthOf(2)
+ expect(broadcast).to.have.lengthOf(2)
+
+ for (const u of unicast) {
+ expect(u.data.uri).to.equal(servers[1].url + '/inbox')
+ }
+
+ for (const b of broadcast) {
+ expect(b.data.uris).to.have.lengthOf(1)
+ expect(b.data.uris[0]).to.equal(servers[2].url + '/inbox')
+ }
+ })
+
+ after(async function () {
+ await cleanupTests(servers)
+ })
+})