From 2f5c6b2fc6e60502c2a8df4dc9029c1d87ebe30b Mon Sep 17 00:00:00 2001 From: Chocobozzz Date: Thu, 20 Dec 2018 14:31:11 +0100 Subject: Optimize actor follow scores modifications --- server/initializers/constants.ts | 4 +- server/lib/cache/actor-follow-score-cache.ts | 46 ++++++++++++++++++++ server/lib/cache/index.ts | 1 + .../handlers/activitypub-http-broadcast.ts | 3 +- .../job-queue/handlers/activitypub-http-unicast.ts | 6 +-- server/lib/job-queue/job-queue.ts | 4 +- server/lib/schedulers/abstract-scheduler.ts | 18 +++++++- server/lib/schedulers/actor-follow-scheduler.ts | 47 ++++++++++++++++++++ .../lib/schedulers/bad-actor-follow-scheduler.ts | 30 ------------- server/lib/schedulers/remove-old-jobs-scheduler.ts | 6 +-- server/lib/schedulers/update-videos-scheduler.ts | 15 +------ .../lib/schedulers/videos-redundancy-scheduler.ts | 9 +--- .../lib/schedulers/youtube-dl-update-scheduler.ts | 2 +- server/models/activitypub/actor-follow.ts | 50 +++++++--------------- 14 files changed, 143 insertions(+), 98 deletions(-) create mode 100644 server/lib/cache/actor-follow-score-cache.ts create mode 100644 server/lib/schedulers/actor-follow-scheduler.ts delete mode 100644 server/lib/schedulers/bad-actor-follow-scheduler.ts (limited to 'server') diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts index b326a6c7b..1c27a9f6b 100644 --- a/server/initializers/constants.ts +++ b/server/initializers/constants.ts @@ -144,7 +144,7 @@ const VIDEO_IMPORT_TIMEOUT = 1000 * 3600 // 1 hour // 1 hour let SCHEDULER_INTERVALS_MS = { - badActorFollow: 60000 * 60, // 1 hour + actorFollowScores: 60000 * 60, // 1 hour removeOldJobs: 60000 * 60, // 1 hour updateVideos: 60000, // 1 minute youtubeDLUpdate: 60000 * 60 * 24 // 1 day @@ -675,7 +675,7 @@ if (isTestInstance() === true) { CONSTRAINTS_FIELDS.ACTORS.AVATAR.FILE_SIZE.max = 100 * 1024 // 100KB - SCHEDULER_INTERVALS_MS.badActorFollow = 10000 + SCHEDULER_INTERVALS_MS.actorFollowScores = 1000 SCHEDULER_INTERVALS_MS.removeOldJobs = 10000 SCHEDULER_INTERVALS_MS.updateVideos = 5000 REPEAT_JOBS['videos-views'] = { every: 5000 } diff --git a/server/lib/cache/actor-follow-score-cache.ts b/server/lib/cache/actor-follow-score-cache.ts new file mode 100644 index 000000000..d070bde09 --- /dev/null +++ b/server/lib/cache/actor-follow-score-cache.ts @@ -0,0 +1,46 @@ +import { ACTOR_FOLLOW_SCORE } from '../../initializers' +import { logger } from '../../helpers/logger' + +// Cache follows scores, instead of writing them too often in database +// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores +class ActorFollowScoreCache { + + private static instance: ActorFollowScoreCache + private pendingFollowsScore: { [ url: string ]: number } = {} + + private constructor () {} + + static get Instance () { + return this.instance || (this.instance = new this()) + } + + updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) { + if (goodInboxes.length === 0 && badInboxes.length === 0) return + + logger.info('Updating %d good actor follows and %d bad actor follows scores in cache.', goodInboxes.length, badInboxes.length) + + for (const goodInbox of goodInboxes) { + if (this.pendingFollowsScore[goodInbox] === undefined) this.pendingFollowsScore[goodInbox] = 0 + + this.pendingFollowsScore[goodInbox] += ACTOR_FOLLOW_SCORE.BONUS + } + + for (const badInbox of badInboxes) { + if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0 + + this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY + } + } + + getPendingFollowsScoreCopy () { + return this.pendingFollowsScore + } + + clearPendingFollowsScore () { + this.pendingFollowsScore = {} + } +} + +export { + ActorFollowScoreCache +} diff --git a/server/lib/cache/index.ts b/server/lib/cache/index.ts index 54eb983fa..e921d04a7 100644 --- a/server/lib/cache/index.ts +++ b/server/lib/cache/index.ts @@ -1,2 +1,3 @@ +export * from './actor-follow-score-cache' export * from './videos-preview-cache' export * from './videos-caption-cache' diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index abbd89b3b..9493945ff 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts @@ -5,6 +5,7 @@ import { doRequest } from '../../../helpers/requests' import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers' +import { ActorFollowScoreCache } from '../../cache' export type ActivitypubHttpBroadcastPayload = { uris: string[] @@ -38,7 +39,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) { .catch(() => badUrls.push(uri)) }, { concurrency: BROADCAST_CONCURRENCY }) - return ActorFollowModel.updateActorFollowsScore(goodUrls, badUrls, undefined) + return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls) } // --------------------------------------------------------------------------- diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index d36479032..3973dcdc8 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts @@ -1,9 +1,9 @@ import * as Bull from 'bull' import { logger } from '../../../helpers/logger' import { doRequest } from '../../../helpers/requests' -import { ActorFollowModel } from '../../../models/activitypub/actor-follow' import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' import { JOB_REQUEST_TIMEOUT } from '../../../initializers' +import { ActorFollowScoreCache } from '../../cache' export type ActivitypubHttpUnicastPayload = { uri: string @@ -31,9 +31,9 @@ async function processActivityPubHttpUnicast (job: Bull.Job) { try { await doRequest(options) - ActorFollowModel.updateActorFollowsScore([ uri ], [], undefined) + ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], []) } catch (err) { - ActorFollowModel.updateActorFollowsScore([], [ uri ], undefined) + ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ]) throw err } diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index e34be7dcd..ba9cbe0d9 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -165,10 +165,10 @@ class JobQueue { return total } - removeOldJobs () { + async removeOldJobs () { for (const key of Object.keys(this.queues)) { const queue = this.queues[key] - queue.clean(JOB_COMPLETED_LIFETIME, 'completed') + await queue.clean(JOB_COMPLETED_LIFETIME, 'completed') } } diff --git a/server/lib/schedulers/abstract-scheduler.ts b/server/lib/schedulers/abstract-scheduler.ts index b9d0a4d17..86ea7aa38 100644 --- a/server/lib/schedulers/abstract-scheduler.ts +++ b/server/lib/schedulers/abstract-scheduler.ts @@ -1,8 +1,11 @@ +import { logger } from '../../helpers/logger' + export abstract class AbstractScheduler { protected abstract schedulerIntervalMs: number private interval: NodeJS.Timer + private isRunning = false enable () { if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.') @@ -14,5 +17,18 @@ export abstract class AbstractScheduler { clearInterval(this.interval) } - abstract execute () + async execute () { + if (this.isRunning === true) return + this.isRunning = true + + try { + await this.internalExecute() + } catch (err) { + logger.error('Cannot execute %s scheduler.', this.constructor.name, { err }) + } finally { + this.isRunning = false + } + } + + protected abstract internalExecute (): Promise } diff --git a/server/lib/schedulers/actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts new file mode 100644 index 000000000..3967be7f8 --- /dev/null +++ b/server/lib/schedulers/actor-follow-scheduler.ts @@ -0,0 +1,47 @@ +import { isTestInstance } from '../../helpers/core-utils' +import { logger } from '../../helpers/logger' +import { ActorFollowModel } from '../../models/activitypub/actor-follow' +import { AbstractScheduler } from './abstract-scheduler' +import { SCHEDULER_INTERVALS_MS } from '../../initializers' +import { ActorFollowScoreCache } from '../cache' + +export class ActorFollowScheduler extends AbstractScheduler { + + private static instance: AbstractScheduler + + protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.actorFollowScores + + private constructor () { + super() + } + + protected async internalExecute () { + await this.processPendingScores() + + await this.removeBadActorFollows() + } + + private async processPendingScores () { + const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScoreCopy() + + ActorFollowScoreCache.Instance.clearPendingFollowsScore() + + for (const inbox of Object.keys(pendingScores)) { + await ActorFollowModel.updateFollowScore(inbox, pendingScores[inbox]) + } + } + + private async removeBadActorFollows () { + if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).') + + try { + await ActorFollowModel.removeBadActorFollows() + } catch (err) { + logger.error('Error in bad actor follows scheduler.', { err }) + } + } + + static get Instance () { + return this.instance || (this.instance = new this()) + } +} diff --git a/server/lib/schedulers/bad-actor-follow-scheduler.ts b/server/lib/schedulers/bad-actor-follow-scheduler.ts deleted file mode 100644 index 617149aaf..000000000 --- a/server/lib/schedulers/bad-actor-follow-scheduler.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { isTestInstance } from '../../helpers/core-utils' -import { logger } from '../../helpers/logger' -import { ActorFollowModel } from '../../models/activitypub/actor-follow' -import { AbstractScheduler } from './abstract-scheduler' -import { SCHEDULER_INTERVALS_MS } from '../../initializers' - -export class BadActorFollowScheduler extends AbstractScheduler { - - private static instance: AbstractScheduler - - protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.badActorFollow - - private constructor () { - super() - } - - async execute () { - if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).') - - try { - await ActorFollowModel.removeBadActorFollows() - } catch (err) { - logger.error('Error in bad actor follows scheduler.', { err }) - } - } - - static get Instance () { - return this.instance || (this.instance = new this()) - } -} diff --git a/server/lib/schedulers/remove-old-jobs-scheduler.ts b/server/lib/schedulers/remove-old-jobs-scheduler.ts index a29a6b800..4a4341ba9 100644 --- a/server/lib/schedulers/remove-old-jobs-scheduler.ts +++ b/server/lib/schedulers/remove-old-jobs-scheduler.ts @@ -14,10 +14,10 @@ export class RemoveOldJobsScheduler extends AbstractScheduler { super() } - async execute () { - if (!isTestInstance()) logger.info('Removing old jobs (scheduler).') + protected internalExecute () { + if (!isTestInstance()) logger.info('Removing old jobs in scheduler.') - JobQueue.Instance.removeOldJobs() + return JobQueue.Instance.removeOldJobs() } static get Instance () { diff --git a/server/lib/schedulers/update-videos-scheduler.ts b/server/lib/schedulers/update-videos-scheduler.ts index fd2edfd17..21f071f9e 100644 --- a/server/lib/schedulers/update-videos-scheduler.ts +++ b/server/lib/schedulers/update-videos-scheduler.ts @@ -12,23 +12,12 @@ export class UpdateVideosScheduler extends AbstractScheduler { protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.updateVideos - private isRunning = false - private constructor () { super() } - async execute () { - if (this.isRunning === true) return - this.isRunning = true - - try { - await retryTransactionWrapper(this.updateVideos.bind(this)) - } catch (err) { - logger.error('Cannot execute update videos scheduler.', { err }) - } finally { - this.isRunning = false - } + protected async internalExecute () { + return retryTransactionWrapper(this.updateVideos.bind(this)) } private async updateVideos () { diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts index 15e094d39..f643ee226 100644 --- a/server/lib/schedulers/videos-redundancy-scheduler.ts +++ b/server/lib/schedulers/videos-redundancy-scheduler.ts @@ -16,7 +16,6 @@ import { getOrCreateVideoAndAccountAndChannel } from '../activitypub' export class VideosRedundancyScheduler extends AbstractScheduler { private static instance: AbstractScheduler - private executing = false protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL @@ -24,11 +23,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler { super() } - async execute () { - if (this.executing) return - - this.executing = true - + protected async internalExecute () { for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) @@ -57,8 +52,6 @@ export class VideosRedundancyScheduler extends AbstractScheduler { await this.extendsLocalExpiration() await this.purgeRemoteExpired() - - this.executing = false } static get Instance () { diff --git a/server/lib/schedulers/youtube-dl-update-scheduler.ts b/server/lib/schedulers/youtube-dl-update-scheduler.ts index 461cd045e..aa027116d 100644 --- a/server/lib/schedulers/youtube-dl-update-scheduler.ts +++ b/server/lib/schedulers/youtube-dl-update-scheduler.ts @@ -12,7 +12,7 @@ export class YoutubeDlUpdateScheduler extends AbstractScheduler { super() } - execute () { + protected internalExecute () { return updateYoutubeDLBinary() } diff --git a/server/models/activitypub/actor-follow.ts b/server/models/activitypub/actor-follow.ts index 0a6935083..994f791de 100644 --- a/server/models/activitypub/actor-follow.ts +++ b/server/models/activitypub/actor-follow.ts @@ -127,22 +127,6 @@ export class ActorFollowModel extends Model { if (numberOfActorFollowsRemoved) logger.info('Removed bad %d actor follows.', numberOfActorFollowsRemoved) } - static updateActorFollowsScore (goodInboxes: string[], badInboxes: string[], t: Sequelize.Transaction | undefined) { - if (goodInboxes.length === 0 && badInboxes.length === 0) return - - logger.info('Updating %d good actor follows and %d bad actor follows scores.', goodInboxes.length, badInboxes.length) - - if (goodInboxes.length !== 0) { - ActorFollowModel.incrementScores(goodInboxes, ACTOR_FOLLOW_SCORE.BONUS, t) - .catch(err => logger.error('Cannot increment scores of good actor follows.', { err })) - } - - if (badInboxes.length !== 0) { - ActorFollowModel.incrementScores(badInboxes, ACTOR_FOLLOW_SCORE.PENALTY, t) - .catch(err => logger.error('Cannot decrement scores of bad actor follows.', { err })) - } - } - static loadByActorAndTarget (actorId: number, targetActorId: number, t?: Sequelize.Transaction) { const query = { where: { @@ -464,6 +448,22 @@ export class ActorFollowModel extends Model { } } + static updateFollowScore (inboxUrl: string, value: number, t?: Sequelize.Transaction) { + const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` + + 'WHERE id IN (' + + 'SELECT "actorFollow"."id" FROM "actorFollow" ' + + 'INNER JOIN "actor" ON "actor"."id" = "actorFollow"."actorId" ' + + `WHERE "actor"."inboxUrl" = '${inboxUrl}' OR "actor"."sharedInboxUrl" = '${inboxUrl}'` + + ')' + + const options = { + type: Sequelize.QueryTypes.BULKUPDATE, + transaction: t + } + + return ActorFollowModel.sequelize.query(query, options) + } + private static async createListAcceptedFollowForApiQuery ( type: 'followers' | 'following', actorIds: number[], @@ -518,24 +518,6 @@ export class ActorFollowModel extends Model { } } - private static incrementScores (inboxUrls: string[], value: number, t: Sequelize.Transaction | undefined) { - const inboxUrlsString = inboxUrls.map(url => `'${url}'`).join(',') - - const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` + - 'WHERE id IN (' + - 'SELECT "actorFollow"."id" FROM "actorFollow" ' + - 'INNER JOIN "actor" ON "actor"."id" = "actorFollow"."actorId" ' + - 'WHERE "actor"."inboxUrl" IN (' + inboxUrlsString + ') OR "actor"."sharedInboxUrl" IN (' + inboxUrlsString + ')' + - ')' - - const options = t ? { - type: Sequelize.QueryTypes.BULKUPDATE, - transaction: t - } : undefined - - return ActorFollowModel.sequelize.query(query, options) - } - private static listBadActorFollows () { const query = { where: { -- cgit v1.2.3