feat: batch redis ws direct messages and dedupe rebuild queue jobs

This commit is contained in:
copilot-swe-agent[bot]
2026-06-16 22:46:12 +00:00
committed by GitHub
parent 06fb3685e4
commit 45158ff45b
3 changed files with 146 additions and 19 deletions

View File

@@ -29,6 +29,7 @@ export interface RebuildJobHandlers {
// Redis list holding pending rebuild jobs (RPUSH to enqueue, LPOP to dequeue — FIFO order).
const QUEUE_KEY = "rebuild-client-associations:queue";
const QUEUED_SET_KEY = "rebuild-client-associations:queued";
// Distributed lock that serialises queue consumption to a single server instance
// at a time. TTL is generous enough to cover a full batch of expensive rebuilds.
@@ -54,11 +55,23 @@ class RedisRebuildQueue {
}
try {
const dedupeKey = `${job.type}:${job.id}`;
const added = await redis.sadd(QUEUED_SET_KEY, dedupeKey);
if (added === 0) {
logger.debug(
`Rebuild queue: skipped duplicate queued job ${job.type}:${job.id}`
);
return;
}
await redis.rpush(QUEUE_KEY, JSON.stringify(job));
logger.debug(
`Rebuild queue: enqueued ${job.type}:${job.id} (queue position: tail)`
);
} catch (err) {
await redis
?.srem(QUEUED_SET_KEY, `${job.type}:${job.id}`)
.catch(() => undefined);
logger.error(
`Rebuild queue: failed to enqueue ${job.type}:${job.id}:`,
err
@@ -121,6 +134,12 @@ class RedisRebuildQueue {
continue;
}
// Remove from dedupe set once dequeued so the same job
// can be re-queued while this one is in progress.
await redis
.srem(QUEUED_SET_KEY, `${job.type}:${job.id}`)
.catch(() => undefined);
logger.debug(
`Rebuild queue: processing ${job.type}:${job.id}`
);

View File

@@ -187,6 +187,8 @@ const wss: WebSocketServer = new WebSocketServer({ noServer: true });
// Generate unique node ID for this instance
const NODE_ID = uuidv4();
const REDIS_CHANNEL = "websocket_messages";
const REDIS_DIRECT_BATCH_SIZE = 250;
const REDIS_DIRECT_FLUSH_MS = 10;
// Client tracking map (local to this node)
const connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
@@ -197,6 +199,15 @@ const clientConfigVersions: Map<string, number> = new Map();
// Recovery tracking
let isRedisRecoveryInProgress = false;
interface RedisDirectBatchEntry {
targetClientId: string;
message: WSMessage;
resolve: () => void;
}
let pendingRedisDirectMessages: RedisDirectBatchEntry[] = [];
let redisDirectFlushTimer: NodeJS.Timeout | null = null;
// Helper to get map key
const getClientMapKey = (clientId: string) => clientId;
@@ -207,6 +218,82 @@ const getNodeConnectionsKey = (nodeId: string, clientId: string) =>
const getConfigVersionKey = (clientId: string) =>
`ws:configVersion:${clientId}`;
const clearRedisDirectFlushTimer = (): void => {
if (redisDirectFlushTimer) {
clearTimeout(redisDirectFlushTimer);
redisDirectFlushTimer = null;
}
};
const publishDirectBatch = async (
entries: RedisDirectBatchEntry[]
): Promise<void> => {
const redisMessage: RedisMessage = {
type: "direct-batch",
messages: entries.map((entry) => ({
targetClientId: entry.targetClientId,
message: entry.message
})),
message: {
type: "batch",
data: {}
},
fromNodeId: NODE_ID
};
await redisManager.publish(REDIS_CHANNEL, JSON.stringify(redisMessage));
};
const flushPendingRedisDirectMessages = async (): Promise<void> => {
clearRedisDirectFlushTimer();
if (pendingRedisDirectMessages.length === 0) {
return;
}
const entries = pendingRedisDirectMessages;
pendingRedisDirectMessages = [];
if (!redisManager.isRedisEnabled()) {
entries.forEach((entry) => entry.resolve());
return;
}
for (let i = 0; i < entries.length; i += REDIS_DIRECT_BATCH_SIZE) {
const batch = entries.slice(i, i + REDIS_DIRECT_BATCH_SIZE);
try {
await publishDirectBatch(batch);
} catch (error) {
logger.error(
"Failed to send batched direct messages via Redis, messages may be lost:",
error
);
} finally {
batch.forEach((entry) => entry.resolve());
}
}
};
const enqueueRedisDirectMessage = async (
targetClientId: string,
message: WSMessage
): Promise<void> => {
await new Promise<void>((resolve) => {
pendingRedisDirectMessages.push({ targetClientId, message, resolve });
if (pendingRedisDirectMessages.length >= REDIS_DIRECT_BATCH_SIZE) {
void flushPendingRedisDirectMessages();
return;
}
if (!redisDirectFlushTimer) {
redisDirectFlushTimer = setTimeout(() => {
void flushPendingRedisDirectMessages();
}, REDIS_DIRECT_FLUSH_MS);
}
});
};
// Initialize Redis subscription for cross-node messaging
const initializeRedisSubscription = async (): Promise<void> => {
if (!redisManager.isRedisEnabled()) return;
@@ -227,7 +314,16 @@ const initializeRedisSubscription = async (): Promise<void> => {
// Send to specific client on this node
await sendToClientLocal(
redisMessage.targetClientId,
redisMessage.message
redisMessage.message,
{},
redisMessage.message.configVersion
);
} else if (
redisMessage.type === "direct-batch" &&
redisMessage.messages
) {
await sendRedisDirectBatchToLocalClients(
redisMessage.messages
);
} else if (redisMessage.type === "broadcast") {
// Broadcast to all clients on this node except excluded
@@ -503,7 +599,8 @@ const incrementClientConfigVersion = async (
const sendToClientLocal = async (
clientId: string,
message: WSMessage,
options: SendMessageOptions = {}
options: SendMessageOptions = {},
preResolvedConfigVersion?: number
): Promise<boolean> => {
const mapKey = getClientMapKey(clientId);
const clients = connectedClients.get(mapKey);
@@ -512,7 +609,8 @@ const sendToClientLocal = async (
}
// Handle config version
const configVersion = await getClientConfigVersion(clientId);
const configVersion =
preResolvedConfigVersion ?? (await getClientConfigVersion(clientId));
// Add config version to message
const messageWithVersion = {
@@ -545,6 +643,20 @@ const sendToClientLocal = async (
return true;
};
const sendRedisDirectBatchToLocalClients = async (
entries: { targetClientId: string; message: WSMessage }[]
): Promise<void> => {
const jobs = entries.map((entry) =>
sendToClientLocal(
entry.targetClientId,
entry.message,
{},
entry.message.configVersion
)
);
await Promise.all(jobs);
};
const broadcastToAllExceptLocal = async (
message: WSMessage,
excludeClientId?: string,
@@ -607,23 +719,13 @@ const sendToClient = async (
// Only send via Redis if the client is not connected locally and Redis is enabled
if (!localSent && redisManager.isRedisEnabled()) {
try {
const redisMessage: RedisMessage = {
type: "direct",
targetClientId: clientId,
message: {
...message,
configVersion
},
fromNodeId: NODE_ID
};
await redisManager.publish(
REDIS_CHANNEL,
JSON.stringify(redisMessage)
);
await enqueueRedisDirectMessage(clientId, {
...message,
configVersion
});
} catch (error) {
logger.error(
"Failed to send message via Redis, message may be lost:",
"Failed to queue batched direct message for Redis delivery, message may be lost:",
error
);
// Continue execution - local delivery already attempted
@@ -1109,6 +1211,8 @@ const disconnectClient = async (clientId: string): Promise<boolean> => {
// Cleanup function for graceful shutdown
const cleanup = async (): Promise<void> => {
try {
await flushPendingRedisDirectMessages();
// Close all WebSocket connections
connectedClients.forEach((clients) => {
clients.forEach((client) => {