From b399d2a291423f535a3138124575e73eeda535a1 Mon Sep 17 00:00:00 2001 From: Owen Date: Fri, 3 Jul 2026 10:23:32 -0400 Subject: [PATCH] Add some retry and database confict mitigation --- server/lib/rebuildClientAssociations.ts | 257 ++++++++++++---------- server/routers/newt/buildConfiguration.ts | 4 +- server/routers/olm/buildConfiguration.ts | 2 +- 3 files changed, 145 insertions(+), 118 deletions(-) diff --git a/server/lib/rebuildClientAssociations.ts b/server/lib/rebuildClientAssociations.ts index b7647a182..abad6b7aa 100644 --- a/server/lib/rebuildClientAssociations.ts +++ b/server/lib/rebuildClientAssociations.ts @@ -485,6 +485,7 @@ async function rebuildClientAssociationsFromSiteResourceImpl( await trx .insert(clientSiteResourcesAssociationsCache) .values(clientSiteResourcesToInsert) + .onConflictDoNothing() .returning(); logger.debug( `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteResourceId=${siteResource.siteResourceId} inserted clientSiteResource associations` @@ -532,121 +533,141 @@ async function rebuildClientAssociationsFromSiteResourceImpl( for (const site of sitesToProcess) { const siteId = site.siteId; - logger.debug( - `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] processing siteId=${siteId} for siteResourceId=${siteResource.siteResourceId}` - ); + try { + logger.debug( + `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] processing siteId=${siteId} for siteResourceId=${siteResource.siteResourceId}` + ); - const existingClientSites = await trx - .select({ - clientId: clientSitesAssociationsCache.clientId - }) - .from(clientSitesAssociationsCache) - .where(eq(clientSitesAssociationsCache.siteId, siteId)); + const existingClientSites = await trx + .select({ + clientId: clientSitesAssociationsCache.clientId + }) + .from(clientSitesAssociationsCache) + .where(eq(clientSitesAssociationsCache.siteId, siteId)); - const existingClientSiteIds = existingClientSites.map( - (row) => row.clientId - ); + const existingClientSiteIds = existingClientSites.map( + (row) => row.clientId + ); - logger.debug( - `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} existingClientSiteIds=[${existingClientSiteIds.join(", ")}]` - ); + logger.debug( + `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} existingClientSiteIds=[${existingClientSiteIds.join(", ")}]` + ); - // Get full client details for existing clients (needed for sending delete messages) - const existingClients = - existingClientSiteIds.length > 0 - ? await trx - .select({ - clientId: clients.clientId, - pubKey: clients.pubKey, - subnet: clients.subnet - }) - .from(clients) - .where(inArray(clients.clientId, existingClientSiteIds)) + // Get full client details for existing clients (needed for sending delete messages) + const existingClients = + existingClientSiteIds.length > 0 + ? await trx + .select({ + clientId: clients.clientId, + pubKey: clients.pubKey, + subnet: clients.subnet + }) + .from(clients) + .where( + inArray(clients.clientId, existingClientSiteIds) + ) + : []; + + const otherResourceClientIds = + clientsFromOtherResourcesBySite.get(siteId) ?? + new Set(); + + logger.debug( + `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} otherResourceClientIds=[${[...otherResourceClientIds].join(", ")}] mergedAllClientIds=[${mergedAllClientIds.join(", ")}]` + ); + + // Expected clients from this resource are site-scoped: if this site is + // no longer attached to the resource, the expected set is empty. + const expectedClientIdsForSite = currentSiteIdSet.has(siteId) + ? mergedAllClientIds : []; - const otherResourceClientIds = - clientsFromOtherResourcesBySite.get(siteId) ?? new Set(); - - logger.debug( - `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} otherResourceClientIds=[${[...otherResourceClientIds].join(", ")}] mergedAllClientIds=[${mergedAllClientIds.join(", ")}]` - ); - - // Expected clients from this resource are site-scoped: if this site is - // no longer attached to the resource, the expected set is empty. - const expectedClientIdsForSite = currentSiteIdSet.has(siteId) - ? mergedAllClientIds - : []; - - const clientSitesToAdd = expectedClientIdsForSite.filter( - (clientId) => - !existingClientSiteIds.includes(clientId) && - !otherResourceClientIds.has(clientId) // dont add if already connected via another site resource - ); - - const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({ - clientId, - siteId - })); - - logger.debug( - `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toAdd=[${clientSitesToAdd.join(", ")}]` - ); - - if (clientSitesToInsert.length > 0) { - logger.debug( - `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserting ${clientSitesToInsert.length} clientSite association(s)` + const clientSitesToAdd = expectedClientIdsForSite.filter( + (clientId) => + !existingClientSiteIds.includes(clientId) && + !otherResourceClientIds.has(clientId) // dont add if already connected via another site resource ); - await trx - .insert(clientSitesAssociationsCache) - .values(clientSitesToInsert) - .returning(); - logger.debug( - `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserted clientSite associations` - ); - } else { - logger.debug( - `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} no clientSite associations to insert` - ); - } - // Now remove any client-site associations that should no longer exist - const clientSitesToRemove = existingClientSiteIds.filter( - (clientId) => - !expectedClientIdsForSite.includes(clientId) && - !otherResourceClientIds.has(clientId) // dont remove if there is still another connection for another site resource - ); + const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({ + clientId, + siteId + })); - logger.debug( - `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toRemove=[${clientSitesToRemove.join(", ")}]` - ); - - if (clientSitesToRemove.length > 0) { logger.debug( - `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} deleting ${clientSitesToRemove.length} clientSite association(s)` + `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toAdd=[${clientSitesToAdd.join(", ")}]` ); - await trx - .delete(clientSitesAssociationsCache) - .where( - and( - eq(clientSitesAssociationsCache.siteId, siteId), - inArray( - clientSitesAssociationsCache.clientId, - clientSitesToRemove - ) - ) + + if (clientSitesToInsert.length > 0) { + logger.debug( + `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserting ${clientSitesToInsert.length} clientSite association(s)` ); - } + await trx + .insert(clientSitesAssociationsCache) + .values(clientSitesToInsert) + .onConflictDoNothing() + .returning(); + logger.debug( + `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserted clientSite associations` + ); + } else { + logger.debug( + `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} no clientSite associations to insert` + ); + } - // Now handle the messages to add/remove peers on both the newt and olm sides - await handleMessagesForSiteClients( - site, - siteId, - mergedAllClients, - existingClients, - clientSitesToAdd, - clientSitesToRemove, - trx - ); + // Now remove any client-site associations that should no longer exist + const clientSitesToRemove = existingClientSiteIds.filter( + (clientId) => + !expectedClientIdsForSite.includes(clientId) && + !otherResourceClientIds.has(clientId) // dont remove if there is still another connection for another site resource + ); + + logger.debug( + `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toRemove=[${clientSitesToRemove.join(", ")}]` + ); + + if (clientSitesToRemove.length > 0) { + logger.debug( + `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} deleting ${clientSitesToRemove.length} clientSite association(s)` + ); + await trx + .delete(clientSitesAssociationsCache) + .where( + and( + eq(clientSitesAssociationsCache.siteId, siteId), + inArray( + clientSitesAssociationsCache.clientId, + clientSitesToRemove + ) + ) + ); + } + + // Now handle the messages to add/remove peers on both the newt and olm sides + await handleMessagesForSiteClients( + site, + siteId, + mergedAllClients, + existingClients, + clientSitesToAdd, + clientSitesToRemove, + trx + ); + } catch (err) { + // Don't let a failure on one site abort processing of every + // other site queued after it in this run. Since we're not + // re-throwing, the outer wrapper's retry/requeue logic never + // sees this failure, so explicitly queue this resource for a + // follow-up pass to reconcile whatever this site didn't get to. + logger.error( + `rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} failed while processing site for siteResourceId=${siteResource.siteResourceId}, continuing with remaining sites and queuing a follow-up pass:`, + err + ); + await rebuildQueue.enqueue({ + type: "site-resource", + id: siteResource.siteResourceId + }); + } } // Handle subnet proxy target updates for the resource associations @@ -939,7 +960,7 @@ export async function updateClientSiteDestinations( for (const site of sitesData) { if (!site.sites.subnet) { - logger.warn(`Site ${site.sites.siteId} has no subnet, skipping`); + logger.debug(`Site ${site.sites.siteId} has no subnet, skipping`); continue; } @@ -1866,12 +1887,15 @@ async function rebuildClientAssociationsFromClientImpl( // Insert new associations if (resourcesToAdd.length > 0) { - await trx.insert(clientSiteResourcesAssociationsCache).values( - resourcesToAdd.map((siteResourceId) => ({ - clientId: client.clientId, - siteResourceId - })) - ); + await trx + .insert(clientSiteResourcesAssociationsCache) + .values( + resourcesToAdd.map((siteResourceId) => ({ + clientId: client.clientId, + siteResourceId + })) + ) + .onConflictDoNothing(); } // Remove old associations @@ -1909,12 +1933,15 @@ async function rebuildClientAssociationsFromClientImpl( // Insert new site associations if (sitesToAdd.length > 0) { - await trx.insert(clientSitesAssociationsCache).values( - sitesToAdd.map((siteId) => ({ - clientId: client.clientId, - siteId - })) - ); + await trx + .insert(clientSitesAssociationsCache) + .values( + sitesToAdd.map((siteId) => ({ + clientId: client.clientId, + siteId + })) + ) + .onConflictDoNothing(); } // Remove old site associations diff --git a/server/routers/newt/buildConfiguration.ts b/server/routers/newt/buildConfiguration.ts index 5083a6c56..5b7f211ae 100644 --- a/server/routers/newt/buildConfiguration.ts +++ b/server/routers/newt/buildConfiguration.ts @@ -52,13 +52,13 @@ export async function buildClientConfigurationForNewtClient( clientsRes .filter((client) => { if (!client.clients.pubKey) { - logger.warn( + logger.debug( `Client ${client.clients.clientId} has no public key, skipping` ); return false; } if (!client.clients.subnet) { - logger.warn( + logger.debug( `Client ${client.clients.clientId} has no subnet, skipping` ); return false; diff --git a/server/routers/olm/buildConfiguration.ts b/server/routers/olm/buildConfiguration.ts index 41bb6d60d..f72731c92 100644 --- a/server/routers/olm/buildConfiguration.ts +++ b/server/routers/olm/buildConfiguration.ts @@ -161,7 +161,7 @@ export async function buildSiteConfigurationForOlmClient( } if (!site.subnet) { - logger.warn(`Site ${site.siteId} has no subnet, skipping`); + logger.debug(`Site ${site.siteId} has no subnet, skipping`); continue; }