Properly lock the ip selection through writes to db

This commit is contained in:
Owen
2026-05-27 21:06:34 -07:00
parent cd9e56fdb7
commit 64c901d91f
8 changed files with 598 additions and 551 deletions

View File

@@ -364,8 +364,14 @@ export async function updateClientResources(
});
} else {
let aliasAddress: string | null = null;
let releaseAliasLock: (() => Promise<void>) | null = null;
if (resourceData.mode === "host" || resourceData.mode === "http") {
aliasAddress = await getNextAvailableAliasAddress(orgId, trx);
const { value, release } = await getNextAvailableAliasAddress(
orgId,
trx
);
aliasAddress = value;
releaseAliasLock = release;
}
let domainInfo:
@@ -427,6 +433,8 @@ export async function updateClientResources(
})
.returning();
await releaseAliasLock?.();
const siteResourceId = newResource.siteResourceId;
for (const site of allSites) {

View File

@@ -331,16 +331,8 @@ export async function calculateUserClientsForOrgs(
];
// Get next available subnet
const newSubnet = await getNextAvailableClientSubnet(
orgId,
transaction
);
if (!newSubnet) {
logger.warn(
`Skipping org ${orgId} for OLM ${olm.olmId} (user ${userId}): no available subnet found`
);
continue;
}
const { value: newSubnet, release: releaseSubnetLock } =
await getNextAvailableClientSubnet(orgId, transaction);
const subnet = newSubnet.split("/")[0];
const updatedSubnet = `${subnet}/${org.subnet.split("/")[1]}`;
@@ -370,6 +362,7 @@ export async function calculateUserClientsForOrgs(
.insert(clients)
.values(newClientData)
.returning();
await releaseSubnetLock();
existingClientCache.set(
getOrgOlmKey(orgId, olm.olmId),
newClient

View File

@@ -327,10 +327,15 @@ export function doCidrsOverlap(cidr1: string, cidr2: string): boolean {
export async function getNextAvailableClientSubnet(
orgId: string,
transaction: Transaction | typeof db = db
): Promise<string> {
return await lockManager.withLock(
`client-subnet-allocation:${orgId}`,
async () => {
): Promise<{ value: string; release: () => Promise<void> }> {
const lockKey = `client-subnet-allocation:${orgId}`;
const acquired = await lockManager.acquireLockWithRetry(lockKey, 6000);
if (!acquired) {
throw new Error(`Failed to acquire lock: ${lockKey}`);
}
const release = () => lockManager.releaseLock(lockKey);
try {
const [org] = await transaction
.select()
.from(orgs)
@@ -358,16 +363,14 @@ export async function getNextAvailableClientSubnet(
address: clients.subnet
})
.from(clients)
.where(
and(isNotNull(clients.subnet), eq(clients.orgId, orgId))
);
.where(and(isNotNull(clients.subnet), eq(clients.orgId, orgId)));
const addresses = [
...existingAddressesSites.map(
(site) => `${site.address?.split("/")[0]}/32`
), // we are overriding the 32 so that we pick individual addresses in the subnet of the org for the site and the client even though they are stored with the /block_size of the org
...existingAddressesClients.map(
(client) => `${client.address.split("/")}/32`
(client) => `${client.address.split("/")[0]}/32`
)
].filter((address) => address !== null) as string[];
@@ -376,18 +379,25 @@ export async function getNextAvailableClientSubnet(
throw new Error("No available subnets remaining in space");
}
return subnet;
return { value: subnet, release };
} catch (e) {
await release();
throw e;
}
);
}
export async function getNextAvailableAliasAddress(
orgId: string,
trx: Transaction | typeof db = db
): Promise<string> {
return await lockManager.withLock(
`alias-address-allocation:${orgId}`,
async () => {
): Promise<{ value: string; release: () => Promise<void> }> {
const lockKey = `alias-address-allocation:${orgId}`;
const acquired = await lockManager.acquireLockWithRetry(lockKey, 6000);
if (!acquired) {
throw new Error(`Failed to acquire lock: ${lockKey}`);
}
const release = () => lockManager.releaseLock(lockKey);
try {
const [org] = await trx
.select()
.from(orgs)
@@ -429,11 +439,7 @@ export async function getNextAvailableAliasAddress(
`${org.utilitySubnet.split("/")[0]}/29`
].filter((address) => address !== null) as string[];
let subnet = findNextAvailableCidr(
addresses,
32,
org.utilitySubnet
);
let subnet = findNextAvailableCidr(addresses, 32, org.utilitySubnet);
if (!subnet) {
throw new Error("No available subnets remaining in space");
}
@@ -441,13 +447,25 @@ export async function getNextAvailableAliasAddress(
// remove the cidr
subnet = subnet.split("/")[0];
return subnet;
return { value: subnet, release };
} catch (e) {
await release();
throw e;
}
);
}
export async function getNextAvailableOrgSubnet(): Promise<string> {
return await lockManager.withLock("org-subnet-allocation", async () => {
export async function getNextAvailableOrgSubnet(): Promise<{
value: string;
release: () => Promise<void>;
}> {
const lockKey = "org-subnet-allocation";
const acquired = await lockManager.acquireLockWithRetry(lockKey, 6000);
if (!acquired) {
throw new Error(`Failed to acquire lock: ${lockKey}`);
}
const release = () => lockManager.releaseLock(lockKey);
try {
const existingAddresses = await db
.select({
subnet: orgs.subnet
@@ -466,8 +484,11 @@ export async function getNextAvailableOrgSubnet(): Promise<string> {
throw new Error("No available subnets remaining in space");
}
return subnet;
});
return { value: subnet, release };
} catch (e) {
await release();
throw e;
}
}
export function generateRemoteSubnets(

View File

@@ -51,7 +51,9 @@ export async function pickClientDefaults(
const olmId = generateId(15);
const secret = generateId(48);
const newSubnet = await getNextAvailableClientSubnet(orgId);
const { value: newSubnet, release } =
await getNextAvailableClientSubnet(orgId);
await release(); // release immediately — this endpoint only previews the next available value
if (!newSubnet) {
return next(
createHttpError(

View File

@@ -203,17 +203,10 @@ export async function registerNewt(
let newSiteId: number | undefined;
const { value: newClientAddress, release: releaseSubnetLock } =
await getNextAvailableClientSubnet(orgId);
try {
await db.transaction(async (trx) => {
const newClientAddress = await getNextAvailableClientSubnet(orgId);
if (!newClientAddress) {
return next(
createHttpError(
HttpCode.INTERNAL_SERVER_ERROR,
"No available subnet found"
)
);
}
let clientAddress = newClientAddress.split("/")[0];
clientAddress = `${clientAddress}/${org.subnet!.split("/")[1]}`; // we want the block size of the whole org
@@ -227,7 +220,9 @@ export async function registerNewt(
address: clientAddress,
type: "newt",
dockerSocketEnabled: true,
status: keyRecord.approveNewSites ? "approved" : "pending"
status: keyRecord.approveNewSites
? "approved"
: "pending"
})
.returning();
@@ -281,6 +276,9 @@ export async function registerNewt(
await usageService.add(orgId, FeatureId.SITES, 1, trx);
});
} finally {
await releaseSubnetLock();
}
logger.info(
`Provisioned new site (ID: ${newSiteId}) and newt (ID: ${newtId}) for org ${orgId} via provisioning key ${provisioningKeyId}`

View File

@@ -174,6 +174,7 @@ export async function createSite(
}
let updatedAddress = null;
let releaseSubnetLock: (() => Promise<void>) | null = null;
if (address) {
if (!org.subnet) {
return next(
@@ -244,19 +245,14 @@ export async function createSite(
);
}
} else {
const newClientAddress = await getNextAvailableClientSubnet(orgId);
if (!newClientAddress) {
return next(
createHttpError(
HttpCode.INTERNAL_SERVER_ERROR,
"No available address found"
)
);
}
const { value: newClientAddress, release } =
await getNextAvailableClientSubnet(orgId);
releaseSubnetLock = release;
updatedAddress = newClientAddress.split("/")[0];
}
let newSite: Site | undefined;
try {
if (subnet && exitNodeId) {
//make sure the subnet is in the range of the exit node if provided
const [exitNode] = await db
@@ -266,7 +262,10 @@ export async function createSite(
if (!exitNode) {
return next(
createHttpError(HttpCode.NOT_FOUND, "Exit node not found")
createHttpError(
HttpCode.NOT_FOUND,
"Exit node not found"
)
);
}
@@ -321,7 +320,9 @@ export async function createSite(
const existingSite = await db
.select()
.from(sites)
.where(and(eq(sites.niceId, niceId), eq(sites.orgId, orgId)))
.where(
and(eq(sites.niceId, niceId), eq(sites.orgId, orgId))
)
.limit(1);
if (existingSite.length > 0) {
@@ -334,7 +335,6 @@ export async function createSite(
}
}
let newSite: Site | undefined;
await db.transaction(async (trx) => {
if (type == "newt") {
[newSite] = await trx
@@ -378,10 +378,8 @@ export async function createSite(
);
}
const { exitNode, hasAccess } = await verifyExitNodeOrgAccess(
exitNodeId,
orgId
);
const { exitNode, hasAccess } =
await verifyExitNodeOrgAccess(exitNodeId, orgId);
if (!exitNode) {
logger.warn("Exit node not found");
@@ -448,7 +446,10 @@ export async function createSite(
if (adminRole.length === 0) {
return next(
createHttpError(HttpCode.NOT_FOUND, `Admin role not found`)
createHttpError(
HttpCode.NOT_FOUND,
`Admin role not found`
)
);
}
@@ -505,6 +506,9 @@ export async function createSite(
await usageService.add(orgId, FeatureId.SITES, 1, trx);
});
} finally {
await releaseSubnetLock?.();
}
if (!newSite) {
return next(

View File

@@ -119,7 +119,9 @@ export async function pickSiteDefaults(
);
}
const newClientAddress = await getNextAvailableClientSubnet(orgId);
const { value: newClientAddress, release: releaseSubnetLock } =
await getNextAvailableClientSubnet(orgId);
await releaseSubnetLock(); // release immediately — this endpoint only previews the next available value
if (!newClientAddress) {
return next(
createHttpError(

View File

@@ -397,11 +397,16 @@ export async function createSiteResource(
}
let aliasAddress: string | null = null;
let releaseAliasLock: (() => Promise<void>) | null = null;
if (mode === "host" || mode === "http") {
aliasAddress = await getNextAvailableAliasAddress(orgId);
const { value, release } =
await getNextAvailableAliasAddress(orgId);
aliasAddress = value;
releaseAliasLock = release;
}
let newSiteResource: SiteResource | undefined;
try {
await db.transaction(async (trx) => {
const [network] = await trx
.insert(networks)
@@ -445,7 +450,9 @@ export async function createSiteResource(
aliasAddress,
tcpPortRangeString: tcpPortRangeStringAdjusted,
udpPortRangeString:
mode == "http" || mode == "ssh" ? "" : udpPortRangeString,
mode == "http" || mode == "ssh"
? ""
: udpPortRangeString,
disableIcmp:
disableIcmp ||
(mode == "http" || mode == "ssh" ? true : false), // default to true for http resources, otherwise false
@@ -484,7 +491,10 @@ export async function createSiteResource(
if (!adminRole) {
return next(
createHttpError(HttpCode.NOT_FOUND, `Admin role not found`)
createHttpError(
HttpCode.NOT_FOUND,
`Admin role not found`
)
);
}
@@ -497,7 +507,10 @@ export async function createSiteResource(
await trx
.insert(roleSiteResources)
.values(
roleIds.map((roleId) => ({ roleId, siteResourceId }))
roleIds.map((roleId) => ({
roleId,
siteResourceId
}))
);
}
@@ -505,7 +518,10 @@ export async function createSiteResource(
await trx
.insert(userSiteResources)
.values(
userIds.map((userId) => ({ userId, siteResourceId }))
userIds.map((userId) => ({
userId,
siteResourceId
}))
);
}
@@ -535,6 +551,9 @@ export async function createSiteResource(
}
}
});
} finally {
await releaseAliasLock?.();
}
if (!newSiteResource) {
return next(