Merge pull request #3072 from fosrl/dev

Optimize this
This commit is contained in:
Owen Schwartz
2026-05-14 11:34:56 -07:00
committed by GitHub

View File

@@ -11,7 +11,7 @@ import {
ExitNode ExitNode
} from "@server/db"; } from "@server/db";
import { db } from "@server/db"; import { db } from "@server/db";
import { eq, and } from "drizzle-orm"; import { eq, and, inArray } from "drizzle-orm";
import HttpCode from "@server/types/HttpCode"; import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors"; import createHttpError from "http-errors";
import logger from "@server/logger"; import logger from "@server/logger";
@@ -202,24 +202,29 @@ export async function updateAndGenerateEndpointDestinations(
) )
); );
// Update clientSites for each site on this exit node
for (const site of sitesOnExitNode) {
// logger.debug(
// `Updating site ${site.siteId} on exit node ${exitNode.exitNodeId}`
// );
// Format the endpoint properly for both IPv4 and IPv6 // Format the endpoint properly for both IPv4 and IPv6
const formattedEndpoint = formatEndpoint(ip, port); const formattedEndpoint = formatEndpoint(ip, port);
// if the public key or endpoint has changed, update it otherwise continue // Determine which rows actually need updating and whether the endpoint
// (as opposed to only the publicKey) changed for any of them.
const siteIdsToUpdate: number[] = [];
let endpointChanged = false;
for (const site of sitesOnExitNode) {
if ( if (
site.endpoint === formattedEndpoint && site.endpoint === formattedEndpoint &&
site.publicKey === publicKey site.publicKey === publicKey
) { ) {
continue; continue;
} }
siteIdsToUpdate.push(site.siteId);
if (site.endpoint !== formattedEndpoint) {
endpointChanged = true;
}
}
const [updatedClientSitesAssociationsCache] = await db if (siteIdsToUpdate.length > 0) {
// Single bulk update for all affected rows for this client on this exit node
await db
.update(clientSitesAssociationsCache) .update(clientSitesAssociationsCache)
.set({ .set({
endpoint: formattedEndpoint, endpoint: formattedEndpoint,
@@ -228,24 +233,22 @@ export async function updateAndGenerateEndpointDestinations(
.where( .where(
and( and(
eq(clientSitesAssociationsCache.clientId, olm.clientId), eq(clientSitesAssociationsCache.clientId, olm.clientId),
eq(clientSitesAssociationsCache.siteId, site.siteId) inArray(
clientSitesAssociationsCache.siteId,
siteIdsToUpdate
) )
) )
.returning(); );
if ( // Only trigger downstream peer updates once per hole punch: the
updatedClientSitesAssociationsCache.endpoint !== // endpoint is the same for every site on this exit node, and
site.endpoint && // this is the endpoint from the join table not the site // handleClientEndpointChange already fans out to all connected
updatedClient.pubKey === publicKey // only trigger if the client's public key matches the current public key which means it has registered so we dont prematurely send the update // sites for this client.
) { if (endpointChanged && updatedClient.pubKey === publicKey) {
logger.info( logger.info(
`ClientSitesAssociationsCache for client ${olm.clientId} and site ${site.siteId} endpoint changed from ${site.endpoint} to ${updatedClientSitesAssociationsCache.endpoint}` `ClientSitesAssociationsCache for client ${olm.clientId} endpoint changed to ${formattedEndpoint} for ${siteIdsToUpdate.length} site(s) on exit node ${exitNode.exitNodeId}`
);
// Handle any additional logic for endpoint change
handleClientEndpointChange(
olm.clientId,
updatedClientSitesAssociationsCache.endpoint!
); );
handleClientEndpointChange(olm.clientId, formattedEndpoint);
} }
} }
@@ -456,11 +459,11 @@ async function handleSiteEndpointChange(siteId: number, newEndpoint: string) {
} }
} }
async function handleClientEndpointChange( async function handleClientEndpointChange( // TODO: I THINK WE DONT NEED TO HIT EVERY SITE HERE BECAUSE WE ONLY NEED TO UPDATE THE SITES CONNECTED TO THIS NODE WHICH WE ALREADY HAVE FROM ABOVE
clientId: number, clientId: number,
newEndpoint: string newEndpoint: string
) { ) {
// Alert all sites connected to this client that the endpoint has changed (only if NOT relayed) // Alert all sites connected to this client that the endpoint has changed (only if NOT relayed and NOT JIT MODE)
try { try {
// Get client details // Get client details
const [client] = await db const [client] = await db
@@ -480,6 +483,7 @@ async function handleClientEndpointChange(
siteId: sites.siteId, siteId: sites.siteId,
newtId: newts.newtId, newtId: newts.newtId,
isRelayed: clientSitesAssociationsCache.isRelayed, isRelayed: clientSitesAssociationsCache.isRelayed,
isJitMode: clientSitesAssociationsCache.isJitMode,
subnet: clients.subnet subnet: clients.subnet
}) })
.from(clientSitesAssociationsCache) .from(clientSitesAssociationsCache)
@@ -495,20 +499,29 @@ async function handleClientEndpointChange(
.where( .where(
and( and(
eq(clientSitesAssociationsCache.clientId, clientId), eq(clientSitesAssociationsCache.clientId, clientId),
eq(clientSitesAssociationsCache.isRelayed, false) eq(clientSitesAssociationsCache.isRelayed, false),
eq(clientSitesAssociationsCache.isJitMode, false)
) )
); );
// Update each non-relayed site with the new client endpoint if (connectedSites.length > 250) {
for (const siteData of connectedSites) {
try {
if (!siteData.subnet) {
logger.warn( logger.warn(
`Client ${clientId} has no subnet, skipping update for site ${siteData.siteId}` `Client ${clientId} has ${connectedSites.length} connected sites so the client will be in jit mode anyway, skipping endpoint updates`
); );
continue; return;
} }
// Update each non-relayed site with the new client endpoint (in parallel)
await Promise.allSettled(
connectedSites.map(async (siteData) => {
if (!siteData.subnet || !client.pubKey) {
logger.warn(
`Client ${clientId} has no subnet or public key, skipping update for site ${siteData.siteId}`
);
return;
}
try {
await updateNewtPeer( await updateNewtPeer(
siteData.siteId, siteData.siteId,
client.pubKey, client.pubKey,
@@ -525,7 +538,8 @@ async function handleClientEndpointChange(
`Failed to update site ${siteData.siteId} with new client endpoint: ${error}` `Failed to update site ${siteData.siteId} with new client endpoint: ${error}`
); );
} }
} })
);
} catch (error) { } catch (error) {
logger.error( logger.error(
`Error handling client endpoint change for client ${clientId}: ${error}` `Error handling client endpoint change for client ${clientId}: ${error}`