diff options
author | kontrollanten <6680299+kontrollanten@users.noreply.github.com> | 2022-11-14 18:26:20 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-14 16:26:20 +0100 |
commit | 564b9b55976873d87e669ace916f037b72fe2865 (patch) | |
tree | 8c3c097cf3a34766e60fa1bdcf1de7d8558eff8c /server/lib/job-queue/job-queue.ts | |
parent | ff91b644fb1b063d0a8eff7492beb1a9bf7e4ce1 (diff) | |
download | PeerTube-564b9b55976873d87e669ace916f037b72fe2865.tar.gz PeerTube-564b9b55976873d87e669ace916f037b72fe2865.tar.zst PeerTube-564b9b55976873d87e669ace916f037b72fe2865.zip |
refactor(server): redis > ioredis (#5371)
* refactor(server): redis > ioredis
* refactor(JobQueue): reuse redis connection builder
* fix(redisio)
* fix(redis): setValue
* feat(redis): showFriendlyErrorStack
* feat(redis): auto pipelining
https://github.com/luin/ioredis/blob/308017a6b9429c16b074e03e70f5524499476fa9/README.md#autopipelining
* dont use autopipelining for bullmq
* ioredis events
Diffstat (limited to 'server/lib/job-queue/job-queue.ts')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 21 |
1 files changed, 6 insertions, 15 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 655be6568..6bc59732f 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -63,6 +63,7 @@ import { processVideoLiveEnding } from './handlers/video-live-ending' | |||
63 | import { processVideoStudioEdition } from './handlers/video-studio-edition' | 63 | import { processVideoStudioEdition } from './handlers/video-studio-edition' |
64 | import { processVideoTranscoding } from './handlers/video-transcoding' | 64 | import { processVideoTranscoding } from './handlers/video-transcoding' |
65 | import { processVideosViewsStats } from './handlers/video-views-stats' | 65 | import { processVideosViewsStats } from './handlers/video-views-stats' |
66 | import { Redis } from '../redis' | ||
66 | 67 | ||
67 | export type CreateJobArgument = | 68 | export type CreateJobArgument = |
68 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 69 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -183,7 +184,7 @@ class JobQueue { | |||
183 | } | 184 | } |
184 | 185 | ||
185 | this.flowProducer = new FlowProducer({ | 186 | this.flowProducer = new FlowProducer({ |
186 | connection: this.getRedisConnection(), | 187 | connection: Redis.getRedisClientOptions('FlowProducer'), |
187 | prefix: this.jobRedisPrefix | 188 | prefix: this.jobRedisPrefix |
188 | }) | 189 | }) |
189 | this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) | 190 | this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) |
@@ -196,7 +197,7 @@ class JobQueue { | |||
196 | autorun: false, | 197 | autorun: false, |
197 | concurrency: this.getJobConcurrency(handlerName), | 198 | concurrency: this.getJobConcurrency(handlerName), |
198 | prefix: this.jobRedisPrefix, | 199 | prefix: this.jobRedisPrefix, |
199 | connection: this.getRedisConnection() | 200 | connection: Redis.getRedisClientOptions('Worker') |
200 | } | 201 | } |
201 | 202 | ||
202 | const handler = function (job: Job) { | 203 | const handler = function (job: Job) { |
@@ -236,7 +237,7 @@ class JobQueue { | |||
236 | 237 | ||
237 | private buildQueue (handlerName: JobType) { | 238 | private buildQueue (handlerName: JobType) { |
238 | const queueOptions: QueueOptions = { | 239 | const queueOptions: QueueOptions = { |
239 | connection: this.getRedisConnection(), | 240 | connection: Redis.getRedisClientOptions('Queue'), |
240 | prefix: this.jobRedisPrefix | 241 | prefix: this.jobRedisPrefix |
241 | } | 242 | } |
242 | 243 | ||
@@ -249,7 +250,7 @@ class JobQueue { | |||
249 | private buildQueueScheduler (handlerName: JobType) { | 250 | private buildQueueScheduler (handlerName: JobType) { |
250 | const queueSchedulerOptions: QueueSchedulerOptions = { | 251 | const queueSchedulerOptions: QueueSchedulerOptions = { |
251 | autorun: false, | 252 | autorun: false, |
252 | connection: this.getRedisConnection(), | 253 | connection: Redis.getRedisClientOptions('QueueScheduler'), |
253 | prefix: this.jobRedisPrefix, | 254 | prefix: this.jobRedisPrefix, |
254 | maxStalledCount: 10 | 255 | maxStalledCount: 10 |
255 | } | 256 | } |
@@ -263,7 +264,7 @@ class JobQueue { | |||
263 | private buildQueueEvent (handlerName: JobType) { | 264 | private buildQueueEvent (handlerName: JobType) { |
264 | const queueEventsOptions: QueueEventsOptions = { | 265 | const queueEventsOptions: QueueEventsOptions = { |
265 | autorun: false, | 266 | autorun: false, |
266 | connection: this.getRedisConnection(), | 267 | connection: Redis.getRedisClientOptions('QueueEvent'), |
267 | prefix: this.jobRedisPrefix | 268 | prefix: this.jobRedisPrefix |
268 | } | 269 | } |
269 | 270 | ||
@@ -273,16 +274,6 @@ class JobQueue { | |||
273 | this.queueEvents[handlerName] = queueEvents | 274 | this.queueEvents[handlerName] = queueEvents |
274 | } | 275 | } |
275 | 276 | ||
276 | private getRedisConnection () { | ||
277 | return { | ||
278 | password: CONFIG.REDIS.AUTH, | ||
279 | db: CONFIG.REDIS.DB, | ||
280 | host: CONFIG.REDIS.HOSTNAME, | ||
281 | port: CONFIG.REDIS.PORT, | ||
282 | path: CONFIG.REDIS.SOCKET | ||
283 | } | ||
284 | } | ||
285 | |||
286 | // --------------------------------------------------------------------------- | 277 | // --------------------------------------------------------------------------- |
287 | 278 | ||
288 | async terminate () { | 279 | async terminate () { |