diff options
Diffstat (limited to 'server/lib/job-queue')
9 files changed, 67 insertions, 39 deletions
diff --git a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts index 0ff7b44a0..7d9dd61e9 100644 --- a/server/lib/job-queue/handlers/activitypub-http-broadcast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-broadcast.ts | |||
@@ -5,11 +5,13 @@ import { doRequest } from '../../../helpers/requests' | |||
5 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 5 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
6 | import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers/constants' | 6 | import { BROADCAST_CONCURRENCY, JOB_REQUEST_TIMEOUT } from '../../../initializers/constants' |
7 | import { ActorFollowScoreCache } from '../../files-cache' | 7 | import { ActorFollowScoreCache } from '../../files-cache' |
8 | import { ContextType } from '@server/helpers/activitypub' | ||
8 | 9 | ||
9 | export type ActivitypubHttpBroadcastPayload = { | 10 | export type ActivitypubHttpBroadcastPayload = { |
10 | uris: string[] | 11 | uris: string[] |
11 | signatureActorId?: number | 12 | signatureActorId?: number |
12 | body: any | 13 | body: any |
14 | contextType?: ContextType | ||
13 | } | 15 | } |
14 | 16 | ||
15 | async function processActivityPubHttpBroadcast (job: Bull.Job) { | 17 | async function processActivityPubHttpBroadcast (job: Bull.Job) { |
diff --git a/server/lib/job-queue/handlers/activitypub-http-unicast.ts b/server/lib/job-queue/handlers/activitypub-http-unicast.ts index c70ce3be9..6b71e2891 100644 --- a/server/lib/job-queue/handlers/activitypub-http-unicast.ts +++ b/server/lib/job-queue/handlers/activitypub-http-unicast.ts | |||
@@ -4,11 +4,13 @@ import { doRequest } from '../../../helpers/requests' | |||
4 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' | 4 | import { buildGlobalHeaders, buildSignedRequestOptions, computeBody } from './utils/activitypub-http-utils' |
5 | import { JOB_REQUEST_TIMEOUT } from '../../../initializers/constants' | 5 | import { JOB_REQUEST_TIMEOUT } from '../../../initializers/constants' |
6 | import { ActorFollowScoreCache } from '../../files-cache' | 6 | import { ActorFollowScoreCache } from '../../files-cache' |
7 | import { ContextType } from '@server/helpers/activitypub' | ||
7 | 8 | ||
8 | export type ActivitypubHttpUnicastPayload = { | 9 | export type ActivitypubHttpUnicastPayload = { |
9 | uri: string | 10 | uri: string |
10 | signatureActorId?: number | 11 | signatureActorId?: number |
11 | body: any | 12 | body: any |
13 | contextType?: ContextType | ||
12 | } | 14 | } |
13 | 15 | ||
14 | async function processActivityPubHttpUnicast (job: Bull.Job) { | 16 | async function processActivityPubHttpUnicast (job: Bull.Job) { |
diff --git a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts index d3bde6e6a..54b35840d 100644 --- a/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts +++ b/server/lib/job-queue/handlers/utils/activitypub-http-utils.ts | |||
@@ -1,11 +1,11 @@ | |||
1 | import { buildSignedActivity } from '../../../../helpers/activitypub' | 1 | import { buildSignedActivity, ContextType } from '../../../../helpers/activitypub' |
2 | import { getServerActor } from '../../../../helpers/utils' | 2 | import { getServerActor } from '../../../../helpers/utils' |
3 | import { ActorModel } from '../../../../models/activitypub/actor' | 3 | import { ActorModel } from '../../../../models/activitypub/actor' |
4 | import { sha256 } from '../../../../helpers/core-utils' | 4 | import { sha256 } from '../../../../helpers/core-utils' |
5 | import { HTTP_SIGNATURE } from '../../../../initializers/constants' | 5 | import { HTTP_SIGNATURE } from '../../../../initializers/constants' |
6 | import { MActor } from '../../../../typings/models' | 6 | import { MActor } from '../../../../typings/models' |
7 | 7 | ||
8 | type Payload = { body: any, signatureActorId?: number } | 8 | type Payload = { body: any, contextType?: ContextType, signatureActorId?: number } |
9 | 9 | ||
10 | async function computeBody (payload: Payload) { | 10 | async function computeBody (payload: Payload) { |
11 | let body = payload.body | 11 | let body = payload.body |
@@ -13,7 +13,7 @@ async function computeBody (payload: Payload) { | |||
13 | if (payload.signatureActorId) { | 13 | if (payload.signatureActorId) { |
14 | const actorSignature = await ActorModel.load(payload.signatureActorId) | 14 | const actorSignature = await ActorModel.load(payload.signatureActorId) |
15 | if (!actorSignature) throw new Error('Unknown signature actor id.') | 15 | if (!actorSignature) throw new Error('Unknown signature actor id.') |
16 | body = await buildSignedActivity(actorSignature, payload.body) | 16 | body = await buildSignedActivity(actorSignature, payload.body, payload.contextType) |
17 | } | 17 | } |
18 | 18 | ||
19 | return body | 19 | return body |
@@ -42,7 +42,7 @@ async function buildSignedRequestOptions (payload: Payload) { | |||
42 | 42 | ||
43 | function buildGlobalHeaders (body: any) { | 43 | function buildGlobalHeaders (body: any) { |
44 | return { | 44 | return { |
45 | 'Digest': buildDigest(body) | 45 | Digest: buildDigest(body) |
46 | } | 46 | } |
47 | } | 47 | } |
48 | 48 | ||
diff --git a/server/lib/job-queue/handlers/video-file-import.ts b/server/lib/job-queue/handlers/video-file-import.ts index 99c991e72..be9e7d181 100644 --- a/server/lib/job-queue/handlers/video-file-import.ts +++ b/server/lib/job-queue/handlers/video-file-import.ts | |||
@@ -11,7 +11,7 @@ import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent' | |||
11 | import { getVideoFilePath } from '@server/lib/video-paths' | 11 | import { getVideoFilePath } from '@server/lib/video-paths' |
12 | 12 | ||
13 | export type VideoFileImportPayload = { | 13 | export type VideoFileImportPayload = { |
14 | videoUUID: string, | 14 | videoUUID: string |
15 | filePath: string | 15 | filePath: string |
16 | } | 16 | } |
17 | 17 | ||
diff --git a/server/lib/job-queue/handlers/video-import.ts b/server/lib/job-queue/handlers/video-import.ts index 1fca17584..09f225cec 100644 --- a/server/lib/job-queue/handlers/video-import.ts +++ b/server/lib/job-queue/handlers/video-import.ts | |||
@@ -221,7 +221,7 @@ async function processFile (downloader: () => Promise<string>, videoImport: MVid | |||
221 | isNewVideo: true | 221 | isNewVideo: true |
222 | } | 222 | } |
223 | 223 | ||
224 | await JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) | 224 | await JobQueue.Instance.createJobWithPromise({ type: 'video-transcoding', payload: dataInput }) |
225 | } | 225 | } |
226 | 226 | ||
227 | } catch (err) { | 227 | } catch (err) { |
diff --git a/server/lib/job-queue/handlers/video-redundancy.ts b/server/lib/job-queue/handlers/video-redundancy.ts new file mode 100644 index 000000000..319d7090e --- /dev/null +++ b/server/lib/job-queue/handlers/video-redundancy.ts | |||
@@ -0,0 +1,20 @@ | |||
1 | import * as Bull from 'bull' | ||
2 | import { logger } from '../../../helpers/logger' | ||
3 | import { VideosRedundancyScheduler } from '@server/lib/schedulers/videos-redundancy-scheduler' | ||
4 | |||
5 | export type VideoRedundancyPayload = { | ||
6 | videoId: number | ||
7 | } | ||
8 | |||
9 | async function processVideoRedundancy (job: Bull.Job) { | ||
10 | const payload = job.data as VideoRedundancyPayload | ||
11 | logger.info('Processing video redundancy in job %d.', job.id) | ||
12 | |||
13 | return VideosRedundancyScheduler.Instance.createManualRedundancy(payload.videoId) | ||
14 | } | ||
15 | |||
16 | // --------------------------------------------------------------------------- | ||
17 | |||
18 | export { | ||
19 | processVideoRedundancy | ||
20 | } | ||
diff --git a/server/lib/job-queue/handlers/video-transcoding.ts b/server/lib/job-queue/handlers/video-transcoding.ts index 39b9fac98..c020057c9 100644 --- a/server/lib/job-queue/handlers/video-transcoding.ts +++ b/server/lib/job-queue/handlers/video-transcoding.ts | |||
@@ -6,7 +6,6 @@ import { JobQueue } from '../job-queue' | |||
6 | import { federateVideoIfNeeded } from '../../activitypub' | 6 | import { federateVideoIfNeeded } from '../../activitypub' |
7 | import { retryTransactionWrapper } from '../../../helpers/database-utils' | 7 | import { retryTransactionWrapper } from '../../../helpers/database-utils' |
8 | import { sequelizeTypescript } from '../../../initializers' | 8 | import { sequelizeTypescript } from '../../../initializers' |
9 | import * as Bluebird from 'bluebird' | ||
10 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' | 9 | import { computeResolutionsToTranscode } from '../../../helpers/ffmpeg-utils' |
11 | import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding' | 10 | import { generateHlsPlaylist, mergeAudioVideofile, optimizeOriginalVideofile, transcodeNewResolution } from '../../video-transcoding' |
12 | import { Notifier } from '../../notifier' | 11 | import { Notifier } from '../../notifier' |
@@ -40,8 +39,11 @@ interface OptimizeTranscodingPayload extends BaseTranscodingPayload { | |||
40 | type: 'optimize' | 39 | type: 'optimize' |
41 | } | 40 | } |
42 | 41 | ||
43 | export type VideoTranscodingPayload = HLSTranscodingPayload | NewResolutionTranscodingPayload | 42 | export type VideoTranscodingPayload = |
44 | | OptimizeTranscodingPayload | MergeAudioTranscodingPayload | 43 | HLSTranscodingPayload |
44 | | NewResolutionTranscodingPayload | ||
45 | | OptimizeTranscodingPayload | ||
46 | | MergeAudioTranscodingPayload | ||
45 | 47 | ||
46 | async function processVideoTranscoding (job: Bull.Job) { | 48 | async function processVideoTranscoding (job: Bull.Job) { |
47 | const payload = job.data as VideoTranscodingPayload | 49 | const payload = job.data as VideoTranscodingPayload |
@@ -105,7 +107,7 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O | |||
105 | 107 | ||
106 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { | 108 | const { videoDatabase, videoPublished } = await sequelizeTypescript.transaction(async t => { |
107 | // Maybe the video changed in database, refresh it | 109 | // Maybe the video changed in database, refresh it |
108 | let videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) | 110 | const videoDatabase = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoArg.uuid, t) |
109 | // Video does not exist anymore | 111 | // Video does not exist anymore |
110 | if (!videoDatabase) return undefined | 112 | if (!videoDatabase) return undefined |
111 | 113 | ||
@@ -122,8 +124,6 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O | |||
122 | await createHlsJobIfEnabled(hlsPayload) | 124 | await createHlsJobIfEnabled(hlsPayload) |
123 | 125 | ||
124 | if (resolutionsEnabled.length !== 0) { | 126 | if (resolutionsEnabled.length !== 0) { |
125 | const tasks: (Bluebird<Bull.Job<any>> | Promise<Bull.Job<any>>)[] = [] | ||
126 | |||
127 | for (const resolution of resolutionsEnabled) { | 127 | for (const resolution of resolutionsEnabled) { |
128 | let dataInput: VideoTranscodingPayload | 128 | let dataInput: VideoTranscodingPayload |
129 | 129 | ||
@@ -143,12 +143,9 @@ async function onVideoFileOptimizerSuccess (videoArg: MVideoWithFile, payload: O | |||
143 | } | 143 | } |
144 | } | 144 | } |
145 | 145 | ||
146 | const p = JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) | 146 | JobQueue.Instance.createJob({ type: 'video-transcoding', payload: dataInput }) |
147 | tasks.push(p) | ||
148 | } | 147 | } |
149 | 148 | ||
150 | await Promise.all(tasks) | ||
151 | |||
152 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) | 149 | logger.info('Transcoding jobs created for uuid %s.', videoDatabase.uuid, { resolutionsEnabled }) |
153 | } else { | 150 | } else { |
154 | // No transcoding to do, it's now published | 151 | // No transcoding to do, it's now published |
diff --git a/server/lib/job-queue/handlers/video-views.ts b/server/lib/job-queue/handlers/video-views.ts index 73fa5ed04..2258cd029 100644 --- a/server/lib/job-queue/handlers/video-views.ts +++ b/server/lib/job-queue/handlers/video-views.ts | |||
@@ -23,6 +23,8 @@ async function processVideosViews () { | |||
23 | for (const videoId of videoIds) { | 23 | for (const videoId of videoIds) { |
24 | try { | 24 | try { |
25 | const views = await Redis.Instance.getVideoViews(videoId, hour) | 25 | const views = await Redis.Instance.getVideoViews(videoId, hour) |
26 | await Redis.Instance.deleteVideoViews(videoId, hour) | ||
27 | |||
26 | if (views) { | 28 | if (views) { |
27 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) | 29 | logger.debug('Adding %d views to video %d in hour %d.', views, videoId, hour) |
28 | 30 | ||
@@ -52,8 +54,6 @@ async function processVideosViews () { | |||
52 | logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) | 54 | logger.error('Cannot create video views for video %d in hour %d.', videoId, hour, { err }) |
53 | } | 55 | } |
54 | } | 56 | } |
55 | |||
56 | await Redis.Instance.deleteVideoViews(videoId, hour) | ||
57 | } catch (err) { | 57 | } catch (err) { |
58 | logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) | 58 | logger.error('Cannot update video views of video %d in hour %d.', videoId, hour, { err }) |
59 | } | 59 | } |
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index ec601e9ea..14acace7d 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -13,6 +13,7 @@ import { processVideoImport, VideoImportPayload } from './handlers/video-import' | |||
13 | import { processVideosViews } from './handlers/video-views' | 13 | import { processVideosViews } from './handlers/video-views' |
14 | import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher' | 14 | import { refreshAPObject, RefreshPayload } from './handlers/activitypub-refresher' |
15 | import { processVideoFileImport, VideoFileImportPayload } from './handlers/video-file-import' | 15 | import { processVideoFileImport, VideoFileImportPayload } from './handlers/video-file-import' |
16 | import { processVideoRedundancy, VideoRedundancyPayload } from '@server/lib/job-queue/handlers/video-redundancy' | ||
16 | 17 | ||
17 | type CreateJobArgument = | 18 | type CreateJobArgument = |
18 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 19 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -24,20 +25,21 @@ type CreateJobArgument = | |||
24 | { type: 'email', payload: EmailPayload } | | 25 | { type: 'email', payload: EmailPayload } | |
25 | { type: 'video-import', payload: VideoImportPayload } | | 26 | { type: 'video-import', payload: VideoImportPayload } | |
26 | { type: 'activitypub-refresher', payload: RefreshPayload } | | 27 | { type: 'activitypub-refresher', payload: RefreshPayload } | |
27 | { type: 'videos-views', payload: {} } | 28 | { type: 'videos-views', payload: {} } | |
29 | { type: 'video-redundancy', payload: VideoRedundancyPayload } | ||
28 | 30 | ||
29 | const handlers: { [ id in (JobType | 'video-file') ]: (job: Bull.Job) => Promise<any>} = { | 31 | const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = { |
30 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, | 32 | 'activitypub-http-broadcast': processActivityPubHttpBroadcast, |
31 | 'activitypub-http-unicast': processActivityPubHttpUnicast, | 33 | 'activitypub-http-unicast': processActivityPubHttpUnicast, |
32 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, | 34 | 'activitypub-http-fetcher': processActivityPubHttpFetcher, |
33 | 'activitypub-follow': processActivityPubFollow, | 35 | 'activitypub-follow': processActivityPubFollow, |
34 | 'video-file-import': processVideoFileImport, | 36 | 'video-file-import': processVideoFileImport, |
35 | 'video-transcoding': processVideoTranscoding, | 37 | 'video-transcoding': processVideoTranscoding, |
36 | 'video-file': processVideoTranscoding, // TODO: remove it (changed in 1.3) | ||
37 | 'email': processEmail, | 38 | 'email': processEmail, |
38 | 'video-import': processVideoImport, | 39 | 'video-import': processVideoImport, |
39 | 'videos-views': processVideosViews, | 40 | 'videos-views': processVideosViews, |
40 | 'activitypub-refresher': refreshAPObject | 41 | 'activitypub-refresher': refreshAPObject, |
42 | 'video-redundancy': processVideoRedundancy | ||
41 | } | 43 | } |
42 | 44 | ||
43 | const jobTypes: JobType[] = [ | 45 | const jobTypes: JobType[] = [ |
@@ -50,20 +52,22 @@ const jobTypes: JobType[] = [ | |||
50 | 'video-file-import', | 52 | 'video-file-import', |
51 | 'video-import', | 53 | 'video-import', |
52 | 'videos-views', | 54 | 'videos-views', |
53 | 'activitypub-refresher' | 55 | 'activitypub-refresher', |
56 | 'video-redundancy' | ||
54 | ] | 57 | ] |
55 | 58 | ||
56 | class JobQueue { | 59 | class JobQueue { |
57 | 60 | ||
58 | private static instance: JobQueue | 61 | private static instance: JobQueue |
59 | 62 | ||
60 | private queues: { [ id in JobType ]?: Bull.Queue } = {} | 63 | private queues: { [id in JobType]?: Bull.Queue } = {} |
61 | private initialized = false | 64 | private initialized = false |
62 | private jobRedisPrefix: string | 65 | private jobRedisPrefix: string |
63 | 66 | ||
64 | private constructor () {} | 67 | private constructor () { |
68 | } | ||
65 | 69 | ||
66 | async init () { | 70 | init () { |
67 | // Already initialized | 71 | // Already initialized |
68 | if (this.initialized === true) return | 72 | if (this.initialized === true) return |
69 | this.initialized = true | 73 | this.initialized = true |
@@ -105,11 +109,16 @@ class JobQueue { | |||
105 | } | 109 | } |
106 | } | 110 | } |
107 | 111 | ||
108 | createJob (obj: CreateJobArgument) { | 112 | createJob (obj: CreateJobArgument): void { |
113 | this.createJobWithPromise(obj) | ||
114 | .catch(err => logger.error('Cannot create job.', { err, obj })) | ||
115 | } | ||
116 | |||
117 | createJobWithPromise (obj: CreateJobArgument) { | ||
109 | const queue = this.queues[obj.type] | 118 | const queue = this.queues[obj.type] |
110 | if (queue === undefined) { | 119 | if (queue === undefined) { |
111 | logger.error('Unknown queue %s: cannot create job.', obj.type) | 120 | logger.error('Unknown queue %s: cannot create job.', obj.type) |
112 | throw Error('Unknown queue, cannot create job') | 121 | return |
113 | } | 122 | } |
114 | 123 | ||
115 | const jobArgs: Bull.JobOptions = { | 124 | const jobArgs: Bull.JobOptions = { |
@@ -122,10 +131,10 @@ class JobQueue { | |||
122 | } | 131 | } |
123 | 132 | ||
124 | async listForApi (options: { | 133 | async listForApi (options: { |
125 | state: JobState, | 134 | state: JobState |
126 | start: number, | 135 | start: number |
127 | count: number, | 136 | count: number |
128 | asc?: boolean, | 137 | asc?: boolean |
129 | jobType: JobType | 138 | jobType: JobType |
130 | }): Promise<Bull.Job[]> { | 139 | }): Promise<Bull.Job[]> { |
131 | const { state, start, count, asc, jobType } = options | 140 | const { state, start, count, asc, jobType } = options |
@@ -133,16 +142,14 @@ class JobQueue { | |||
133 | 142 | ||
134 | const filteredJobTypes = this.filterJobTypes(jobType) | 143 | const filteredJobTypes = this.filterJobTypes(jobType) |
135 | 144 | ||
136 | // TODO: optimize | ||
137 | for (const jobType of filteredJobTypes) { | 145 | for (const jobType of filteredJobTypes) { |
138 | const queue = this.queues[ jobType ] | 146 | const queue = this.queues[jobType] |
139 | if (queue === undefined) { | 147 | if (queue === undefined) { |
140 | logger.error('Unknown queue %s to list jobs.', jobType) | 148 | logger.error('Unknown queue %s to list jobs.', jobType) |
141 | continue | 149 | continue |
142 | } | 150 | } |
143 | 151 | ||
144 | // FIXME: Bull queue typings does not have getJobs method | 152 | const jobs = await queue.getJobs([ state ], 0, start + count, asc) |
145 | const jobs = await (queue as any).getJobs(state, 0, start + count, asc) | ||
146 | results = results.concat(jobs) | 153 | results = results.concat(jobs) |
147 | } | 154 | } |
148 | 155 | ||
@@ -164,7 +171,7 @@ class JobQueue { | |||
164 | const filteredJobTypes = this.filterJobTypes(jobType) | 171 | const filteredJobTypes = this.filterJobTypes(jobType) |
165 | 172 | ||
166 | for (const type of filteredJobTypes) { | 173 | for (const type of filteredJobTypes) { |
167 | const queue = this.queues[ type ] | 174 | const queue = this.queues[type] |
168 | if (queue === undefined) { | 175 | if (queue === undefined) { |
169 | logger.error('Unknown queue %s to count jobs.', type) | 176 | logger.error('Unknown queue %s to count jobs.', type) |
170 | continue | 177 | continue |
@@ -172,7 +179,7 @@ class JobQueue { | |||
172 | 179 | ||
173 | const counts = await queue.getJobCounts() | 180 | const counts = await queue.getJobCounts() |
174 | 181 | ||
175 | total += counts[ state ] | 182 | total += counts[state] |
176 | } | 183 | } |
177 | 184 | ||
178 | return total | 185 | return total |
@@ -188,7 +195,7 @@ class JobQueue { | |||
188 | private addRepeatableJobs () { | 195 | private addRepeatableJobs () { |
189 | this.queues['videos-views'].add({}, { | 196 | this.queues['videos-views'].add({}, { |
190 | repeat: REPEAT_JOBS['videos-views'] | 197 | repeat: REPEAT_JOBS['videos-views'] |
191 | }) | 198 | }).catch(err => logger.error('Cannot add repeatable job.', { err })) |
192 | } | 199 | } |
193 | 200 | ||
194 | private filterJobTypes (jobType?: JobType) { | 201 | private filterJobTypes (jobType?: JobType) { |