diff options
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 20 |
1 files changed, 15 insertions, 5 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 14e181835..8d97434ac 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -10,6 +10,7 @@ import { | |||
10 | RefreshPayload, | 10 | RefreshPayload, |
11 | VideoFileImportPayload, | 11 | VideoFileImportPayload, |
12 | VideoImportPayload, | 12 | VideoImportPayload, |
13 | VideoLiveEndingPayload, | ||
13 | VideoRedundancyPayload, | 14 | VideoRedundancyPayload, |
14 | VideoTranscodingPayload | 15 | VideoTranscodingPayload |
15 | } from '../../../shared/models' | 16 | } from '../../../shared/models' |
@@ -27,6 +28,7 @@ import { processVideosViews } from './handlers/video-views' | |||
27 | import { refreshAPObject } from './handlers/activitypub-refresher' | 28 | import { refreshAPObject } from './handlers/activitypub-refresher' |
28 | import { processVideoFileImport } from './handlers/video-file-import' | 29 | import { processVideoFileImport } from './handlers/video-file-import' |
29 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' | 30 | import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy' |
31 | import { processVideoLiveEnding } from './handlers/video-live-ending' | ||
30 | 32 | ||
31 | type CreateJobArgument = | 33 | type CreateJobArgument = |
32 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 34 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -39,8 +41,13 @@ type CreateJobArgument = | |||
39 | { type: 'video-import', payload: VideoImportPayload } | | 41 | { type: 'video-import', payload: VideoImportPayload } | |
40 | { type: 'activitypub-refresher', payload: RefreshPayload } | | 42 | { type: 'activitypub-refresher', payload: RefreshPayload } | |
41 | { type: 'videos-views', payload: {} } | | 43 | { type: 'videos-views', payload: {} } | |
44 | { type: 'video-live-ending', payload: VideoLiveEndingPayload } | | ||
42 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | 45 | { type: 'video-redundancy', payload: VideoRedundancyPayload } |
43 | 46 | ||
47 | type CreateJobOptions = { | ||
48 | delay?: number | ||
49 | } | ||
50 | |||
44 | const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { | 51 | const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { |
45 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 52 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
46 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | 53 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
@@ -52,6 +59,7 @@ const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { | |||
52 | 'video-import': processVideoImport, | 59 | 'video-import': processVideoImport, |
53 | 'videos-views': processVideosViews, | 60 | 'videos-views': processVideosViews, |
54 | 'activitypub-refresher': refreshAPObject, | 61 | 'activitypub-refresher': refreshAPObject, |
62 | 'video-live-ending': processVideoLiveEnding, | ||
55 | 'video-redundancy': processVideoRedundancy | 63 | 'video-redundancy': processVideoRedundancy |
56 | } | 64 | } |
57 | 65 | ||
@@ -66,7 +74,8 @@ const jobTypes: JobType[] = [ | |||
66 | 'video-import', | 74 | 'video-import', |
67 | 'videos-views', | 75 | 'videos-views', |
68 | 'activitypub-refresher', | 76 | 'activitypub-refresher', |
69 | 'video-redundancy' | 77 | 'video-redundancy', |
78 | 'video-live-ending' | ||
70 | ] | 79 | ] |
71 | 80 | ||
72 | class JobQueue { | 81 | class JobQueue { |
@@ -122,12 +131,12 @@ class JobQueue { | |||
122 | } | 131 | } |
123 | } | 132 | } |
124 | 133 | ||
125 | createJob (obj: CreateJobArgument): void { | 134 | createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void { |
126 | this.createJobWithPromise(obj) | 135 | this.createJobWithPromise(obj, options) |
127 | .catch(err => logger.error('Cannot create job.', { err, obj })) | 136 | .catch(err => logger.error('Cannot create job.', { err, obj })) |
128 | } | 137 | } |
129 | 138 | ||
130 | createJobWithPromise (obj: CreateJobArgument) { | 139 | createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) { |
131 | const queue = this.queues[obj.type] | 140 | const queue = this.queues[obj.type] |
132 | if (queue === undefined) { | 141 | if (queue === undefined) { |
133 | logger.error('Unknown queue %s: cannot create job.', obj.type) | 142 | logger.error('Unknown queue %s: cannot create job.', obj.type) |
@@ -137,7 +146,8 @@ class JobQueue { | |||
137 | const jobArgs: Bull.JobOptions = { | 146 | const jobArgs: Bull.JobOptions = { |
138 | backoff: { delay: 60 * 1000, type: 'exponential' }, | 147 | backoff: { delay: 60 * 1000, type: 'exponential' }, |
139 | attempts: JOB_ATTEMPTS[obj.type], | 148 | attempts: JOB_ATTEMPTS[obj.type], |
140 | timeout: JOB_TTL[obj.type] | 149 | timeout: JOB_TTL[obj.type], |
150 | delay: options.delay | ||
141 | } | 151 | } |
142 | 152 | ||
143 | return queue.add(obj.payload, jobArgs) | 153 | return queue.add(obj.payload, jobArgs) |