aboutsummaryrefslogtreecommitdiffhomepage
path: root/server/lib/job-queue
diff options
context:
space:
mode:
authorkontrollanten <6680299+kontrollanten@users.noreply.github.com>2022-11-14 18:26:20 +0300
committerGitHub <noreply@github.com>2022-11-14 16:26:20 +0100
commit564b9b55976873d87e669ace916f037b72fe2865 (patch)
tree8c3c097cf3a34766e60fa1bdcf1de7d8558eff8c /server/lib/job-queue
parentff91b644fb1b063d0a8eff7492beb1a9bf7e4ce1 (diff)
downloadPeerTube-564b9b55976873d87e669ace916f037b72fe2865.tar.gz
PeerTube-564b9b55976873d87e669ace916f037b72fe2865.tar.zst
PeerTube-564b9b55976873d87e669ace916f037b72fe2865.zip
refactor(server): redis > ioredis (#5371)
* refactor(server): redis > ioredis * refactor(JobQueue): reuse redis connection builder * fix(redisio) * fix(redis): setValue * feat(redis): showFriendlyErrorStack * feat(redis): auto pipelining https://github.com/luin/ioredis/blob/308017a6b9429c16b074e03e70f5524499476fa9/README.md#autopipelining * dont use autopipelining for bullmq * ioredis events
Diffstat (limited to 'server/lib/job-queue')
-rw-r--r--server/lib/job-queue/job-queue.ts21
1 files changed, 6 insertions, 15 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 655be6568..6bc59732f 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -63,6 +63,7 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
63import { processVideoStudioEdition } from './handlers/video-studio-edition' 63import { processVideoStudioEdition } from './handlers/video-studio-edition'
64import { processVideoTranscoding } from './handlers/video-transcoding' 64import { processVideoTranscoding } from './handlers/video-transcoding'
65import { processVideosViewsStats } from './handlers/video-views-stats' 65import { processVideosViewsStats } from './handlers/video-views-stats'
66import { Redis } from '../redis'
66 67
67export type CreateJobArgument = 68export type CreateJobArgument =
68 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 69 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -183,7 +184,7 @@ class JobQueue {
183 } 184 }
184 185
185 this.flowProducer = new FlowProducer({ 186 this.flowProducer = new FlowProducer({
186 connection: this.getRedisConnection(), 187 connection: Redis.getRedisClientOptions('FlowProducer'),
187 prefix: this.jobRedisPrefix 188 prefix: this.jobRedisPrefix
188 }) 189 })
189 this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) 190 this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
@@ -196,7 +197,7 @@ class JobQueue {
196 autorun: false, 197 autorun: false,
197 concurrency: this.getJobConcurrency(handlerName), 198 concurrency: this.getJobConcurrency(handlerName),
198 prefix: this.jobRedisPrefix, 199 prefix: this.jobRedisPrefix,
199 connection: this.getRedisConnection() 200 connection: Redis.getRedisClientOptions('Worker')
200 } 201 }
201 202
202 const handler = function (job: Job) { 203 const handler = function (job: Job) {
@@ -236,7 +237,7 @@ class JobQueue {
236 237
237 private buildQueue (handlerName: JobType) { 238 private buildQueue (handlerName: JobType) {
238 const queueOptions: QueueOptions = { 239 const queueOptions: QueueOptions = {
239 connection: this.getRedisConnection(), 240 connection: Redis.getRedisClientOptions('Queue'),
240 prefix: this.jobRedisPrefix 241 prefix: this.jobRedisPrefix
241 } 242 }
242 243
@@ -249,7 +250,7 @@ class JobQueue {
249 private buildQueueScheduler (handlerName: JobType) { 250 private buildQueueScheduler (handlerName: JobType) {
250 const queueSchedulerOptions: QueueSchedulerOptions = { 251 const queueSchedulerOptions: QueueSchedulerOptions = {
251 autorun: false, 252 autorun: false,
252 connection: this.getRedisConnection(), 253 connection: Redis.getRedisClientOptions('QueueScheduler'),
253 prefix: this.jobRedisPrefix, 254 prefix: this.jobRedisPrefix,
254 maxStalledCount: 10 255 maxStalledCount: 10
255 } 256 }
@@ -263,7 +264,7 @@ class JobQueue {
263 private buildQueueEvent (handlerName: JobType) { 264 private buildQueueEvent (handlerName: JobType) {
264 const queueEventsOptions: QueueEventsOptions = { 265 const queueEventsOptions: QueueEventsOptions = {
265 autorun: false, 266 autorun: false,
266 connection: this.getRedisConnection(), 267 connection: Redis.getRedisClientOptions('QueueEvent'),
267 prefix: this.jobRedisPrefix 268 prefix: this.jobRedisPrefix
268 } 269 }
269 270
@@ -273,16 +274,6 @@ class JobQueue {
273 this.queueEvents[handlerName] = queueEvents 274 this.queueEvents[handlerName] = queueEvents
274 } 275 }
275 276
276 private getRedisConnection () {
277 return {
278 password: CONFIG.REDIS.AUTH,
279 db: CONFIG.REDIS.DB,
280 host: CONFIG.REDIS.HOSTNAME,
281 port: CONFIG.REDIS.PORT,
282 path: CONFIG.REDIS.SOCKET
283 }
284 }
285
286 // --------------------------------------------------------------------------- 277 // ---------------------------------------------------------------------------
287 278
288 async terminate () { 279 async terminate () {