diff --git a/server/private/lib/rebuildQueue.ts b/server/private/lib/rebuildQueue.ts index e5ee7e7cb..ad150f278 100644 --- a/server/private/lib/rebuildQueue.ts +++ b/server/private/lib/rebuildQueue.ts @@ -29,6 +29,7 @@ export interface RebuildJobHandlers { // Redis list holding pending rebuild jobs (RPUSH to enqueue, LPOP to dequeue — FIFO order). const QUEUE_KEY = "rebuild-client-associations:queue"; +const QUEUED_SET_KEY = "rebuild-client-associations:queued"; // Distributed lock that serialises queue consumption to a single server instance // at a time. TTL is generous enough to cover a full batch of expensive rebuilds. @@ -54,11 +55,23 @@ class RedisRebuildQueue { } try { + const dedupeKey = `${job.type}:${job.id}`; + const added = await redis.sadd(QUEUED_SET_KEY, dedupeKey); + if (added === 0) { + logger.debug( + `Rebuild queue: skipped duplicate queued job ${job.type}:${job.id}` + ); + return; + } + await redis.rpush(QUEUE_KEY, JSON.stringify(job)); logger.debug( `Rebuild queue: enqueued ${job.type}:${job.id} (queue position: tail)` ); } catch (err) { + await redis + ?.srem(QUEUED_SET_KEY, `${job.type}:${job.id}`) + .catch(() => undefined); logger.error( `Rebuild queue: failed to enqueue ${job.type}:${job.id}:`, err @@ -121,6 +134,12 @@ class RedisRebuildQueue { continue; } + // Remove from dedupe set once dequeued so the same job + // can be re-queued while this one is in progress. + await redis + .srem(QUEUED_SET_KEY, `${job.type}:${job.id}`) + .catch(() => undefined); + logger.debug( `Rebuild queue: processing ${job.type}:${job.id}` ); diff --git a/server/private/routers/ws/ws.ts b/server/private/routers/ws/ws.ts index a592927cc..c58437b78 100644 --- a/server/private/routers/ws/ws.ts +++ b/server/private/routers/ws/ws.ts @@ -187,6 +187,8 @@ const wss: WebSocketServer = new WebSocketServer({ noServer: true }); // Generate unique node ID for this instance const NODE_ID = uuidv4(); const REDIS_CHANNEL = "websocket_messages"; +const REDIS_DIRECT_BATCH_SIZE = 250; +const REDIS_DIRECT_FLUSH_MS = 10; // Client tracking map (local to this node) const connectedClients: Map = new Map(); @@ -197,6 +199,15 @@ const clientConfigVersions: Map = new Map(); // Recovery tracking let isRedisRecoveryInProgress = false; +interface RedisDirectBatchEntry { + targetClientId: string; + message: WSMessage; + resolve: () => void; +} + +let pendingRedisDirectMessages: RedisDirectBatchEntry[] = []; +let redisDirectFlushTimer: NodeJS.Timeout | null = null; + // Helper to get map key const getClientMapKey = (clientId: string) => clientId; @@ -207,6 +218,82 @@ const getNodeConnectionsKey = (nodeId: string, clientId: string) => const getConfigVersionKey = (clientId: string) => `ws:configVersion:${clientId}`; +const clearRedisDirectFlushTimer = (): void => { + if (redisDirectFlushTimer) { + clearTimeout(redisDirectFlushTimer); + redisDirectFlushTimer = null; + } +}; + +const publishDirectBatch = async ( + entries: RedisDirectBatchEntry[] +): Promise => { + const redisMessage: RedisMessage = { + type: "direct-batch", + messages: entries.map((entry) => ({ + targetClientId: entry.targetClientId, + message: entry.message + })), + message: { + type: "batch", + data: {} + }, + fromNodeId: NODE_ID + }; + + await redisManager.publish(REDIS_CHANNEL, JSON.stringify(redisMessage)); +}; + +const flushPendingRedisDirectMessages = async (): Promise => { + clearRedisDirectFlushTimer(); + + if (pendingRedisDirectMessages.length === 0) { + return; + } + + const entries = pendingRedisDirectMessages; + pendingRedisDirectMessages = []; + + if (!redisManager.isRedisEnabled()) { + entries.forEach((entry) => entry.resolve()); + return; + } + + for (let i = 0; i < entries.length; i += REDIS_DIRECT_BATCH_SIZE) { + const batch = entries.slice(i, i + REDIS_DIRECT_BATCH_SIZE); + try { + await publishDirectBatch(batch); + } catch (error) { + logger.error( + "Failed to send batched direct messages via Redis, messages may be lost:", + error + ); + } finally { + batch.forEach((entry) => entry.resolve()); + } + } +}; + +const enqueueRedisDirectMessage = async ( + targetClientId: string, + message: WSMessage +): Promise => { + await new Promise((resolve) => { + pendingRedisDirectMessages.push({ targetClientId, message, resolve }); + + if (pendingRedisDirectMessages.length >= REDIS_DIRECT_BATCH_SIZE) { + void flushPendingRedisDirectMessages(); + return; + } + + if (!redisDirectFlushTimer) { + redisDirectFlushTimer = setTimeout(() => { + void flushPendingRedisDirectMessages(); + }, REDIS_DIRECT_FLUSH_MS); + } + }); +}; + // Initialize Redis subscription for cross-node messaging const initializeRedisSubscription = async (): Promise => { if (!redisManager.isRedisEnabled()) return; @@ -227,7 +314,16 @@ const initializeRedisSubscription = async (): Promise => { // Send to specific client on this node await sendToClientLocal( redisMessage.targetClientId, - redisMessage.message + redisMessage.message, + {}, + redisMessage.message.configVersion + ); + } else if ( + redisMessage.type === "direct-batch" && + redisMessage.messages + ) { + await sendRedisDirectBatchToLocalClients( + redisMessage.messages ); } else if (redisMessage.type === "broadcast") { // Broadcast to all clients on this node except excluded @@ -503,7 +599,8 @@ const incrementClientConfigVersion = async ( const sendToClientLocal = async ( clientId: string, message: WSMessage, - options: SendMessageOptions = {} + options: SendMessageOptions = {}, + preResolvedConfigVersion?: number ): Promise => { const mapKey = getClientMapKey(clientId); const clients = connectedClients.get(mapKey); @@ -512,7 +609,8 @@ const sendToClientLocal = async ( } // Handle config version - const configVersion = await getClientConfigVersion(clientId); + const configVersion = + preResolvedConfigVersion ?? (await getClientConfigVersion(clientId)); // Add config version to message const messageWithVersion = { @@ -545,6 +643,20 @@ const sendToClientLocal = async ( return true; }; +const sendRedisDirectBatchToLocalClients = async ( + entries: { targetClientId: string; message: WSMessage }[] +): Promise => { + const jobs = entries.map((entry) => + sendToClientLocal( + entry.targetClientId, + entry.message, + {}, + entry.message.configVersion + ) + ); + await Promise.all(jobs); +}; + const broadcastToAllExceptLocal = async ( message: WSMessage, excludeClientId?: string, @@ -607,23 +719,13 @@ const sendToClient = async ( // Only send via Redis if the client is not connected locally and Redis is enabled if (!localSent && redisManager.isRedisEnabled()) { try { - const redisMessage: RedisMessage = { - type: "direct", - targetClientId: clientId, - message: { - ...message, - configVersion - }, - fromNodeId: NODE_ID - }; - - await redisManager.publish( - REDIS_CHANNEL, - JSON.stringify(redisMessage) - ); + await enqueueRedisDirectMessage(clientId, { + ...message, + configVersion + }); } catch (error) { logger.error( - "Failed to send message via Redis, message may be lost:", + "Failed to queue batched direct message for Redis delivery, message may be lost:", error ); // Continue execution - local delivery already attempted @@ -1109,6 +1211,8 @@ const disconnectClient = async (clientId: string): Promise => { // Cleanup function for graceful shutdown const cleanup = async (): Promise => { try { + await flushPendingRedisDirectMessages(); + // Close all WebSocket connections connectedClients.forEach((clients) => { clients.forEach((client) => { diff --git a/server/routers/ws/types.ts b/server/routers/ws/types.ts index e539954ce..9e6504ce0 100644 --- a/server/routers/ws/types.ts +++ b/server/routers/ws/types.ts @@ -78,10 +78,14 @@ export interface SendMessageOptions { // Redis message type for cross-node communication export interface RedisMessage { - type: "direct" | "broadcast"; + type: "direct" | "direct-batch" | "broadcast"; targetClientId?: string; excludeClientId?: string; message: WSMessage; + messages?: { + targetClientId: string; + message: WSMessage; + }[]; fromNodeId: string; options?: SendMessageOptions; }