From c3820a4e7029d31454916973da02e77827dfbab4 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 22 Jun 2026 16:47:52 -0400 Subject: [PATCH] Add missing queuing --- server/lib/rebuildClientAssociations.ts | 75 ++++++++++------- server/private/routers/ws/ws.ts | 104 ++++++++++++++++-------- 2 files changed, 118 insertions(+), 61 deletions(-) diff --git a/server/lib/rebuildClientAssociations.ts b/server/lib/rebuildClientAssociations.ts index 98c6c58f4..7f271bbe5 100644 --- a/server/lib/rebuildClientAssociations.ts +++ b/server/lib/rebuildClientAssociations.ts @@ -873,6 +873,20 @@ async function handleSubnetProxyTargetUpdates( ): Promise { const proxyJobs: Promise[] = []; const olmJobs: Promise[] = []; + const targetsToAddBatch: { + newtId: string; + targets: NonNullable< + Awaited> + >; + version: string | null; + }[] = []; + const targetsToRemoveBatch: { + newtId: string; + targets: NonNullable< + Awaited> + >; + version: string | null; + }[] = []; for (const siteData of sitesList) { const siteId = siteData.siteId; @@ -904,15 +918,11 @@ async function handleSubnetProxyTargetUpdates( ); if (targetsToAdd) { - proxyJobs.push( - addSubnetProxyTargetsBatch([ - { - newtId: newt.newtId, - targets: targetsToAdd, - version: newt.version - } - ]) - ); + targetsToAddBatch.push({ + newtId: newt.newtId, + targets: targetsToAdd, + version: newt.version + }); } olmJobs.push( @@ -945,15 +955,11 @@ async function handleSubnetProxyTargetUpdates( ); if (targetsToRemove) { - proxyJobs.push( - removeSubnetProxyTargetsBatch([ - { - newtId: newt.newtId, - targets: targetsToRemove, - version: newt.version - } - ]) - ); + targetsToRemoveBatch.push({ + newtId: newt.newtId, + targets: targetsToRemove, + version: newt.version + }); } const peerDataRemovals: { @@ -1025,7 +1031,15 @@ async function handleSubnetProxyTargetUpdates( } } - await Promise.all(proxyJobs); + if (targetsToAddBatch.length > 0) { + proxyJobs.push(addSubnetProxyTargetsBatch(targetsToAddBatch)); + } + + if (targetsToRemoveBatch.length > 0) { + proxyJobs.push(removeSubnetProxyTargetsBatch(targetsToRemoveBatch)); + } + + await Promise.all([...proxyJobs, ...olmJobs]); } export async function rebuildClientAssociationsFromClient( @@ -2048,19 +2062,20 @@ export async function cleanupSiteAssociations( // 7. Fire all removal messages in parallel. const jobs: Promise[] = []; + const olmPeerDeletes: { + clientId: number; + siteId: number; + publicKey: string; + }[] = []; for (const client of allClients) { // Tell each olm to drop the site's WireGuard peer. if (site.publicKey) { - jobs.push( - olmDeletePeersBatch([ - { - clientId: client.clientId, - siteId, - publicKey: site.publicKey - } - ]) - ); + olmPeerDeletes.push({ + clientId: client.clientId, + siteId, + publicKey: site.publicKey + }); } // Recompute and push updated relay destinations (now excluding this site). @@ -2069,6 +2084,10 @@ export async function cleanupSiteAssociations( } } + if (olmPeerDeletes.length > 0) { + jobs.push(olmDeletePeersBatch(olmPeerDeletes)); + } + await Promise.all(jobs).catch((error) => { logger.error( `cleanupSiteAssociations: error sending cleanup messages for siteId=${siteId}:`, diff --git a/server/private/routers/ws/ws.ts b/server/private/routers/ws/ws.ts index 8d222fd72..5e38c709e 100644 --- a/server/private/routers/ws/ws.ts +++ b/server/private/routers/ws/ws.ts @@ -659,38 +659,52 @@ const broadcastToAllExceptLocal = async ( excludeClientId?: string, options: SendMessageOptions = {} ): Promise => { - for (const [mapKey, clients] of connectedClients.entries()) { - const [type, id] = mapKey.split(":"); - const clientId = mapKey; // mapKey is the clientId - if (!(excludeClientId && clientId === excludeClientId)) { - // Handle config version per client - let configVersion = await getClientConfigVersion(clientId); - if (options.incrementConfigVersion) { - configVersion = await incrementClientConfigVersion(clientId); - } + const sendPlans = await Promise.all( + Array.from(connectedClients.entries()).map( + async ([mapKey, clients]) => { + const clientId = mapKey; // mapKey is the clientId + if (excludeClientId && clientId === excludeClientId) { + return null; + } - // Add config version to message - const messageWithVersion = { - ...message, - configVersion - }; + let configVersion = await getClientConfigVersion(clientId); + if (options.incrementConfigVersion) { + configVersion = + await incrementClientConfigVersion(clientId); + } - if (options.compress) { - const compressed = zlib.gzipSync( - Buffer.from(JSON.stringify(messageWithVersion), "utf8") - ); - clients.forEach((client) => { - if (client.readyState === WebSocket.OPEN) { - client.send(compressed); + return { + clients, + messageWithVersion: { + ...message, + configVersion } - }); - } else { - clients.forEach((client) => { - if (client.readyState === WebSocket.OPEN) { - client.send(JSON.stringify(messageWithVersion)); - } - }); + }; } + ) + ); + + for (const plan of sendPlans) { + if (!plan) { + continue; + } + + if (options.compress) { + const compressed = zlib.gzipSync( + Buffer.from(JSON.stringify(plan.messageWithVersion), "utf8") + ); + plan.clients.forEach((client) => { + if (client.readyState === WebSocket.OPEN) { + client.send(compressed); + } + }); + } else { + const messageString = JSON.stringify(plan.messageWithVersion); + plan.clients.forEach((client) => { + if (client.readyState === WebSocket.OPEN) { + client.send(messageString); + } + }); } } }; @@ -711,7 +725,12 @@ const sendToClient = async ( ); // Try to send locally first - const localSent = await sendToClientLocal(clientId, message, options); + const localSent = await sendToClientLocal( + clientId, + message, + options, + configVersion + ); // Only send via Redis if the client is not connected locally and Redis is enabled if (!localSent && redisManager.isRedisEnabled()) { @@ -745,15 +764,34 @@ const sendToClientsBatch = async ( } const remoteEntries: { targetClientId: string; message: WSMessage }[] = []; + const clientsWithIncrement = new Set( + entries + .filter((entry) => !!entry.options?.incrementConfigVersion) + .map((entry) => entry.clientId) + ); + const nonIncrementOnlyClientIds = Array.from( + new Set( + entries + .map((entry) => entry.clientId) + .filter((clientId) => !clientsWithIncrement.has(clientId)) + ) + ); + const stableConfigVersionByClient = new Map( + await Promise.all( + nonIncrementOnlyClientIds.map( + async (clientId) => + [clientId, await getClientConfigVersion(clientId)] as const + ) + ) + ); for (const entry of entries) { const options = entry.options || {}; const { clientId, message } = entry; - let configVersion = await getClientConfigVersion(clientId); - if (options.incrementConfigVersion) { - configVersion = await incrementClientConfigVersion(clientId); - } + const configVersion = options.incrementConfigVersion + ? await incrementClientConfigVersion(clientId) + : stableConfigVersionByClient.get(clientId); logger.debug( `sendToClientsBatch: Message type ${message.type} queued for clientId ${clientId} (new configVersion: ${configVersion})`