From 2f5c6b2fc6e60502c2a8df4dc9029c1d87ebe30b Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 20 Dec 2018 14:31:11 +0100 Subject: Optimize actor follow scores modifications --- server/lib/cache/actor-follow-score-cache.ts | 46 +++++++++++++++++++++ server/lib/cache/index.ts | 1 + .../handlers/activitypub-http-broadcast.ts | 3 +- .../job-queue/handlers/activitypub-http-unicast.ts | 6 +-- server/lib/job-queue/job-queue.ts | 4 +- server/lib/schedulers/abstract-scheduler.ts | 18 ++++++++- server/lib/schedulers/actor-follow-scheduler.ts | 47 ++++++++++++++++++++++ .../lib/schedulers/bad-actor-follow-scheduler.ts | 30 -------------- server/lib/schedulers/remove-old-jobs-scheduler.ts | 6 +-- server/lib/schedulers/update-videos-scheduler.ts | 15 +------ .../lib/schedulers/videos-redundancy-scheduler.ts | 9 +---- .../lib/schedulers/youtube-dl-update-scheduler.ts | 2 +- 12 files changed, 125 insertions(+), 62 deletions(-) create mode 100644 server/lib/cache/actor-follow-score-cache.ts create mode 100644 server/lib/schedulers/actor-follow-scheduler.ts delete mode 100644 server/lib/schedulers/bad-actor-follow-scheduler.ts (limited to 'server/lib') diff --git a/server/lib/cache/actor-follow-score-cache.ts b/server/lib/cache/actor-follow-score-cache.ts new file mode 100644 index 000000000..d070bde09 --- /dev/null +++ b/server/lib/cache/actor-follow-score-cache.ts @@ -0,0 +1,46 @@ +import { ACTOR_FOLLOW_SCORE } from '../../initializers' +import { logger } from '../../helpers/logger' + +// Cache follows scores, instead of writing them too often in database +// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores +class ActorFollowScoreCache { + + private static instance: ActorFollowScoreCache + private pendingFollowsScore: { [ url: string ]: number } = {} + + private constructor () {} + + static get Instance () { + return this.instance || (this.instance = new this()) + } + + updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) { + if (goodInboxes.length === 0 && badInboxes.length === 0) return + + logger.info('Updating %d good actor follows and %d bad actor follows scores in cache.', goodInboxes.length, badInboxes.length) + + for (const goodInbox of goodInboxes) { + if (this.pendingFollowsScore[goodInbox] === undefined) this.pendingFollowsScore[goodInbox] = 0 + + this.pendingFollowsScore[goodInbox] += ACTOR_FOLLOW_SCORE.BONUS + } + + for (const badInbox of badInboxes) { + if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0 + + this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY + } + } + + getPendingFollowsScoreCopy () { + return this.pendingFollowsScore + } + + clearPendingFollowsScore () { + this.pendingFollowsScore = {} + } +} + +export { + ActorFollowScoreCache +} diff --git a/server/lib/cache/index.ts b/server/lib/cache/index.ts index 54eb983fa..e921d04a7 100644 --- a/server/lib/cache/index.ts +++ b/server/lib/cache/index.ts @@ -1,2 +1,3 @@ +export * from './actor-follow-score-cache' export * from './videos-preview-cache' export * from './videos-caption-cache' diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index abbd89b3b..9493945ff 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -5,6 +5,7 @@ import { doRequest } from '../../../helpers/requests' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers' +import { ActorFollowScoreCache } from '../../cache' export type ActivitypubHttpBroadcastPayload = { uris: string[] @@ -38,7 +39,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) { .catch(() => badUrls.push(uri)) }, { concurrency: BROADCAST_CONCURRENCY }) - return ActorFollowModel.updateActorFollowsScore(goodUrls, badUrls, undefined) + return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) } // --------------------------------------------------------------------------- diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index d36479032..3973dcdc8 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts @@ -1,9 +1,9 @@ import * as Bull from 'bull' import { logger } from '../../../helpers/logger' import { doRequest } from '../../../helpers/requests' -import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' import { JOB_REQUEST_TIMEOUT } from '../../../initializers' +import { ActorFollowScoreCache } from '../../cache' export type ActivitypubHttpUnicastPayload = { uri: string @@ -31,9 +31,9 @@ async function processActivityPubHttpUnicast (job: Bull.Job) { try { await doRequest(options) - ActorFollowModel.updateActorFollowsScore([ uri ], [], undefined) + ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) } catch (err) { - ActorFollowModel.updateActorFollowsScore([], [ uri ], undefined) + ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) throw err } diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e34be7dcd..ba9cbe0d9 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -165,10 +165,10 @@ class JobQueue { return total } - removeOldJobs () { + async removeOldJobs () { for (const key of Object.keys(this.queues)) { const queue = this.queues[key] - queue.clean(JOB_COMPLETED_LIFETIME, 'completed') + await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') } } diff --git a/server/lib/schedulers/abstract-scheduler.ts b/server/lib/schedulers/abstract-scheduler.ts index b9d0a4d17..86ea7aa38 100644 --- a/server/lib/schedulers/abstract-scheduler.ts +++ b/server/lib/schedulers/abstract-scheduler.ts @@ -1,8 +1,11 @@ +import { logger } from '../../helpers/logger' + export abstract class AbstractScheduler { protected abstract schedulerIntervalMs: number private interval: NodeJS.Timer + private isRunning = false enable () { if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.') @@ -14,5 +17,18 @@ export abstract class AbstractScheduler { clearInterval(this.interval) } - abstract execute () + async execute () { + if (this.isRunning === true) return + this.isRunning = true + + try { + await this.internalExecute() + } catch (err) { + logger.error('Cannot execute %s scheduler.', this.constructor.name, { err }) + } finally { + this.isRunning = false + } + } + + protected abstract internalExecute (): Promise } diff --git a/server/lib/schedulers/actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts new file mode 100644 index 000000000..3967be7f8 --- /dev/null +++ b/server/lib/schedulers/actor-follow-scheduler.ts @@ -0,0 +1,47 @@ +import { isTestInstance } from '../../helpers/core-utils' +import { logger } from '../../helpers/logger' +import { ActorFollowModel } from '../../models/activitypub/actor-follow' +import { AbstractScheduler } from './abstract-scheduler' +import { SCHEDULER_INTERVALS_MS } from '../../initializers' +import { ActorFollowScoreCache } from '../cache' + +export class ActorFollowScheduler extends AbstractScheduler { + + private static instance: AbstractScheduler + + protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.actorFollowScores + + private constructor () { + super() + } + + protected async internalExecute () { + await this.processPendingScores() + + await this.removeBadActorFollows() + } + + private async processPendingScores () { + const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScoreCopy() + + ActorFollowScoreCache.Instance.clearPendingFollowsScore() + + for (const inbox of Object.keys(pendingScores)) { + await ActorFollowModel.updateFollowScore(inbox, pendingScores[inbox]) + } + } + + private async removeBadActorFollows () { + if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).') + + try { + await ActorFollowModel.removeBadActorFollows() + } catch (err) { + logger.error('Error in bad actor follows scheduler.', { err }) + } + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} diff --git a/server/lib/schedulers/bad-actor-follow-scheduler.ts b/server/lib/schedulers/bad-actor-follow-scheduler.ts deleted file mode 100644 index 617149aaf..000000000 --- a/server/lib/schedulers/bad-actor-follow-scheduler.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { isTestInstance } from '../../helpers/core-utils' -import { logger } from '../../helpers/logger' -import { ActorFollowModel } from '../../models/activitypub/actor-follow' -import { AbstractScheduler } from './abstract-scheduler' -import { SCHEDULER_INTERVALS_MS } from '../../initializers' - -export class BadActorFollowScheduler extends AbstractScheduler { - - private static instance: AbstractScheduler - - protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.badActorFollow - - private constructor () { - super() - } - - async execute () { - if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).') - - try { - await ActorFollowModel.removeBadActorFollows() - } catch (err) { - logger.error('Error in bad actor follows scheduler.', { err }) - } - } - - static get Instance () { - return this.instance || (this.instance = new this()) - } -} diff --git a/server/lib/schedulers/remove-old-jobs-scheduler.ts b/server/lib/schedulers/remove-old-jobs-scheduler.ts index a29a6b800..4a4341ba9 100644 --- a/server/lib/schedulers/remove-old-jobs-scheduler.ts +++ b/server/lib/schedulers/remove-old-jobs-scheduler.ts @@ -14,10 +14,10 @@ export class RemoveOldJobsScheduler extends AbstractScheduler { super() } - async execute () { - if (!isTestInstance()) logger.info('Removing old jobs (scheduler).') + protected internalExecute () { + if (!isTestInstance()) logger.info('Removing old jobs in scheduler.') - JobQueue.Instance.removeOldJobs() + return JobQueue.Instance.removeOldJobs() } static get Instance () { diff --git a/server/lib/schedulers/update-videos-scheduler.ts b/server/lib/schedulers/update-videos-scheduler.ts index fd2edfd17..21f071f9e 100644 --- a/server/lib/schedulers/update-videos-scheduler.ts +++ b/server/lib/schedulers/update-videos-scheduler.ts @@ -12,23 +12,12 @@ export class UpdateVideosScheduler extends AbstractScheduler { protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.updateVideos - private isRunning = false - private constructor () { super() } - async execute () { - if (this.isRunning === true) return - this.isRunning = true - - try { - await retryTransactionWrapper(this.updateVideos.bind(this)) - } catch (err) { - logger.error('Cannot execute update videos scheduler.', { err }) - } finally { - this.isRunning = false - } + protected async internalExecute () { + return retryTransactionWrapper(this.updateVideos.bind(this)) } private async updateVideos () { diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts index 15e094d39..f643ee226 100644 --- a/server/lib/schedulers/videos-redundancy-scheduler.ts +++ b/server/lib/schedulers/videos-redundancy-scheduler.ts @@ -16,7 +16,6 @@ import { getOrCreateVideoAndAccountAndChannel } from '../activitypub' export class VideosRedundancyScheduler extends AbstractScheduler { private static instance: AbstractScheduler - private executing = false protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL @@ -24,11 +23,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler { super() } - async execute () { - if (this.executing) return - - this.executing = true - + protected async internalExecute () { for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) @@ -57,8 +52,6 @@ export class VideosRedundancyScheduler extends AbstractScheduler { await this.extendsLocalExpiration() await this.purgeRemoteExpired() - - this.executing = false } static get Instance () { diff --git a/server/lib/schedulers/youtube-dl-update-scheduler.ts b/server/lib/schedulers/youtube-dl-update-scheduler.ts index 461cd045e..aa027116d 100644 --- a/server/lib/schedulers/youtube-dl-update-scheduler.ts +++ b/server/lib/schedulers/youtube-dl-update-scheduler.ts @@ -12,7 +12,7 @@ export class YoutubeDlUpdateScheduler extends AbstractScheduler { super() } - execute () { + protected internalExecute () { return updateYoutubeDLBinary() } -- cgit v1.2.3