aboutsummaryrefslogtreecommitdiffhomepage
path: root/server
diff options
context:
space:
mode:
Diffstat (limited to 'server')
-rw-r--r--server/lib/job-queue/job-queue.ts21
-rw-r--r--server/lib/redis.ts82
-rw-r--r--server/middlewares/cache/shared/api-cache.ts6
3 files changed, 49 insertions, 60 deletions
diff --git a/server/lib/job-queue/job-queue.ts b/server/lib/job-queue/job-queue.ts
index 655be6568..6bc59732f 100644
--- a/server/lib/job-queue/job-queue.ts
+++ b/server/lib/job-queue/job-queue.ts
@@ -63,6 +63,7 @@ import { processVideoLiveEnding } from './handlers/video-live-ending'
63import { processVideoStudioEdition } from './handlers/video-studio-edition' 63import { processVideoStudioEdition } from './handlers/video-studio-edition'
64import { processVideoTranscoding } from './handlers/video-transcoding' 64import { processVideoTranscoding } from './handlers/video-transcoding'
65import { processVideosViewsStats } from './handlers/video-views-stats' 65import { processVideosViewsStats } from './handlers/video-views-stats'
66import { Redis } from '../redis'
66 67
67export type CreateJobArgument = 68export type CreateJobArgument =
68 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } | 69 { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
@@ -183,7 +184,7 @@ class JobQueue {
183 } 184 }
184 185
185 this.flowProducer = new FlowProducer({ 186 this.flowProducer = new FlowProducer({
186 connection: this.getRedisConnection(), 187 connection: Redis.getRedisClientOptions('FlowProducer'),
187 prefix: this.jobRedisPrefix 188 prefix: this.jobRedisPrefix
188 }) 189 })
189 this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) }) 190 this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
@@ -196,7 +197,7 @@ class JobQueue {
196 autorun: false, 197 autorun: false,
197 concurrency: this.getJobConcurrency(handlerName), 198 concurrency: this.getJobConcurrency(handlerName),
198 prefix: this.jobRedisPrefix, 199 prefix: this.jobRedisPrefix,
199 connection: this.getRedisConnection() 200 connection: Redis.getRedisClientOptions('Worker')
200 } 201 }
201 202
202 const handler = function (job: Job) { 203 const handler = function (job: Job) {
@@ -236,7 +237,7 @@ class JobQueue {
236 237
237 private buildQueue (handlerName: JobType) { 238 private buildQueue (handlerName: JobType) {
238 const queueOptions: QueueOptions = { 239 const queueOptions: QueueOptions = {
239 connection: this.getRedisConnection(), 240 connection: Redis.getRedisClientOptions('Queue'),
240 prefix: this.jobRedisPrefix 241 prefix: this.jobRedisPrefix
241 } 242 }
242 243
@@ -249,7 +250,7 @@ class JobQueue {
249 private buildQueueScheduler (handlerName: JobType) { 250 private buildQueueScheduler (handlerName: JobType) {
250 const queueSchedulerOptions: QueueSchedulerOptions = { 251 const queueSchedulerOptions: QueueSchedulerOptions = {
251 autorun: false, 252 autorun: false,
252 connection: this.getRedisConnection(), 253 connection: Redis.getRedisClientOptions('QueueScheduler'),
253 prefix: this.jobRedisPrefix, 254 prefix: this.jobRedisPrefix,
254 maxStalledCount: 10 255 maxStalledCount: 10
255 } 256 }
@@ -263,7 +264,7 @@ class JobQueue {
263 private buildQueueEvent (handlerName: JobType) { 264 private buildQueueEvent (handlerName: JobType) {
264 const queueEventsOptions: QueueEventsOptions = { 265 const queueEventsOptions: QueueEventsOptions = {
265 autorun: false, 266 autorun: false,
266 connection: this.getRedisConnection(), 267 connection: Redis.getRedisClientOptions('QueueEvent'),
267 prefix: this.jobRedisPrefix 268 prefix: this.jobRedisPrefix
268 } 269 }
269 270
@@ -273,16 +274,6 @@ class JobQueue {
273 this.queueEvents[handlerName] = queueEvents 274 this.queueEvents[handlerName] = queueEvents
274 } 275 }
275 276
276 private getRedisConnection () {
277 return {
278 password: CONFIG.REDIS.AUTH,
279 db: CONFIG.REDIS.DB,
280 host: CONFIG.REDIS.HOSTNAME,
281 port: CONFIG.REDIS.PORT,
282 path: CONFIG.REDIS.SOCKET
283 }
284 }
285
286 // --------------------------------------------------------------------------- 277 // ---------------------------------------------------------------------------
287 278
288 async terminate () { 279 async terminate () {
diff --git a/server/lib/redis.ts b/server/lib/redis.ts
index b7523492a..4d7947d40 100644
--- a/server/lib/redis.ts
+++ b/server/lib/redis.ts
@@ -1,4 +1,4 @@
1import { createClient, RedisClientOptions, RedisModules } from 'redis' 1import IoRedis, { RedisOptions } from 'ioredis'
2import { exists } from '@server/helpers/custom-validators/misc' 2import { exists } from '@server/helpers/custom-validators/misc'
3import { sha256 } from '@shared/extra-utils' 3import { sha256 } from '@shared/extra-utils'
4import { logger } from '../helpers/logger' 4import { logger } from '../helpers/logger'
@@ -22,7 +22,7 @@ class Redis {
22 private static instance: Redis 22 private static instance: Redis
23 private initialized = false 23 private initialized = false
24 private connected = false 24 private connected = false
25 private client: ReturnType<typeof createClient> 25 private client: IoRedis
26 private prefix: string 26 private prefix: string
27 27
28 private constructor () { 28 private constructor () {
@@ -33,46 +33,42 @@ class Redis {
33 if (this.initialized === true) return 33 if (this.initialized === true) return
34 this.initialized = true 34 this.initialized = true
35 35
36 this.client = createClient(Redis.getRedisClientOptions())
37 this.client.on('error', err => logger.error('Redis Client Error', { err }))
38
39 logger.info('Connecting to redis...') 36 logger.info('Connecting to redis...')
40 37
41 this.client.connect() 38 this.client = new IoRedis(Redis.getRedisClientOptions('', { enableAutoPipelining: true }))
42 .then(() => { 39 this.client.on('error', err => logger.error('Redis failed to connect', { err }))
43 logger.info('Connected to redis.') 40 this.client.on('connect', () => {
44 41 logger.info('Connected to redis.')
45 this.connected = true 42
46 }).catch(err => { 43 this.connected = true
47 logger.error('Cannot connect to redis', { err }) 44 })
48 process.exit(-1) 45 this.client.on('reconnecting', (ms) => {
49 }) 46 logger.error(`Reconnecting to redis in ${ms}.`)
47 })
48 this.client.on('close', () => {
49 logger.error('Connection to redis has closed.')
50 this.connected = false
51 })
52
53 this.client.on('end', () => {
54 logger.error('Connection to redis has closed and no more reconnects will be done.')
55 })
50 56
51 this.prefix = 'redis-' + WEBSERVER.HOST + '-' 57 this.prefix = 'redis-' + WEBSERVER.HOST + '-'
52 } 58 }
53 59
54 static getRedisClientOptions () { 60 static getRedisClientOptions (connectionName?: string, options: RedisOptions = {}): RedisOptions {
55 let config: RedisClientOptions<RedisModules, {}> = { 61 return {
56 socket: { 62 connectionName: [ 'PeerTube', connectionName ].join(''),
57 connectTimeout: 20000 // Could be slow since node use sync call to compile PeerTube 63 connectTimeout: 20000, // Could be slow since node use sync call to compile PeerTube
58 } 64 password: CONFIG.REDIS.AUTH,
59 } 65 db: CONFIG.REDIS.DB,
60 66 host: CONFIG.REDIS.HOSTNAME,
61 if (CONFIG.REDIS.AUTH) { 67 port: CONFIG.REDIS.PORT,
62 config = { ...config, password: CONFIG.REDIS.AUTH } 68 path: CONFIG.REDIS.SOCKET,
63 } 69 showFriendlyErrorStack: true,
64 70 ...options
65 if (CONFIG.REDIS.DB) {
66 config = { ...config, database: CONFIG.REDIS.DB }
67 } 71 }
68
69 if (CONFIG.REDIS.HOSTNAME && CONFIG.REDIS.PORT) {
70 config.socket = { ...config.socket, host: CONFIG.REDIS.HOSTNAME, port: CONFIG.REDIS.PORT }
71 } else {
72 config.socket = { ...config.socket, path: CONFIG.REDIS.SOCKET }
73 }
74
75 return config
76 } 72 }
77 73
78 getClient () { 74 getClient () {
@@ -388,15 +384,15 @@ class Redis {
388 } 384 }
389 385
390 private getSet (key: string) { 386 private getSet (key: string) {
391 return this.client.sMembers(this.prefix + key) 387 return this.client.smembers(this.prefix + key)
392 } 388 }
393 389
394 private addToSet (key: string, value: string) { 390 private addToSet (key: string, value: string) {
395 return this.client.sAdd(this.prefix + key, value) 391 return this.client.sadd(this.prefix + key, value)
396 } 392 }
397 393
398 private deleteFromSet (key: string, value: string) { 394 private deleteFromSet (key: string, value: string) {
399 return this.client.sRem(this.prefix + key, value) 395 return this.client.srem(this.prefix + key, value)
400 } 396 }
401 397
402 private deleteKey (key: string) { 398 private deleteKey (key: string) {
@@ -415,11 +411,13 @@ class Redis {
415 } 411 }
416 412
417 private async setValue (key: string, value: string, expirationMilliseconds?: number) { 413 private async setValue (key: string, value: string, expirationMilliseconds?: number) {
418 const options = expirationMilliseconds 414 let result
419 ? { PX: expirationMilliseconds }
420 : {}
421 415
422 const result = await this.client.set(this.prefix + key, value, options) 416 if (expirationMilliseconds !== undefined) {
417 result = await this.client.set(this.prefix + key, value, 'PX', expirationMilliseconds)
418 } else {
419 result = await this.client.set(this.prefix + key, value)
420 }
423 421
424 if (result !== 'OK') throw new Error('Redis set result is not OK.') 422 if (result !== 'OK') throw new Error('Redis set result is not OK.')
425 } 423 }
diff --git a/server/middlewares/cache/shared/api-cache.ts b/server/middlewares/cache/shared/api-cache.ts
index abc919339..9e15bf2d6 100644
--- a/server/middlewares/cache/shared/api-cache.ts
+++ b/server/middlewares/cache/shared/api-cache.ts
@@ -49,7 +49,7 @@ export class ApiCache {
49 if (!Redis.Instance.isConnected()) return this.makeResponseCacheable(res, next, key, duration) 49 if (!Redis.Instance.isConnected()) return this.makeResponseCacheable(res, next, key, duration)
50 50
51 try { 51 try {
52 const obj = await redis.hGetAll(key) 52 const obj = await redis.hgetall(key)
53 if (obj?.response) { 53 if (obj?.response) {
54 return this.sendCachedResponse(req, res, JSON.parse(obj.response), duration) 54 return this.sendCachedResponse(req, res, JSON.parse(obj.response), duration)
55 } 55 }
@@ -100,8 +100,8 @@ export class ApiCache {
100 100
101 if (Redis.Instance.isConnected()) { 101 if (Redis.Instance.isConnected()) {
102 await Promise.all([ 102 await Promise.all([
103 redis.hSet(key, 'response', JSON.stringify(value)), 103 redis.hset(key, 'response', JSON.stringify(value)),
104 redis.hSet(key, 'duration', duration + ''), 104 redis.hset(key, 'duration', duration + ''),
105 redis.expire(key, duration / 1000) 105 redis.expire(key, duration / 1000)
106 ]) 106 ])
107 } 107 }