diff options
author | Chocobozzz <me@florianbigard.com> | 2022-11-23 17:13:01 +0100 |
---|---|---|
committer | Chocobozzz <me@florianbigard.com> | 2022-11-23 17:13:01 +0100 |
commit | c3b21b68b55fa5cb0ee7a23e4316b5466a0448f0 (patch) | |
tree | 8b7d0be8a57dc9a57a6f9d3af4a69578f06c0c4c /server/lib/job-queue | |
parent | 5800f354ccd4845d0b4bc212a0c53a0ff40edf82 (diff) | |
download | PeerTube-c3b21b68b55fa5cb0ee7a23e4316b5466a0448f0.tar.gz PeerTube-c3b21b68b55fa5cb0ee7a23e4316b5466a0448f0.tar.zst PeerTube-c3b21b68b55fa5cb0ee7a23e4316b5466a0448f0.zip |
Delete completed/failed jobs directly from bullmq
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 46 |
1 files changed, 39 insertions, 7 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 6bc59732f..91366c481 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -41,8 +41,16 @@ import { | |||
41 | VideoTranscodingPayload | 41 | VideoTranscodingPayload |
42 | } from '../../../shared/models' | 42 | } from '../../../shared/models' |
43 | import { logger } from '../../helpers/logger' | 43 | import { logger } from '../../helpers/logger' |
44 | import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants' | 44 | import { |
45 | JOB_ATTEMPTS, | ||
46 | JOB_CONCURRENCY, | ||
47 | JOB_REMOVAL_OPTIONS, | ||
48 | JOB_TTL, | ||
49 | REPEAT_JOBS, | ||
50 | WEBSERVER | ||
51 | } from '../../initializers/constants' | ||
45 | import { Hooks } from '../plugins/hooks' | 52 | import { Hooks } from '../plugins/hooks' |
53 | import { Redis } from '../redis' | ||
46 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' | 54 | import { processActivityPubCleaner } from './handlers/activitypub-cleaner' |
47 | import { processActivityPubFollow } from './handlers/activitypub-follow' | 55 | import { processActivityPubFollow } from './handlers/activitypub-follow' |
48 | import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast' | 56 | import { processActivityPubHttpSequentialBroadcast, processActivityPubParallelHttpBroadcast } from './handlers/activitypub-http-broadcast' |
@@ -63,7 +71,7 @@ import { processVideoLiveEnding } from './handlers/video-live-ending' | |||
63 | import { processVideoStudioEdition } from './handlers/video-studio-edition' | 71 | import { processVideoStudioEdition } from './handlers/video-studio-edition' |
64 | import { processVideoTranscoding } from './handlers/video-transcoding' | 72 | import { processVideoTranscoding } from './handlers/video-transcoding' |
65 | import { processVideosViewsStats } from './handlers/video-views-stats' | 73 | import { processVideosViewsStats } from './handlers/video-views-stats' |
66 | import { Redis } from '../redis' | 74 | import { parseDurationToMs } from '@server/helpers/core-utils' |
67 | 75 | ||
68 | export type CreateJobArgument = | 76 | export type CreateJobArgument = |
69 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 77 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -373,7 +381,7 @@ class JobQueue { | |||
373 | }) | 381 | }) |
374 | } | 382 | } |
375 | 383 | ||
376 | private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions) { | 384 | private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob { |
377 | return { | 385 | return { |
378 | name: 'job', | 386 | name: 'job', |
379 | data: job.payload, | 387 | data: job.payload, |
@@ -387,7 +395,9 @@ class JobQueue { | |||
387 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 395 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
388 | attempts: JOB_ATTEMPTS[type], | 396 | attempts: JOB_ATTEMPTS[type], |
389 | priority: options.priority, | 397 | priority: options.priority, |
390 | delay: options.delay | 398 | delay: options.delay, |
399 | |||
400 | ...this.buildJobRemovalOptions(type) | ||
391 | } | 401 | } |
392 | } | 402 | } |
393 | 403 | ||
@@ -482,18 +492,23 @@ class JobQueue { | |||
482 | async removeOldJobs () { | 492 | async removeOldJobs () { |
483 | for (const key of Object.keys(this.queues)) { | 493 | for (const key of Object.keys(this.queues)) { |
484 | const queue: Queue = this.queues[key] | 494 | const queue: Queue = this.queues[key] |
485 | await queue.clean(JOB_COMPLETED_LIFETIME, 100, 'completed') | 495 | await queue.clean(parseDurationToMs('7 days'), 100, 'completed') |
496 | await queue.clean(parseDurationToMs('7 days'), 100, 'failed') | ||
486 | } | 497 | } |
487 | } | 498 | } |
488 | 499 | ||
489 | private addRepeatableJobs () { | 500 | private addRepeatableJobs () { |
490 | this.queues['videos-views-stats'].add('job', {}, { | 501 | this.queues['videos-views-stats'].add('job', {}, { |
491 | repeat: REPEAT_JOBS['videos-views-stats'] | 502 | repeat: REPEAT_JOBS['videos-views-stats'], |
503 | |||
504 | ...this.buildJobRemovalOptions('videos-views-stats') | ||
492 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | 505 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
493 | 506 | ||
494 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { | 507 | if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) { |
495 | this.queues['activitypub-cleaner'].add('job', {}, { | 508 | this.queues['activitypub-cleaner'].add('job', {}, { |
496 | repeat: REPEAT_JOBS['activitypub-cleaner'] | 509 | repeat: REPEAT_JOBS['activitypub-cleaner'], |
510 | |||
511 | ...this.buildJobRemovalOptions('activitypub-cleaner') | ||
497 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) | 512 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
498 | } | 513 | } |
499 | } | 514 | } |
@@ -505,6 +520,23 @@ class JobQueue { | |||
505 | return JOB_CONCURRENCY[jobType] | 520 | return JOB_CONCURRENCY[jobType] |
506 | } | 521 | } |
507 | 522 | ||
523 | private buildJobRemovalOptions (queueName: string) { | ||
524 | return { | ||
525 | removeOnComplete: { | ||
526 | // Wants seconds | ||
527 | age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000, | ||
528 | |||
529 | count: JOB_REMOVAL_OPTIONS.COUNT | ||
530 | }, | ||
531 | removeOnFail: { | ||
532 | // Wants seconds | ||
533 | age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000, | ||
534 | |||
535 | count: JOB_REMOVAL_OPTIONS.COUNT / 1000 | ||
536 | } | ||
537 | } | ||
538 | } | ||
539 | |||
508 | static get Instance () { | 540 | static get Instance () { |
509 | return this.instance || (this.instance = new this()) | 541 | return this.instance || (this.instance = new this()) |
510 | } | 542 | } |