diff options
author | kontrollanten <6680299+kontrollanten@users.noreply.github.com> | 2022-11-14 18:26:20 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-14 16:26:20 +0100 |
commit | 564b9b55976873d87e669ace916f037b72fe2865 (patch) | |
tree | 8c3c097cf3a34766e60fa1bdcf1de7d8558eff8c /server | |
parent | ff91b644fb1b063d0a8eff7492beb1a9bf7e4ce1 (diff) | |
download | PeerTube-564b9b55976873d87e669ace916f037b72fe2865.tar.gz PeerTube-564b9b55976873d87e669ace916f037b72fe2865.tar.zst PeerTube-564b9b55976873d87e669ace916f037b72fe2865.zip |
refactor(server): redis > ioredis (#5371)
* refactor(server): redis > ioredis
* refactor(JobQueue): reuse redis connection builder
* fix(redisio)
* fix(redis): setValue
* feat(redis): showFriendlyErrorStack
* feat(redis): auto pipelining
https://github.com/luin/ioredis/blob/308017a6b9429c16b074e03e70f5524499476fa9/README.md#autopipelining
* dont use autopipelining for bullmq
* ioredis events
Diffstat (limited to 'server')
-rw-r--r-- | server/lib/job-queue/job-queue.ts | 21 | ||||
-rw-r--r-- | server/lib/redis.ts | 82 | ||||
-rw-r--r-- | server/middlewares/cache/shared/api-cache.ts | 6 |
3 files changed, 49 insertions, 60 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts index 655be6568..6bc59732f 100644 --- a/server/lib/job-queue/job-queue.ts +++ b/server/lib/job-queue/job-queue.ts | |||
@@ -63,6 +63,7 @@ import { processVideoLiveEnding } from './handlers/video-live-ending' | |||
63 | import { processVideoStudioEdition } from './handlers/video-studio-edition' | 63 | import { processVideoStudioEdition } from './handlers/video-studio-edition' |
64 | import { processVideoTranscoding } from './handlers/video-transcoding' | 64 | import { processVideoTranscoding } from './handlers/video-transcoding' |
65 | import { processVideosViewsStats } from './handlers/video-views-stats' | 65 | import { processVideosViewsStats } from './handlers/video-views-stats' |
66 | import { Redis } from '../redis' | ||
66 | 67 | ||
67 | export type CreateJobArgument = | 68 | export type CreateJobArgument = |
68 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | | 69 | { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | |
@@ -183,7 +184,7 @@ class JobQueue { | |||
183 | } | 184 | } |
184 | 185 | ||
185 | this.flowProducer = new FlowProducer({ | 186 | this.flowProducer = new FlowProducer({ |
186 | connection: this.getRedisConnection(), | 187 | connection: Redis.getRedisClientOptions('FlowProducer'), |
187 | prefix: this.jobRedisPrefix | 188 | prefix: this.jobRedisPrefix |
188 | }) | 189 | }) |
189 | this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) | 190 | this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) |
@@ -196,7 +197,7 @@ class JobQueue { | |||
196 | autorun: false, | 197 | autorun: false, |
197 | concurrency: this.getJobConcurrency(handlerName), | 198 | concurrency: this.getJobConcurrency(handlerName), |
198 | prefix: this.jobRedisPrefix, | 199 | prefix: this.jobRedisPrefix, |
199 | connection: this.getRedisConnection() | 200 | connection: Redis.getRedisClientOptions('Worker') |
200 | } | 201 | } |
201 | 202 | ||
202 | const handler = function (job: Job) { | 203 | const handler = function (job: Job) { |
@@ -236,7 +237,7 @@ class JobQueue { | |||
236 | 237 | ||
237 | private buildQueue (handlerName: JobType) { | 238 | private buildQueue (handlerName: JobType) { |
238 | const queueOptions: QueueOptions = { | 239 | const queueOptions: QueueOptions = { |
239 | connection: this.getRedisConnection(), | 240 | connection: Redis.getRedisClientOptions('Queue'), |
240 | prefix: this.jobRedisPrefix | 241 | prefix: this.jobRedisPrefix |
241 | } | 242 | } |
242 | 243 | ||
@@ -249,7 +250,7 @@ class JobQueue { | |||
249 | private buildQueueScheduler (handlerName: JobType) { | 250 | private buildQueueScheduler (handlerName: JobType) { |
250 | const queueSchedulerOptions: QueueSchedulerOptions = { | 251 | const queueSchedulerOptions: QueueSchedulerOptions = { |
251 | autorun: false, | 252 | autorun: false, |
252 | connection: this.getRedisConnection(), | 253 | connection: Redis.getRedisClientOptions('QueueScheduler'), |
253 | prefix: this.jobRedisPrefix, | 254 | prefix: this.jobRedisPrefix, |
254 | maxStalledCount: 10 | 255 | maxStalledCount: 10 |
255 | } | 256 | } |
@@ -263,7 +264,7 @@ class JobQueue { | |||
263 | private buildQueueEvent (handlerName: JobType) { | 264 | private buildQueueEvent (handlerName: JobType) { |
264 | const queueEventsOptions: QueueEventsOptions = { | 265 | const queueEventsOptions: QueueEventsOptions = { |
265 | autorun: false, | 266 | autorun: false, |
266 | connection: this.getRedisConnection(), | 267 | connection: Redis.getRedisClientOptions('QueueEvent'), |
267 | prefix: this.jobRedisPrefix | 268 | prefix: this.jobRedisPrefix |
268 | } | 269 | } |
269 | 270 | ||
@@ -273,16 +274,6 @@ class JobQueue { | |||
273 | this.queueEvents[handlerName] = queueEvents | 274 | this.queueEvents[handlerName] = queueEvents |
274 | } | 275 | } |
275 | 276 | ||
276 | private getRedisConnection () { | ||
277 | return { | ||
278 | password: CONFIG.REDIS.AUTH, | ||
279 | db: CONFIG.REDIS.DB, | ||
280 | host: CONFIG.REDIS.HOSTNAME, | ||
281 | port: CONFIG.REDIS.PORT, | ||
282 | path: CONFIG.REDIS.SOCKET | ||
283 | } | ||
284 | } | ||
285 | |||
286 | // --------------------------------------------------------------------------- | 277 | // --------------------------------------------------------------------------- |
287 | 278 | ||
288 | async terminate () { | 279 | async terminate () { |
diff --git a/server/lib/redis.ts b/server/lib/redis.ts index b7523492a..4d7947d40 100644 --- a/server/lib/redis.ts +++ b/server/lib/redis.ts | |||
@@ -1,4 +1,4 @@ | |||
1 | import { createClient, RedisClientOptions, RedisModules } from 'redis' | 1 | import IoRedis, { RedisOptions } from 'ioredis' |
2 | import { exists } from '@server/helpers/custom-validators/misc' | 2 | import { exists } from '@server/helpers/custom-validators/misc' |
3 | import { sha256 } from '@shared/extra-utils' | 3 | import { sha256 } from '@shared/extra-utils' |
4 | import { logger } from '../helpers/logger' | 4 | import { logger } from '../helpers/logger' |
@@ -22,7 +22,7 @@ class Redis { | |||
22 | private static instance: Redis | 22 | private static instance: Redis |
23 | private initialized = false | 23 | private initialized = false |
24 | private connected = false | 24 | private connected = false |
25 | private client: ReturnType<typeof createClient> | 25 | private client: IoRedis |
26 | private prefix: string | 26 | private prefix: string |
27 | 27 | ||
28 | private constructor () { | 28 | private constructor () { |
@@ -33,46 +33,42 @@ class Redis { | |||
33 | if (this.initialized === true) return | 33 | if (this.initialized === true) return |
34 | this.initialized = true | 34 | this.initialized = true |
35 | 35 | ||
36 | this.client = createClient(Redis.getRedisClientOptions()) | ||
37 | this.client.on('error', err => logger.error('Redis Client Error', { err })) | ||
38 | |||
39 | logger.info('Connecting to redis...') | 36 | logger.info('Connecting to redis...') |
40 | 37 | ||
41 | this.client.connect() | 38 | this.client = new IoRedis(Redis.getRedisClientOptions('', { enableAutoPipelining: true })) |
42 | .then(() => { | 39 | this.client.on('error', err => logger.error('Redis failed to connect', { err })) |
43 | logger.info('Connected to redis.') | 40 | this.client.on('connect', () => { |
44 | 41 | logger.info('Connected to redis.') | |
45 | this.connected = true | 42 | |
46 | }).catch(err => { | 43 | this.connected = true |
47 | logger.error('Cannot connect to redis', { err }) | 44 | }) |
48 | process.exit(-1) | 45 | this.client.on('reconnecting', (ms) => { |
49 | }) | 46 | logger.error(`Reconnecting to redis in ${ms}.`) |
47 | }) | ||
48 | this.client.on('close', () => { | ||
49 | logger.error('Connection to redis has closed.') | ||
50 | this.connected = false | ||
51 | }) | ||
52 | |||
53 | this.client.on('end', () => { | ||
54 | logger.error('Connection to redis has closed and no more reconnects will be done.') | ||
55 | }) | ||
50 | 56 | ||
51 | this.prefix = 'redis-' + WEBSERVER.HOST + '-' | 57 | this.prefix = 'redis-' + WEBSERVER.HOST + '-' |
52 | } | 58 | } |
53 | 59 | ||
54 | static getRedisClientOptions () { | 60 | static getRedisClientOptions (connectionName?: string, options: RedisOptions = {}): RedisOptions { |
55 | let config: RedisClientOptions<RedisModules, {}> = { | 61 | return { |
56 | socket: { | 62 | connectionName: [ 'PeerTube', connectionName ].join(''), |
57 | connectTimeout: 20000 // Could be slow since node use sync call to compile PeerTube | 63 | connectTimeout: 20000, // Could be slow since node use sync call to compile PeerTube |
58 | } | 64 | password: CONFIG.REDIS.AUTH, |
59 | } | 65 | db: CONFIG.REDIS.DB, |
60 | 66 | host: CONFIG.REDIS.HOSTNAME, | |
61 | if (CONFIG.REDIS.AUTH) { | 67 | port: CONFIG.REDIS.PORT, |
62 | config = { ...config, password: CONFIG.REDIS.AUTH } | 68 | path: CONFIG.REDIS.SOCKET, |
63 | } | 69 | showFriendlyErrorStack: true, |
64 | 70 | ...options | |
65 | if (CONFIG.REDIS.DB) { | ||
66 | config = { ...config, database: CONFIG.REDIS.DB } | ||
67 | } | 71 | } |
68 | |||
69 | if (CONFIG.REDIS.HOSTNAME && CONFIG.REDIS.PORT) { | ||
70 | config.socket = { ...config.socket, host: CONFIG.REDIS.HOSTNAME, port: CONFIG.REDIS.PORT } | ||
71 | } else { | ||
72 | config.socket = { ...config.socket, path: CONFIG.REDIS.SOCKET } | ||
73 | } | ||
74 | |||
75 | return config | ||
76 | } | 72 | } |
77 | 73 | ||
78 | getClient () { | 74 | getClient () { |
@@ -388,15 +384,15 @@ class Redis { | |||
388 | } | 384 | } |
389 | 385 | ||
390 | private getSet (key: string) { | 386 | private getSet (key: string) { |
391 | return this.client.sMembers(this.prefix + key) | 387 | return this.client.smembers(this.prefix + key) |
392 | } | 388 | } |
393 | 389 | ||
394 | private addToSet (key: string, value: string) { | 390 | private addToSet (key: string, value: string) { |
395 | return this.client.sAdd(this.prefix + key, value) | 391 | return this.client.sadd(this.prefix + key, value) |
396 | } | 392 | } |
397 | 393 | ||
398 | private deleteFromSet (key: string, value: string) { | 394 | private deleteFromSet (key: string, value: string) { |
399 | return this.client.sRem(this.prefix + key, value) | 395 | return this.client.srem(this.prefix + key, value) |
400 | } | 396 | } |
401 | 397 | ||
402 | private deleteKey (key: string) { | 398 | private deleteKey (key: string) { |
@@ -415,11 +411,13 @@ class Redis { | |||
415 | } | 411 | } |
416 | 412 | ||
417 | private async setValue (key: string, value: string, expirationMilliseconds?: number) { | 413 | private async setValue (key: string, value: string, expirationMilliseconds?: number) { |
418 | const options = expirationMilliseconds | 414 | let result |
419 | ? { PX: expirationMilliseconds } | ||
420 | : {} | ||
421 | 415 | ||
422 | const result = await this.client.set(this.prefix + key, value, options) | 416 | if (expirationMilliseconds !== undefined) { |
417 | result = await this.client.set(this.prefix + key, value, 'PX', expirationMilliseconds) | ||
418 | } else { | ||
419 | result = await this.client.set(this.prefix + key, value) | ||
420 | } | ||
423 | 421 | ||
424 | if (result !== 'OK') throw new Error('Redis set result is not OK.') | 422 | if (result !== 'OK') throw new Error('Redis set result is not OK.') |
425 | } | 423 | } |
diff --git a/server/middlewares/cache/shared/api-cache.ts b/server/middlewares/cache/shared/api-cache.ts index abc919339..9e15bf2d6 100644 --- a/server/middlewares/cache/shared/api-cache.ts +++ b/server/middlewares/cache/shared/api-cache.ts | |||
@@ -49,7 +49,7 @@ export class ApiCache { | |||
49 | if (!Redis.Instance.isConnected()) return this.makeResponseCacheable(res, next, key, duration) | 49 | if (!Redis.Instance.isConnected()) return this.makeResponseCacheable(res, next, key, duration) |
50 | 50 | ||
51 | try { | 51 | try { |
52 | const obj = await redis.hGetAll(key) | 52 | const obj = await redis.hgetall(key) |
53 | if (obj?.response) { | 53 | if (obj?.response) { |
54 | return this.sendCachedResponse(req, res, JSON.parse(obj.response), duration) | 54 | return this.sendCachedResponse(req, res, JSON.parse(obj.response), duration) |
55 | } | 55 | } |
@@ -100,8 +100,8 @@ export class ApiCache { | |||
100 | 100 | ||
101 | if (Redis.Instance.isConnected()) { | 101 | if (Redis.Instance.isConnected()) { |
102 | await Promise.all([ | 102 | await Promise.all([ |
103 | redis.hSet(key, 'response', JSON.stringify(value)), | 103 | redis.hset(key, 'response', JSON.stringify(value)), |
104 | redis.hSet(key, 'duration', duration + ''), | 104 | redis.hset(key, 'duration', duration + ''), |
105 | redis.expire(key, duration / 1000) | 105 | redis.expire(key, duration / 1000) |
106 | ]) | 106 | ]) |
107 | } | 107 | } |