From 1c18b2bffbdddac0a90eeb7f190ad73e833f9de8 Mon Sep 17 00:00:00 2001 From: Owen Date: Wed, 28 May 2025 20:59:06 -0400 Subject: [PATCH 1/2] Create redis connection --- server/db/redis.ts | 316 +++++++++++++++++++++++++++++++++++++++++++ server/routers/ws.ts | 217 ++++++++++++++++++++++++++--- 2 files changed, 511 insertions(+), 22 deletions(-) create mode 100644 server/db/redis.ts diff --git a/server/db/redis.ts b/server/db/redis.ts new file mode 100644 index 00000000..bae80099 --- /dev/null +++ b/server/db/redis.ts @@ -0,0 +1,316 @@ +import Redis from 'ioredis'; +import logger from '@server/logger'; + +interface RedisConfig { + host: string; + port: number; + password?: string; + db?: number; + retryDelayOnFailover?: number; + maxRetriesPerRequest?: number; +} + +class RedisManager { + private static instance: RedisManager; + private client: Redis | null = null; + private subscriber: Redis | null = null; + private publisher: Redis | null = null; + private isEnabled: boolean = false; + private subscribers: Map void>> = new Map(); + + private constructor() { + this.isEnabled = !!process.env.REDIS; + if (this.isEnabled) { + this.initializeClients(); + } + } + + public static getInstance(): RedisManager { + if (!RedisManager.instance) { + RedisManager.instance = new RedisManager(); + } + return RedisManager.instance; + } + + private getRedisConfig(): RedisConfig { + return { + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379'), + password: process.env.REDIS_PASSWORD, + db: parseInt(process.env.REDIS_DB || '0'), + retryDelayOnFailover: 100, + maxRetriesPerRequest: 3, + }; + } + + private initializeClients(): void { + const config = this.getRedisConfig(); + + try { + // Main client for general operations + this.client = new Redis(config); + + // Dedicated publisher client + this.publisher = new Redis(config); + + // Dedicated subscriber client + this.subscriber = new Redis(config); + + // Set up error handlers + this.client.on('error', (err) => { + logger.error('Redis client error:', err); + }); + + this.publisher.on('error', (err) => { + logger.error('Redis publisher error:', err); + }); + + this.subscriber.on('error', (err) => { + logger.error('Redis subscriber error:', err); + }); + + // Set up connection handlers + this.client.on('connect', () => { + logger.info('Redis client connected'); + }); + + this.publisher.on('connect', () => { + logger.info('Redis publisher connected'); + }); + + this.subscriber.on('connect', () => { + logger.info('Redis subscriber connected'); + }); + + // Set up message handler for subscriber + this.subscriber.on('message', (channel: string, message: string) => { + const channelSubscribers = this.subscribers.get(channel); + if (channelSubscribers) { + channelSubscribers.forEach(callback => { + try { + callback(channel, message); + } catch (error) { + logger.error(`Error in subscriber callback for channel ${channel}:`, error); + } + }); + } + }); + + logger.info('Redis clients initialized successfully'); + } catch (error) { + logger.error('Failed to initialize Redis clients:', error); + this.isEnabled = false; + } + } + + public isRedisEnabled(): boolean { + return this.isEnabled && this.client !== null; + } + + public getClient(): Redis | null { + return this.client; + } + + public async set(key: string, value: string, ttl?: number): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + if (ttl) { + await this.client.setex(key, ttl, value); + } else { + await this.client.set(key, value); + } + return true; + } catch (error) { + logger.error('Redis SET error:', error); + return false; + } + } + + public async get(key: string): Promise { + if (!this.isRedisEnabled() || !this.client) return null; + + try { + return await this.client.get(key); + } catch (error) { + logger.error('Redis GET error:', error); + return null; + } + } + + public async del(key: string): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.del(key); + return true; + } catch (error) { + logger.error('Redis DEL error:', error); + return false; + } + } + + public async sadd(key: string, member: string): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.sadd(key, member); + return true; + } catch (error) { + logger.error('Redis SADD error:', error); + return false; + } + } + + public async srem(key: string, member: string): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.srem(key, member); + return true; + } catch (error) { + logger.error('Redis SREM error:', error); + return false; + } + } + + public async smembers(key: string): Promise { + if (!this.isRedisEnabled() || !this.client) return []; + + try { + return await this.client.smembers(key); + } catch (error) { + logger.error('Redis SMEMBERS error:', error); + return []; + } + } + + public async hset(key: string, field: string, value: string): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.hset(key, field, value); + return true; + } catch (error) { + logger.error('Redis HSET error:', error); + return false; + } + } + + public async hget(key: string, field: string): Promise { + if (!this.isRedisEnabled() || !this.client) return null; + + try { + return await this.client.hget(key, field); + } catch (error) { + logger.error('Redis HGET error:', error); + return null; + } + } + + public async hdel(key: string, field: string): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.hdel(key, field); + return true; + } catch (error) { + logger.error('Redis HDEL error:', error); + return false; + } + } + + public async hgetall(key: string): Promise> { + if (!this.isRedisEnabled() || !this.client) return {}; + + try { + return await this.client.hgetall(key); + } catch (error) { + logger.error('Redis HGETALL error:', error); + return {}; + } + } + + public async publish(channel: string, message: string): Promise { + if (!this.isRedisEnabled() || !this.publisher) return false; + + try { + await this.publisher.publish(channel, message); + return true; + } catch (error) { + logger.error('Redis PUBLISH error:', error); + return false; + } + } + + public async subscribe(channel: string, callback: (channel: string, message: string) => void): Promise { + if (!this.isRedisEnabled() || !this.subscriber) return false; + + try { + // Add callback to subscribers map + if (!this.subscribers.has(channel)) { + this.subscribers.set(channel, new Set()); + // Only subscribe to the channel if it's the first subscriber + await this.subscriber.subscribe(channel); + } + + this.subscribers.get(channel)!.add(callback); + return true; + } catch (error) { + logger.error('Redis SUBSCRIBE error:', error); + return false; + } + } + + public async unsubscribe(channel: string, callback?: (channel: string, message: string) => void): Promise { + if (!this.isRedisEnabled() || !this.subscriber) return false; + + try { + const channelSubscribers = this.subscribers.get(channel); + if (!channelSubscribers) return true; + + if (callback) { + // Remove specific callback + channelSubscribers.delete(callback); + if (channelSubscribers.size === 0) { + this.subscribers.delete(channel); + await this.subscriber.unsubscribe(channel); + } + } else { + // Remove all callbacks for this channel + this.subscribers.delete(channel); + await this.subscriber.unsubscribe(channel); + } + + return true; + } catch (error) { + logger.error('Redis UNSUBSCRIBE error:', error); + return false; + } + } + + public async disconnect(): Promise { + try { + if (this.client) { + await this.client.quit(); + this.client = null; + } + if (this.publisher) { + await this.publisher.quit(); + this.publisher = null; + } + if (this.subscriber) { + await this.subscriber.quit(); + this.subscriber = null; + } + this.subscribers.clear(); + logger.info('Redis clients disconnected'); + } catch (error) { + logger.error('Error disconnecting Redis clients:', error); + } + } +} + +// Export singleton instance +export const redisManager = RedisManager.getInstance(); +export default redisManager; \ No newline at end of file diff --git a/server/routers/ws.ts b/server/routers/ws.ts index c4ee8874..c953a60c 100644 --- a/server/routers/ws.ts +++ b/server/routers/ws.ts @@ -9,6 +9,8 @@ import db from "@server/db"; import { validateNewtSessionToken } from "@server/auth/sessions/newt"; import { messageHandlers } from "./messageHandlers"; import logger from "@server/logger"; +import redisManager from "@server/db/redis"; +import { v4 as uuidv4 } from "uuid"; // Custom interfaces interface WebSocketRequest extends IncomingMessage { @@ -17,6 +19,7 @@ interface WebSocketRequest extends IncomingMessage { interface AuthenticatedWebSocket extends WebSocket { newt?: Newt; + connectionId?: string; } interface TokenPayload { @@ -40,45 +43,113 @@ interface HandlerContext { message: WSMessage; senderWs: WebSocket; newt: Newt | undefined; - sendToClient: (newtId: string, message: WSMessage) => boolean; - broadcastToAllExcept: (message: WSMessage, excludeNewtId?: string) => void; + sendToClient: (newtId: string, message: WSMessage) => Promise; + broadcastToAllExcept: (message: WSMessage, excludeNewtId?: string) => Promise; connectedClients: Map; } +interface RedisMessage { + type: 'direct' | 'broadcast'; + targetNewtId?: string; + excludeNewtId?: string; + message: WSMessage; + fromNodeId: string; +} + export type MessageHandler = (context: HandlerContext) => Promise; const router: Router = Router(); const wss: WebSocketServer = new WebSocketServer({ noServer: true }); -// Client tracking map +// Generate unique node ID for this instance +const NODE_ID = uuidv4(); +const REDIS_CHANNEL = 'websocket_messages'; + +// Client tracking map (local to this node) let connectedClients: Map = new Map(); +// Redis keys +const getConnectionsKey = (newtId: string) => `ws:connections:${newtId}`; +const getNodeConnectionsKey = (nodeId: string, newtId: string) => `ws:node:${nodeId}:${newtId}`; + +// Initialize Redis subscription for cross-node messaging +const initializeRedisSubscription = async (): Promise => { + if (!redisManager.isRedisEnabled()) return; + + await redisManager.subscribe(REDIS_CHANNEL, async (channel: string, message: string) => { + try { + const redisMessage: RedisMessage = JSON.parse(message); + + // Ignore messages from this node + if (redisMessage.fromNodeId === NODE_ID) return; + + if (redisMessage.type === 'direct' && redisMessage.targetNewtId) { + // Send to specific client on this node + await sendToClientLocal(redisMessage.targetNewtId, redisMessage.message); + } else if (redisMessage.type === 'broadcast') { + // Broadcast to all clients on this node except excluded + await broadcastToAllExceptLocal(redisMessage.message, redisMessage.excludeNewtId); + } + } catch (error) { + logger.error('Error processing Redis message:', error); + } + }); +}; + // Helper functions for client management -const addClient = (newtId: string, ws: AuthenticatedWebSocket): void => { +const addClient = async (newtId: string, ws: AuthenticatedWebSocket): Promise => { + // Generate unique connection ID + const connectionId = uuidv4(); + ws.connectionId = connectionId; + + // Add to local tracking const existingClients = connectedClients.get(newtId) || []; existingClients.push(ws); connectedClients.set(newtId, existingClients); - logger.info(`Client added to tracking - Newt ID: ${newtId}, Total connections: ${existingClients.length}`); + + // Add to Redis tracking if enabled + if (redisManager.isRedisEnabled()) { + // Add this node to the set of nodes handling this newt + await redisManager.sadd(getConnectionsKey(newtId), NODE_ID); + + // Track specific connection on this node + await redisManager.hset(getNodeConnectionsKey(NODE_ID, newtId), connectionId, Date.now().toString()); + } + + logger.info(`Client added to tracking - Newt ID: ${newtId}, Connection ID: ${connectionId}, Total connections: ${existingClients.length}`); }; -const removeClient = (newtId: string, ws: AuthenticatedWebSocket): void => { +const removeClient = async (newtId: string, ws: AuthenticatedWebSocket): Promise => { + // Remove from local tracking const existingClients = connectedClients.get(newtId) || []; const updatedClients = existingClients.filter(client => client !== ws); if (updatedClients.length === 0) { connectedClients.delete(newtId); + + // Remove from Redis tracking if enabled + if (redisManager.isRedisEnabled()) { + await redisManager.srem(getConnectionsKey(newtId), NODE_ID); + await redisManager.del(getNodeConnectionsKey(NODE_ID, newtId)); + } + logger.info(`All connections removed for Newt ID: ${newtId}`); } else { connectedClients.set(newtId, updatedClients); + + // Update Redis tracking if enabled + if (redisManager.isRedisEnabled() && ws.connectionId) { + await redisManager.hdel(getNodeConnectionsKey(NODE_ID, newtId), ws.connectionId); + } + logger.info(`Connection removed - Newt ID: ${newtId}, Remaining connections: ${updatedClients.length}`); } }; -// Helper functions for sending messages -const sendToClient = (newtId: string, message: WSMessage): boolean => { +// Local message sending (within this node) +const sendToClientLocal = async (newtId: string, message: WSMessage): Promise => { const clients = connectedClients.get(newtId); if (!clients || clients.length === 0) { - logger.info(`No active connections found for Newt ID: ${newtId}`); return false; } @@ -91,7 +162,7 @@ const sendToClient = (newtId: string, message: WSMessage): boolean => { return true; }; -const broadcastToAllExcept = (message: WSMessage, excludeNewtId?: string): void => { +const broadcastToAllExceptLocal = async (message: WSMessage, excludeNewtId?: string): Promise => { connectedClients.forEach((clients, newtId) => { if (newtId !== excludeNewtId) { clients.forEach(client => { @@ -103,6 +174,65 @@ const broadcastToAllExcept = (message: WSMessage, excludeNewtId?: string): void }); }; +// Cross-node message sending (via Redis) +const sendToClient = async (newtId: string, message: WSMessage): Promise => { + // Try to send locally first + const localSent = await sendToClientLocal(newtId, message); + + // If Redis is enabled, also send via Redis pub/sub to other nodes + if (redisManager.isRedisEnabled()) { + const redisMessage: RedisMessage = { + type: 'direct', + targetNewtId: newtId, + message, + fromNodeId: NODE_ID + }; + + await redisManager.publish(REDIS_CHANNEL, JSON.stringify(redisMessage)); + } + + return localSent; +}; + +const broadcastToAllExcept = async (message: WSMessage, excludeNewtId?: string): Promise => { + // Broadcast locally + await broadcastToAllExceptLocal(message, excludeNewtId); + + // If Redis is enabled, also broadcast via Redis pub/sub to other nodes + if (redisManager.isRedisEnabled()) { + const redisMessage: RedisMessage = { + type: 'broadcast', + excludeNewtId, + message, + fromNodeId: NODE_ID + }; + + await redisManager.publish(REDIS_CHANNEL, JSON.stringify(redisMessage)); + } +}; + +// Check if a newt has active connections across all nodes +const hasActiveConnections = async (newtId: string): Promise => { + if (!redisManager.isRedisEnabled()) { + // Fallback to local check + const clients = connectedClients.get(newtId); + return !!(clients && clients.length > 0); + } + + const activeNodes = await redisManager.smembers(getConnectionsKey(newtId)); + return activeNodes.length > 0; +}; + +// Get all active nodes for a newt +const getActiveNodes = async (newtId: string): Promise => { + if (!redisManager.isRedisEnabled()) { + const clients = connectedClients.get(newtId); + return (clients && clients.length > 0) ? [NODE_ID] : []; + } + + return await redisManager.smembers(getConnectionsKey(newtId)); +}; + // Token verification middleware (unchanged) const verifyToken = async (token: string): Promise => { try { @@ -128,7 +258,7 @@ const verifyToken = async (token: string): Promise => { } }; -const setupConnection = (ws: AuthenticatedWebSocket, newt: Newt): void => { +const setupConnection = async (ws: AuthenticatedWebSocket, newt: Newt): Promise => { logger.info("Establishing websocket connection"); if (!newt) { @@ -139,12 +269,11 @@ const setupConnection = (ws: AuthenticatedWebSocket, newt: Newt): void => { ws.newt = newt; // Add client to tracking - addClient(newt.newtId, ws); + await addClient(newt.newtId, ws); ws.on("message", async (data) => { try { const message: WSMessage = JSON.parse(data.toString()); - // logger.info(`Message received from Newt ID ${newtId}:`, message); // Validate message format if (!message.type || typeof message.type !== "string") { @@ -171,10 +300,10 @@ const setupConnection = (ws: AuthenticatedWebSocket, newt: Newt): void => { if (response) { if (response.broadcast) { // Broadcast to all clients except sender if specified - broadcastToAllExcept(response.message, response.excludeSender ? newt.newtId : undefined); + await broadcastToAllExcept(response.message, response.excludeSender ? newt.newtId : undefined); } else if (response.targetNewtId) { // Send to specific client if targetNewtId is provided - sendToClient(response.targetNewtId, response.message); + await sendToClient(response.targetNewtId, response.message); } else { // Send back to sender ws.send(JSON.stringify(response.message)); @@ -193,8 +322,8 @@ const setupConnection = (ws: AuthenticatedWebSocket, newt: Newt): void => { } }); - ws.on("close", () => { - removeClient(newt.newtId, ws); + ws.on("close", async () => { + await removeClient(newt.newtId, ws); logger.info(`Client disconnected - Newt ID: ${newt.newtId}`); }); @@ -202,7 +331,7 @@ const setupConnection = (ws: AuthenticatedWebSocket, newt: Newt): void => { logger.error(`WebSocket error for Newt ID ${newt.newtId}:`, error); }); - logger.info(`WebSocket connection established - Newt ID: ${newt.newtId}`); + logger.info(`WebSocket connection established - Newt ID: ${newt.newtId}, Node ID: ${NODE_ID}`); }; // Router endpoint (unchanged) @@ -233,8 +362,8 @@ const handleWSUpgrade = (server: HttpServer): void => { return; } - wss.handleUpgrade(request, socket, head, (ws: AuthenticatedWebSocket) => { - setupConnection(ws, tokenPayload.newt); + wss.handleUpgrade(request, socket, head, async (ws: AuthenticatedWebSocket) => { + await setupConnection(ws, tokenPayload.newt); }); } catch (error) { logger.error("WebSocket upgrade error:", error); @@ -244,10 +373,54 @@ const handleWSUpgrade = (server: HttpServer): void => { }); }; +// Initialize Redis subscription when the module is loaded +if (redisManager.isRedisEnabled()) { + initializeRedisSubscription().catch(error => { + logger.error('Failed to initialize Redis subscription:', error); + }); + logger.info(`WebSocket handler initialized with Redis support - Node ID: ${NODE_ID}`); +} else { + logger.info('WebSocket handler initialized in local mode (Redis disabled)'); +} + +// Cleanup function for graceful shutdown +const cleanup = async (): Promise => { + try { + // Close all WebSocket connections + connectedClients.forEach((clients) => { + clients.forEach(client => { + if (client.readyState === WebSocket.OPEN) { + client.terminate(); + } + }); + }); + + // Clean up Redis tracking for this node + if (redisManager.isRedisEnabled()) { + const keys = await redisManager.getClient()?.keys(`ws:node:${NODE_ID}:*`) || []; + if (keys.length > 0) { + await Promise.all(keys.map(key => redisManager.del(key))); + } + } + + logger.info('WebSocket cleanup completed'); + } catch (error) { + logger.error('Error during WebSocket cleanup:', error); + } +}; + +// Handle process termination +process.on('SIGTERM', cleanup); +process.on('SIGINT', cleanup); + export { router, handleWSUpgrade, sendToClient, broadcastToAllExcept, - connectedClients -}; + connectedClients, + hasActiveConnections, + getActiveNodes, + NODE_ID, + cleanup +}; \ No newline at end of file From 139c9d2ce371791c68f3170bda7508ad7fec9d4d Mon Sep 17 00:00:00 2001 From: miloschwartz Date: Fri, 13 Jun 2025 16:42:15 -0400 Subject: [PATCH 2/2] add redis conn to config --- package-lock.json | 88 ++++++ package.json | 1 + server/db/redis.ts | 593 ++++++++++++++++++----------------- server/lib/readConfigFile.ts | 26 ++ 4 files changed, 420 insertions(+), 288 deletions(-) diff --git a/package-lock.json b/package-lock.json index f97c5ad4..07674a00 100644 --- a/package-lock.json +++ b/package-lock.json @@ -58,6 +58,7 @@ "http-errors": "2.0.0", "i": "^0.3.7", "input-otp": "1.4.2", + "ioredis": "^5.6.1", "jmespath": "^0.16.0", "js-yaml": "4.1.0", "jsonwebtoken": "^9.0.2", @@ -1971,6 +1972,12 @@ "url": "https://opencollective.com/libvips" } }, + "node_modules/@ioredis/commands": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.2.0.tgz", + "integrity": "sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==", + "license": "MIT" + }, "node_modules/@isaacs/balanced-match": { "version": "4.0.1", "resolved": "https://registry.npmjs.org/@isaacs/balanced-match/-/balanced-match-4.0.1.tgz", @@ -6254,6 +6261,15 @@ "node": ">=6" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/cmdk": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/cmdk/-/cmdk-1.1.1.tgz", @@ -6691,6 +6707,15 @@ "node": ">=0.4.0" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -8853,6 +8878,30 @@ "tslib": "^2.8.0" } }, + "node_modules/ioredis": { + "version": "5.6.1", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.6.1.tgz", + "integrity": "sha512-UxC0Yv1Y4WRJiGQxQkP0hfdL0/5/6YvdfOOClRgJ0qppSarkhneSa6UvkMkms0AkdGimSH3Ikqm+6mkMmX7vGA==", + "license": "MIT", + "dependencies": { + "@ioredis/commands": "^1.1.1", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, "node_modules/ipaddr.js": { "version": "1.9.1", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", @@ -9810,12 +9859,24 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "license": "MIT" + }, "node_modules/lodash.includes": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/lodash.includes/-/lodash.includes-4.3.0.tgz", "integrity": "sha512-W3Bx6mdkRTGtlJISOvVD/lbqjTlPPUDTMnlXZFnVwi9NKJ6tiAk6LVdlhZMm17VZisqhKcgzpO5Wz91PCt5b0w==", "license": "MIT" }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "license": "MIT" + }, "node_modules/lodash.isboolean": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/lodash.isboolean/-/lodash.isboolean-3.0.3.tgz", @@ -14402,6 +14463,27 @@ "node": ">=0.8.8" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "license": "MIT", + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "license": "MIT", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/reflect.getprototypeof": { "version": "1.0.10", "resolved": "https://registry.npmjs.org/reflect.getprototypeof/-/reflect.getprototypeof-1.0.10.tgz", @@ -15173,6 +15255,12 @@ "node": "*" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "license": "MIT" + }, "node_modules/statuses": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", diff --git a/package.json b/package.json index 040a5453..02d26197 100644 --- a/package.json +++ b/package.json @@ -75,6 +75,7 @@ "http-errors": "2.0.0", "i": "^0.3.7", "input-otp": "1.4.2", + "ioredis": "^5.6.1", "jmespath": "^0.16.0", "js-yaml": "4.1.0", "jsonwebtoken": "^9.0.2", diff --git a/server/db/redis.ts b/server/db/redis.ts index bae80099..80f1e690 100644 --- a/server/db/redis.ts +++ b/server/db/redis.ts @@ -1,316 +1,333 @@ -import Redis from 'ioredis'; -import logger from '@server/logger'; - -interface RedisConfig { - host: string; - port: number; - password?: string; - db?: number; - retryDelayOnFailover?: number; - maxRetriesPerRequest?: number; -} +import Redis, { RedisOptions } from "ioredis"; +import logger from "@server/logger"; +import config from "@server/lib/config"; class RedisManager { - private static instance: RedisManager; - private client: Redis | null = null; - private subscriber: Redis | null = null; - private publisher: Redis | null = null; - private isEnabled: boolean = false; - private subscribers: Map void>> = new Map(); + private static instance: RedisManager; + private client: Redis | null = null; + private subscriber: Redis | null = null; + private publisher: Redis | null = null; + private isEnabled: boolean = false; + private subscribers: Map< + string, + Set<(channel: string, message: string) => void> + > = new Map(); - private constructor() { - this.isEnabled = !!process.env.REDIS; - if (this.isEnabled) { - this.initializeClients(); + private constructor() { + this.isEnabled = config.getRawConfig().redis?.enabled || false; + if (this.isEnabled) { + this.initializeClients(); + } } - } - public static getInstance(): RedisManager { - if (!RedisManager.instance) { - RedisManager.instance = new RedisManager(); + public static getInstance(): RedisManager { + if (!RedisManager.instance) { + RedisManager.instance = new RedisManager(); + } + return RedisManager.instance; } - return RedisManager.instance; - } - private getRedisConfig(): RedisConfig { - return { - host: process.env.REDIS_HOST || 'localhost', - port: parseInt(process.env.REDIS_PORT || '6379'), - password: process.env.REDIS_PASSWORD, - db: parseInt(process.env.REDIS_DB || '0'), - retryDelayOnFailover: 100, - maxRetriesPerRequest: 3, - }; - } + private getRedisConfig(): RedisOptions { + const redisConfig = config.getRawConfig().redis!; + const opts: RedisOptions = { + host: redisConfig.host!, + port: redisConfig.port!, + password: redisConfig.password, + db: redisConfig.db, + tls: { + rejectUnauthorized: false + }, + }; + return opts; + } - private initializeClients(): void { - const config = this.getRedisConfig(); + private initializeClients(): void { + const config = this.getRedisConfig(); - try { - // Main client for general operations - this.client = new Redis(config); - - // Dedicated publisher client - this.publisher = new Redis(config); - - // Dedicated subscriber client - this.subscriber = new Redis(config); + try { + // Main client for general operations + this.client = new Redis(config); - // Set up error handlers - this.client.on('error', (err) => { - logger.error('Redis client error:', err); - }); + // Dedicated publisher client + this.publisher = new Redis(config); - this.publisher.on('error', (err) => { - logger.error('Redis publisher error:', err); - }); + // Dedicated subscriber client + this.subscriber = new Redis(config); - this.subscriber.on('error', (err) => { - logger.error('Redis subscriber error:', err); - }); + // Set up error handlers + this.client.on("error", (err) => { + logger.error("Redis client error:", err); + }); - // Set up connection handlers - this.client.on('connect', () => { - logger.info('Redis client connected'); - }); + this.publisher.on("error", (err) => { + logger.error("Redis publisher error:", err); + }); - this.publisher.on('connect', () => { - logger.info('Redis publisher connected'); - }); + this.subscriber.on("error", (err) => { + logger.error("Redis subscriber error:", err); + }); - this.subscriber.on('connect', () => { - logger.info('Redis subscriber connected'); - }); + // Set up connection handlers + this.client.on("connect", () => { + logger.info("Redis client connected"); + }); - // Set up message handler for subscriber - this.subscriber.on('message', (channel: string, message: string) => { - const channelSubscribers = this.subscribers.get(channel); - if (channelSubscribers) { - channelSubscribers.forEach(callback => { - try { - callback(channel, message); - } catch (error) { - logger.error(`Error in subscriber callback for channel ${channel}:`, error); + this.publisher.on("connect", () => { + logger.info("Redis publisher connected"); + }); + + this.subscriber.on("connect", () => { + logger.info("Redis subscriber connected"); + }); + + // Set up message handler for subscriber + this.subscriber.on( + "message", + (channel: string, message: string) => { + const channelSubscribers = this.subscribers.get(channel); + if (channelSubscribers) { + channelSubscribers.forEach((callback) => { + try { + callback(channel, message); + } catch (error) { + logger.error( + `Error in subscriber callback for channel ${channel}:`, + error + ); + } + }); + } + } + ); + + logger.info("Redis clients initialized successfully"); + } catch (error) { + logger.error("Failed to initialize Redis clients:", error); + this.isEnabled = false; + } + } + + public isRedisEnabled(): boolean { + return this.isEnabled && this.client !== null; + } + + public getClient(): Redis | null { + return this.client; + } + + public async set( + key: string, + value: string, + ttl?: number + ): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + if (ttl) { + await this.client.setex(key, ttl, value); + } else { + await this.client.set(key, value); } - }); + return true; + } catch (error) { + logger.error("Redis SET error:", error); + return false; } - }); - - logger.info('Redis clients initialized successfully'); - } catch (error) { - logger.error('Failed to initialize Redis clients:', error); - this.isEnabled = false; } - } - public isRedisEnabled(): boolean { - return this.isEnabled && this.client !== null; - } + public async get(key: string): Promise { + if (!this.isRedisEnabled() || !this.client) return null; - public getClient(): Redis | null { - return this.client; - } - - public async set(key: string, value: string, ttl?: number): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - if (ttl) { - await this.client.setex(key, ttl, value); - } else { - await this.client.set(key, value); - } - return true; - } catch (error) { - logger.error('Redis SET error:', error); - return false; - } - } - - public async get(key: string): Promise { - if (!this.isRedisEnabled() || !this.client) return null; - - try { - return await this.client.get(key); - } catch (error) { - logger.error('Redis GET error:', error); - return null; - } - } - - public async del(key: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.del(key); - return true; - } catch (error) { - logger.error('Redis DEL error:', error); - return false; - } - } - - public async sadd(key: string, member: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.sadd(key, member); - return true; - } catch (error) { - logger.error('Redis SADD error:', error); - return false; - } - } - - public async srem(key: string, member: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.srem(key, member); - return true; - } catch (error) { - logger.error('Redis SREM error:', error); - return false; - } - } - - public async smembers(key: string): Promise { - if (!this.isRedisEnabled() || !this.client) return []; - - try { - return await this.client.smembers(key); - } catch (error) { - logger.error('Redis SMEMBERS error:', error); - return []; - } - } - - public async hset(key: string, field: string, value: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.hset(key, field, value); - return true; - } catch (error) { - logger.error('Redis HSET error:', error); - return false; - } - } - - public async hget(key: string, field: string): Promise { - if (!this.isRedisEnabled() || !this.client) return null; - - try { - return await this.client.hget(key, field); - } catch (error) { - logger.error('Redis HGET error:', error); - return null; - } - } - - public async hdel(key: string, field: string): Promise { - if (!this.isRedisEnabled() || !this.client) return false; - - try { - await this.client.hdel(key, field); - return true; - } catch (error) { - logger.error('Redis HDEL error:', error); - return false; - } - } - - public async hgetall(key: string): Promise> { - if (!this.isRedisEnabled() || !this.client) return {}; - - try { - return await this.client.hgetall(key); - } catch (error) { - logger.error('Redis HGETALL error:', error); - return {}; - } - } - - public async publish(channel: string, message: string): Promise { - if (!this.isRedisEnabled() || !this.publisher) return false; - - try { - await this.publisher.publish(channel, message); - return true; - } catch (error) { - logger.error('Redis PUBLISH error:', error); - return false; - } - } - - public async subscribe(channel: string, callback: (channel: string, message: string) => void): Promise { - if (!this.isRedisEnabled() || !this.subscriber) return false; - - try { - // Add callback to subscribers map - if (!this.subscribers.has(channel)) { - this.subscribers.set(channel, new Set()); - // Only subscribe to the channel if it's the first subscriber - await this.subscriber.subscribe(channel); - } - - this.subscribers.get(channel)!.add(callback); - return true; - } catch (error) { - logger.error('Redis SUBSCRIBE error:', error); - return false; - } - } - - public async unsubscribe(channel: string, callback?: (channel: string, message: string) => void): Promise { - if (!this.isRedisEnabled() || !this.subscriber) return false; - - try { - const channelSubscribers = this.subscribers.get(channel); - if (!channelSubscribers) return true; - - if (callback) { - // Remove specific callback - channelSubscribers.delete(callback); - if (channelSubscribers.size === 0) { - this.subscribers.delete(channel); - await this.subscriber.unsubscribe(channel); + try { + return await this.client.get(key); + } catch (error) { + logger.error("Redis GET error:", error); + return null; } - } else { - // Remove all callbacks for this channel - this.subscribers.delete(channel); - await this.subscriber.unsubscribe(channel); - } - - return true; - } catch (error) { - logger.error('Redis UNSUBSCRIBE error:', error); - return false; } - } - public async disconnect(): Promise { - try { - if (this.client) { - await this.client.quit(); - this.client = null; - } - if (this.publisher) { - await this.publisher.quit(); - this.publisher = null; - } - if (this.subscriber) { - await this.subscriber.quit(); - this.subscriber = null; - } - this.subscribers.clear(); - logger.info('Redis clients disconnected'); - } catch (error) { - logger.error('Error disconnecting Redis clients:', error); + public async del(key: string): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.del(key); + return true; + } catch (error) { + logger.error("Redis DEL error:", error); + return false; + } + } + + public async sadd(key: string, member: string): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.sadd(key, member); + return true; + } catch (error) { + logger.error("Redis SADD error:", error); + return false; + } + } + + public async srem(key: string, member: string): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.srem(key, member); + return true; + } catch (error) { + logger.error("Redis SREM error:", error); + return false; + } + } + + public async smembers(key: string): Promise { + if (!this.isRedisEnabled() || !this.client) return []; + + try { + return await this.client.smembers(key); + } catch (error) { + logger.error("Redis SMEMBERS error:", error); + return []; + } + } + + public async hset( + key: string, + field: string, + value: string + ): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.hset(key, field, value); + return true; + } catch (error) { + logger.error("Redis HSET error:", error); + return false; + } + } + + public async hget(key: string, field: string): Promise { + if (!this.isRedisEnabled() || !this.client) return null; + + try { + return await this.client.hget(key, field); + } catch (error) { + logger.error("Redis HGET error:", error); + return null; + } + } + + public async hdel(key: string, field: string): Promise { + if (!this.isRedisEnabled() || !this.client) return false; + + try { + await this.client.hdel(key, field); + return true; + } catch (error) { + logger.error("Redis HDEL error:", error); + return false; + } + } + + public async hgetall(key: string): Promise> { + if (!this.isRedisEnabled() || !this.client) return {}; + + try { + return await this.client.hgetall(key); + } catch (error) { + logger.error("Redis HGETALL error:", error); + return {}; + } + } + + public async publish(channel: string, message: string): Promise { + if (!this.isRedisEnabled() || !this.publisher) return false; + + try { + await this.publisher.publish(channel, message); + return true; + } catch (error) { + logger.error("Redis PUBLISH error:", error); + return false; + } + } + + public async subscribe( + channel: string, + callback: (channel: string, message: string) => void + ): Promise { + if (!this.isRedisEnabled() || !this.subscriber) return false; + + try { + // Add callback to subscribers map + if (!this.subscribers.has(channel)) { + this.subscribers.set(channel, new Set()); + // Only subscribe to the channel if it's the first subscriber + await this.subscriber.subscribe(channel); + } + + this.subscribers.get(channel)!.add(callback); + return true; + } catch (error) { + logger.error("Redis SUBSCRIBE error:", error); + return false; + } + } + + public async unsubscribe( + channel: string, + callback?: (channel: string, message: string) => void + ): Promise { + if (!this.isRedisEnabled() || !this.subscriber) return false; + + try { + const channelSubscribers = this.subscribers.get(channel); + if (!channelSubscribers) return true; + + if (callback) { + // Remove specific callback + channelSubscribers.delete(callback); + if (channelSubscribers.size === 0) { + this.subscribers.delete(channel); + await this.subscriber.unsubscribe(channel); + } + } else { + // Remove all callbacks for this channel + this.subscribers.delete(channel); + await this.subscriber.unsubscribe(channel); + } + + return true; + } catch (error) { + logger.error("Redis UNSUBSCRIBE error:", error); + return false; + } + } + + public async disconnect(): Promise { + try { + if (this.client) { + await this.client.quit(); + this.client = null; + } + if (this.publisher) { + await this.publisher.quit(); + this.publisher = null; + } + if (this.subscriber) { + await this.subscriber.quit(); + this.subscriber = null; + } + this.subscribers.clear(); + logger.info("Redis clients disconnected"); + } catch (error) { + logger.error("Error disconnecting Redis clients:", error); + } } - } } -// Export singleton instance export const redisManager = RedisManager.getInstance(); -export default redisManager; \ No newline at end of file +export default redisManager; diff --git a/server/lib/readConfigFile.ts b/server/lib/readConfigFile.ts index 13efce5d..aca2e262 100644 --- a/server/lib/readConfigFile.ts +++ b/server/lib/readConfigFile.ts @@ -131,6 +131,32 @@ export const configSchema = z.object({ .optional() }) .optional(), + redis: z + .object({ + enabled: z.boolean(), + host: z.string().optional(), + port: portSchema.optional(), + password: z.string().optional(), + db: z.number().int().nonnegative().optional().default(0), + tls: z + .object({ + rejectUnauthorized: z.boolean().optional().default(true) + }) + .optional() + }) + .refine( + (redis) => { + if (!redis.enabled) { + return true; + } + return redis.host !== undefined && redis.port !== undefined; + }, + { + message: + "If Redis is enabled, connection details must be provided" + } + ) + .optional(), traefik: z .object({ http_entrypoint: z.string().optional().default("web"),