From 564b9b55976873d87e669ace916f037b72fe2865 Mon Sep 17 00:00:00 2001 From: kontrollanten <6680299+kontrollanten@users.noreply.github.com> Date: Mon, 14 Nov 2022 18:26:20 +0300 Subject: refactor(server): redis > ioredis (#5371) * refactor(server): redis > ioredis * refactor(JobQueue): reuse redis connection builder * fix(redisio) * fix(redis): setValue * feat(redis): showFriendlyErrorStack * feat(redis): auto pipelining https://github.com/luin/ioredis/blob/308017a6b9429c16b074e03e70f5524499476fa9/README.md#autopipelining * dont use autopipelining for bullmq * ioredis events --- server/lib/job-queue/job-queue.ts | 21 ++----- server/lib/redis.ts | 82 ++++++++++++++-------------- server/middlewares/cache/shared/api-cache.ts | 6 +- 3 files changed, 49 insertions(+), 60 deletions(-) (limited to 'server') diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 655be6568..6bc59732f 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts @@ -63,6 +63,7 @@ import { processVideoLiveEnding } from './handlers/video-live-ending' import { processVideoStudioEdition } from './handlers/video-studio-edition' import { processVideoTranscoding } from './handlers/video-transcoding' import { processVideosViewsStats } from './handlers/video-views-stats' +import { Redis } from '../redis' export type CreateJobArgument = { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | @@ -183,7 +184,7 @@ class JobQueue { } this.flowProducer = new FlowProducer({ - connection: this.getRedisConnection(), + connection: Redis.getRedisClientOptions('FlowProducer'), prefix: this.jobRedisPrefix }) this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) @@ -196,7 +197,7 @@ class JobQueue { autorun: false, concurrency: this.getJobConcurrency(handlerName), prefix: this.jobRedisPrefix, - connection: this.getRedisConnection() + connection: Redis.getRedisClientOptions('Worker') } const handler = function (job: Job) { @@ -236,7 +237,7 @@ class JobQueue { private buildQueue (handlerName: JobType) { const queueOptions: QueueOptions = { - connection: this.getRedisConnection(), + connection: Redis.getRedisClientOptions('Queue'), prefix: this.jobRedisPrefix } @@ -249,7 +250,7 @@ class JobQueue { private buildQueueScheduler (handlerName: JobType) { const queueSchedulerOptions: QueueSchedulerOptions = { autorun: false, - connection: this.getRedisConnection(), + connection: Redis.getRedisClientOptions('QueueScheduler'), prefix: this.jobRedisPrefix, maxStalledCount: 10 } @@ -263,7 +264,7 @@ class JobQueue { private buildQueueEvent (handlerName: JobType) { const queueEventsOptions: QueueEventsOptions = { autorun: false, - connection: this.getRedisConnection(), + connection: Redis.getRedisClientOptions('QueueEvent'), prefix: this.jobRedisPrefix } @@ -273,16 +274,6 @@ class JobQueue { this.queueEvents[handlerName] = queueEvents } - private getRedisConnection () { - return { - password: CONFIG.REDIS.AUTH, - db: CONFIG.REDIS.DB, - host: CONFIG.REDIS.HOSTNAME, - port: CONFIG.REDIS.PORT, - path: CONFIG.REDIS.SOCKET - } - } - // --------------------------------------------------------------------------- async terminate () { diff --git a/server/lib/redis.ts b/server/lib/redis.ts index b7523492a..4d7947d40 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts @@ -1,4 +1,4 @@ -import { createClient, RedisClientOptions, RedisModules } from 'redis' +import IoRedis, { RedisOptions } from 'ioredis' import { exists } from '@server/helpers/custom-validators/misc' import { sha256 } from '@shared/extra-utils' import { logger } from '../helpers/logger' @@ -22,7 +22,7 @@ class Redis { private static instance: Redis private initialized = false private connected = false - private client: ReturnType + private client: IoRedis private prefix: string private constructor () { @@ -33,46 +33,42 @@ class Redis { if (this.initialized === true) return this.initialized = true - this.client = createClient(Redis.getRedisClientOptions()) - this.client.on('error', err => logger.error('Redis Client Error', { err })) - logger.info('Connecting to redis...') - this.client.connect() - .then(() => { - logger.info('Connected to redis.') - - this.connected = true - }).catch(err => { - logger.error('Cannot connect to redis', { err }) - process.exit(-1) - }) + this.client = new IoRedis(Redis.getRedisClientOptions('', { enableAutoPipelining: true })) + this.client.on('error', err => logger.error('Redis failed to connect', { err })) + this.client.on('connect', () => { + logger.info('Connected to redis.') + + this.connected = true + }) + this.client.on('reconnecting', (ms) => { + logger.error(`Reconnecting to redis in ${ms}.`) + }) + this.client.on('close', () => { + logger.error('Connection to redis has closed.') + this.connected = false + }) + + this.client.on('end', () => { + logger.error('Connection to redis has closed and no more reconnects will be done.') + }) this.prefix = 'redis-' + WEBSERVER.HOST + '-' } - static getRedisClientOptions () { - let config: RedisClientOptions = { - socket: { - connectTimeout: 20000 // Could be slow since node use sync call to compile PeerTube - } - } - - if (CONFIG.REDIS.AUTH) { - config = { ...config, password: CONFIG.REDIS.AUTH } - } - - if (CONFIG.REDIS.DB) { - config = { ...config, database: CONFIG.REDIS.DB } + static getRedisClientOptions (connectionName?: string, options: RedisOptions = {}): RedisOptions { + return { + connectionName: [ 'PeerTube', connectionName ].join(''), + connectTimeout: 20000, // Could be slow since node use sync call to compile PeerTube + password: CONFIG.REDIS.AUTH, + db: CONFIG.REDIS.DB, + host: CONFIG.REDIS.HOSTNAME, + port: CONFIG.REDIS.PORT, + path: CONFIG.REDIS.SOCKET, + showFriendlyErrorStack: true, + ...options } - - if (CONFIG.REDIS.HOSTNAME && CONFIG.REDIS.PORT) { - config.socket = { ...config.socket, host: CONFIG.REDIS.HOSTNAME, port: CONFIG.REDIS.PORT } - } else { - config.socket = { ...config.socket, path: CONFIG.REDIS.SOCKET } - } - - return config } getClient () { @@ -388,15 +384,15 @@ class Redis { } private getSet (key: string) { - return this.client.sMembers(this.prefix + key) + return this.client.smembers(this.prefix + key) } private addToSet (key: string, value: string) { - return this.client.sAdd(this.prefix + key, value) + return this.client.sadd(this.prefix + key, value) } private deleteFromSet (key: string, value: string) { - return this.client.sRem(this.prefix + key, value) + return this.client.srem(this.prefix + key, value) } private deleteKey (key: string) { @@ -415,11 +411,13 @@ class Redis { } private async setValue (key: string, value: string, expirationMilliseconds?: number) { - const options = expirationMilliseconds - ? { PX: expirationMilliseconds } - : {} + let result - const result = await this.client.set(this.prefix + key, value, options) + if (expirationMilliseconds !== undefined) { + result = await this.client.set(this.prefix + key, value, 'PX', expirationMilliseconds) + } else { + result = await this.client.set(this.prefix + key, value) + } if (result !== 'OK') throw new Error('Redis set result is not OK.') } diff --git a/server/middlewares/cache/shared/api-cache.ts b/server/middlewares/cache/shared/api-cache.ts index abc919339..9e15bf2d6 100644 --- a/server/middlewares/cache/shared/api-cache.ts +++ b/server/middlewares/cache/shared/api-cache.ts @@ -49,7 +49,7 @@ export class ApiCache { if (!Redis.Instance.isConnected()) return this.makeResponseCacheable(res, next, key, duration) try { - const obj = await redis.hGetAll(key) + const obj = await redis.hgetall(key) if (obj?.response) { return this.sendCachedResponse(req, res, JSON.parse(obj.response), duration) } @@ -100,8 +100,8 @@ export class ApiCache { if (Redis.Instance.isConnected()) { await Promise.all([ - redis.hSet(key, 'response', JSON.stringify(value)), - redis.hSet(key, 'duration', duration + ''), + redis.hset(key, 'response', JSON.stringify(value)), + redis.hset(key, 'duration', duration + ''), redis.expire(key, duration / 1000) ]) } -- cgit v1.2.3