mirror of
https://github.com/fosrl/pangolin.git
synced 2026-06-26 17:19:09 +00:00
Standardize db rebuildClientAssociationsFromClient
This commit is contained in:
@@ -3,7 +3,6 @@ import {
|
||||
newts,
|
||||
blueprints,
|
||||
Blueprint,
|
||||
Site,
|
||||
siteResources,
|
||||
roleSiteResources,
|
||||
userSiteResources,
|
||||
@@ -60,30 +59,26 @@ export async function applyBlueprint({
|
||||
|
||||
const config: Config = validationResult.data;
|
||||
|
||||
let proxyResourcesResults: PublicResourcesResults = [];
|
||||
let clientResourcesResults: ClientResourcesResults = [];
|
||||
let publicResourcesResults: PublicResourcesResults = [];
|
||||
let privateResourcesResults: ClientResourcesResults = [];
|
||||
await db.transaction(async (trx) => {
|
||||
await updateResourcePolicies(orgId, config, trx);
|
||||
|
||||
proxyResourcesResults = await updatePublicResources(
|
||||
publicResourcesResults = await updatePublicResources(
|
||||
orgId,
|
||||
config,
|
||||
trx,
|
||||
siteId
|
||||
);
|
||||
clientResourcesResults = await updatePrivateResources(
|
||||
privateResourcesResults = await updatePrivateResources(
|
||||
orgId,
|
||||
config,
|
||||
trx,
|
||||
siteId
|
||||
);
|
||||
|
||||
logger.debug(
|
||||
`Successfully updated proxy resources for org ${orgId}: ${JSON.stringify(proxyResourcesResults)}`
|
||||
);
|
||||
|
||||
// We need to update the targets on the newts from the successfully updated information
|
||||
for (const result of proxyResourcesResults) {
|
||||
for (const result of publicResourcesResults) {
|
||||
for (const target of result.targetsToUpdate) {
|
||||
const [site] = await trx
|
||||
.select()
|
||||
@@ -136,166 +131,38 @@ export async function applyBlueprint({
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
`Successfully updated client resources for org ${orgId}: ${JSON.stringify(clientResourcesResults)}`
|
||||
`Successfully updated public resources for org ${orgId}: ${JSON.stringify(publicResourcesResults)}`
|
||||
);
|
||||
|
||||
// We need to update the targets on the newts from the successfully updated information
|
||||
for (const result of clientResourcesResults) {
|
||||
if (
|
||||
result.oldSiteResource &&
|
||||
JSON.stringify(result.newSites?.sort()) !==
|
||||
JSON.stringify(result.oldSites?.sort())
|
||||
) {
|
||||
// query existing associations
|
||||
const existingRoleIds = await trx
|
||||
.select()
|
||||
.from(roleSiteResources)
|
||||
.where(
|
||||
eq(
|
||||
roleSiteResources.siteResourceId,
|
||||
result.oldSiteResource.siteResourceId
|
||||
)
|
||||
)
|
||||
.then((rows) => rows.map((row) => row.roleId));
|
||||
|
||||
const existingUserIds = await trx
|
||||
.select()
|
||||
.from(userSiteResources)
|
||||
.where(
|
||||
eq(
|
||||
userSiteResources.siteResourceId,
|
||||
result.oldSiteResource.siteResourceId
|
||||
)
|
||||
)
|
||||
.then((rows) => rows.map((row) => row.userId));
|
||||
|
||||
const existingClientIds = await trx
|
||||
.select()
|
||||
.from(clientSiteResources)
|
||||
.where(
|
||||
eq(
|
||||
clientSiteResources.siteResourceId,
|
||||
result.oldSiteResource.siteResourceId
|
||||
)
|
||||
)
|
||||
.then((rows) => rows.map((row) => row.clientId));
|
||||
|
||||
// delete the existing site resource
|
||||
await trx
|
||||
.delete(siteResources)
|
||||
.where(
|
||||
and(
|
||||
eq(
|
||||
siteResources.siteResourceId,
|
||||
result.oldSiteResource.siteResourceId
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(
|
||||
result.oldSiteResource,
|
||||
trx
|
||||
for (const result of privateResourcesResults) {
|
||||
rebuildClientAssociationsFromSiteResource(
|
||||
result.newSiteResource
|
||||
).catch((e) => {
|
||||
logger.error(
|
||||
`Failed to rebuild client associations for site resource ${result.newSiteResource.siteResourceId}. Error: ${e}`
|
||||
);
|
||||
});
|
||||
|
||||
const [insertedSiteResource] = await trx
|
||||
.insert(siteResources)
|
||||
.values({
|
||||
...result.newSiteResource
|
||||
})
|
||||
.returning();
|
||||
|
||||
// wait some time to allow for messages to be handled
|
||||
await new Promise((resolve) => setTimeout(resolve, 750));
|
||||
|
||||
//////////////////// update the associations ////////////////////
|
||||
|
||||
if (existingRoleIds.length > 0) {
|
||||
await trx.insert(roleSiteResources).values(
|
||||
existingRoleIds.map((roleId) => ({
|
||||
roleId,
|
||||
siteResourceId:
|
||||
insertedSiteResource!.siteResourceId
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
if (existingUserIds.length > 0) {
|
||||
await trx.insert(userSiteResources).values(
|
||||
existingUserIds.map((userId) => ({
|
||||
userId,
|
||||
siteResourceId:
|
||||
insertedSiteResource!.siteResourceId
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
if (existingClientIds.length > 0) {
|
||||
await trx.insert(clientSiteResources).values(
|
||||
existingClientIds.map((clientId) => ({
|
||||
clientId,
|
||||
siteResourceId:
|
||||
insertedSiteResource!.siteResourceId
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(
|
||||
insertedSiteResource,
|
||||
trx
|
||||
handleMessagingForUpdatedSiteResource(
|
||||
result.oldSiteResource,
|
||||
result.newSiteResource,
|
||||
result.oldSites.map((site) => ({
|
||||
// only need to run this on the old sites because the new sites are added above
|
||||
siteId: site.siteId,
|
||||
orgId: result.newSiteResource.orgId
|
||||
}))
|
||||
).catch((err) => {
|
||||
logger.error(
|
||||
`Error handling messaging for updated site resource ${result.newSiteResource.siteResourceId}:`,
|
||||
err
|
||||
);
|
||||
} else {
|
||||
let good = true;
|
||||
for (const newSite of result.newSites) {
|
||||
const [site] = await trx
|
||||
.select()
|
||||
.from(sites)
|
||||
.innerJoin(newts, eq(sites.siteId, newts.siteId))
|
||||
.where(
|
||||
and(
|
||||
eq(sites.siteId, newSite.siteId),
|
||||
eq(sites.orgId, orgId),
|
||||
eq(sites.type, "newt"),
|
||||
isNotNull(sites.pubKey)
|
||||
)
|
||||
)
|
||||
.limit(1);
|
||||
|
||||
if (!site) {
|
||||
logger.debug(
|
||||
`No newt sites found for client resource ${result.newSiteResource.siteResourceId}, skipping target update`
|
||||
);
|
||||
good = false;
|
||||
break;
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
`Updating client resource ${result.newSiteResource.siteResourceId} on site ${newSite.siteId}`
|
||||
);
|
||||
}
|
||||
|
||||
if (!good) {
|
||||
continue;
|
||||
}
|
||||
|
||||
await handleMessagingForUpdatedSiteResource(
|
||||
result.oldSiteResource,
|
||||
result.newSiteResource,
|
||||
result.newSites.map((site) => ({
|
||||
siteId: site.siteId,
|
||||
orgId: result.newSiteResource.orgId
|
||||
})),
|
||||
trx
|
||||
);
|
||||
}
|
||||
|
||||
// await addClientTargets(
|
||||
// site.newt.newtId,
|
||||
// result.resource.destination,
|
||||
// result.resource.destinationPort,
|
||||
// result.resource.protocol,
|
||||
// result.resource.proxyPort
|
||||
// );
|
||||
});
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
`Successfully updated private resources for org ${orgId}: ${JSON.stringify(privateResourcesResults)}`
|
||||
);
|
||||
});
|
||||
|
||||
blueprintSucceeded = true;
|
||||
|
||||
@@ -160,9 +160,9 @@ export async function getClientSiteResourceAccess(
|
||||
}
|
||||
|
||||
export async function rebuildClientAssociationsFromSiteResource(
|
||||
siteResource: SiteResource,
|
||||
trx: Transaction | typeof db = db
|
||||
siteResource: SiteResource
|
||||
) {
|
||||
const trx = primaryDb;
|
||||
try {
|
||||
return await lockManager.withLock(
|
||||
`rebuild-client-associations:site-resource:${siteResource.siteResourceId}`,
|
||||
@@ -2119,10 +2119,7 @@ export function startRebuildQueueProcessor(): void {
|
||||
return;
|
||||
}
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(
|
||||
siteResource,
|
||||
primaryDb
|
||||
);
|
||||
await rebuildClientAssociationsFromSiteResource(siteResource);
|
||||
},
|
||||
onClient: async (clientId: number) => {
|
||||
const [client] = await primaryDb
|
||||
|
||||
@@ -153,8 +153,12 @@ export async function addClientToSiteResource(
|
||||
clientId,
|
||||
siteResourceId
|
||||
});
|
||||
});
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(siteResource, trx);
|
||||
rebuildClientAssociationsFromSiteResource(siteResource).catch((e) => {
|
||||
logger.error(
|
||||
`Failed to rebuild client associations for site resource ${siteResourceId}. Error: ${e}`
|
||||
);
|
||||
});
|
||||
|
||||
return response(res, {
|
||||
|
||||
@@ -160,8 +160,12 @@ export async function addRoleToSiteResource(
|
||||
roleId,
|
||||
siteResourceId
|
||||
});
|
||||
});
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(siteResource, trx);
|
||||
rebuildClientAssociationsFromSiteResource(siteResource).catch((e) => {
|
||||
logger.error(
|
||||
`Failed to rebuild client associations for site resource ${siteResourceId}. Error: ${e}`
|
||||
);
|
||||
});
|
||||
|
||||
return response(res, {
|
||||
|
||||
@@ -129,8 +129,12 @@ export async function addUserToSiteResource(
|
||||
userId,
|
||||
siteResourceId
|
||||
});
|
||||
});
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(siteResource, trx);
|
||||
rebuildClientAssociationsFromSiteResource(siteResource).catch((e) => {
|
||||
logger.error(
|
||||
`Failed to rebuild client associations for site resource ${siteResourceId}. Error: ${e}`
|
||||
);
|
||||
});
|
||||
|
||||
return response(res, {
|
||||
|
||||
@@ -625,15 +625,14 @@ export async function createSiteResource(
|
||||
// own transaction so it always executes on the primary — avoiding any
|
||||
// replica-lag issues while still allowing the HTTP response to return
|
||||
// early.
|
||||
rebuildClientAssociationsFromSiteResource(
|
||||
newSiteResource!,
|
||||
primaryDb
|
||||
).catch((err) => {
|
||||
logger.error(
|
||||
`Error rebuilding client associations for site resource ${newSiteResource!.siteResourceId}:`,
|
||||
err
|
||||
);
|
||||
});
|
||||
rebuildClientAssociationsFromSiteResource(newSiteResource!).catch(
|
||||
(err) => {
|
||||
logger.error(
|
||||
`Error rebuilding client associations for site resource ${newSiteResource!.siteResourceId}:`,
|
||||
err
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
return response(res, {
|
||||
data: newSiteResource,
|
||||
|
||||
@@ -88,15 +88,14 @@ export async function deleteSiteResource(
|
||||
// own transaction so it always executes on the primary — avoiding any
|
||||
// replica-lag issues while still allowing the HTTP response to return
|
||||
// early.
|
||||
rebuildClientAssociationsFromSiteResource(
|
||||
removedSiteResource,
|
||||
primaryDb
|
||||
).catch((err) => {
|
||||
logger.error(
|
||||
`Error rebuilding client associations for site resource ${removedSiteResource!.siteResourceId}:`,
|
||||
err
|
||||
);
|
||||
});
|
||||
rebuildClientAssociationsFromSiteResource(removedSiteResource).catch(
|
||||
(err) => {
|
||||
logger.error(
|
||||
`Error rebuilding client associations for site resource ${removedSiteResource!.siteResourceId}:`,
|
||||
err
|
||||
);
|
||||
}
|
||||
);
|
||||
|
||||
logger.info(`Deleted site resource ${siteResourceId}`);
|
||||
|
||||
|
||||
@@ -157,8 +157,12 @@ export async function removeClientFromSiteResource(
|
||||
eq(clientSiteResources.clientId, clientId)
|
||||
)
|
||||
);
|
||||
});
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(siteResource, trx);
|
||||
rebuildClientAssociationsFromSiteResource(siteResource).catch((e) => {
|
||||
logger.error(
|
||||
`Failed to rebuild client associations for site resource ${siteResourceId}. Error: ${e}`
|
||||
);
|
||||
});
|
||||
|
||||
return response(res, {
|
||||
|
||||
@@ -165,8 +165,12 @@ export async function removeRoleFromSiteResource(
|
||||
eq(roleSiteResources.roleId, roleId)
|
||||
)
|
||||
);
|
||||
});
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(siteResource, trx);
|
||||
rebuildClientAssociationsFromSiteResource(siteResource).catch((e) => {
|
||||
logger.error(
|
||||
`Failed to rebuild client associations for site resource ${siteResourceId}. Error: ${e}`
|
||||
);
|
||||
});
|
||||
|
||||
return response(res, {
|
||||
|
||||
@@ -135,8 +135,12 @@ export async function removeUserFromSiteResource(
|
||||
eq(userSiteResources.userId, userId)
|
||||
)
|
||||
);
|
||||
});
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(siteResource, trx);
|
||||
rebuildClientAssociationsFromSiteResource(siteResource).catch((e) => {
|
||||
logger.error(
|
||||
`Failed to rebuild client associations for site resource ${siteResourceId} after removing user ${userId}: ${e}`
|
||||
);
|
||||
});
|
||||
|
||||
return response(res, {
|
||||
|
||||
@@ -141,8 +141,12 @@ export async function setSiteResourceClients(
|
||||
}))
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(siteResource, trx);
|
||||
rebuildClientAssociationsFromSiteResource(siteResource).catch((e) => {
|
||||
logger.error(
|
||||
`Failed to rebuild client associations for site resource ${siteResourceId}. Error: ${e}`
|
||||
);
|
||||
});
|
||||
|
||||
return response(res, {
|
||||
|
||||
@@ -165,8 +165,12 @@ export async function setSiteResourceRoles(
|
||||
roleIds.map((roleId) => ({ roleId, siteResourceId }))
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(siteResource, trx);
|
||||
rebuildClientAssociationsFromSiteResource(siteResource).catch((e) => {
|
||||
logger.error(
|
||||
`Failed to rebuild client associations for site resource ${siteResourceId}. Error: ${e}`
|
||||
);
|
||||
});
|
||||
|
||||
return response(res, {
|
||||
|
||||
@@ -10,6 +10,7 @@ import { fromError } from "zod-validation-error";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { OpenAPITags, registry } from "@server/openApi";
|
||||
import { rebuildClientAssociationsFromSiteResource } from "@server/lib/rebuildClientAssociations";
|
||||
import { error } from "node:console";
|
||||
|
||||
const setSiteResourceUsersBodySchema = z
|
||||
.object({
|
||||
@@ -120,8 +121,12 @@ export async function setSiteResourceUsers(
|
||||
userIds.map((userId) => ({ userId, siteResourceId }))
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(siteResource, trx);
|
||||
rebuildClientAssociationsFromSiteResource(siteResource).catch((e) => {
|
||||
logger.error(
|
||||
`Failed to rebuild client associations for site resource ${siteResourceId}. Error: ${e}`
|
||||
);
|
||||
});
|
||||
|
||||
return response(res, {
|
||||
|
||||
@@ -12,7 +12,8 @@ import {
|
||||
sites,
|
||||
networks,
|
||||
Transaction,
|
||||
userSiteResources
|
||||
userSiteResources,
|
||||
primaryDb
|
||||
} from "@server/db";
|
||||
import { isLicensedOrSubscribed } from "#dynamic/lib/isLicencedOrSubscribed";
|
||||
import { TierFeature, tierMatrix } from "@server/lib/billing/tierMatrix";
|
||||
@@ -474,345 +475,167 @@ export async function updateSiteResource(
|
||||
|
||||
let updatedSiteResource: SiteResource | undefined;
|
||||
await db.transaction(async (trx) => {
|
||||
// if the site is changed we need to delete and recreate the resource to avoid complications with the rebuild function otherwise we can just update in place
|
||||
if (sitesChanged) {
|
||||
// delete the existing site resource
|
||||
await trx
|
||||
.delete(siteResources)
|
||||
.where(
|
||||
and(eq(siteResources.siteResourceId, siteResourceId))
|
||||
);
|
||||
// Update the site resource
|
||||
const sshPamSet =
|
||||
isLicensedSshPam &&
|
||||
(authDaemonPort !== undefined ||
|
||||
authDaemonMode !== undefined ||
|
||||
pamMode !== undefined)
|
||||
? {
|
||||
...(authDaemonPort !== undefined && {
|
||||
authDaemonPort
|
||||
}),
|
||||
...(authDaemonMode !== undefined && {
|
||||
authDaemonMode
|
||||
}),
|
||||
...(pamMode !== undefined && {
|
||||
pamMode
|
||||
})
|
||||
}
|
||||
: {};
|
||||
let tcpPortRangeStringAdjusted = tcpPortRangeString;
|
||||
if (mode === "http") {
|
||||
tcpPortRangeStringAdjusted = "443,80";
|
||||
} else if (mode === "ssh") {
|
||||
tcpPortRangeStringAdjusted = destinationPort
|
||||
? destinationPort.toString()
|
||||
: "22";
|
||||
}
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(
|
||||
existingSiteResource,
|
||||
trx
|
||||
[updatedSiteResource] = await trx
|
||||
.update(siteResources)
|
||||
.set({
|
||||
name: name,
|
||||
niceId: niceId,
|
||||
mode: mode,
|
||||
scheme,
|
||||
ssl,
|
||||
destination: destination,
|
||||
destinationPort: destinationPort,
|
||||
enabled: enabled,
|
||||
alias: alias ? alias.trim() : null,
|
||||
tcpPortRangeString: tcpPortRangeStringAdjusted,
|
||||
udpPortRangeString:
|
||||
mode == "http" || mode == "ssh"
|
||||
? ""
|
||||
: udpPortRangeString,
|
||||
disableIcmp:
|
||||
disableIcmp ||
|
||||
(mode == "http" || mode == "ssh" ? true : false),
|
||||
domainId,
|
||||
subdomain: finalSubdomain,
|
||||
fullDomain,
|
||||
...sshPamSet
|
||||
})
|
||||
.where(and(eq(siteResources.siteResourceId, siteResourceId)))
|
||||
.returning();
|
||||
|
||||
//////////////////// update the associations ////////////////////
|
||||
|
||||
// delete the site - site resources associations
|
||||
await trx
|
||||
.delete(siteNetworks)
|
||||
.where(
|
||||
eq(siteNetworks.networkId, updatedSiteResource.networkId!)
|
||||
);
|
||||
|
||||
// create the new site resource from the removed one - the ID should stay the same
|
||||
const [insertedSiteResource] = await trx
|
||||
.insert(siteResources)
|
||||
.values({
|
||||
...existingSiteResource
|
||||
})
|
||||
.returning();
|
||||
|
||||
const sshPamSet =
|
||||
isLicensedSshPam &&
|
||||
(authDaemonPort !== undefined ||
|
||||
authDaemonMode !== undefined ||
|
||||
pamMode !== undefined)
|
||||
? {
|
||||
...(authDaemonPort !== undefined && {
|
||||
authDaemonPort
|
||||
}),
|
||||
...(authDaemonMode !== undefined && {
|
||||
authDaemonMode
|
||||
}),
|
||||
...(pamMode !== undefined && {
|
||||
pamMode
|
||||
})
|
||||
}
|
||||
: {};
|
||||
|
||||
let tcpPortRangeStringAdjusted = tcpPortRangeString;
|
||||
if (mode === "http") {
|
||||
tcpPortRangeStringAdjusted = "443,80";
|
||||
} else if (mode === "ssh") {
|
||||
tcpPortRangeStringAdjusted = destinationPort
|
||||
? destinationPort.toString()
|
||||
: "22";
|
||||
}
|
||||
|
||||
[updatedSiteResource] = await trx
|
||||
.update(siteResources)
|
||||
.set({
|
||||
name,
|
||||
niceId,
|
||||
mode,
|
||||
scheme,
|
||||
ssl,
|
||||
destination,
|
||||
destinationPort,
|
||||
enabled,
|
||||
alias: alias ? alias.trim() : null,
|
||||
tcpPortRangeString: tcpPortRangeStringAdjusted,
|
||||
udpPortRangeString:
|
||||
mode == "http" || mode == "ssh"
|
||||
? ""
|
||||
: udpPortRangeString,
|
||||
disableIcmp:
|
||||
disableIcmp ||
|
||||
(mode == "http" || mode == "ssh" ? true : false), // default to true for http resources, otherwise false
|
||||
domainId,
|
||||
subdomain: finalSubdomain,
|
||||
fullDomain,
|
||||
...sshPamSet
|
||||
})
|
||||
.where(
|
||||
and(
|
||||
eq(
|
||||
siteResources.siteResourceId,
|
||||
insertedSiteResource.siteResourceId
|
||||
)
|
||||
)
|
||||
)
|
||||
.returning();
|
||||
|
||||
if (!updatedSiteResource) {
|
||||
throw new Error(
|
||||
"Failed to create updated site resource after site change"
|
||||
);
|
||||
}
|
||||
|
||||
//////////////////// update the associations ////////////////////
|
||||
|
||||
// delete the site - site resources associations
|
||||
await trx
|
||||
.delete(siteNetworks)
|
||||
.where(
|
||||
eq(
|
||||
siteNetworks.networkId,
|
||||
updatedSiteResource.networkId!
|
||||
)
|
||||
);
|
||||
|
||||
for (const siteId of siteIds) {
|
||||
await trx.insert(siteNetworks).values({
|
||||
siteId: siteId,
|
||||
networkId: updatedSiteResource.networkId!
|
||||
});
|
||||
}
|
||||
|
||||
const [adminRole] = await trx
|
||||
.select()
|
||||
.from(roles)
|
||||
.where(
|
||||
and(
|
||||
eq(roles.isAdmin, true),
|
||||
eq(roles.orgId, updatedSiteResource.orgId)
|
||||
)
|
||||
)
|
||||
.limit(1);
|
||||
|
||||
if (!adminRole) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.NOT_FOUND,
|
||||
`Admin role not found`
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
await trx.insert(roleSiteResources).values({
|
||||
roleId: adminRole.roleId,
|
||||
siteResourceId: updatedSiteResource.siteResourceId
|
||||
for (const siteId of siteIds) {
|
||||
await trx.insert(siteNetworks).values({
|
||||
siteId: siteId,
|
||||
networkId: updatedSiteResource.networkId!
|
||||
});
|
||||
|
||||
if (roleIds.length > 0) {
|
||||
await trx.insert(roleSiteResources).values(
|
||||
roleIds.map((roleId) => ({
|
||||
roleId,
|
||||
siteResourceId: updatedSiteResource!.siteResourceId
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
if (userIds.length > 0) {
|
||||
await trx.insert(userSiteResources).values(
|
||||
userIds.map((userId) => ({
|
||||
userId,
|
||||
siteResourceId: updatedSiteResource!.siteResourceId
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
if (clientIds.length > 0) {
|
||||
await trx.insert(clientSiteResources).values(
|
||||
clientIds.map((clientId) => ({
|
||||
clientId,
|
||||
siteResourceId: updatedSiteResource!.siteResourceId
|
||||
}))
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// Update the site resource
|
||||
const sshPamSet =
|
||||
isLicensedSshPam &&
|
||||
(authDaemonPort !== undefined ||
|
||||
authDaemonMode !== undefined ||
|
||||
pamMode !== undefined)
|
||||
? {
|
||||
...(authDaemonPort !== undefined && {
|
||||
authDaemonPort
|
||||
}),
|
||||
...(authDaemonMode !== undefined && {
|
||||
authDaemonMode
|
||||
}),
|
||||
...(pamMode !== undefined && {
|
||||
pamMode
|
||||
})
|
||||
}
|
||||
: {};
|
||||
let tcpPortRangeStringAdjusted = tcpPortRangeString;
|
||||
if (mode === "http") {
|
||||
tcpPortRangeStringAdjusted = "443,80";
|
||||
} else if (mode === "ssh") {
|
||||
tcpPortRangeStringAdjusted = destinationPort
|
||||
? destinationPort.toString()
|
||||
: "22";
|
||||
}
|
||||
|
||||
[updatedSiteResource] = await trx
|
||||
.update(siteResources)
|
||||
.set({
|
||||
name: name,
|
||||
niceId: niceId,
|
||||
mode: mode,
|
||||
scheme,
|
||||
ssl,
|
||||
destination: destination,
|
||||
destinationPort: destinationPort,
|
||||
enabled: enabled,
|
||||
alias: alias ? alias.trim() : null,
|
||||
tcpPortRangeString: tcpPortRangeStringAdjusted,
|
||||
udpPortRangeString:
|
||||
mode == "http" || mode == "ssh"
|
||||
? ""
|
||||
: udpPortRangeString,
|
||||
disableIcmp:
|
||||
disableIcmp ||
|
||||
(mode == "http" || mode == "ssh" ? true : false),
|
||||
domainId,
|
||||
subdomain: finalSubdomain,
|
||||
fullDomain,
|
||||
...sshPamSet
|
||||
})
|
||||
.where(
|
||||
and(eq(siteResources.siteResourceId, siteResourceId))
|
||||
)
|
||||
.returning();
|
||||
|
||||
//////////////////// update the associations ////////////////////
|
||||
|
||||
// delete the site - site resources associations
|
||||
await trx
|
||||
.delete(siteNetworks)
|
||||
.where(
|
||||
eq(
|
||||
siteNetworks.networkId,
|
||||
updatedSiteResource.networkId!
|
||||
)
|
||||
);
|
||||
|
||||
for (const siteId of siteIds) {
|
||||
await trx.insert(siteNetworks).values({
|
||||
siteId: siteId,
|
||||
networkId: updatedSiteResource.networkId!
|
||||
});
|
||||
}
|
||||
|
||||
await trx
|
||||
.delete(clientSiteResources)
|
||||
.where(
|
||||
eq(clientSiteResources.siteResourceId, siteResourceId)
|
||||
);
|
||||
|
||||
if (clientIds.length > 0) {
|
||||
await trx.insert(clientSiteResources).values(
|
||||
clientIds.map((clientId) => ({
|
||||
clientId,
|
||||
siteResourceId
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
await trx
|
||||
.delete(userSiteResources)
|
||||
.where(
|
||||
eq(userSiteResources.siteResourceId, siteResourceId)
|
||||
);
|
||||
|
||||
if (userIds.length > 0) {
|
||||
await trx.insert(userSiteResources).values(
|
||||
userIds.map((userId) => ({
|
||||
userId,
|
||||
siteResourceId
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
// Get all admin role IDs for this org to exclude from deletion
|
||||
const adminRoles = await trx
|
||||
.select()
|
||||
.from(roles)
|
||||
.where(
|
||||
and(
|
||||
eq(roles.isAdmin, true),
|
||||
eq(roles.orgId, updatedSiteResource.orgId)
|
||||
)
|
||||
);
|
||||
const adminRoleIds = adminRoles.map((role) => role.roleId);
|
||||
|
||||
if (adminRoleIds.length > 0) {
|
||||
await trx.delete(roleSiteResources).where(
|
||||
and(
|
||||
eq(
|
||||
roleSiteResources.siteResourceId,
|
||||
siteResourceId
|
||||
),
|
||||
ne(roleSiteResources.roleId, adminRoleIds[0]) // delete all but the admin role
|
||||
)
|
||||
);
|
||||
} else {
|
||||
await trx
|
||||
.delete(roleSiteResources)
|
||||
.where(
|
||||
eq(roleSiteResources.siteResourceId, siteResourceId)
|
||||
);
|
||||
}
|
||||
|
||||
if (roleIds.length > 0) {
|
||||
await trx.insert(roleSiteResources).values(
|
||||
roleIds.map((roleId) => ({
|
||||
roleId,
|
||||
siteResourceId
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
logger.info(`Updated site resource ${siteResourceId}`);
|
||||
}
|
||||
|
||||
await trx
|
||||
.delete(clientSiteResources)
|
||||
.where(eq(clientSiteResources.siteResourceId, siteResourceId));
|
||||
|
||||
if (clientIds.length > 0) {
|
||||
await trx.insert(clientSiteResources).values(
|
||||
clientIds.map((clientId) => ({
|
||||
clientId,
|
||||
siteResourceId
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
await trx
|
||||
.delete(userSiteResources)
|
||||
.where(eq(userSiteResources.siteResourceId, siteResourceId));
|
||||
|
||||
if (userIds.length > 0) {
|
||||
await trx.insert(userSiteResources).values(
|
||||
userIds.map((userId) => ({
|
||||
userId,
|
||||
siteResourceId
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
// Get all admin role IDs for this org to exclude from deletion
|
||||
const adminRoles = await trx
|
||||
.select()
|
||||
.from(roles)
|
||||
.where(
|
||||
and(
|
||||
eq(roles.isAdmin, true),
|
||||
eq(roles.orgId, updatedSiteResource.orgId)
|
||||
)
|
||||
);
|
||||
const adminRoleIds = adminRoles.map((role) => role.roleId);
|
||||
|
||||
if (adminRoleIds.length > 0) {
|
||||
await trx.delete(roleSiteResources).where(
|
||||
and(
|
||||
eq(roleSiteResources.siteResourceId, siteResourceId),
|
||||
ne(roleSiteResources.roleId, adminRoleIds[0]) // delete all but the admin role
|
||||
)
|
||||
);
|
||||
} else {
|
||||
await trx
|
||||
.delete(roleSiteResources)
|
||||
.where(
|
||||
eq(roleSiteResources.siteResourceId, siteResourceId)
|
||||
);
|
||||
}
|
||||
|
||||
if (roleIds.length > 0) {
|
||||
await trx.insert(roleSiteResources).values(
|
||||
roleIds.map((roleId) => ({
|
||||
roleId,
|
||||
siteResourceId
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
logger.info(`Updated site resource ${siteResourceId}`);
|
||||
});
|
||||
|
||||
// Background: wait for removal messages to propagate, then rebuild
|
||||
// associations for the re-created resource. Own transaction ensures
|
||||
// execution on the primary against fully committed state.
|
||||
(async () => {
|
||||
await db.transaction(async (trx) => {
|
||||
if (!updatedSiteResource) {
|
||||
throw new Error("No updated resource found after update");
|
||||
}
|
||||
if (sitesChanged) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 750));
|
||||
await rebuildClientAssociationsFromSiteResource(
|
||||
updatedSiteResource,
|
||||
trx
|
||||
);
|
||||
}
|
||||
await handleMessagingForUpdatedSiteResource(
|
||||
existingSiteResource,
|
||||
updatedSiteResource,
|
||||
siteIds.map((siteId) => ({
|
||||
siteId,
|
||||
orgId: existingSiteResource.orgId
|
||||
})),
|
||||
trx
|
||||
if (!updatedSiteResource) {
|
||||
throw new Error("No updated resource found after update");
|
||||
}
|
||||
|
||||
if (sitesChanged) {
|
||||
rebuildClientAssociationsFromSiteResource(
|
||||
updatedSiteResource
|
||||
).catch((e) => {
|
||||
logger.error(
|
||||
`Failed to rebuild client associations for site resource ${siteResourceId}. Error: ${e}`
|
||||
);
|
||||
});
|
||||
})().catch((err) => {
|
||||
}
|
||||
|
||||
handleMessagingForUpdatedSiteResource(
|
||||
existingSiteResource,
|
||||
updatedSiteResource,
|
||||
Array.from(existingSiteIdSet).map((siteId: number) => ({
|
||||
// we already added to the new sites above in the rebuild function so we only need to update the ones that did not change
|
||||
siteId,
|
||||
orgId: existingSiteResource.orgId
|
||||
}))
|
||||
).catch((e) => {
|
||||
logger.error(
|
||||
`Error rebuilding client associations for site resource ${updatedSiteResource?.siteResourceId}:`,
|
||||
err
|
||||
`Failed to handle messaging for updated site resource ${siteResourceId}. Error: ${e}`
|
||||
);
|
||||
});
|
||||
|
||||
@@ -837,9 +660,9 @@ export async function updateSiteResource(
|
||||
export async function handleMessagingForUpdatedSiteResource(
|
||||
existingSiteResource: SiteResource | undefined,
|
||||
updatedSiteResource: SiteResource,
|
||||
sites: { siteId: number; orgId: string }[],
|
||||
trx: Transaction
|
||||
sites: { siteId: number; orgId: string }[]
|
||||
) {
|
||||
const trx = primaryDb;
|
||||
logger.debug(
|
||||
"handleMessagingForUpdatedSiteResource: existingSiteResource is: ",
|
||||
existingSiteResource
|
||||
@@ -849,17 +672,14 @@ export async function handleMessagingForUpdatedSiteResource(
|
||||
updatedSiteResource
|
||||
);
|
||||
|
||||
await rebuildClientAssociationsFromSiteResource(
|
||||
existingSiteResource || updatedSiteResource, // we want to rebuild based on the existing resource then we will apply the change to the destination below
|
||||
trx
|
||||
);
|
||||
|
||||
const { sitesList, mergedAllClients, mergedAllClientIds } =
|
||||
await getClientSiteResourceAccess(
|
||||
existingSiteResource || updatedSiteResource,
|
||||
trx
|
||||
);
|
||||
|
||||
const siteIds = sites.map((site) => site.siteId);
|
||||
|
||||
// after everything is rebuilt above we still need to update the targets and remote subnets if the destination changed
|
||||
const destinationChanged =
|
||||
existingSiteResource &&
|
||||
@@ -896,12 +716,86 @@ export async function handleMessagingForUpdatedSiteResource(
|
||||
portRangesChanged ||
|
||||
destinationPortChanged
|
||||
) {
|
||||
const newtsForSites =
|
||||
siteIds.length > 0
|
||||
? await trx
|
||||
.select()
|
||||
.from(newts)
|
||||
.where(inArray(newts.siteId, siteIds))
|
||||
: [];
|
||||
const newtBySiteId = new Map(
|
||||
newtsForSites.map((newt) => [newt.siteId, newt])
|
||||
);
|
||||
|
||||
const oldDestinationStillInUseClientSitePairs = new Set<string>();
|
||||
if (
|
||||
existingSiteResource?.destination &&
|
||||
siteIds.length > 0 &&
|
||||
mergedAllClientIds.length > 0
|
||||
) {
|
||||
const oldDestinationStillInUseRows = await trx
|
||||
.select({
|
||||
clientId: clientSiteResourcesAssociationsCache.clientId,
|
||||
siteId: siteNetworks.siteId
|
||||
})
|
||||
.from(siteResources)
|
||||
.innerJoin(
|
||||
clientSiteResourcesAssociationsCache,
|
||||
eq(
|
||||
clientSiteResourcesAssociationsCache.siteResourceId,
|
||||
siteResources.siteResourceId
|
||||
)
|
||||
)
|
||||
.innerJoin(
|
||||
siteNetworks,
|
||||
eq(siteNetworks.networkId, siteResources.networkId)
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
inArray(
|
||||
clientSiteResourcesAssociationsCache.clientId,
|
||||
mergedAllClientIds
|
||||
),
|
||||
inArray(siteNetworks.siteId, siteIds),
|
||||
eq(
|
||||
siteResources.destination,
|
||||
existingSiteResource.destination
|
||||
),
|
||||
ne(
|
||||
siteResources.siteResourceId,
|
||||
existingSiteResource.siteResourceId
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
for (const row of oldDestinationStillInUseRows) {
|
||||
oldDestinationStillInUseClientSitePairs.add(
|
||||
`${row.clientId}:${row.siteId}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const shouldUpdateTargets =
|
||||
destinationChanged ||
|
||||
sslChanged ||
|
||||
portRangesChanged ||
|
||||
fullDomainChanged ||
|
||||
destinationPortChanged;
|
||||
const oldTargets = shouldUpdateTargets
|
||||
? await generateSubnetProxyTargetV2(
|
||||
existingSiteResource,
|
||||
mergedAllClients
|
||||
)
|
||||
: [];
|
||||
const newTargets = shouldUpdateTargets
|
||||
? await generateSubnetProxyTargetV2(
|
||||
updatedSiteResource,
|
||||
mergedAllClients
|
||||
)
|
||||
: [];
|
||||
|
||||
for (const site of sites) {
|
||||
const [newt] = await trx
|
||||
.select()
|
||||
.from(newts)
|
||||
.where(eq(newts.siteId, site.siteId))
|
||||
.limit(1);
|
||||
const newt = newtBySiteId.get(site.siteId);
|
||||
|
||||
if (!newt) {
|
||||
throw new Error(
|
||||
@@ -910,22 +804,7 @@ export async function handleMessagingForUpdatedSiteResource(
|
||||
}
|
||||
|
||||
// Only update targets on newt if these items change
|
||||
if (
|
||||
destinationChanged ||
|
||||
sslChanged || // we need to push a new cert if the ssl changed
|
||||
portRangesChanged ||
|
||||
fullDomainChanged || // if the domain changes we need to update the certs and stuff
|
||||
destinationPortChanged
|
||||
) {
|
||||
const oldTargets = await generateSubnetProxyTargetV2(
|
||||
existingSiteResource,
|
||||
mergedAllClients
|
||||
);
|
||||
const newTargets = await generateSubnetProxyTargetV2(
|
||||
updatedSiteResource,
|
||||
mergedAllClients
|
||||
);
|
||||
|
||||
if (shouldUpdateTargets) {
|
||||
await updateTargets(
|
||||
newt.newtId,
|
||||
{
|
||||
@@ -939,49 +818,19 @@ export async function handleMessagingForUpdatedSiteResource(
|
||||
const olmJobs: Promise<void>[] = [];
|
||||
for (const client of mergedAllClients) {
|
||||
// does this client have access to another resource on this site that has the same destination still? if so we dont want to remove it from their olm yet
|
||||
// todo: optimize this query if needed
|
||||
if (!existingSiteResource.destination) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const oldDestinationStillInUseSites = await trx
|
||||
.select()
|
||||
.from(siteResources)
|
||||
.innerJoin(
|
||||
clientSiteResourcesAssociationsCache,
|
||||
eq(
|
||||
clientSiteResourcesAssociationsCache.siteResourceId,
|
||||
siteResources.siteResourceId
|
||||
)
|
||||
)
|
||||
.innerJoin(
|
||||
siteNetworks,
|
||||
eq(siteNetworks.networkId, siteResources.networkId)
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(
|
||||
clientSiteResourcesAssociationsCache.clientId,
|
||||
client.clientId
|
||||
),
|
||||
eq(siteNetworks.siteId, site.siteId),
|
||||
eq(
|
||||
siteResources.destination,
|
||||
existingSiteResource.destination
|
||||
),
|
||||
ne(
|
||||
siteResources.siteResourceId,
|
||||
existingSiteResource.siteResourceId
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
const oldDestinationStillInUseByASite =
|
||||
oldDestinationStillInUseSites.length > 0;
|
||||
oldDestinationStillInUseClientSitePairs.has(
|
||||
`${client.clientId}:${site.siteId}`
|
||||
);
|
||||
|
||||
// we also need to update the remote subnets on the olms for each client that has access to this site
|
||||
olmJobs.push(
|
||||
updatePeerData(
|
||||
// TODO: THIS SHOULD BE UPDATED TO WORK I A BATCH
|
||||
client.clientId,
|
||||
site.siteId,
|
||||
destinationChanged
|
||||
|
||||
Reference in New Issue
Block a user