From 80b66cf9b9860433d8005c504a571ec7110ce775 Mon Sep 17 00:00:00 2001 From: Owen Date: Wed, 24 Jun 2026 13:50:33 -0400 Subject: [PATCH] Add locks to rebuilds --- server/lib/blueprints/applyBlueprint.ts | 46 ++++---- server/lib/lock.ts | 2 +- server/lib/rebuildClientAssociations.ts | 108 ++++++++++++++++++ server/lib/rebuildQueue.ts | 4 + server/private/lib/rebuildQueue.ts | 11 ++ .../newt/handleNewtGetConfigMessage.ts | 3 + .../routers/olm/handleOlmRegisterMessage.ts | 3 + .../siteResource/updateSiteResource.ts | 43 ++++--- 8 files changed, 179 insertions(+), 41 deletions(-) diff --git a/server/lib/blueprints/applyBlueprint.ts b/server/lib/blueprints/applyBlueprint.ts index fbd6f3fb0..493831131 100644 --- a/server/lib/blueprints/applyBlueprint.ts +++ b/server/lib/blueprints/applyBlueprint.ts @@ -29,8 +29,11 @@ import { updateResourcePolicies } from "./resourcePolicies"; import { BlueprintSource } from "@server/routers/blueprints/types"; import { stringify as stringifyYaml } from "yaml"; import { generateName } from "@server/db/names"; -import { handleMessagingForUpdatedSiteResource } from "@server/routers/siteResource"; -import { rebuildClientAssociationsFromSiteResource } from "../rebuildClientAssociations"; +import { + handleMessagingForUpdatedSiteResource, + rebuildClientAssociationsFromSiteResource, + waitForSiteResourceRebuildIdle +} from "../rebuildClientAssociations"; type ApplyBlueprintArgs = { orgId: string; @@ -138,26 +141,25 @@ export async function applyBlueprint({ for (const result of privateResourcesResults) { rebuildClientAssociationsFromSiteResource( result.newSiteResource - ).catch((e) => { - logger.error( - `Failed to rebuild client associations for site resource ${result.newSiteResource.siteResourceId}. Error: ${e}` - ); - }); - - handleMessagingForUpdatedSiteResource( - result.oldSiteResource, - result.newSiteResource, - result.oldSites.map((site) => ({ - // only need to run this on the old sites because the new sites are added above - siteId: site.siteId, - orgId: result.newSiteResource.orgId - })) - ).catch((err) => { - logger.error( - `Error handling messaging for updated site resource ${result.newSiteResource.siteResourceId}:`, - err - ); - }); + ) + .then(() => + waitForSiteResourceRebuildIdle( + result.newSiteResource.siteResourceId + ) + ) + .then(() => + handleMessagingForUpdatedSiteResource( + result.oldSiteResource, + result.newSiteResource, + result.oldSites.map((s) => s.siteId), + result.newSites.map((s) => s.siteId) + ) + ) + .catch((e) => { + logger.error( + `Failed to rebuild and handle messaging for site resource ${result.newSiteResource.siteResourceId}. Error: ${e}` + ); + }); } logger.debug( diff --git a/server/lib/lock.ts b/server/lib/lock.ts index 7eea89084..15d1f39e1 100644 --- a/server/lib/lock.ts +++ b/server/lib/lock.ts @@ -35,7 +35,7 @@ export class LockManager { ttl: number; owner?: string; }> { - return { exists: true, ownedByMe: true, ttl: 0 }; + return { exists: false, ownedByMe: false, ttl: 0 }; } /** diff --git a/server/lib/rebuildClientAssociations.ts b/server/lib/rebuildClientAssociations.ts index 9e362fb51..daf639bf3 100644 --- a/server/lib/rebuildClientAssociations.ts +++ b/server/lib/rebuildClientAssociations.ts @@ -49,6 +49,112 @@ import { rebuildQueue } from "#dynamic/lib/rebuildQueue"; // peer/proxy updates, so give them a generous window. const REBUILD_ASSOCIATIONS_LOCK_TTL_MS = 120000; +const REBUILD_IDLE_POLL_INTERVAL_MS = 300; +const REBUILD_IDLE_DEFAULT_TIMEOUT_MS = 130_000; // slightly longer than lock TTL +const REBUILD_IDLE_HANDLER_TIMEOUT_MS = 5_000; + +/** + * Returns true if a rebuild for the given site resource is currently active + * (holding the distributed lock) or is pending in the rebuild queue. + */ +export async function hasActiveSiteResourceRebuild( + siteResourceId: number +): Promise { + const lockKey = `rebuild-client-associations:site-resource:${siteResourceId}`; + const lockInfo = await lockManager.getLockInfo(lockKey); + if (lockInfo.exists) return true; + return rebuildQueue.isQueued({ type: "site-resource", id: siteResourceId }); +} + +/** + * Resolves once there is no active or queued rebuild for the given site resource. + * Logs a warning and resolves early if the timeout is reached. + */ +export async function waitForSiteResourceRebuildIdle( + siteResourceId: number, + timeoutMs = REBUILD_IDLE_DEFAULT_TIMEOUT_MS +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + if (!(await hasActiveSiteResourceRebuild(siteResourceId))) return; + await new Promise((r) => + setTimeout(r, REBUILD_IDLE_POLL_INTERVAL_MS) + ); + } + logger.warn( + `waitForSiteResourceRebuildIdle: timed out after ${timeoutMs}ms waiting for siteResourceId=${siteResourceId}` + ); +} + +/** + * Resolves once there are no active or queued rebuilds for any site resource + * associated with the given site. + */ +export async function waitForSiteRebuildIdle( + siteId: number, + timeoutMs = REBUILD_IDLE_HANDLER_TIMEOUT_MS +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const resourceRows = await db + .select({ siteResourceId: siteResources.siteResourceId }) + .from(siteResources) + .innerJoin( + siteNetworks, + eq(siteNetworks.networkId, siteResources.networkId) + ) + .where(eq(siteNetworks.siteId, siteId)); + let allIdle = true; + for (const { siteResourceId } of resourceRows) { + if (await hasActiveSiteResourceRebuild(siteResourceId)) { + allIdle = false; + break; + } + } + if (allIdle) return; + await new Promise((r) => + setTimeout(r, REBUILD_IDLE_POLL_INTERVAL_MS) + ); + } + logger.warn( + `waitForSiteRebuildIdle: timed out after ${timeoutMs}ms waiting for siteId=${siteId}` + ); +} + +/** + * Resolves once there are no active or queued rebuilds for any site resource + * associated with the given client. + */ +export async function waitForClientRebuildIdle( + clientId: number, + timeoutMs = REBUILD_IDLE_HANDLER_TIMEOUT_MS +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const resourceRows = await db + .select({ + siteResourceId: + clientSiteResourcesAssociationsCache.siteResourceId + }) + .from(clientSiteResourcesAssociationsCache) + .where(eq(clientSiteResourcesAssociationsCache.clientId, clientId)); + let allIdle = true; + for (const { siteResourceId } of resourceRows) { + if (await hasActiveSiteResourceRebuild(siteResourceId)) { + allIdle = false; + break; + } + } + if (allIdle) return; + await new Promise((r) => + setTimeout(r, REBUILD_IDLE_POLL_INTERVAL_MS) + ); + } + logger.warn( + `waitForClientRebuildIdle: timed out after ${timeoutMs}ms waiting for clientId=${clientId}` + ); +} + export async function getClientSiteResourceAccess( siteResource: SiteResource, trx: Transaction | typeof db = db @@ -1060,6 +1166,8 @@ export async function handleMessagingForUpdatedSiteResource( ); // get all of the clients from the cache + const { mergedAllClients, mergedAllClientIds } = + await getClientSiteResourceAccess(updatedSiteResource, trx); const targets = await generateSubnetProxyTargetV2( updatedSiteResource, diff --git a/server/lib/rebuildQueue.ts b/server/lib/rebuildQueue.ts index 475858108..84dce9641 100644 --- a/server/lib/rebuildQueue.ts +++ b/server/lib/rebuildQueue.ts @@ -13,11 +13,15 @@ export interface RebuildJobHandlers { export interface RebuildQueueManager { enqueue(job: RebuildJob): Promise; startProcessing(handlers: RebuildJobHandlers): void; + isQueued(job: RebuildJob): Promise; } class NoopRebuildQueue implements RebuildQueueManager { async enqueue(_job: RebuildJob): Promise {} startProcessing(_handlers: RebuildJobHandlers): void {} + async isQueued(_job: RebuildJob): Promise { + return false; + } } export const rebuildQueue: RebuildQueueManager = new NoopRebuildQueue(); diff --git a/server/private/lib/rebuildQueue.ts b/server/private/lib/rebuildQueue.ts index 01082cc50..b5e112545 100644 --- a/server/private/lib/rebuildQueue.ts +++ b/server/private/lib/rebuildQueue.ts @@ -46,6 +46,17 @@ const POLL_INTERVAL_MS = 500; class RedisRebuildQueue { private processingStarted = false; + async isQueued(job: RebuildJob): Promise { + if (!redis || redis.status !== "ready") return false; + const dedupeKey = `${job.type}:${job.id}`; + try { + const member = await redis.sismember(QUEUED_SET_KEY, dedupeKey); + return member === 1; + } catch { + return false; + } + } + async enqueue(job: RebuildJob): Promise { if (!redis || redis.status !== "ready") { logger.warn( diff --git a/server/routers/newt/handleNewtGetConfigMessage.ts b/server/routers/newt/handleNewtGetConfigMessage.ts index ff5d83799..fd5e2b42e 100644 --- a/server/routers/newt/handleNewtGetConfigMessage.ts +++ b/server/routers/newt/handleNewtGetConfigMessage.ts @@ -9,6 +9,7 @@ import { buildClientConfigurationForNewtClient } from "./buildConfiguration"; import { convertTargetsIfNecessary } from "../client/targets"; import { canCompress } from "@server/lib/clientVersionChecks"; import config from "@server/lib/config"; +import { waitForSiteRebuildIdle } from "@server/lib/rebuildClientAssociations"; export const handleNewtGetConfigMessage: MessageHandler = async (context) => { const { message, client, sendToClient } = context; @@ -61,6 +62,8 @@ export const handleNewtGetConfigMessage: MessageHandler = async (context) => { return; } + await waitForSiteRebuildIdle(siteId); + // update the endpoint and the public key const [site] = await db .update(sites) diff --git a/server/routers/olm/handleOlmRegisterMessage.ts b/server/routers/olm/handleOlmRegisterMessage.ts index 6bfc02aee..bef993831 100644 --- a/server/routers/olm/handleOlmRegisterMessage.ts +++ b/server/routers/olm/handleOlmRegisterMessage.ts @@ -21,6 +21,7 @@ import { build } from "@server/build"; import { canCompress } from "@server/lib/clientVersionChecks"; import config from "@server/lib/config"; import cache from "#dynamic/lib/cache"; // not using regional here because we need this in the register message handler before we know where the client is +import { waitForClientRebuildIdle } from "@server/lib/rebuildClientAssociations"; const HOLEPUNCH_STALE_CHAIN_THRESHOLD = 18; const HOLEPUNCH_STALE_CHAIN_TTL_SECONDS = 1800; @@ -385,6 +386,8 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => { } // NOTE: its important that the client here is the old client and the public key is the new key + await waitForClientRebuildIdle(olm.clientId); + const siteConfigurations = await buildSiteConfigurationForOlmClient( client, publicKey, diff --git a/server/routers/siteResource/updateSiteResource.ts b/server/routers/siteResource/updateSiteResource.ts index 5a8ed2aa0..434163f6f 100644 --- a/server/routers/siteResource/updateSiteResource.ts +++ b/server/routers/siteResource/updateSiteResource.ts @@ -17,7 +17,11 @@ import response from "@server/lib/response"; import { eq, and, ne, inArray } from "drizzle-orm"; import { OpenAPITags, registry } from "@server/openApi"; import { isIpInCidr, portRangeStringSchema } from "@server/lib/ip"; -import { rebuildClientAssociationsFromSiteResource } from "@server/lib/rebuildClientAssociations"; +import { + handleMessagingForUpdatedSiteResource, + rebuildClientAssociationsFromSiteResource, + waitForSiteResourceRebuildIdle +} from "@server/lib/rebuildClientAssociations"; import logger from "@server/logger"; import HttpCode from "@server/types/HttpCode"; import { NextFunction, Request, Response } from "express"; @@ -592,24 +596,27 @@ export async function updateSiteResource( throw new Error("No updated resource found after update"); } - rebuildClientAssociationsFromSiteResource(updatedSiteResource).catch( - (e) => { - logger.error( - `Failed to rebuild client associations for site resource ${siteResourceId}. Error: ${e}` - ); - } - ); + const finalUpdatedSiteResource = updatedSiteResource; - handleMessagingForUpdatedSiteResource( - existingSiteResource, - updatedSiteResource, - existingSiteIds, - updatedSiteIds - ).catch((e) => { - logger.error( - `Failed to handle messaging for updated site resource ${siteResourceId}. Error: ${e}` - ); - }); + rebuildClientAssociationsFromSiteResource(finalUpdatedSiteResource) + .then(() => + waitForSiteResourceRebuildIdle( + finalUpdatedSiteResource.siteResourceId + ) + ) + .then(() => + handleMessagingForUpdatedSiteResource( + existingSiteResource, + finalUpdatedSiteResource, + existingSiteIds, + updatedSiteIds + ) + ) + .catch((e) => { + logger.error( + `Failed to rebuild and handle messaging for site resource ${siteResourceId}. Error: ${e}` + ); + }); return response(res, { data: updatedSiteResource,