aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2022-11-23 17:13:01 +0100
committerChocobozzz <me@florianbigard.com>2022-11-23 17:13:01 +0100
commitc3b21b68b55fa5cb0ee7a23e4316b5466a0448f0 (patch)
tree8b7d0be8a57dc9a57a6f9d3af4a69578f06c0c4c /server/lib
parent5800f354ccd4845d0b4bc212a0c53a0ff40edf82 (diff)
downloadPeerTube-c3b21b68b55fa5cb0ee7a23e4316b5466a0448f0.tar.gz
PeerTube-c3b21b68b55fa5cb0ee7a23e4316b5466a0448f0.tar.zst
PeerTube-c3b21b68b55fa5cb0ee7a23e4316b5466a0448f0.zip
Delete completed/failed jobs directly from bullmq
Diffstat (limited to 'server/lib')
-rw-r--r--server/lib/job-queue/job-queue.ts46
-rw-r--r--server/lib/schedulers/remove-old-jobs-scheduler.ts2
2 files changed, 41 insertions, 7 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 6bc59732f..91366c481 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -41,8 +41,16 @@ import {
41 VideoTranscodingPayload 41 VideoTranscodingPayload
42} from '../../../shared/models' 42} from '../../../shared/models'
43import { logger } from '../../helpers/logger' 43import { logger } from '../../helpers/logger'
44import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' 44import {
45 JOB_ATTEMPTS,
46 JOB_CONCURRENCY,
47 JOB_REMOVAL_OPTIONS,
48 JOB_TTL,
49 REPEAT_JOBS,
50 WEBSERVER
51} from '../../initializers/constants'
45import { Hooks } from '../plugins/hooks' 52import { Hooks } from '../plugins/hooks'
53import { Redis } from '../redis'
46import { processActivityPubCleaner } from './handlers/activitypub-cleaner' 54import { processActivityPubCleaner } from './handlers/activitypub-cleaner'
47import { processActivityPubFollow } from './handlers/activitypub-follow' 55import { processActivityPubFollow } from './handlers/activitypub-follow'
48import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast' 56import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast'
@@ -63,7 +71,7 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
63import { processVideoStudioEdition } from './handlers/video-studio-edition' 71import { processVideoStudioEdition } from './handlers/video-studio-edition'
64import { processVideoTranscoding } from './handlers/video-transcoding' 72import { processVideoTranscoding } from './handlers/video-transcoding'
65import { processVideosViewsStats } from './handlers/video-views-stats' 73import { processVideosViewsStats } from './handlers/video-views-stats'
66import { Redis } from '../redis' 74import { parseDurationToMs } from '@server/helpers/core-utils'
67 75
68export type CreateJobArgument = 76export type CreateJobArgument =
69 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 77 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -373,7 +381,7 @@ class JobQueue {
373 }) 381 })
374 } 382 }
375 383
376 private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) { 384 private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob {
377 return { 385 return {
378 name: 'job', 386 name: 'job',
379 data: job.payload, 387 data: job.payload,
@@ -387,7 +395,9 @@ class JobQueue {
387 backoff: { delay: 60 * 1000, type: 'exponential' }, 395 backoff: { delay: 60 * 1000, type: 'exponential' },
388 attempts: JOB_ATTEMPTS[type], 396 attempts: JOB_ATTEMPTS[type],
389 priority: options.priority, 397 priority: options.priority,
390 delay: options.delay 398 delay: options.delay,
399
400 ...this.buildJobRemovalOptions(type)
391 } 401 }
392 } 402 }
393 403
@@ -482,18 +492,23 @@ class JobQueue {
482 async removeOldJobs () { 492 async removeOldJobs () {
483 for (const key of Object.keys(this.queues)) { 493 for (const key of Object.keys(this.queues)) {
484 const queue: Queue = this.queues[key] 494 const queue: Queue = this.queues[key]
485 await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed') 495 await queue.clean(parseDurationToMs('7 days'), 100, 'completed')
496 await queue.clean(parseDurationToMs('7 days'), 100, 'failed')
486 } 497 }
487 } 498 }
488 499
489 private addRepeatableJobs () { 500 private addRepeatableJobs () {
490 this.queues['videos-views-stats'].add('job', {}, { 501 this.queues['videos-views-stats'].add('job', {}, {
491 repeat: REPEAT_JOBS['videos-views-stats'] 502 repeat: REPEAT_JOBS['videos-views-stats'],
503
504 ...this.buildJobRemovalOptions('videos-views-stats')
492 }).catch(err => logger.error('Cannot add repeatable job.', { err })) 505 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
493 506
494 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { 507 if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
495 this.queues['activitypub-cleaner'].add('job', {}, { 508 this.queues['activitypub-cleaner'].add('job', {}, {
496 repeat: REPEAT_JOBS['activitypub-cleaner'] 509 repeat: REPEAT_JOBS['activitypub-cleaner'],
510
511 ...this.buildJobRemovalOptions('activitypub-cleaner')
497 }).catch(err => logger.error('Cannot add repeatable job.', { err })) 512 }).catch(err => logger.error('Cannot add repeatable job.', { err }))
498 } 513 }
499 } 514 }
@@ -505,6 +520,23 @@ class JobQueue {
505 return JOB_CONCURRENCY[jobType] 520 return JOB_CONCURRENCY[jobType]
506 } 521 }
507 522
523 private buildJobRemovalOptions (queueName: string) {
524 return {
525 removeOnComplete: {
526 // Wants seconds
527 age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000,
528
529 count: JOB_REMOVAL_OPTIONS.COUNT
530 },
531 removeOnFail: {
532 // Wants seconds
533 age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000,
534
535 count: JOB_REMOVAL_OPTIONS.COUNT / 1000
536 }
537 }
538 }
539
508 static get Instance () { 540 static get Instance () {
509 return this.instance || (this.instance = new this()) 541 return this.instance || (this.instance = new this())
510 } 542 }
diff --git a/server/lib/schedulers/remove-old-jobs-scheduler.ts b/server/lib/schedulers/remove-old-jobs-scheduler.ts
index 879846999..6b05ea144 100644
--- a/server/lib/schedulers/remove-old-jobs-scheduler.ts
+++ b/server/lib/schedulers/remove-old-jobs-scheduler.ts
@@ -4,6 +4,8 @@ import { SCHEDULER_INTERVALS_MS } from '../../initializers/constants'
4import { JobQueue } from '../job-queue' 4import { JobQueue } from '../job-queue'
5import { AbstractScheduler } from './abstract-scheduler' 5import { AbstractScheduler } from './abstract-scheduler'
6 6
7// FIXME: delete this scheduler in a few versions (introduced in 5.0)
8// We introduced job removal directly using bullmq option but we still need to delete old jobs
7export class RemoveOldJobsScheduler extends AbstractScheduler { 9export class RemoveOldJobsScheduler extends AbstractScheduler {
8 10
9 private static instance: AbstractScheduler 11 private static instance: AbstractScheduler