Add some retry and database confict mitigation

This commit is contained in:
Owen
2026-07-03 10:23:32 -04:00
parent 1b1fba60f1
commit b399d2a291
3 changed files with 145 additions and 118 deletions

View File

@@ -485,6 +485,7 @@ async function rebuildClientAssociationsFromSiteResourceImpl(
await trx
.insert(clientSiteResourcesAssociationsCache)
.values(clientSiteResourcesToInsert)
.onConflictDoNothing()
.returning();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteResourceId=${siteResource.siteResourceId} inserted clientSiteResource associations`
@@ -532,121 +533,141 @@ async function rebuildClientAssociationsFromSiteResourceImpl(
for (const site of sitesToProcess) {
const siteId = site.siteId;
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] processing siteId=${siteId} for siteResourceId=${siteResource.siteResourceId}`
);
try {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] processing siteId=${siteId} for siteResourceId=${siteResource.siteResourceId}`
);
const existingClientSites = await trx
.select({
clientId: clientSitesAssociationsCache.clientId
})
.from(clientSitesAssociationsCache)
.where(eq(clientSitesAssociationsCache.siteId, siteId));
const existingClientSites = await trx
.select({
clientId: clientSitesAssociationsCache.clientId
})
.from(clientSitesAssociationsCache)
.where(eq(clientSitesAssociationsCache.siteId, siteId));
const existingClientSiteIds = existingClientSites.map(
(row) => row.clientId
);
const existingClientSiteIds = existingClientSites.map(
(row) => row.clientId
);
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} existingClientSiteIds=[${existingClientSiteIds.join(", ")}]`
);
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} existingClientSiteIds=[${existingClientSiteIds.join(", ")}]`
);
// Get full client details for existing clients (needed for sending delete messages)
const existingClients =
existingClientSiteIds.length > 0
? await trx
.select({
clientId: clients.clientId,
pubKey: clients.pubKey,
subnet: clients.subnet
})
.from(clients)
.where(inArray(clients.clientId, existingClientSiteIds))
// Get full client details for existing clients (needed for sending delete messages)
const existingClients =
existingClientSiteIds.length > 0
? await trx
.select({
clientId: clients.clientId,
pubKey: clients.pubKey,
subnet: clients.subnet
})
.from(clients)
.where(
inArray(clients.clientId, existingClientSiteIds)
)
: [];
const otherResourceClientIds =
clientsFromOtherResourcesBySite.get(siteId) ??
new Set<number>();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} otherResourceClientIds=[${[...otherResourceClientIds].join(", ")}] mergedAllClientIds=[${mergedAllClientIds.join(", ")}]`
);
// Expected clients from this resource are site-scoped: if this site is
// no longer attached to the resource, the expected set is empty.
const expectedClientIdsForSite = currentSiteIdSet.has(siteId)
? mergedAllClientIds
: [];
const otherResourceClientIds =
clientsFromOtherResourcesBySite.get(siteId) ?? new Set<number>();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} otherResourceClientIds=[${[...otherResourceClientIds].join(", ")}] mergedAllClientIds=[${mergedAllClientIds.join(", ")}]`
);
// Expected clients from this resource are site-scoped: if this site is
// no longer attached to the resource, the expected set is empty.
const expectedClientIdsForSite = currentSiteIdSet.has(siteId)
? mergedAllClientIds
: [];
const clientSitesToAdd = expectedClientIdsForSite.filter(
(clientId) =>
!existingClientSiteIds.includes(clientId) &&
!otherResourceClientIds.has(clientId) // dont add if already connected via another site resource
);
const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({
clientId,
siteId
}));
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toAdd=[${clientSitesToAdd.join(", ")}]`
);
if (clientSitesToInsert.length > 0) {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserting ${clientSitesToInsert.length} clientSite association(s)`
const clientSitesToAdd = expectedClientIdsForSite.filter(
(clientId) =>
!existingClientSiteIds.includes(clientId) &&
!otherResourceClientIds.has(clientId) // dont add if already connected via another site resource
);
await trx
.insert(clientSitesAssociationsCache)
.values(clientSitesToInsert)
.returning();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserted clientSite associations`
);
} else {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} no clientSite associations to insert`
);
}
// Now remove any client-site associations that should no longer exist
const clientSitesToRemove = existingClientSiteIds.filter(
(clientId) =>
!expectedClientIdsForSite.includes(clientId) &&
!otherResourceClientIds.has(clientId) // dont remove if there is still another connection for another site resource
);
const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({
clientId,
siteId
}));
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toRemove=[${clientSitesToRemove.join(", ")}]`
);
if (clientSitesToRemove.length > 0) {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} deleting ${clientSitesToRemove.length} clientSite association(s)`
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toAdd=[${clientSitesToAdd.join(", ")}]`
);
await trx
.delete(clientSitesAssociationsCache)
.where(
and(
eq(clientSitesAssociationsCache.siteId, siteId),
inArray(
clientSitesAssociationsCache.clientId,
clientSitesToRemove
)
)
if (clientSitesToInsert.length > 0) {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserting ${clientSitesToInsert.length} clientSite association(s)`
);
}
await trx
.insert(clientSitesAssociationsCache)
.values(clientSitesToInsert)
.onConflictDoNothing()
.returning();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserted clientSite associations`
);
} else {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} no clientSite associations to insert`
);
}
// Now handle the messages to add/remove peers on both the newt and olm sides
await handleMessagesForSiteClients(
site,
siteId,
mergedAllClients,
existingClients,
clientSitesToAdd,
clientSitesToRemove,
trx
);
// Now remove any client-site associations that should no longer exist
const clientSitesToRemove = existingClientSiteIds.filter(
(clientId) =>
!expectedClientIdsForSite.includes(clientId) &&
!otherResourceClientIds.has(clientId) // dont remove if there is still another connection for another site resource
);
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toRemove=[${clientSitesToRemove.join(", ")}]`
);
if (clientSitesToRemove.length > 0) {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} deleting ${clientSitesToRemove.length} clientSite association(s)`
);
await trx
.delete(clientSitesAssociationsCache)
.where(
and(
eq(clientSitesAssociationsCache.siteId, siteId),
inArray(
clientSitesAssociationsCache.clientId,
clientSitesToRemove
)
)
);
}
// Now handle the messages to add/remove peers on both the newt and olm sides
await handleMessagesForSiteClients(
site,
siteId,
mergedAllClients,
existingClients,
clientSitesToAdd,
clientSitesToRemove,
trx
);
} catch (err) {
// Don't let a failure on one site abort processing of every
// other site queued after it in this run. Since we're not
// re-throwing, the outer wrapper's retry/requeue logic never
// sees this failure, so explicitly queue this resource for a
// follow-up pass to reconcile whatever this site didn't get to.
logger.error(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} failed while processing site for siteResourceId=${siteResource.siteResourceId}, continuing with remaining sites and queuing a follow-up pass:`,
err
);
await rebuildQueue.enqueue({
type: "site-resource",
id: siteResource.siteResourceId
});
}
}
// Handle subnet proxy target updates for the resource associations
@@ -939,7 +960,7 @@ export async function updateClientSiteDestinations(
for (const site of sitesData) {
if (!site.sites.subnet) {
logger.warn(`Site ${site.sites.siteId} has no subnet, skipping`);
logger.debug(`Site ${site.sites.siteId} has no subnet, skipping`);
continue;
}
@@ -1866,12 +1887,15 @@ async function rebuildClientAssociationsFromClientImpl(
// Insert new associations
if (resourcesToAdd.length > 0) {
await trx.insert(clientSiteResourcesAssociationsCache).values(
resourcesToAdd.map((siteResourceId) => ({
clientId: client.clientId,
siteResourceId
}))
);
await trx
.insert(clientSiteResourcesAssociationsCache)
.values(
resourcesToAdd.map((siteResourceId) => ({
clientId: client.clientId,
siteResourceId
}))
)
.onConflictDoNothing();
}
// Remove old associations
@@ -1909,12 +1933,15 @@ async function rebuildClientAssociationsFromClientImpl(
// Insert new site associations
if (sitesToAdd.length > 0) {
await trx.insert(clientSitesAssociationsCache).values(
sitesToAdd.map((siteId) => ({
clientId: client.clientId,
siteId
}))
);
await trx
.insert(clientSitesAssociationsCache)
.values(
sitesToAdd.map((siteId) => ({
clientId: client.clientId,
siteId
}))
)
.onConflictDoNothing();
}
// Remove old site associations

View File

@@ -52,13 +52,13 @@ export async function buildClientConfigurationForNewtClient(
clientsRes
.filter((client) => {
if (!client.clients.pubKey) {
logger.warn(
logger.debug(
`Client ${client.clients.clientId} has no public key, skipping`
);
return false;
}
if (!client.clients.subnet) {
logger.warn(
logger.debug(
`Client ${client.clients.clientId} has no subnet, skipping`
);
return false;

View File

@@ -161,7 +161,7 @@ export async function buildSiteConfigurationForOlmClient(
}
if (!site.subnet) {
logger.warn(`Site ${site.siteId} has no subnet, skipping`);
logger.debug(`Site ${site.siteId} has no subnet, skipping`);
continue;
}