aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2018-12-20 14:31:11 +0100
committerChocobozzz <me@florianbigard.com>2018-12-20 14:31:11 +0100
commit2f5c6b2fc6e60502c2a8df4dc9029c1d87ebe30b (patch)
tree9b83f018403c30421c1b3ca4439c4dddc5077881 /server
parent4707f410ae44b55e17e1758693dd21cff03b7ef1 (diff)
downloadPeerTube-2f5c6b2fc6e60502c2a8df4dc9029c1d87ebe30b.tar.gz
PeerTube-2f5c6b2fc6e60502c2a8df4dc9029c1d87ebe30b.tar.zst
PeerTube-2f5c6b2fc6e60502c2a8df4dc9029c1d87ebe30b.zip
Optimize actor follow scores modifications
Diffstat (limited to 'server')
-rw-r--r--server/initializers/constants.ts4
-rw-r--r--server/lib/cache/actor-follow-score-cache.ts46
-rw-r--r--server/lib/cache/index.ts1
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-broadcast.ts3
-rw-r--r--server/lib/job-queue/handlers/activitypub-http-unicast.ts6
-rw-r--r--server/lib/job-queue/job-queue.ts4
-rw-r--r--server/lib/schedulers/abstract-scheduler.ts18
-rw-r--r--server/lib/schedulers/actor-follow-scheduler.ts (renamed from server/lib/schedulers/bad-actor-follow-scheduler.ts)23
-rw-r--r--server/lib/schedulers/remove-old-jobs-scheduler.ts6
-rw-r--r--server/lib/schedulers/update-videos-scheduler.ts15
-rw-r--r--server/lib/schedulers/videos-redundancy-scheduler.ts9
-rw-r--r--server/lib/schedulers/youtube-dl-update-scheduler.ts2
-rw-r--r--server/models/activitypub/actor-follow.ts50
13 files changed, 116 insertions, 71 deletions
diff --git a/server/initializers/constants.ts b/server/initializers/constants.ts
index b326a6c7b..1c27a9f6b 100644
--- a/server/initializers/constants.ts
+++ b/server/initializers/constants.ts
@@ -144,7 +144,7 @@ const VIDEO_IMPORT_TIMEOUT = 1000 * 3600 // 1 hour
144 144
145// 1 hour 145// 1 hour
146let SCHEDULER_INTERVALS_MS = { 146let SCHEDULER_INTERVALS_MS = {
147 badActorFollow: 60000 * 60, // 1 hour 147 actorFollowScores: 60000 * 60, // 1 hour
148 removeOldJobs: 60000 * 60, // 1 hour 148 removeOldJobs: 60000 * 60, // 1 hour
149 updateVideos: 60000, // 1 minute 149 updateVideos: 60000, // 1 minute
150 youtubeDLUpdate: 60000 * 60 * 24 // 1 day 150 youtubeDLUpdate: 60000 * 60 * 24 // 1 day
@@ -675,7 +675,7 @@ if (isTestInstance() === true) {
675 675
676 CONSTRAINTS_FIELDS.ACTORS.AVATAR.FILE_SIZE.max = 100 * 1024 // 100KB 676 CONSTRAINTS_FIELDS.ACTORS.AVATAR.FILE_SIZE.max = 100 * 1024 // 100KB
677 677
678 SCHEDULER_INTERVALS_MS.badActorFollow = 10000 678 SCHEDULER_INTERVALS_MS.actorFollowScores = 1000
679 SCHEDULER_INTERVALS_MS.removeOldJobs = 10000 679 SCHEDULER_INTERVALS_MS.removeOldJobs = 10000
680 SCHEDULER_INTERVALS_MS.updateVideos = 5000 680 SCHEDULER_INTERVALS_MS.updateVideos = 5000
681 REPEAT_JOBS['videos-views'] = { every: 5000 } 681 REPEAT_JOBS['videos-views'] = { every: 5000 }
diff --git a/server/lib/cache/actor-follow-score-cache.ts b/server/lib/cache/actor-follow-score-cache.ts
new file mode 100644
index 000000000..d070bde09
--- /dev/null
+++ b/server/lib/cache/actor-follow-score-cache.ts
@@ -0,0 +1,46 @@
1import { ACTOR_FOLLOW_SCORE } from '../../initializers'
2import { logger } from '../../helpers/logger'
3
4// Cache follows scores, instead of writing them too often in database
5// Keep data in memory, we don't really need Redis here as we don't really care to loose some scores
6class ActorFollowScoreCache {
7
8 private static instance: ActorFollowScoreCache
9 private pendingFollowsScore: { [ url: string ]: number } = {}
10
11 private constructor () {}
12
13 static get Instance () {
14 return this.instance || (this.instance = new this())
15 }
16
17 updateActorFollowsScore (goodInboxes: string[], badInboxes: string[]) {
18 if (goodInboxes.length === 0 && badInboxes.length === 0) return
19
20 logger.info('Updating %d good actor follows and %d bad actor follows scores in cache.', goodInboxes.length, badInboxes.length)
21
22 for (const goodInbox of goodInboxes) {
23 if (this.pendingFollowsScore[goodInbox] === undefined) this.pendingFollowsScore[goodInbox] = 0
24
25 this.pendingFollowsScore[goodInbox] += ACTOR_FOLLOW_SCORE.BONUS
26 }
27
28 for (const badInbox of badInboxes) {
29 if (this.pendingFollowsScore[badInbox] === undefined) this.pendingFollowsScore[badInbox] = 0
30
31 this.pendingFollowsScore[badInbox] += ACTOR_FOLLOW_SCORE.PENALTY
32 }
33 }
34
35 getPendingFollowsScoreCopy () {
36 return this.pendingFollowsScore
37 }
38
39 clearPendingFollowsScore () {
40 this.pendingFollowsScore = {}
41 }
42}
43
44export {
45 ActorFollowScoreCache
46}
diff --git a/server/lib/cache/index.ts b/server/lib/cache/index.ts
index 54eb983fa..e921d04a7 100644
--- a/server/lib/cache/index.ts
+++ b/server/lib/cache/index.ts
@@ -1,2 +1,3 @@
1export * from './actor-follow-score-cache'
1export * from './videos-preview-cache' 2export * from './videos-preview-cache'
2export * from './videos-caption-cache' 3export * from './videos-caption-cache'
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
index abbd89b3b..9493945ff 100644
--- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts
@@ -5,6 +5,7 @@ import { doRequest } from '../../../helpers/requests'
5import { ActorFollowModel } from '../../../models/activitypub/actor-follow' 5import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
6import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' 6import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
7import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers' 7import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers'
8import { ActorFollowScoreCache } from '../../cache'
8 9
9export type ActivitypubHttpBroadcastPayload = { 10export type ActivitypubHttpBroadcastPayload = {
10 uris: string[] 11 uris: string[]
@@ -38,7 +39,7 @@ async function processActivityPubHttpBroadcast (job: Bull.Job) {
38 .catch(() => badUrls.push(uri)) 39 .catch(() => badUrls.push(uri))
39 }, { concurrency: BROADCAST_CONCURRENCY }) 40 }, { concurrency: BROADCAST_CONCURRENCY })
40 41
41 return ActorFollowModel.updateActorFollowsScore(goodUrls, badUrls, undefined) 42 return ActorFollowScoreCache.Instance.updateActorFollowsScore(goodUrls, badUrls)
42} 43}
43 44
44// --------------------------------------------------------------------------- 45// ---------------------------------------------------------------------------
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
index d36479032..3973dcdc8 100644
--- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts
+++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts
@@ -1,9 +1,9 @@
1import * as Bull from 'bull' 1import * as Bull from 'bull'
2import { logger } from '../../../helpers/logger' 2import { logger } from '../../../helpers/logger'
3import { doRequest } from '../../../helpers/requests' 3import { doRequest } from '../../../helpers/requests'
4import { ActorFollowModel } from '../../../models/activitypub/actor-follow'
5import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' 4import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils'
6import { JOB_REQUEST_TIMEOUT } from '../../../initializers' 5import { JOB_REQUEST_TIMEOUT } from '../../../initializers'
6import { ActorFollowScoreCache } from '../../cache'
7 7
8export type ActivitypubHttpUnicastPayload = { 8export type ActivitypubHttpUnicastPayload = {
9 uri: string 9 uri: string
@@ -31,9 +31,9 @@ async function processActivityPubHttpUnicast (job: Bull.Job) {
31 31
32 try { 32 try {
33 await doRequest(options) 33 await doRequest(options)
34 ActorFollowModel.updateActorFollowsScore([ uri ], [], undefined) 34 ActorFollowScoreCache.Instance.updateActorFollowsScore([ uri ], [])
35 } catch (err) { 35 } catch (err) {
36 ActorFollowModel.updateActorFollowsScore([], [ uri ], undefined) 36 ActorFollowScoreCache.Instance.updateActorFollowsScore([], [ uri ])
37 37
38 throw err 38 throw err
39 } 39 }
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index e34be7dcd..ba9cbe0d9 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -165,10 +165,10 @@ class JobQueue {
165 return total 165 return total
166 } 166 }
167 167
168 removeOldJobs () { 168 async removeOldJobs () {
169 for (const key of Object.keys(this.queues)) { 169 for (const key of Object.keys(this.queues)) {
170 const queue = this.queues[key] 170 const queue = this.queues[key]
171 queue.clean(JOB_COMPLETED_LIFETIME, 'completed') 171 await queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
172 } 172 }
173 } 173 }
174 174
diff --git a/server/lib/schedulers/abstract-scheduler.ts b/server/lib/schedulers/abstract-scheduler.ts
index b9d0a4d17..86ea7aa38 100644
--- a/server/lib/schedulers/abstract-scheduler.ts
+++ b/server/lib/schedulers/abstract-scheduler.ts
@@ -1,8 +1,11 @@
1import { logger } from '../../helpers/logger'
2
1export abstract class AbstractScheduler { 3export abstract class AbstractScheduler {
2 4
3 protected abstract schedulerIntervalMs: number 5 protected abstract schedulerIntervalMs: number
4 6
5 private interval: NodeJS.Timer 7 private interval: NodeJS.Timer
8 private isRunning = false
6 9
7 enable () { 10 enable () {
8 if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.') 11 if (!this.schedulerIntervalMs) throw new Error('Interval is not correctly set.')
@@ -14,5 +17,18 @@ export abstract class AbstractScheduler {
14 clearInterval(this.interval) 17 clearInterval(this.interval)
15 } 18 }
16 19
17 abstract execute () 20 async execute () {
21 if (this.isRunning === true) return
22 this.isRunning = true
23
24 try {
25 await this.internalExecute()
26 } catch (err) {
27 logger.error('Cannot execute %s scheduler.', this.constructor.name, { err })
28 } finally {
29 this.isRunning = false
30 }
31 }
32
33 protected abstract internalExecute (): Promise<any>
18} 34}
diff --git a/server/lib/schedulers/bad-actor-follow-scheduler.ts b/server/lib/schedulers/actor-follow-scheduler.ts
index 617149aaf..3967be7f8 100644
--- a/server/lib/schedulers/bad-actor-follow-scheduler.ts
+++ b/server/lib/schedulers/actor-follow-scheduler.ts
@@ -3,18 +3,35 @@ import { logger } from '../../helpers/logger'
3import { ActorFollowModel } from '../../models/activitypub/actor-follow' 3import { ActorFollowModel } from '../../models/activitypub/actor-follow'
4import { AbstractScheduler } from './abstract-scheduler' 4import { AbstractScheduler } from './abstract-scheduler'
5import { SCHEDULER_INTERVALS_MS } from '../../initializers' 5import { SCHEDULER_INTERVALS_MS } from '../../initializers'
6import { ActorFollowScoreCache } from '../cache'
6 7
7export class BadActorFollowScheduler extends AbstractScheduler { 8export class ActorFollowScheduler extends AbstractScheduler {
8 9
9 private static instance: AbstractScheduler 10 private static instance: AbstractScheduler
10 11
11 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.badActorFollow 12 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.actorFollowScores
12 13
13 private constructor () { 14 private constructor () {
14 super() 15 super()
15 } 16 }
16 17
17 async execute () { 18 protected async internalExecute () {
19 await this.processPendingScores()
20
21 await this.removeBadActorFollows()
22 }
23
24 private async processPendingScores () {
25 const pendingScores = ActorFollowScoreCache.Instance.getPendingFollowsScoreCopy()
26
27 ActorFollowScoreCache.Instance.clearPendingFollowsScore()
28
29 for (const inbox of Object.keys(pendingScores)) {
30 await ActorFollowModel.updateFollowScore(inbox, pendingScores[inbox])
31 }
32 }
33
34 private async removeBadActorFollows () {
18 if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).') 35 if (!isTestInstance()) logger.info('Removing bad actor follows (scheduler).')
19 36
20 try { 37 try {
diff --git a/server/lib/schedulers/remove-old-jobs-scheduler.ts b/server/lib/schedulers/remove-old-jobs-scheduler.ts
index a29a6b800..4a4341ba9 100644
--- a/server/lib/schedulers/remove-old-jobs-scheduler.ts
+++ b/server/lib/schedulers/remove-old-jobs-scheduler.ts
@@ -14,10 +14,10 @@ export class RemoveOldJobsScheduler extends AbstractScheduler {
14 super() 14 super()
15 } 15 }
16 16
17 async execute () { 17 protected internalExecute () {
18 if (!isTestInstance()) logger.info('Removing old jobs (scheduler).') 18 if (!isTestInstance()) logger.info('Removing old jobs in scheduler.')
19 19
20 JobQueue.Instance.removeOldJobs() 20 return JobQueue.Instance.removeOldJobs()
21 } 21 }
22 22
23 static get Instance () { 23 static get Instance () {
diff --git a/server/lib/schedulers/update-videos-scheduler.ts b/server/lib/schedulers/update-videos-scheduler.ts
index fd2edfd17..21f071f9e 100644
--- a/server/lib/schedulers/update-videos-scheduler.ts
+++ b/server/lib/schedulers/update-videos-scheduler.ts
@@ -12,23 +12,12 @@ export class UpdateVideosScheduler extends AbstractScheduler {
12 12
13 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.updateVideos 13 protected schedulerIntervalMs = SCHEDULER_INTERVALS_MS.updateVideos
14 14
15 private isRunning = false
16
17 private constructor () { 15 private constructor () {
18 super() 16 super()
19 } 17 }
20 18
21 async execute () { 19 protected async internalExecute () {
22 if (this.isRunning === true) return 20 return retryTransactionWrapper(this.updateVideos.bind(this))
23 this.isRunning = true
24
25 try {
26 await retryTransactionWrapper(this.updateVideos.bind(this))
27 } catch (err) {
28 logger.error('Cannot execute update videos scheduler.', { err })
29 } finally {
30 this.isRunning = false
31 }
32 } 21 }
33 22
34 private async updateVideos () { 23 private async updateVideos () {
diff --git a/server/lib/schedulers/videos-redundancy-scheduler.ts b/server/lib/schedulers/videos-redundancy-scheduler.ts
index 15e094d39..f643ee226 100644
--- a/server/lib/schedulers/videos-redundancy-scheduler.ts
+++ b/server/lib/schedulers/videos-redundancy-scheduler.ts
@@ -16,7 +16,6 @@ import { getOrCreateVideoAndAccountAndChannel } from '../activitypub'
16export class VideosRedundancyScheduler extends AbstractScheduler { 16export class VideosRedundancyScheduler extends AbstractScheduler {
17 17
18 private static instance: AbstractScheduler 18 private static instance: AbstractScheduler
19 private executing = false
20 19
21 protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL 20 protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL
22 21
@@ -24,11 +23,7 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
24 super() 23 super()
25 } 24 }
26 25
27 async execute () { 26 protected async internalExecute () {
28 if (this.executing) return
29
30 this.executing = true
31
32 for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) { 27 for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
33 logger.info('Running redundancy scheduler for strategy %s.', obj.strategy) 28 logger.info('Running redundancy scheduler for strategy %s.', obj.strategy)
34 29
@@ -57,8 +52,6 @@ export class VideosRedundancyScheduler extends AbstractScheduler {
57 await this.extendsLocalExpiration() 52 await this.extendsLocalExpiration()
58 53
59 await this.purgeRemoteExpired() 54 await this.purgeRemoteExpired()
60
61 this.executing = false
62 } 55 }
63 56
64 static get Instance () { 57 static get Instance () {
diff --git a/server/lib/schedulers/youtube-dl-update-scheduler.ts b/server/lib/schedulers/youtube-dl-update-scheduler.ts
index 461cd045e..aa027116d 100644
--- a/server/lib/schedulers/youtube-dl-update-scheduler.ts
+++ b/server/lib/schedulers/youtube-dl-update-scheduler.ts
@@ -12,7 +12,7 @@ export class YoutubeDlUpdateScheduler extends AbstractScheduler {
12 super() 12 super()
13 } 13 }
14 14
15 execute () { 15 protected internalExecute () {
16 return updateYoutubeDLBinary() 16 return updateYoutubeDLBinary()
17 } 17 }
18 18
diff --git a/server/models/activitypub/actor-follow.ts b/server/models/activitypub/actor-follow.ts
index 0a6935083..994f791de 100644
--- a/server/models/activitypub/actor-follow.ts
+++ b/server/models/activitypub/actor-follow.ts
@@ -127,22 +127,6 @@ export class ActorFollowModel extends Model<ActorFollowModel> {
127 if (numberOfActorFollowsRemoved) logger.info('Removed bad %d actor follows.', numberOfActorFollowsRemoved) 127 if (numberOfActorFollowsRemoved) logger.info('Removed bad %d actor follows.', numberOfActorFollowsRemoved)
128 } 128 }
129 129
130 static updateActorFollowsScore (goodInboxes: string[], badInboxes: string[], t: Sequelize.Transaction | undefined) {
131 if (goodInboxes.length === 0 && badInboxes.length === 0) return
132
133 logger.info('Updating %d good actor follows and %d bad actor follows scores.', goodInboxes.length, badInboxes.length)
134
135 if (goodInboxes.length !== 0) {
136 ActorFollowModel.incrementScores(goodInboxes, ACTOR_FOLLOW_SCORE.BONUS, t)
137 .catch(err => logger.error('Cannot increment scores of good actor follows.', { err }))
138 }
139
140 if (badInboxes.length !== 0) {
141 ActorFollowModel.incrementScores(badInboxes, ACTOR_FOLLOW_SCORE.PENALTY, t)
142 .catch(err => logger.error('Cannot decrement scores of bad actor follows.', { err }))
143 }
144 }
145
146 static loadByActorAndTarget (actorId: number, targetActorId: number, t?: Sequelize.Transaction) { 130 static loadByActorAndTarget (actorId: number, targetActorId: number, t?: Sequelize.Transaction) {
147 const query = { 131 const query = {
148 where: { 132 where: {
@@ -464,6 +448,22 @@ export class ActorFollowModel extends Model<ActorFollowModel> {
464 } 448 }
465 } 449 }
466 450
451 static updateFollowScore (inboxUrl: string, value: number, t?: Sequelize.Transaction) {
452 const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` +
453 'WHERE id IN (' +
454 'SELECT "actorFollow"."id" FROM "actorFollow" ' +
455 'INNER JOIN "actor" ON "actor"."id" = "actorFollow"."actorId" ' +
456 `WHERE "actor"."inboxUrl" = '${inboxUrl}' OR "actor"."sharedInboxUrl" = '${inboxUrl}'` +
457 ')'
458
459 const options = {
460 type: Sequelize.QueryTypes.BULKUPDATE,
461 transaction: t
462 }
463
464 return ActorFollowModel.sequelize.query(query, options)
465 }
466
467 private static async createListAcceptedFollowForApiQuery ( 467 private static async createListAcceptedFollowForApiQuery (
468 type: 'followers' | 'following', 468 type: 'followers' | 'following',
469 actorIds: number[], 469 actorIds: number[],
@@ -518,24 +518,6 @@ export class ActorFollowModel extends Model<ActorFollowModel> {
518 } 518 }
519 } 519 }
520 520
521 private static incrementScores (inboxUrls: string[], value: number, t: Sequelize.Transaction | undefined) {
522 const inboxUrlsString = inboxUrls.map(url => `'${url}'`).join(',')
523
524 const query = `UPDATE "actorFollow" SET "score" = LEAST("score" + ${value}, ${ACTOR_FOLLOW_SCORE.MAX}) ` +
525 'WHERE id IN (' +
526 'SELECT "actorFollow"."id" FROM "actorFollow" ' +
527 'INNER JOIN "actor" ON "actor"."id" = "actorFollow"."actorId" ' +
528 'WHERE "actor"."inboxUrl" IN (' + inboxUrlsString + ') OR "actor"."sharedInboxUrl" IN (' + inboxUrlsString + ')' +
529 ')'
530
531 const options = t ? {
532 type: Sequelize.QueryTypes.BULKUPDATE,
533 transaction: t
534 } : undefined
535
536 return ActorFollowModel.sequelize.query(query, options)
537 }
538
539 private static listBadActorFollows () { 521 private static listBadActorFollows () {
540 const query = { 522 const query = {
541 where: { 523 where: {