diff --git a/server/private/lib/rebuildQueue.ts b/server/private/lib/rebuildQueue.ts index ad150f278..2cd1dadc0 100644 --- a/server/private/lib/rebuildQueue.ts +++ b/server/private/lib/rebuildQueue.ts @@ -70,8 +70,13 @@ class RedisRebuildQueue { ); } catch (err) { await redis - ?.srem(QUEUED_SET_KEY, `${job.type}:${job.id}`) - .catch(() => undefined); + .srem(QUEUED_SET_KEY, `${job.type}:${job.id}`) + .catch((cleanupErr) => + logger.warn( + `Rebuild queue: failed to cleanup dedupe key for ${job.type}:${job.id} after enqueue failure:`, + cleanupErr + ) + ); logger.error( `Rebuild queue: failed to enqueue ${job.type}:${job.id}:`, err @@ -138,7 +143,12 @@ class RedisRebuildQueue { // can be re-queued while this one is in progress. await redis .srem(QUEUED_SET_KEY, `${job.type}:${job.id}`) - .catch(() => undefined); + .catch((cleanupErr) => + logger.warn( + `Rebuild queue: failed to remove dedupe key for ${job.type}:${job.id} on dequeue:`, + cleanupErr + ) + ); logger.debug( `Rebuild queue: processing ${job.type}:${job.id}` diff --git a/server/private/routers/ws/ws.ts b/server/private/routers/ws/ws.ts index c58437b78..2db8f3140 100644 --- a/server/private/routers/ws/ws.ts +++ b/server/private/routers/ws/ws.ts @@ -188,7 +188,7 @@ const wss: WebSocketServer = new WebSocketServer({ noServer: true }); const NODE_ID = uuidv4(); const REDIS_CHANNEL = "websocket_messages"; const REDIS_DIRECT_BATCH_SIZE = 250; -const REDIS_DIRECT_FLUSH_MS = 10; +const REDIS_DIRECT_FLUSH_INTERVAL_MS = 10; // Client tracking map (local to this node) const connectedClients: Map = new Map(); @@ -234,10 +234,6 @@ const publishDirectBatch = async ( targetClientId: entry.targetClientId, message: entry.message })), - message: { - type: "batch", - data: {} - }, fromNodeId: NODE_ID }; @@ -289,7 +285,7 @@ const enqueueRedisDirectMessage = async ( if (!redisDirectFlushTimer) { redisDirectFlushTimer = setTimeout(() => { void flushPendingRedisDirectMessages(); - }, REDIS_DIRECT_FLUSH_MS); + }, REDIS_DIRECT_FLUSH_INTERVAL_MS); } }); }; diff --git a/server/routers/ws/types.ts b/server/routers/ws/types.ts index 9e6504ce0..d541d3276 100644 --- a/server/routers/ws/types.ts +++ b/server/routers/ws/types.ts @@ -76,16 +76,26 @@ export interface SendMessageOptions { compress?: boolean; } -// Redis message type for cross-node communication -export interface RedisMessage { - type: "direct" | "direct-batch" | "broadcast"; - targetClientId?: string; - excludeClientId?: string; - message: WSMessage; - messages?: { - targetClientId: string; - message: WSMessage; - }[]; - fromNodeId: string; - options?: SendMessageOptions; -} +// Redis message types for cross-node communication +export type RedisMessage = + | { + type: "direct"; + targetClientId: string; + message: WSMessage; + fromNodeId: string; + } + | { + type: "direct-batch"; + messages: { + targetClientId: string; + message: WSMessage; + }[]; + fromNodeId: string; + } + | { + type: "broadcast"; + excludeClientId?: string; + message: WSMessage; + fromNodeId: string; + options?: SendMessageOptions; + };