mirror of
https://github.com/fosrl/pangolin.git
synced 2026-05-21 16:25:19 +00:00
Compare commits
13 Commits
exit-node-
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
87bcd8ec1b | ||
|
|
b3cfe82dff | ||
|
|
d65128671c | ||
|
|
41fdd5de74 | ||
|
|
2704202ba9 | ||
|
|
72ef0ae020 | ||
|
|
1442faa740 | ||
|
|
6aa589e612 | ||
|
|
4b1a8e14c4 | ||
|
|
1a0db10b1a | ||
|
|
b7634086db | ||
|
|
1ba75092f9 | ||
|
|
82745c701a |
@@ -18,7 +18,7 @@ import {
|
||||
userOrgRoles,
|
||||
userSiteResources
|
||||
} from "@server/db";
|
||||
import { and, eq, inArray, ne } from "drizzle-orm";
|
||||
import { and, count, eq, inArray, ne } from "drizzle-orm";
|
||||
|
||||
import { deletePeer as newtDeletePeer } from "@server/routers/newt/peers";
|
||||
import {
|
||||
@@ -39,6 +39,11 @@ import {
|
||||
removePeerData,
|
||||
removeTargets as removeSubnetProxyTargets
|
||||
} from "@server/routers/client/targets";
|
||||
import { lockManager } from "#dynamic/lib/lock";
|
||||
|
||||
// TTL for rebuild-association locks. These functions can fan out into many
|
||||
// peer/proxy updates, so give them a generous window.
|
||||
const REBUILD_ASSOCIATIONS_LOCK_TTL_MS = 120000;
|
||||
|
||||
export async function getClientSiteResourceAccess(
|
||||
siteResource: SiteResource,
|
||||
@@ -161,6 +166,23 @@ export async function rebuildClientAssociationsFromSiteResource(
|
||||
pubKey: string | null;
|
||||
subnet: string | null;
|
||||
}[];
|
||||
}> {
|
||||
return await lockManager.withLock(
|
||||
`rebuild-client-associations:site-resource:${siteResource.siteResourceId}`,
|
||||
() => rebuildClientAssociationsFromSiteResourceImpl(siteResource, trx),
|
||||
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
||||
);
|
||||
}
|
||||
|
||||
async function rebuildClientAssociationsFromSiteResourceImpl(
|
||||
siteResource: SiteResource,
|
||||
trx: Transaction | typeof db = db
|
||||
): Promise<{
|
||||
mergedAllClients: {
|
||||
clientId: number;
|
||||
pubKey: string | null;
|
||||
subnet: string | null;
|
||||
}[];
|
||||
}> {
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] START siteResourceId=${siteResource.siteResourceId} networkId=${siteResource.networkId} orgId=${siteResource.orgId}`
|
||||
@@ -539,6 +561,29 @@ async function handleMessagesForSiteClients(
|
||||
}
|
||||
}
|
||||
|
||||
// get the number of sites on each of these clients so we can log it and make decisions about whether to send messages based on it
|
||||
const clientSiteCounts: Record<number, number> = {};
|
||||
if (clientsToProcess.size > 0) {
|
||||
const clientIdsToProcess = Array.from(clientsToProcess.keys());
|
||||
const siteCounts = await trx
|
||||
.select({
|
||||
clientId: clientSitesAssociationsCache.clientId,
|
||||
siteCount: count(clientSitesAssociationsCache.siteId)
|
||||
})
|
||||
.from(clientSitesAssociationsCache)
|
||||
.where(
|
||||
inArray(
|
||||
clientSitesAssociationsCache.clientId,
|
||||
clientIdsToProcess
|
||||
)
|
||||
)
|
||||
.groupBy(clientSitesAssociationsCache.clientId);
|
||||
|
||||
for (const row of siteCounts) {
|
||||
clientSiteCounts[row.clientId] = Number(row.siteCount);
|
||||
}
|
||||
}
|
||||
|
||||
for (const client of clientsToProcess.values()) {
|
||||
// UPDATE THE NEWT
|
||||
if (!client.subnet || !client.pubKey) {
|
||||
@@ -582,7 +627,14 @@ async function handleMessagesForSiteClients(
|
||||
}
|
||||
|
||||
if (isAdd) {
|
||||
// TODO: if we are in jit mode here should we really be sending this?
|
||||
if (clientSiteCounts[client.clientId] > 250) {
|
||||
// skip adding the peer if we have more than 250 sites because we are in jit mode anyway
|
||||
logger.info(
|
||||
`rebuildClientAssociations: Client ${client.clientId} has ${clientSiteCounts[client.clientId]} sites so skipping adding peer to newt and olm because it is likely in jit mode`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
await initPeerAddHandshake(
|
||||
// this will kick off the add peer process for the client
|
||||
client.clientId,
|
||||
@@ -600,9 +652,24 @@ async function handleMessagesForSiteClients(
|
||||
exitNodeJobs.push(updateClientSiteDestinations(client, trx));
|
||||
}
|
||||
|
||||
await Promise.all(exitNodeJobs);
|
||||
await Promise.all(newtJobs); // do the servers first to make sure they are ready?
|
||||
await Promise.all(olmJobs);
|
||||
Promise.all(exitNodeJobs).catch((error) => {
|
||||
logger.error(
|
||||
`rebuildClientAssociations: Error updating client site destinations for site ${site.siteId}:`,
|
||||
error
|
||||
);
|
||||
});
|
||||
Promise.all(newtJobs).catch((error) => {
|
||||
logger.error(
|
||||
`rebuildClientAssociations: Error updating Newt peers for site ${site.siteId}:`,
|
||||
error
|
||||
);
|
||||
});
|
||||
Promise.all(olmJobs).catch((error) => {
|
||||
logger.error(
|
||||
`rebuildClientAssociations: Error updating Olm peers for site ${site.siteId}:`,
|
||||
error
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
interface PeerDestination {
|
||||
@@ -885,6 +952,17 @@ async function handleSubnetProxyTargetUpdates(
|
||||
export async function rebuildClientAssociationsFromClient(
|
||||
client: Client,
|
||||
trx: Transaction | typeof db = db
|
||||
): Promise<void> {
|
||||
return await lockManager.withLock(
|
||||
`rebuild-client-associations:client:${client.clientId}`,
|
||||
() => rebuildClientAssociationsFromClientImpl(client, trx),
|
||||
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
||||
);
|
||||
}
|
||||
|
||||
async function rebuildClientAssociationsFromClientImpl(
|
||||
client: Client,
|
||||
trx: Transaction | typeof db = db
|
||||
): Promise<void> {
|
||||
let newSiteResourceIds: number[] = [];
|
||||
|
||||
@@ -1157,6 +1235,12 @@ async function handleMessagesForClientSites(
|
||||
const olmJobs: Promise<any>[] = [];
|
||||
const exitNodeJobs: Promise<any>[] = [];
|
||||
|
||||
const totalSitesOnClient = await trx
|
||||
.select({ count: count(clientSitesAssociationsCache.siteId) })
|
||||
.from(clientSitesAssociationsCache)
|
||||
.where(eq(clientSitesAssociationsCache.clientId, client.clientId))
|
||||
.then((rows) => Number(rows[0].count));
|
||||
|
||||
for (const siteData of sitesData) {
|
||||
const site = siteData.sites;
|
||||
const exitNode = siteData.exitNodes;
|
||||
@@ -1217,7 +1301,14 @@ async function handleMessagesForClientSites(
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: if we are in jit mode here should we really be sending this?
|
||||
if (totalSitesOnClient > 250) {
|
||||
// skip adding the site if we have more than 250 because we are in jit mode anyway
|
||||
logger.info(
|
||||
`rebuildClientAssociations: Client ${client.clientId} has ${totalSitesOnClient} sites so skipping adding peer to newt and olm because it is likely in jit mode`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
await initPeerAddHandshake(
|
||||
// this will kick off the add peer process for the client
|
||||
client.clientId,
|
||||
@@ -1245,9 +1336,24 @@ async function handleMessagesForClientSites(
|
||||
);
|
||||
}
|
||||
|
||||
await Promise.all(exitNodeJobs);
|
||||
await Promise.all(newtJobs);
|
||||
await Promise.all(olmJobs);
|
||||
Promise.all(exitNodeJobs).catch((error) => {
|
||||
logger.error(
|
||||
`rebuildClientAssociations: Error updating client site destinations for client ${client.clientId}:`,
|
||||
error
|
||||
);
|
||||
});
|
||||
Promise.all(newtJobs).catch((error) => {
|
||||
logger.error(
|
||||
`rebuildClientAssociations: Error updating Newt peers for client ${client.clientId}:`,
|
||||
error
|
||||
);
|
||||
});
|
||||
Promise.all(olmJobs).catch((error) => {
|
||||
logger.error(
|
||||
`rebuildClientAssociations: Error updating Olm peers for client ${client.clientId}:`,
|
||||
error
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
async function handleMessagesForClientResources(
|
||||
@@ -1528,3 +1634,195 @@ async function handleMessagesForClientResources(
|
||||
|
||||
await Promise.all([...proxyJobs, ...olmJobs]);
|
||||
}
|
||||
|
||||
export type ClientAssociationsCacheVerification = {
|
||||
clientId: number;
|
||||
consistent: boolean;
|
||||
// What permissions say the cache should contain
|
||||
expectedSiteResourceIds: number[];
|
||||
expectedSiteIds: number[];
|
||||
// What the cache currently contains
|
||||
actualSiteResourceIds: number[];
|
||||
actualSiteIds: number[];
|
||||
// Diff
|
||||
missingSiteResourceIds: number[]; // present in expected, missing from cache
|
||||
extraSiteResourceIds: number[]; // present in cache, not in expected
|
||||
missingSiteIds: number[];
|
||||
extraSiteIds: number[];
|
||||
};
|
||||
|
||||
// verifyClientAssociationsCache walks the same permission-derivation logic as
|
||||
// rebuildClientAssociationsFromClient but does NOT modify the database. It
|
||||
// returns the expected vs actual cache contents and a boolean indicating
|
||||
// whether the cache is in sync with what permissions imply.
|
||||
export async function verifyClientAssociationsCache(
|
||||
client: Client,
|
||||
trx: Transaction | typeof db = db
|
||||
): Promise<ClientAssociationsCacheVerification> {
|
||||
let newSiteResourceIds: number[] = [];
|
||||
|
||||
// 1. Direct client associations
|
||||
const directSiteResources = await trx
|
||||
.select({ siteResourceId: clientSiteResources.siteResourceId })
|
||||
.from(clientSiteResources)
|
||||
.innerJoin(
|
||||
siteResources,
|
||||
eq(siteResources.siteResourceId, clientSiteResources.siteResourceId)
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(clientSiteResources.clientId, client.clientId),
|
||||
eq(siteResources.orgId, client.orgId)
|
||||
)
|
||||
);
|
||||
|
||||
newSiteResourceIds.push(
|
||||
...directSiteResources.map((r) => r.siteResourceId)
|
||||
);
|
||||
|
||||
// 2. User-based and role-based access (if client has a userId)
|
||||
if (client.userId) {
|
||||
const userSiteResourceIds = await trx
|
||||
.select({ siteResourceId: userSiteResources.siteResourceId })
|
||||
.from(userSiteResources)
|
||||
.innerJoin(
|
||||
siteResources,
|
||||
eq(
|
||||
siteResources.siteResourceId,
|
||||
userSiteResources.siteResourceId
|
||||
)
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(userSiteResources.userId, client.userId),
|
||||
eq(siteResources.orgId, client.orgId)
|
||||
)
|
||||
);
|
||||
|
||||
newSiteResourceIds.push(
|
||||
...userSiteResourceIds.map((r) => r.siteResourceId)
|
||||
);
|
||||
|
||||
const roleIds = await trx
|
||||
.select({ roleId: userOrgRoles.roleId })
|
||||
.from(userOrgRoles)
|
||||
.where(
|
||||
and(
|
||||
eq(userOrgRoles.userId, client.userId),
|
||||
eq(userOrgRoles.orgId, client.orgId)
|
||||
)
|
||||
)
|
||||
.then((rows) => rows.map((row) => row.roleId));
|
||||
|
||||
if (roleIds.length > 0) {
|
||||
const roleSiteResourceIds = await trx
|
||||
.select({ siteResourceId: roleSiteResources.siteResourceId })
|
||||
.from(roleSiteResources)
|
||||
.innerJoin(
|
||||
siteResources,
|
||||
eq(
|
||||
siteResources.siteResourceId,
|
||||
roleSiteResources.siteResourceId
|
||||
)
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
inArray(roleSiteResources.roleId, roleIds),
|
||||
eq(siteResources.orgId, client.orgId)
|
||||
)
|
||||
);
|
||||
|
||||
newSiteResourceIds.push(
|
||||
...roleSiteResourceIds.map((r) => r.siteResourceId)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
newSiteResourceIds = Array.from(new Set(newSiteResourceIds));
|
||||
|
||||
const newSiteResources =
|
||||
newSiteResourceIds.length > 0
|
||||
? await trx
|
||||
.select()
|
||||
.from(siteResources)
|
||||
.where(
|
||||
inArray(siteResources.siteResourceId, newSiteResourceIds)
|
||||
)
|
||||
: [];
|
||||
|
||||
const networkIds = Array.from(
|
||||
new Set(
|
||||
newSiteResources
|
||||
.map((sr) => sr.networkId)
|
||||
.filter((id): id is number => id !== null)
|
||||
)
|
||||
);
|
||||
const newSiteIds =
|
||||
networkIds.length > 0
|
||||
? await trx
|
||||
.select({ siteId: siteNetworks.siteId })
|
||||
.from(siteNetworks)
|
||||
.where(inArray(siteNetworks.networkId, networkIds))
|
||||
.then((rows) =>
|
||||
Array.from(new Set(rows.map((r) => r.siteId)))
|
||||
)
|
||||
: [];
|
||||
|
||||
// Read the existing cache state
|
||||
const existingResourceAssociations = await trx
|
||||
.select({
|
||||
siteResourceId: clientSiteResourcesAssociationsCache.siteResourceId
|
||||
})
|
||||
.from(clientSiteResourcesAssociationsCache)
|
||||
.where(
|
||||
eq(clientSiteResourcesAssociationsCache.clientId, client.clientId)
|
||||
);
|
||||
const existingSiteResourceIds = existingResourceAssociations.map(
|
||||
(r) => r.siteResourceId
|
||||
);
|
||||
|
||||
const existingSiteAssociations = await trx
|
||||
.select({ siteId: clientSitesAssociationsCache.siteId })
|
||||
.from(clientSitesAssociationsCache)
|
||||
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
|
||||
const existingSiteIds = existingSiteAssociations.map((s) => s.siteId);
|
||||
|
||||
const expectedSiteResourceSet = new Set(newSiteResourceIds);
|
||||
const actualSiteResourceSet = new Set(existingSiteResourceIds);
|
||||
const expectedSiteSet = new Set(newSiteIds);
|
||||
const actualSiteSet = new Set(existingSiteIds);
|
||||
|
||||
const missingSiteResourceIds = newSiteResourceIds.filter(
|
||||
(id) => !actualSiteResourceSet.has(id)
|
||||
);
|
||||
const extraSiteResourceIds = existingSiteResourceIds.filter(
|
||||
(id) => !expectedSiteResourceSet.has(id)
|
||||
);
|
||||
const missingSiteIds = newSiteIds.filter((id) => !actualSiteSet.has(id));
|
||||
const extraSiteIds = existingSiteIds.filter(
|
||||
(id) => !expectedSiteSet.has(id)
|
||||
);
|
||||
|
||||
const consistent =
|
||||
missingSiteResourceIds.length === 0 &&
|
||||
extraSiteResourceIds.length === 0 &&
|
||||
missingSiteIds.length === 0 &&
|
||||
extraSiteIds.length === 0;
|
||||
|
||||
return {
|
||||
clientId: client.clientId,
|
||||
consistent,
|
||||
expectedSiteResourceIds: Array.from(expectedSiteResourceSet).sort(
|
||||
(a, b) => a - b
|
||||
),
|
||||
expectedSiteIds: Array.from(expectedSiteSet).sort((a, b) => a - b),
|
||||
actualSiteResourceIds: Array.from(actualSiteResourceSet).sort(
|
||||
(a, b) => a - b
|
||||
),
|
||||
actualSiteIds: Array.from(actualSiteSet).sort((a, b) => a - b),
|
||||
missingSiteResourceIds: missingSiteResourceIds.sort((a, b) => a - b),
|
||||
extraSiteResourceIds: extraSiteResourceIds.sort((a, b) => a - b),
|
||||
missingSiteIds: missingSiteIds.sort((a, b) => a - b),
|
||||
extraSiteIds: extraSiteIds.sort((a, b) => a - b)
|
||||
};
|
||||
}
|
||||
|
||||
@@ -32,6 +32,7 @@ import * as eventStreamingDestination from "#private/routers/eventStreamingDesti
|
||||
import * as alertRule from "#private/routers/alertRule";
|
||||
import * as healthChecks from "#private/routers/healthChecks";
|
||||
import * as labels from "#private/routers/labels";
|
||||
import * as client from "@server/routers/client";
|
||||
|
||||
import {
|
||||
verifyOrgAccess,
|
||||
@@ -829,3 +830,15 @@ authenticated.get(
|
||||
verifyUserHasAction(ActionsEnum.getTarget),
|
||||
healthChecks.getHealthCheckStatusHistory
|
||||
);
|
||||
|
||||
authenticated.get(
|
||||
"/client/:clientId/verify-associations-cache",
|
||||
verifyClientAccess,
|
||||
client.verifyClientAssociationsCache
|
||||
);
|
||||
|
||||
authenticated.post(
|
||||
"/client/:clientId/rebuild-associations-cache",
|
||||
verifyClientAccess,
|
||||
client.rebuildClientAssociationsCacheRoute
|
||||
);
|
||||
|
||||
@@ -26,7 +26,6 @@ import logger from "@server/logger";
|
||||
import { fromError } from "zod-validation-error";
|
||||
import { eq, InferInsertModel } from "drizzle-orm";
|
||||
import { build } from "@server/build";
|
||||
import { validateLocalPath } from "@app/lib/validateLocalPath";
|
||||
import config from "#private/lib/config";
|
||||
|
||||
const paramsSchema = z.strictObject({
|
||||
@@ -35,78 +34,9 @@ const paramsSchema = z.strictObject({
|
||||
|
||||
const bodySchema = z.strictObject({
|
||||
logoUrl: z
|
||||
.union([
|
||||
z.literal(""),
|
||||
z
|
||||
.string()
|
||||
.superRefine(async (urlOrPath, ctx) => {
|
||||
const parseResult = z.url().safeParse(urlOrPath);
|
||||
if (!parseResult.success) {
|
||||
if (build !== "enterprise") {
|
||||
ctx.addIssue({
|
||||
code: "custom",
|
||||
message: "Must be a valid URL"
|
||||
});
|
||||
return;
|
||||
} else {
|
||||
try {
|
||||
validateLocalPath(urlOrPath);
|
||||
} catch (error) {
|
||||
ctx.addIssue({
|
||||
code: "custom",
|
||||
message: "Must be either a valid image URL or a valid pathname starting with `/` and not containing query parameters, `..` or `*`"
|
||||
});
|
||||
} finally {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await fetch(urlOrPath, {
|
||||
method: "HEAD"
|
||||
}).catch(() => {
|
||||
// If HEAD fails (CORS or method not allowed), try GET
|
||||
return fetch(urlOrPath, { method: "GET" });
|
||||
});
|
||||
|
||||
if (response.status !== 200) {
|
||||
ctx.addIssue({
|
||||
code: "custom",
|
||||
message: `Failed to load image. Please check that the URL is accessible.`
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const contentType =
|
||||
response.headers.get("content-type") ?? "";
|
||||
if (!contentType.startsWith("image/")) {
|
||||
ctx.addIssue({
|
||||
code: "custom",
|
||||
message: `URL does not point to an image. Please provide a URL to an image file (e.g., .png, .jpg, .svg).`
|
||||
});
|
||||
return;
|
||||
}
|
||||
} catch (error) {
|
||||
let errorMessage =
|
||||
"Unable to verify image URL. Please check that the URL is accessible and points to an image file.";
|
||||
|
||||
if (error instanceof TypeError && error.message.includes("fetch")) {
|
||||
errorMessage =
|
||||
"Network error: Unable to reach the URL. Please check your internet connection and verify the URL is correct.";
|
||||
} else if (error instanceof Error) {
|
||||
errorMessage = `Error verifying URL: ${error.message}`;
|
||||
}
|
||||
|
||||
ctx.addIssue({
|
||||
code: "custom",
|
||||
message: errorMessage
|
||||
});
|
||||
}
|
||||
})
|
||||
])
|
||||
.transform((val) => (val === "" ? null : val))
|
||||
.nullish(),
|
||||
.string()
|
||||
.optional()
|
||||
.transform((val) => (val === "" ? null : val)),
|
||||
logoWidth: z.coerce.number<number>().min(1),
|
||||
logoHeight: z.coerce.number<number>().min(1),
|
||||
resourceTitle: z.string(),
|
||||
|
||||
@@ -1,202 +0,0 @@
|
||||
/*
|
||||
* This file is part of a proprietary work.
|
||||
*
|
||||
* Copyright (c) 2025-2026 Fossorial, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This file is licensed under the Fossorial Commercial License.
|
||||
* You may not use this file except in compliance with the License.
|
||||
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
||||
*
|
||||
* This file is not licensed under the AGPLv3.
|
||||
*/
|
||||
|
||||
import axios from "axios";
|
||||
import { db, exitNodes, newts, sites } from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import redisManager from "#private/lib/redis";
|
||||
import { sendToClient } from "#private/routers/ws";
|
||||
|
||||
const INITIAL_DELAY_MS = 15 * 1000; // 15 seconds before first check
|
||||
const CHECK_INTERVAL_MS = 10 * 1000; // Check every 10 seconds
|
||||
const MAX_DURATION_MS = 5 * 60 * 1000; // Give up after 5 minutes
|
||||
const REDIS_PENDING_SET = "exit-node-reconnect-pending";
|
||||
const REDIS_HASH_PREFIX = "exit-node-reconnect:";
|
||||
|
||||
interface PendingReconnect {
|
||||
startTime: number;
|
||||
reachableAt: string;
|
||||
}
|
||||
|
||||
// In-memory tracking for this node
|
||||
const pendingReconnects = new Map<number, PendingReconnect>();
|
||||
|
||||
let schedulerInterval: NodeJS.Timeout | null = null;
|
||||
|
||||
/**
|
||||
* Schedules a reconnect check for newts connected to the given exit node.
|
||||
* Called when an exit node transitions from offline to online.
|
||||
*/
|
||||
export async function scheduleExitNodeReconnect(
|
||||
exitNodeId: number,
|
||||
reachableAt: string
|
||||
): Promise<void> {
|
||||
logger.info(
|
||||
`Scheduling newt reconnect for exit node ${exitNodeId} (reachableAt: ${reachableAt})`
|
||||
);
|
||||
|
||||
const entry: PendingReconnect = {
|
||||
startTime: Date.now(),
|
||||
reachableAt
|
||||
};
|
||||
|
||||
pendingReconnects.set(exitNodeId, entry);
|
||||
|
||||
// Store in Redis if available for cross-node coordination
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
await redisManager.sadd(REDIS_PENDING_SET, exitNodeId.toString());
|
||||
await redisManager.hset(
|
||||
`${REDIS_HASH_PREFIX}${exitNodeId}`,
|
||||
"startTime",
|
||||
entry.startTime.toString()
|
||||
);
|
||||
await redisManager.hset(
|
||||
`${REDIS_HASH_PREFIX}${exitNodeId}`,
|
||||
"reachableAt",
|
||||
reachableAt
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the background interval that checks pending exit node reconnects.
|
||||
*/
|
||||
export function startExitNodeReconnectScheduler(): void {
|
||||
if (schedulerInterval) {
|
||||
return;
|
||||
}
|
||||
|
||||
schedulerInterval = setInterval(async () => {
|
||||
try {
|
||||
await processPendingReconnects();
|
||||
} catch (error) {
|
||||
logger.error("Error in exit node reconnect scheduler", { error });
|
||||
}
|
||||
}, CHECK_INTERVAL_MS);
|
||||
|
||||
logger.debug("Started exit node reconnect scheduler");
|
||||
}
|
||||
|
||||
async function processPendingReconnects(): Promise<void> {
|
||||
// Merge in-memory and Redis-tracked pending reconnects
|
||||
const toProcess = new Map(pendingReconnects);
|
||||
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
const redisIds = await redisManager.smembers(REDIS_PENDING_SET);
|
||||
for (const idStr of redisIds) {
|
||||
const id = parseInt(idStr, 10);
|
||||
if (!toProcess.has(id)) {
|
||||
const startTimeStr = await redisManager.hget(
|
||||
`${REDIS_HASH_PREFIX}${id}`,
|
||||
"startTime"
|
||||
);
|
||||
const reachableAt = await redisManager.hget(
|
||||
`${REDIS_HASH_PREFIX}${id}`,
|
||||
"reachableAt"
|
||||
);
|
||||
if (startTimeStr && reachableAt) {
|
||||
toProcess.set(id, {
|
||||
startTime: parseInt(startTimeStr, 10),
|
||||
reachableAt
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const now = Date.now();
|
||||
|
||||
for (const [exitNodeId, entry] of toProcess) {
|
||||
const elapsed = now - entry.startTime;
|
||||
|
||||
// Give up after max duration
|
||||
if (elapsed >= MAX_DURATION_MS) {
|
||||
logger.warn(
|
||||
`Exit node reconnect check timed out for exit node ${exitNodeId} after 5 minutes`
|
||||
);
|
||||
await removePending(exitNodeId);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Respect initial delay
|
||||
if (elapsed < INITIAL_DELAY_MS) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if the exit node HTTP endpoint is reachable
|
||||
const pingUrl = `${entry.reachableAt}/ping`;
|
||||
try {
|
||||
await axios.get(pingUrl, { timeout: 5000 });
|
||||
} catch {
|
||||
logger.debug(
|
||||
`Exit node ${exitNodeId} not yet reachable at ${pingUrl}`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Node is reachable — send reconnect to all connected newts
|
||||
logger.info(
|
||||
`Exit node ${exitNodeId} is reachable. Sending newt/wg/reconnect to connected newts.`
|
||||
);
|
||||
|
||||
await sendReconnectToNewts(exitNodeId);
|
||||
await removePending(exitNodeId);
|
||||
}
|
||||
}
|
||||
|
||||
async function sendReconnectToNewts(exitNodeId: number): Promise<void> {
|
||||
try {
|
||||
const connectedNewts = await db
|
||||
.select({ newtId: newts.newtId })
|
||||
.from(newts)
|
||||
.innerJoin(sites, eq(newts.siteId, sites.siteId))
|
||||
.where(eq(sites.exitNodeId, exitNodeId));
|
||||
|
||||
if (connectedNewts.length === 0) {
|
||||
logger.debug(
|
||||
`No newts found for exit node ${exitNodeId}, nothing to reconnect`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`Sending newt/wg/reconnect to ${connectedNewts.length} newt(s) for exit node ${exitNodeId}`
|
||||
);
|
||||
|
||||
const reconnectMessage = {
|
||||
type: "newt/wg/reconnect",
|
||||
data: {}
|
||||
};
|
||||
|
||||
await Promise.allSettled(
|
||||
connectedNewts.map(({ newtId }) =>
|
||||
sendToClient(newtId, reconnectMessage)
|
||||
)
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to send reconnect messages for exit node ${exitNodeId}`,
|
||||
{ error }
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async function removePending(exitNodeId: number): Promise<void> {
|
||||
pendingReconnects.delete(exitNodeId);
|
||||
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
await redisManager.srem(REDIS_PENDING_SET, exitNodeId.toString());
|
||||
await redisManager.del(`${REDIS_HASH_PREFIX}${exitNodeId}`);
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,6 @@ import { MessageHandler } from "@server/routers/ws";
|
||||
import { RemoteExitNode } from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { scheduleExitNodeReconnect } from "./exitNodeReconnectScheduler";
|
||||
|
||||
/**
|
||||
* Handles ping messages from clients and responds with pong
|
||||
@@ -38,13 +37,6 @@ export const handleRemoteExitNodePingMessage: MessageHandler = async (
|
||||
}
|
||||
|
||||
try {
|
||||
// Fetch the current state before updating so we can detect the offline→online transition
|
||||
const [currentExitNode] = await db
|
||||
.select({ online: exitNodes.online, reachableAt: exitNodes.reachableAt })
|
||||
.from(exitNodes)
|
||||
.where(eq(exitNodes.exitNodeId, remoteExitNode.exitNodeId))
|
||||
.limit(1);
|
||||
|
||||
// Update the exit node's last ping timestamp
|
||||
await db
|
||||
.update(exitNodes)
|
||||
@@ -53,16 +45,6 @@ export const handleRemoteExitNodePingMessage: MessageHandler = async (
|
||||
online: true
|
||||
})
|
||||
.where(eq(exitNodes.exitNodeId, remoteExitNode.exitNodeId));
|
||||
|
||||
// If the exit node was offline and is now coming online, schedule newt reconnects
|
||||
if (currentExitNode && !currentExitNode.online && currentExitNode.reachableAt) {
|
||||
scheduleExitNodeReconnect(
|
||||
remoteExitNode.exitNodeId,
|
||||
currentExitNode.reachableAt
|
||||
).catch((error) => {
|
||||
logger.error("Failed to schedule exit node reconnect", { error });
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error("Error handling ping message", { error });
|
||||
}
|
||||
|
||||
@@ -22,4 +22,3 @@ export * from "./listRemoteExitNodes";
|
||||
export * from "./pickRemoteExitNodeDefaults";
|
||||
export * from "./quickStartRemoteExitNode";
|
||||
export * from "./offlineChecker";
|
||||
export * from "./exitNodeReconnectScheduler";
|
||||
|
||||
@@ -14,8 +14,7 @@
|
||||
import {
|
||||
handleRemoteExitNodeRegisterMessage,
|
||||
handleRemoteExitNodePingMessage,
|
||||
startRemoteExitNodeOfflineChecker,
|
||||
startExitNodeReconnectScheduler
|
||||
startRemoteExitNodeOfflineChecker
|
||||
} from "#private/routers/remoteExitNode";
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { build } from "@server/build";
|
||||
@@ -30,5 +29,4 @@ export const messageHandlers: Record<string, MessageHandler> = {
|
||||
|
||||
if (build != "saas") {
|
||||
startRemoteExitNodeOfflineChecker(); // this is to handle the offline check for remote exit nodes
|
||||
startExitNodeReconnectScheduler(); // check pending exit node reconnects and notify newts
|
||||
}
|
||||
|
||||
@@ -10,3 +10,5 @@ export * from "./listUserDevices";
|
||||
export * from "./updateClient";
|
||||
export * from "./getClient";
|
||||
export * from "./createUserClient";
|
||||
export * from "./verifyClientAssociationsCache";
|
||||
export * from "./rebuildClientAssociationsCacheRoute";
|
||||
|
||||
81
server/routers/client/rebuildClientAssociationsCacheRoute.ts
Normal file
81
server/routers/client/rebuildClientAssociationsCacheRoute.ts
Normal file
@@ -0,0 +1,81 @@
|
||||
import { Request, Response, NextFunction } from "express";
|
||||
import { z } from "zod";
|
||||
import { db } from "@server/db";
|
||||
import { clients } from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import response from "@server/lib/response";
|
||||
import HttpCode from "@server/types/HttpCode";
|
||||
import createHttpError from "http-errors";
|
||||
import logger from "@server/logger";
|
||||
import { fromError } from "zod-validation-error";
|
||||
import { OpenAPITags, registry } from "@server/openApi";
|
||||
import { rebuildClientAssociationsFromClient } from "@server/lib/rebuildClientAssociations";
|
||||
|
||||
const paramsSchema = z.strictObject({
|
||||
clientId: z.string().transform(Number).pipe(z.int().positive())
|
||||
});
|
||||
|
||||
registry.registerPath({
|
||||
method: "post",
|
||||
path: "/client/{clientId}/rebuild-associations-cache",
|
||||
description:
|
||||
"Rebuild the client's site/site-resource association cache based on current permissions.",
|
||||
tags: [OpenAPITags.Client],
|
||||
request: {
|
||||
params: paramsSchema
|
||||
},
|
||||
responses: {}
|
||||
});
|
||||
|
||||
export async function rebuildClientAssociationsCacheRoute(
|
||||
req: Request,
|
||||
res: Response,
|
||||
next: NextFunction
|
||||
): Promise<any> {
|
||||
try {
|
||||
const parsedParams = paramsSchema.safeParse(req.params);
|
||||
if (!parsedParams.success) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.BAD_REQUEST,
|
||||
fromError(parsedParams.error).toString()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
const { clientId } = parsedParams.data;
|
||||
|
||||
const [client] = await db
|
||||
.select()
|
||||
.from(clients)
|
||||
.where(eq(clients.clientId, clientId))
|
||||
.limit(1);
|
||||
|
||||
if (!client) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.NOT_FOUND,
|
||||
`Client with ID ${clientId} not found`
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
await rebuildClientAssociationsFromClient(client);
|
||||
|
||||
return response(res, {
|
||||
data: null,
|
||||
success: true,
|
||||
error: false,
|
||||
message: "Client association cache rebuilt successfully",
|
||||
status: HttpCode.OK
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.INTERNAL_SERVER_ERROR,
|
||||
"Failed to rebuild client association cache"
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
83
server/routers/client/verifyClientAssociationsCache.ts
Normal file
83
server/routers/client/verifyClientAssociationsCache.ts
Normal file
@@ -0,0 +1,83 @@
|
||||
import { Request, Response, NextFunction } from "express";
|
||||
import { z } from "zod";
|
||||
import { db } from "@server/db";
|
||||
import { clients } from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import response from "@server/lib/response";
|
||||
import HttpCode from "@server/types/HttpCode";
|
||||
import createHttpError from "http-errors";
|
||||
import logger from "@server/logger";
|
||||
import { fromError } from "zod-validation-error";
|
||||
import { OpenAPITags, registry } from "@server/openApi";
|
||||
import { verifyClientAssociationsCache as verifyClientAssociationsCacheLib } from "@server/lib/rebuildClientAssociations";
|
||||
|
||||
const paramsSchema = z.strictObject({
|
||||
clientId: z.string().transform(Number).pipe(z.int().positive())
|
||||
});
|
||||
|
||||
registry.registerPath({
|
||||
method: "get",
|
||||
path: "/client/{clientId}/verify-associations-cache",
|
||||
description:
|
||||
"Read-only check of whether the client's site/site-resource association cache matches what the current permissions imply.",
|
||||
tags: [OpenAPITags.Client],
|
||||
request: {
|
||||
params: paramsSchema
|
||||
},
|
||||
responses: {}
|
||||
});
|
||||
|
||||
export async function verifyClientAssociationsCache(
|
||||
req: Request,
|
||||
res: Response,
|
||||
next: NextFunction
|
||||
): Promise<any> {
|
||||
try {
|
||||
const parsedParams = paramsSchema.safeParse(req.params);
|
||||
if (!parsedParams.success) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.BAD_REQUEST,
|
||||
fromError(parsedParams.error).toString()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
const { clientId } = parsedParams.data;
|
||||
|
||||
const [client] = await db
|
||||
.select()
|
||||
.from(clients)
|
||||
.where(eq(clients.clientId, clientId))
|
||||
.limit(1);
|
||||
|
||||
if (!client) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.NOT_FOUND,
|
||||
`Client with ID ${clientId} not found`
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
const report = await verifyClientAssociationsCacheLib(client);
|
||||
|
||||
return response(res, {
|
||||
data: report,
|
||||
success: true,
|
||||
error: false,
|
||||
message: report.consistent
|
||||
? "Client association cache is consistent"
|
||||
: "Client association cache is INCONSISTENT",
|
||||
status: HttpCode.OK
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.INTERNAL_SERVER_ERROR,
|
||||
"Failed to verify client association cache"
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -153,6 +153,65 @@ export default function GeneralPage() {
|
||||
const [approvalId, setApprovalId] = useState<number | null>(null);
|
||||
const [isRefreshing, setIsRefreshing] = useState(false);
|
||||
const [, startTransition] = useTransition();
|
||||
const [cacheCheck, setCacheCheck] = useState<null | {
|
||||
consistent: boolean;
|
||||
missingSiteResourceIds: number[];
|
||||
extraSiteResourceIds: number[];
|
||||
missingSiteIds: number[];
|
||||
extraSiteIds: number[];
|
||||
expectedSiteResourceIds: number[];
|
||||
actualSiteResourceIds: number[];
|
||||
expectedSiteIds: number[];
|
||||
actualSiteIds: number[];
|
||||
}>(null);
|
||||
const [isCheckingCache, setIsCheckingCache] = useState(false);
|
||||
const [isRebuildingCache, setIsRebuildingCache] = useState(false);
|
||||
|
||||
const handleRebuildCache = async () => {
|
||||
if (!client.clientId) return;
|
||||
setIsRebuildingCache(true);
|
||||
try {
|
||||
await api.post(
|
||||
`/client/${client.clientId}/rebuild-associations-cache`
|
||||
);
|
||||
// Re-verify after rebuild so the result refreshes
|
||||
const res = await api.get(
|
||||
`/client/${client.clientId}/verify-associations-cache`
|
||||
);
|
||||
setCacheCheck(res.data.data);
|
||||
toast({
|
||||
title: "Cache rebuilt",
|
||||
description: "Association cache rebuilt successfully."
|
||||
});
|
||||
} catch (e) {
|
||||
toast({
|
||||
variant: "destructive",
|
||||
title: "Rebuild failed",
|
||||
description: formatAxiosError(e, "Failed to rebuild cache")
|
||||
});
|
||||
} finally {
|
||||
setIsRebuildingCache(false);
|
||||
}
|
||||
};
|
||||
|
||||
const handleVerifyCache = async () => {
|
||||
if (!client.clientId) return;
|
||||
setIsCheckingCache(true);
|
||||
try {
|
||||
const res = await api.get(
|
||||
`/client/${client.clientId}/verify-associations-cache`
|
||||
);
|
||||
setCacheCheck(res.data.data);
|
||||
} catch (e) {
|
||||
toast({
|
||||
variant: "destructive",
|
||||
title: "Cache check failed",
|
||||
description: formatAxiosError(e, "Failed to verify cache")
|
||||
});
|
||||
} finally {
|
||||
setIsCheckingCache(false);
|
||||
}
|
||||
};
|
||||
const { env } = useEnvContext();
|
||||
|
||||
const showApprovalFeatures =
|
||||
@@ -844,6 +903,75 @@ export default function GeneralPage() {
|
||||
</SettingsSectionBody>
|
||||
</SettingsSection>
|
||||
)}
|
||||
|
||||
{/* Hidden cache verification — subtle button, dev/admin diagnostic */}
|
||||
<div className="mt-8 flex flex-col gap-2 items-start opacity-30 hover:opacity-100 transition-opacity">
|
||||
<button
|
||||
type="button"
|
||||
onClick={handleVerifyCache}
|
||||
disabled={isCheckingCache}
|
||||
className="text-xs text-muted-foreground underline disabled:opacity-50"
|
||||
title="Verify the client's site association cache against current permissions (read-only)"
|
||||
>
|
||||
{isCheckingCache
|
||||
? "Checking cache…"
|
||||
: "Verify association cache"}
|
||||
</button>
|
||||
{cacheCheck && (
|
||||
<div
|
||||
className={
|
||||
"text-xs rounded border px-2 py-1 " +
|
||||
(cacheCheck.consistent
|
||||
? "border-green-600 text-green-700"
|
||||
: "border-red-600 text-red-700")
|
||||
}
|
||||
>
|
||||
{cacheCheck.consistent ? (
|
||||
<span className="flex items-center gap-1">
|
||||
<CheckCircle2 className="h-3 w-3" />
|
||||
Cache is consistent
|
||||
</span>
|
||||
) : (
|
||||
<div className="space-y-2">
|
||||
<div className="flex items-center gap-1 font-semibold">
|
||||
<XCircle className="h-3 w-3" />
|
||||
Cache is INCONSISTENT
|
||||
</div>
|
||||
<div>
|
||||
Missing site resources: [
|
||||
{cacheCheck.missingSiteResourceIds.join(
|
||||
", "
|
||||
)}
|
||||
]
|
||||
</div>
|
||||
<div>
|
||||
Extra site resources: [
|
||||
{cacheCheck.extraSiteResourceIds.join(", ")}
|
||||
]
|
||||
</div>
|
||||
<div>
|
||||
Missing sites: [
|
||||
{cacheCheck.missingSiteIds.join(", ")}]
|
||||
</div>
|
||||
<div>
|
||||
Extra sites: [
|
||||
{cacheCheck.extraSiteIds.join(", ")}]
|
||||
</div>
|
||||
<button
|
||||
type="button"
|
||||
onClick={handleRebuildCache}
|
||||
disabled={isRebuildingCache}
|
||||
className="mt-1 text-xs underline font-semibold disabled:opacity-50"
|
||||
>
|
||||
{isRebuildingCache
|
||||
? "Rebuilding…"
|
||||
: "Rebuild cache now"}
|
||||
</button>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</SettingsContainer>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -44,77 +44,11 @@ export type AuthPageCustomizationProps = {
|
||||
};
|
||||
|
||||
const AuthPageFormSchema = z.object({
|
||||
logoUrl: z.union([
|
||||
z.literal(""),
|
||||
z.string().superRefine(async (urlOrPath, ctx) => {
|
||||
const parseResult = z.url().safeParse(urlOrPath);
|
||||
if (!parseResult.success) {
|
||||
if (build !== "enterprise") {
|
||||
ctx.addIssue({
|
||||
code: "custom",
|
||||
message: "Must be a valid URL"
|
||||
});
|
||||
return;
|
||||
} else {
|
||||
try {
|
||||
validateLocalPath(urlOrPath);
|
||||
} catch (error) {
|
||||
ctx.addIssue({
|
||||
code: "custom",
|
||||
message:
|
||||
"Must be either a valid image URL or a valid pathname starting with `/` and not containing query parameters, `..` or `*`"
|
||||
});
|
||||
} finally {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
logoUrl: z
|
||||
.string()
|
||||
.optional()
|
||||
.transform((val) => (val === "" ? undefined : val)),
|
||||
|
||||
try {
|
||||
const response = await fetch(urlOrPath, {
|
||||
method: "HEAD"
|
||||
}).catch(() => {
|
||||
// If HEAD fails (CORS or method not allowed), try GET
|
||||
return fetch(urlOrPath, { method: "GET" });
|
||||
});
|
||||
|
||||
if (response.status !== 200) {
|
||||
ctx.addIssue({
|
||||
code: "custom",
|
||||
message: `Failed to load image. Please check that the URL is accessible.`
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const contentType = response.headers.get("content-type") ?? "";
|
||||
if (!contentType.startsWith("image/")) {
|
||||
ctx.addIssue({
|
||||
code: "custom",
|
||||
message: `URL does not point to an image. Please provide a URL to an image file (e.g., .png, .jpg, .svg).`
|
||||
});
|
||||
return;
|
||||
}
|
||||
} catch (error) {
|
||||
let errorMessage =
|
||||
"Unable to verify image URL. Please check that the URL is accessible and points to an image file.";
|
||||
|
||||
if (
|
||||
error instanceof TypeError &&
|
||||
error.message.includes("fetch")
|
||||
) {
|
||||
errorMessage =
|
||||
"Network error: Unable to reach the URL. Please check your internet connection and verify the URL is correct.";
|
||||
} else if (error instanceof Error) {
|
||||
errorMessage = `Error verifying URL: ${error.message}`;
|
||||
}
|
||||
|
||||
ctx.addIssue({
|
||||
code: "custom",
|
||||
message: errorMessage
|
||||
});
|
||||
}
|
||||
})
|
||||
]),
|
||||
logoWidth: z.coerce.number<number>().min(1),
|
||||
logoHeight: z.coerce.number<number>().min(1),
|
||||
orgTitle: z.string().optional(),
|
||||
|
||||
Reference in New Issue
Block a user