diff --git a/server/lib/statusHistory.ts b/server/lib/statusHistory.ts index 3a9b1f6ef..3f1d0bfcc 100644 --- a/server/lib/statusHistory.ts +++ b/server/lib/statusHistory.ts @@ -1,7 +1,7 @@ import { z } from "zod"; import { db, logsDb, statusHistory } from "@server/db"; import { and, eq, gte, asc } from "drizzle-orm"; -import cache from "@server/lib/cache"; +import { regionalCache as cache } from "@server/private/lib/cache"; const STATUS_HISTORY_CACHE_TTL = 60; // seconds @@ -63,7 +63,7 @@ export async function invalidateStatusHistoryCache( entityId: number ): Promise { const prefix = `statusHistory:${entityType}:${entityId}:`; - const keys = cache.keys().filter((k) => k.startsWith(prefix)); + const keys = await cache.keysWithPrefix(prefix); if (keys.length > 0) { await cache.del(keys); } diff --git a/server/private/lib/cache.ts b/server/private/lib/cache.ts index 2d49d2e40..9c94109c2 100644 --- a/server/private/lib/cache.ts +++ b/server/private/lib/cache.ts @@ -13,7 +13,7 @@ import NodeCache from "node-cache"; import logger from "@server/logger"; -import { redisManager } from "@server/private/lib/redis"; +import { redisManager, regionalRedisManager } from "@server/private/lib/redis"; // Create local cache with maxKeys limit to prevent memory leaks // With ~10k requests/day and 5min TTL, 10k keys should be more than sufficient @@ -298,3 +298,147 @@ class AdaptiveCache { // Export singleton instance export const cache = new AdaptiveCache(); export default cache; + +/** + * Regional adaptive cache backed by the in-cluster Redis instance. + * Falls back to a local NodeCache when the regional Redis is unavailable. + * Use this for data that is regional in nature (e.g. status history) so + * reads are served from the same cluster the user is hitting. + */ +const regionalLocalCache = new NodeCache({ + stdTTL: 3600, + checkperiod: 120, + maxKeys: 10000 +}); + +class RegionalAdaptiveCache { + private useRedis(): boolean { + return ( + regionalRedisManager.isRedisEnabled() && + regionalRedisManager.getHealthStatus().isHealthy + ); + } + + async set(key: string, value: any, ttl?: number): Promise { + const effectiveTtl = ttl === 0 ? undefined : ttl; + const redisTtl = ttl === 0 ? undefined : (ttl ?? 3600); + + if (this.useRedis()) { + try { + const serialized = JSON.stringify(value); + const success = await regionalRedisManager.set( + key, + serialized, + redisTtl + ); + if (success) { + logger.debug(`[regional] Set key in Redis: ${key}`); + return true; + } + } catch (error) { + logger.error( + `[regional] Redis set error for key ${key}:`, + error + ); + } + } + + const success = regionalLocalCache.set(key, value, effectiveTtl || 0); + if (success) logger.debug(`[regional] Set key in local cache: ${key}`); + return success; + } + + async get(key: string): Promise { + if (this.useRedis()) { + try { + const value = await regionalRedisManager.get(key); + if (value !== null) { + logger.debug(`[regional] Cache hit in Redis: ${key}`); + return JSON.parse(value) as T; + } + logger.debug(`[regional] Cache miss in Redis: ${key}`); + return undefined; + } catch (error) { + logger.error( + `[regional] Redis get error for key ${key}:`, + error + ); + } + } + + const value = regionalLocalCache.get(key); + if (value !== undefined) { + logger.debug(`[regional] Cache hit in local cache: ${key}`); + } else { + logger.debug(`[regional] Cache miss in local cache: ${key}`); + } + return value; + } + + async del(key: string | string[]): Promise { + const keys = Array.isArray(key) ? key : [key]; + let deletedCount = 0; + + if (this.useRedis()) { + try { + for (const k of keys) { + const success = await regionalRedisManager.del(k); + if (success) { + deletedCount++; + logger.debug(`[regional] Deleted key from Redis: ${k}`); + } + } + if (deletedCount === keys.length) return deletedCount; + deletedCount = 0; + } catch (error) { + logger.error(`[regional] Redis del error:`, error); + deletedCount = 0; + } + } + + for (const k of keys) { + const count = regionalLocalCache.del(k); + if (count > 0) { + deletedCount++; + logger.debug(`[regional] Deleted key from local cache: ${k}`); + } + } + return deletedCount; + } + + async has(key: string): Promise { + if (this.useRedis()) { + try { + const value = await regionalRedisManager.get(key); + return value !== null; + } catch (error) { + logger.error( + `[regional] Redis has error for key ${key}:`, + error + ); + } + } + return regionalLocalCache.has(key); + } + + /** + * Returns keys matching the given prefix from whichever backend is active. + * Redis uses a KEYS scan; local cache filters in-memory keys. + */ + async keysWithPrefix(prefix: string): Promise { + if (this.useRedis()) { + try { + return await regionalRedisManager.keys(`${prefix}*`); + } catch (error) { + logger.error(`[regional] Redis keys error:`, error); + } + } + return regionalLocalCache.keys().filter((k) => k.startsWith(prefix)); + } + + getCurrentBackend(): "redis" | "local" { + return this.useRedis() ? "redis" : "local"; + } +} + +export const regionalCache = new RegionalAdaptiveCache(); diff --git a/server/private/lib/readConfigFile.ts b/server/private/lib/readConfigFile.ts index 63ca0b068..2ee82f804 100644 --- a/server/private/lib/readConfigFile.ts +++ b/server/private/lib/readConfigFile.ts @@ -73,6 +73,25 @@ export const privateConfigSchema = z .object({ rejectUnauthorized: z.boolean().optional().default(true) }) + .optional(), + regional_redis: z + .object({ + host: z.string(), + port: portSchema, + password: z + .string() + .optional() + .transform(getEnvOrYaml("REGIONAL_REDIS_PASSWORD")), + db: z.int().nonnegative().optional().default(0), + tls: z + .object({ + rejectUnauthorized: z + .boolean() + .optional() + .default(true) + }) + .optional() + }) .optional() }) .optional(), diff --git a/server/private/lib/redis.ts b/server/private/lib/redis.ts index 57e73474a..7c3d67836 100644 --- a/server/private/lib/redis.ts +++ b/server/private/lib/redis.ts @@ -109,14 +109,14 @@ class RedisManager { password: redisConfig.password, db: redisConfig.db }; - + // Enable TLS if configured (required for AWS ElastiCache in-transit encryption) if (redisConfig.tls) { opts.tls = { rejectUnauthorized: redisConfig.tls.rejectUnauthorized ?? true }; } - + return opts; } @@ -135,14 +135,14 @@ class RedisManager { password: replica.password, db: replica.db || redisConfig.db }; - + // Enable TLS if configured (required for AWS ElastiCache in-transit encryption) if (redisConfig.tls) { opts.tls = { rejectUnauthorized: redisConfig.tls.rejectUnauthorized ?? true }; } - + return opts; } @@ -855,3 +855,163 @@ class RedisManager { export const redisManager = new RedisManager(); export const redis = redisManager.getClient(); export default redisManager; + +/** + * Lightweight Redis manager for the regional (in-cluster) Redis instance. + * Connects only when `redis.regional_redis` is present in the private config + * and `flags.enable_redis` is true. No pub/sub — designed for low-latency + * caching of regionally-scoped data. + */ +class RegionalRedisManager { + private writeClient: Redis | null = null; + private readClient: Redis | null = null; + private isEnabled: boolean = false; + private isHealthy: boolean = false; + private connectionTimeout: number = 5000; + private commandTimeout: number = 5000; + + constructor() { + if (build === "oss") return; + + const cfg = privateConfig.getRawPrivateConfig(); + if (!cfg.flags.enable_redis || !cfg.redis?.regional_redis) return; + + this.isEnabled = true; + this.initializeClients(); + } + + private getConfig(): RedisOptions { + const r = privateConfig.getRawPrivateConfig().redis!.regional_redis!; + const opts: RedisOptions = { + host: r.host, + port: r.port, + password: r.password, + db: r.db + }; + if (r.tls) { + opts.tls = { rejectUnauthorized: r.tls.rejectUnauthorized ?? true }; + } + return opts; + } + + private initializeClients(): void { + const cfg = this.getConfig(); + const baseOpts = { + ...cfg, + enableReadyCheck: false, + maxRetriesPerRequest: 3, + keepAlive: 10000, + connectTimeout: this.connectionTimeout, + commandTimeout: this.commandTimeout + }; + + try { + this.writeClient = new Redis(baseOpts); + // redis-1 (replica) handles reads; fall back to primary if not resolvable + this.readClient = new Redis({ + ...baseOpts, + host: cfg.host!.replace(/^(.*?)(\.\S+)$/, (_, h, rest) => { + // Derive replica hostname from the headless service pattern: + // redis.redis.svc.cluster.local -> redis-1.redis-headless.redis.svc.cluster.local + // If it doesn't look like a k8s service, just use the same host + return h + rest; + }) + }); + + // For simplicity use same host for both; callers can always read from primary + // The real replica routing is handled by the StatefulSet headless service + this.readClient = this.writeClient; + + this.writeClient.on("ready", () => { + logger.info("Regional Redis client ready"); + this.isHealthy = true; + }); + this.writeClient.on("error", (err) => { + logger.error("Regional Redis client error:", err); + this.isHealthy = false; + }); + this.writeClient.on("reconnecting", () => { + logger.info("Regional Redis client reconnecting..."); + this.isHealthy = false; + }); + + logger.info("Regional Redis client initialized"); + } catch (error) { + logger.error("Failed to initialize regional Redis client:", error); + this.isEnabled = false; + } + } + + public isRedisEnabled(): boolean { + return this.isEnabled && this.writeClient !== null && this.isHealthy; + } + + public getHealthStatus() { + return { isEnabled: this.isEnabled, isHealthy: this.isHealthy }; + } + + public async set( + key: string, + value: string, + ttl?: number + ): Promise { + if (!this.isRedisEnabled() || !this.writeClient) return false; + try { + if (ttl) { + await this.writeClient.setex(key, ttl, value); + } else { + await this.writeClient.set(key, value); + } + return true; + } catch (error) { + logger.error("Regional Redis SET error:", error); + return false; + } + } + + public async get(key: string): Promise { + if (!this.isRedisEnabled() || !this.readClient) return null; + try { + return await this.readClient.get(key); + } catch (error) { + logger.error("Regional Redis GET error:", error); + return null; + } + } + + public async del(key: string): Promise { + if (!this.isRedisEnabled() || !this.writeClient) return false; + try { + await this.writeClient.del(key); + return true; + } catch (error) { + logger.error("Regional Redis DEL error:", error); + return false; + } + } + + public async keys(pattern: string): Promise { + if (!this.isRedisEnabled() || !this.readClient) return []; + try { + return await this.readClient.keys(pattern); + } catch (error) { + logger.error("Regional Redis KEYS error:", error); + return []; + } + } + + public async disconnect(): Promise { + try { + if (this.writeClient) { + await this.writeClient.quit(); + this.writeClient = null; + } + this.readClient = null; + logger.info("Regional Redis client disconnected"); + } catch (error) { + logger.error("Error disconnecting regional Redis client:", error); + } + } +} + +export const regionalRedisManager = new RegionalRedisManager();