Compare commits

..

16 Commits

Author SHA1 Message Date
Owen
2c3151da9b Fix #3374 2026-07-03 17:36:06 -04:00
Owen
f60b8795ad Fix #3383 2026-07-03 17:26:49 -04:00
Owen
4054976388 Fix #3395 2026-07-03 17:15:48 -04:00
Owen
390c822bb4 Fix issue with overlapping site resources not sending 2026-07-03 16:13:50 -04:00
Owen
2bfc1901a6 Dont check the subnet because we dont use it 2026-07-03 14:38:01 -04:00
Owen
5da186b528 Quiet warning messages 2026-07-03 11:58:00 -04:00
Owen
600a96c13b Update rate limit to 10 2026-07-03 11:52:59 -04:00
Owen
e4e0da3723 Add not exists check 2026-07-03 11:24:53 -04:00
Owen
440ebfe08e Clarify error messages 2026-07-03 10:26:13 -04:00
Owen
b399d2a291 Add some retry and database confict mitigation 2026-07-03 10:23:32 -04:00
Owen
1b1fba60f1 Make sure to retry rebuilds 2026-07-03 09:02:08 -04:00
Owen
b93d26f09f Fix reading from replicas 2026-07-02 21:46:53 -04:00
Owen
fc54ad49b5 Fix trial showing 2026-07-02 21:46:44 -04:00
Owen
f87e136f6b Unique subnets for exit nodes 2026-07-02 20:54:59 -04:00
Owen
1bf3d2cdd6 Add back the sync with semver 2026-07-02 12:10:20 -04:00
Owen
5fc5a3ebca Adjust spacing 2026-07-02 11:49:49 -04:00
25 changed files with 3345 additions and 2178 deletions

View File

@@ -864,8 +864,8 @@
"policyAuthHeaderAuthTitle": "Basic Header Auth",
"policyAuthHeaderAuthDescription": "Validate a custom HTTP header name and value on each request",
"policyAuthHeaderAuthSummary": "Header configured",
"policyAuthHeaderName": "Header name",
"policyAuthHeaderValue": "Expected value",
"policyAuthHeaderName": "Username",
"policyAuthHeaderValue": "Password",
"policyAuthSetPasscode": "Set Passcode",
"policyAuthSetPincode": "Set PIN Code",
"policyAuthSetEmailWhitelist": "Set Email Whitelist",

4065
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -35,7 +35,7 @@
"@asteasolutions/zod-to-openapi": "8.5.0",
"@devolutions/iron-remote-desktop": "https://static.pangolin.net/packages/devolutions-iron-remote-desktop-0.0.0.tgz",
"@devolutions/iron-remote-desktop-rdp": "https://static.pangolin.net/packages/devolutions-iron-remote-desktop-rdp-0.0.0.tgz",
"@aws-sdk/client-s3": "3.1079.0",
"@aws-sdk/client-s3": "3.1056.0",
"@headlessui/react": "2.2.10",
"@hookform/resolvers": "5.4.0",
"@monaco-editor/react": "4.7.0",
@@ -43,38 +43,38 @@
"@novnc/novnc": "^1.7.0",
"@oslojs/crypto": "1.0.1",
"@oslojs/encoding": "1.1.0",
"@radix-ui/react-avatar": "1.2.1",
"@radix-ui/react-checkbox": "1.3.6",
"@radix-ui/react-collapsible": "1.1.15",
"@radix-ui/react-dialog": "1.1.18",
"@radix-ui/react-dropdown-menu": "2.1.19",
"@radix-ui/react-avatar": "1.1.11",
"@radix-ui/react-checkbox": "1.3.3",
"@radix-ui/react-collapsible": "1.1.12",
"@radix-ui/react-dialog": "1.1.15",
"@radix-ui/react-dropdown-menu": "2.1.16",
"@radix-ui/react-icons": "1.3.2",
"@radix-ui/react-label": "2.1.11",
"@radix-ui/react-popover": "1.1.18",
"@radix-ui/react-progress": "1.1.11",
"@radix-ui/react-radio-group": "1.4.2",
"@radix-ui/react-scroll-area": "1.2.13",
"@radix-ui/react-select": "2.3.2",
"@radix-ui/react-separator": "1.1.11",
"@radix-ui/react-slot": "1.3.0",
"@radix-ui/react-switch": "1.3.2",
"@radix-ui/react-tabs": "1.1.16",
"@radix-ui/react-toast": "1.2.18",
"@radix-ui/react-tooltip": "1.2.11",
"@radix-ui/react-label": "2.1.8",
"@radix-ui/react-popover": "1.1.15",
"@radix-ui/react-progress": "1.1.8",
"@radix-ui/react-radio-group": "1.3.8",
"@radix-ui/react-scroll-area": "1.2.10",
"@radix-ui/react-select": "2.2.6",
"@radix-ui/react-separator": "1.1.8",
"@radix-ui/react-slot": "1.2.4",
"@radix-ui/react-switch": "1.2.6",
"@radix-ui/react-tabs": "1.1.13",
"@radix-ui/react-toast": "1.2.15",
"@radix-ui/react-tooltip": "1.2.8",
"@react-email/body": "0.3.0",
"@react-email/components": "1.0.12",
"@react-email/render": "2.0.10",
"@react-email/render": "2.0.8",
"@react-email/tailwind": "2.0.7",
"@simplewebauthn/browser": "13.3.0",
"@simplewebauthn/server": "13.3.2",
"@simplewebauthn/server": "13.3.1",
"@tailwindcss/forms": "0.5.11",
"@tanstack/react-query": "5.101.2",
"@tanstack/react-query": "5.100.14",
"@tanstack/react-table": "8.21.3",
"@xterm/addon-fit": "^0.11.0",
"@xterm/addon-web-links": "^0.12.0",
"@xterm/xterm": "^6.0.0",
"arctic": "3.7.0",
"axios": "1.18.1",
"axios": "1.16.1",
"better-sqlite3": "11.9.1",
"canvas-confetti": "1.9.4",
"class-variance-authority": "0.7.1",
@@ -91,40 +91,40 @@
"helmet": "8.2.0",
"http-errors": "2.0.1",
"input-otp": "1.4.2",
"ioredis": "5.11.1",
"ioredis": "5.11.0",
"jmespath": "0.16.0",
"js-yaml": "5.2.1",
"js-yaml": "4.2.0",
"jsonwebtoken": "9.0.3",
"lucide-react": "1.23.0",
"lucide-react": "1.17.0",
"maxmind": "5.0.6",
"moment": "2.30.1",
"next": "16.2.10",
"next-intl": "4.13.1",
"next": "16.2.6",
"next-intl": "4.13.0",
"next-themes": "0.4.6",
"nextjs-toploader": "3.9.17",
"node-cache": "5.1.2",
"nodemailer": "9.0.3",
"nodemailer": "9.0.1",
"oslo": "1.2.1",
"pg": "8.22.0",
"posthog-node": "5.39.4",
"pg": "8.21.0",
"posthog-node": "5.35.6",
"qrcode.react": "4.2.0",
"react": "19.2.7",
"react-day-picker": "10.0.1",
"react-dom": "19.2.7",
"react": "19.2.6",
"react-day-picker": "9.14.0",
"react-dom": "19.2.6",
"react-easy-sort": "1.8.0",
"react-hook-form": "7.80.0",
"react-icons": "5.7.0",
"recharts": "3.9.1",
"react-hook-form": "7.76.1",
"react-icons": "5.6.0",
"recharts": "3.8.1",
"reodotdev": "1.1.0",
"semver": "7.8.5",
"semver": "7.8.1",
"sshpk": "1.18.0",
"stripe": "22.3.0",
"stripe": "22.2.0",
"swagger-ui-express": "5.0.1",
"tailwind-merge": "3.6.0",
"topojson-client": "3.1.0",
"tw-animate-css": "1.4.0",
"use-debounce": "10.1.1",
"uuid": "14.0.1",
"uuid": "14.0.0",
"vaul": "1.1.2",
"visionscarto-world-atlas": "1.0.0",
"winston": "3.19.0",
@@ -136,11 +136,11 @@
"zod-validation-error": "5.0.0"
},
"devDependencies": {
"@dotenvx/dotenvx": "2.1.4",
"@dotenvx/dotenvx": "1.69.1",
"@esbuild-plugins/tsconfig-paths": "0.1.2",
"@react-email/ui": "^6.6.6",
"@tailwindcss/postcss": "4.3.2",
"@tanstack/react-query-devtools": "5.101.2",
"@react-email/ui": "^6.5.0",
"@tailwindcss/postcss": "4.3.0",
"@tanstack/react-query-devtools": "5.100.14",
"@types/better-sqlite3": "7.6.13",
"@types/cookie-parser": "1.4.10",
"@types/cors": "2.8.19",
@@ -151,14 +151,14 @@
"@types/jmespath": "0.15.2",
"@types/js-yaml": "4.0.9",
"@types/jsonwebtoken": "9.0.10",
"@types/node": "26.1.0",
"@types/nodemailer": "8.0.1",
"@types/node": "25.9.1",
"@types/nodemailer": "8.0.0",
"@types/nprogress": "0.2.3",
"@types/pg": "8.20.0",
"@types/react": "19.2.17",
"@types/react": "19.2.15",
"@types/react-dom": "19.2.3",
"@types/semver": "7.7.1",
"@types/sshpk": "1.17.5",
"@types/sshpk": "1.17.4",
"@types/swagger-ui-express": "4.1.8",
"@types/topojson-client": "3.1.5",
"@types/ws": "8.18.1",
@@ -166,21 +166,21 @@
"babel-plugin-react-compiler": "1.0.0",
"drizzle-kit": "0.31.10",
"esbuild": "0.28.1",
"esbuild-node-externals": "1.23.1",
"eslint": "10.6.0",
"eslint-config-next": "16.2.10",
"postcss": "8.5.16",
"prettier": "3.9.4",
"react-email": "6.6.6",
"tailwindcss": "4.3.2",
"esbuild-node-externals": "1.22.0",
"eslint": "10.4.0",
"eslint-config-next": "16.2.6",
"postcss": "8.5.15",
"prettier": "3.8.3",
"react-email": "6.5.0",
"tailwindcss": "4.3.0",
"tsc-alias": "1.8.17",
"tsx": "4.22.5",
"tsx": "4.22.3",
"typescript": "6.0.3",
"typescript-eslint": "8.62.1"
"typescript-eslint": "8.60.0"
},
"overrides": {
"esbuild": "0.28.1",
"dompurify": "3.4.0",
"postcss": "8.5.16"
"postcss": "8.5.15"
}
}

83
server/lib/dbRetry.ts Normal file
View File

@@ -0,0 +1,83 @@
import logger from "@server/logger";
const MAX_RETRIES = 5;
const BASE_DELAY_MS = 50;
/**
* Detect transient errors that are safe to retry (connection drops, deadlocks,
* serialization failures). PostgreSQL deadlocks (40P01) are always safe to
* retry: the database guarantees exactly one winner per deadlock pair, so the
* loser just needs to try again.
*/
export function isTransientError(error: any): boolean {
if (!error) return false;
const message = (error.message || "").toLowerCase();
const causeMessage = (error.cause?.message || "").toLowerCase();
const code = error.code || error.cause?.code || "";
// Connection timeout / terminated
if (
message.includes("connection timeout") ||
message.includes("connection terminated") ||
message.includes("timeout exceeded when trying to connect") ||
causeMessage.includes("connection terminated unexpectedly") ||
causeMessage.includes("connection timeout")
) {
return true;
}
// PostgreSQL deadlock detected - always safe to retry (one winner guaranteed)
if (code === "40P01" || message.includes("deadlock")) {
return true;
}
// PostgreSQL serialization failure
if (code === "40001") {
return true;
}
// ECONNRESET, ECONNREFUSED, EPIPE, ETIMEDOUT
if (
code === "ECONNRESET" ||
code === "ECONNREFUSED" ||
code === "EPIPE" ||
code === "ETIMEDOUT"
) {
return true;
}
return false;
}
/**
* Simple retry wrapper with exponential backoff for transient errors
* (deadlocks, connection timeouts, unexpected disconnects).
*/
export async function withRetry<T>(
operation: () => Promise<T>,
context: string,
maxRetries: number = MAX_RETRIES,
baseDelayMs: number = BASE_DELAY_MS
): Promise<T> {
let attempt = 0;
while (true) {
try {
return await operation();
} catch (error: any) {
if (isTransientError(error) && attempt < maxRetries) {
attempt++;
const baseDelay = Math.pow(2, attempt - 1) * baseDelayMs;
const jitter = Math.random() * baseDelay;
const delay = baseDelay + jitter;
logger.warn(
`Transient DB error in ${context}, retrying attempt ${attempt}/${maxRetries} after ${delay.toFixed(0)}ms`,
{ code: error?.code ?? error?.cause?.code }
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
}

View File

@@ -1,30 +1,55 @@
import { db, exitNodes } from "@server/db";
import { db, exitNodes, Transaction } from "@server/db";
import config from "@server/lib/config";
import { findNextAvailableCidr } from "@server/lib/ip";
import { lockManager } from "#dynamic/lib/lock";
export async function getNextAvailableSubnet(): Promise<string> {
// Get all existing subnets from routes table
const existingAddresses = await db
.select({
address: exitNodes.address
})
.from(exitNodes);
const addresses = existingAddresses.map((a) => a.address);
let subnet = findNextAvailableCidr(
addresses,
config.getRawConfig().gerbil.block_size,
config.getRawConfig().gerbil.subnet_group
);
if (!subnet) {
throw new Error("No available subnets remaining in space");
/**
* Reserves the next available exit node subnet.
*
* Exit node subnets must never overlap with one another - regardless of
* which org(s) they belong to - since HA exit nodes can end up routing for
* the same org. This acquires a lock that the caller MUST release (via the
* returned `release`) only after the chosen address has been durably
* persisted (e.g. after the enclosing transaction commits), otherwise
* concurrent callers can race and pick the same subnet.
*/
export async function getNextAvailableSubnet(
trx: Transaction | typeof db = db
): Promise<{ value: string; release: () => Promise<void> }> {
const lockKey = "exit-node-subnet-allocation";
const acquired = await lockManager.acquireLockWithRetry(lockKey, 6000);
if (!acquired) {
throw new Error(`Failed to acquire lock: ${lockKey}`);
}
const release = () => lockManager.releaseLock(lockKey, acquired);
// replace the last octet with 1
subnet =
subnet.split(".").slice(0, 3).join(".") +
".1" +
"/" +
subnet.split("/")[1];
return subnet;
try {
// Get all existing subnets from routes table
const existingAddresses = await trx
.select({
address: exitNodes.address
})
.from(exitNodes);
const addresses = existingAddresses.map((a) => a.address);
let subnet = findNextAvailableCidr(
addresses,
config.getRawConfig().gerbil.block_size,
config.getRawConfig().gerbil.subnet_group
);
if (!subnet) {
throw new Error("No available subnets remaining in space");
}
// replace the last octet with 1
subnet =
subnet.split(".").slice(0, 3).join(".") +
".1" +
"/" +
subnet.split("/")[1];
return { value: subnet, release };
} catch (e) {
await release();
throw e;
}
}

View File

@@ -45,6 +45,7 @@ import {
} from "@server/routers/client/targets";
import { lockManager } from "#dynamic/lib/lock";
import { rebuildQueue } from "#dynamic/lib/rebuildQueue";
import { withRetry, isTransientError } from "@server/lib/dbRetry";
import {
checkOrgRebuildRateLimit,
decrementOrgRebuildCount,
@@ -285,10 +286,20 @@ export async function rebuildClientAssociationsFromSiteResource(
) {
await incrementOrgRebuildCount(siteResource.orgId);
try {
return await lockManager.withLock(
`rebuild-client-associations:site-resource:${siteResource.siteResourceId}`,
() => rebuildClientAssociationsFromSiteResourceImpl(siteResource),
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
// The whole locked rebuild is idempotent (it diffs full expected vs.
// actual state each time), so on a transient DB error it's safe to
// retry the entire thing rather than just the failed query.
return await withRetry(
() =>
lockManager.withLock(
`rebuild-client-associations:site-resource:${siteResource.siteResourceId}`,
() =>
rebuildClientAssociationsFromSiteResourceImpl(
siteResource
),
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
),
`rebuildClientAssociationsFromSiteResource:${siteResource.siteResourceId}`
);
} catch (err: any) {
if (
@@ -304,6 +315,17 @@ export async function rebuildClientAssociationsFromSiteResource(
});
return { mergedAllClients: [] };
}
if (isTransientError(err)) {
logger.warn(
`rebuildClientAssociations: transient DB error rebuilding site resource ${siteResource.siteResourceId} persisted after retries, queuing for deferred processing:`,
err
);
await rebuildQueue.enqueue({
type: "site-resource",
id: siteResource.siteResourceId
});
return { mergedAllClients: [] };
}
throw err;
} finally {
await decrementOrgRebuildCount(siteResource.orgId);
@@ -463,6 +485,7 @@ async function rebuildClientAssociationsFromSiteResourceImpl(
await trx
.insert(clientSiteResourcesAssociationsCache)
.values(clientSiteResourcesToInsert)
.onConflictDoNothing()
.returning();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteResourceId=${siteResource.siteResourceId} inserted clientSiteResource associations`
@@ -510,121 +533,148 @@ async function rebuildClientAssociationsFromSiteResourceImpl(
for (const site of sitesToProcess) {
const siteId = site.siteId;
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] processing siteId=${siteId} for siteResourceId=${siteResource.siteResourceId}`
);
try {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] processing siteId=${siteId} for siteResourceId=${siteResource.siteResourceId}`
);
const existingClientSites = await trx
.select({
clientId: clientSitesAssociationsCache.clientId
})
.from(clientSitesAssociationsCache)
.where(eq(clientSitesAssociationsCache.siteId, siteId));
const existingClientSites = await trx
.select({
clientId: clientSitesAssociationsCache.clientId
})
.from(clientSitesAssociationsCache)
.where(eq(clientSitesAssociationsCache.siteId, siteId));
const existingClientSiteIds = existingClientSites.map(
(row) => row.clientId
);
const existingClientSiteIds = existingClientSites.map(
(row) => row.clientId
);
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} existingClientSiteIds=[${existingClientSiteIds.join(", ")}]`
);
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} existingClientSiteIds=[${existingClientSiteIds.join(", ")}]`
);
// Get full client details for existing clients (needed for sending delete messages)
const existingClients =
existingClientSiteIds.length > 0
? await trx
.select({
clientId: clients.clientId,
pubKey: clients.pubKey,
subnet: clients.subnet
})
.from(clients)
.where(inArray(clients.clientId, existingClientSiteIds))
// Get full client details for existing clients (needed for sending delete messages)
const existingClients =
existingClientSiteIds.length > 0
? await trx
.select({
clientId: clients.clientId,
pubKey: clients.pubKey,
subnet: clients.subnet
})
.from(clients)
.where(
inArray(clients.clientId, existingClientSiteIds)
)
: [];
const otherResourceClientIds =
clientsFromOtherResourcesBySite.get(siteId) ??
new Set<number>();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} otherResourceClientIds=[${[...otherResourceClientIds].join(", ")}] mergedAllClientIds=[${mergedAllClientIds.join(", ")}]`
);
// Expected clients from this resource are site-scoped: if this site is
// no longer attached to the resource, the expected set is empty.
const expectedClientIdsForSite = currentSiteIdSet.has(siteId)
? mergedAllClientIds
: [];
const otherResourceClientIds =
clientsFromOtherResourcesBySite.get(siteId) ?? new Set<number>();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} otherResourceClientIds=[${[...otherResourceClientIds].join(", ")}] mergedAllClientIds=[${mergedAllClientIds.join(", ")}]`
);
// Expected clients from this resource are site-scoped: if this site is
// no longer attached to the resource, the expected set is empty.
const expectedClientIdsForSite = currentSiteIdSet.has(siteId)
? mergedAllClientIds
: [];
const clientSitesToAdd = expectedClientIdsForSite.filter(
(clientId) =>
!existingClientSiteIds.includes(clientId) &&
!otherResourceClientIds.has(clientId) // dont add if already connected via another site resource
);
const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({
clientId,
siteId
}));
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toAdd=[${clientSitesToAdd.join(", ")}]`
);
if (clientSitesToInsert.length > 0) {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserting ${clientSitesToInsert.length} clientSite association(s)`
// Note: we deliberately do NOT exclude clients covered by another
// site resource here (unlike clientSitesToRemove below). Doing so
// previously caused a permanent gap: if resource A saw resource B's
// cache row and skipped adding (assuming B would maintain it), and
// B's own rebuild made the same assumption about A, the site-level
// row could end up never inserted by anyone even though both
// resources' client associations were otherwise correct.
// onConflictDoNothing makes a redundant insert harmless, so there's
// no correctness reason to skip here.
const clientSitesToAdd = expectedClientIdsForSite.filter(
(clientId) => !existingClientSiteIds.includes(clientId)
);
await trx
.insert(clientSitesAssociationsCache)
.values(clientSitesToInsert)
.returning();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserted clientSite associations`
);
} else {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} no clientSite associations to insert`
);
}
// Now remove any client-site associations that should no longer exist
const clientSitesToRemove = existingClientSiteIds.filter(
(clientId) =>
!expectedClientIdsForSite.includes(clientId) &&
!otherResourceClientIds.has(clientId) // dont remove if there is still another connection for another site resource
);
const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({
clientId,
siteId
}));
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toRemove=[${clientSitesToRemove.join(", ")}]`
);
if (clientSitesToRemove.length > 0) {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} deleting ${clientSitesToRemove.length} clientSite association(s)`
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toAdd=[${clientSitesToAdd.join(", ")}]`
);
await trx
.delete(clientSitesAssociationsCache)
.where(
and(
eq(clientSitesAssociationsCache.siteId, siteId),
inArray(
clientSitesAssociationsCache.clientId,
clientSitesToRemove
)
)
if (clientSitesToInsert.length > 0) {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserting ${clientSitesToInsert.length} clientSite association(s)`
);
}
await trx
.insert(clientSitesAssociationsCache)
.values(clientSitesToInsert)
.onConflictDoNothing()
.returning();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserted clientSite associations`
);
} else {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} no clientSite associations to insert`
);
}
// Now handle the messages to add/remove peers on both the newt and olm sides
await handleMessagesForSiteClients(
site,
siteId,
mergedAllClients,
existingClients,
clientSitesToAdd,
clientSitesToRemove,
trx
);
// Now remove any client-site associations that should no longer exist
const clientSitesToRemove = existingClientSiteIds.filter(
(clientId) =>
!expectedClientIdsForSite.includes(clientId) &&
!otherResourceClientIds.has(clientId) // dont remove if there is still another connection for another site resource
);
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toRemove=[${clientSitesToRemove.join(", ")}]`
);
if (clientSitesToRemove.length > 0) {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} deleting ${clientSitesToRemove.length} clientSite association(s)`
);
await trx
.delete(clientSitesAssociationsCache)
.where(
and(
eq(clientSitesAssociationsCache.siteId, siteId),
inArray(
clientSitesAssociationsCache.clientId,
clientSitesToRemove
)
)
);
}
// Now handle the messages to add/remove peers on both the newt and olm sides
await handleMessagesForSiteClients(
site,
siteId,
mergedAllClients,
existingClients,
clientSitesToAdd,
clientSitesToRemove,
trx
);
} catch (err) {
// Don't let a failure on one site abort processing of every
// other site queued after it in this run. Since we're not
// re-throwing, the outer wrapper's retry/requeue logic never
// sees this failure, so explicitly queue this resource for a
// follow-up pass to reconcile whatever this site didn't get to.
logger.error(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} failed while processing site for siteResourceId=${siteResource.siteResourceId}, continuing with remaining sites and queuing a follow-up pass:`,
err
);
await rebuildQueue.enqueue({
type: "site-resource",
id: siteResource.siteResourceId
});
}
}
// Handle subnet proxy target updates for the resource associations
@@ -657,7 +707,7 @@ async function handleMessagesForSiteClients(
trx: Transaction | typeof db = db
): Promise<void> {
if (!site.exitNodeId) {
logger.warn(
logger.debug(
`Exit node ID not on site ${site.siteId} so there is no reason to update clients because it must be offline`
);
return;
@@ -671,14 +721,14 @@ async function handleMessagesForSiteClients(
.limit(1);
if (!exitNode) {
logger.warn(
logger.debug(
`Exit node not found for site ${site.siteId} so there is no reason to update clients because it must be offline`
);
return;
}
if (!site.publicKey) {
logger.warn(
logger.debug(
`Site publicKey not set for site ${site.siteId} so cannot add peers to clients`
);
return;
@@ -692,7 +742,7 @@ async function handleMessagesForSiteClients(
.where(eq(newts.siteId, siteId))
.limit(1);
if (!newt) {
logger.warn(
logger.debug(
`Newt not found for site ${siteId} so cannot add peers to clients`
);
return;
@@ -917,7 +967,7 @@ export async function updateClientSiteDestinations(
for (const site of sitesData) {
if (!site.sites.subnet) {
logger.warn(`Site ${site.sites.siteId} has no subnet, skipping`);
logger.debug(`Site ${site.sites.siteId} has no subnet, skipping`);
continue;
}
@@ -1656,10 +1706,17 @@ export async function rebuildClientAssociationsFromClient(
await incrementOrgRebuildCount(client.orgId);
try {
const trx = primaryDb;
return await lockManager.withLock(
`rebuild-client-associations:client:${client.clientId}`,
() => rebuildClientAssociationsFromClientImpl(client, trx),
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
// The whole locked rebuild is idempotent (it diffs full expected vs.
// actual state each time), so on a transient DB error it's safe to
// retry the entire thing rather than just the failed query.
return await withRetry(
() =>
lockManager.withLock(
`rebuild-client-associations:client:${client.clientId}`,
() => rebuildClientAssociationsFromClientImpl(client, trx),
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
),
`rebuildClientAssociationsFromClient:${client.clientId}`
);
} catch (err: any) {
if (
@@ -1675,6 +1732,17 @@ export async function rebuildClientAssociationsFromClient(
});
return;
}
if (isTransientError(err)) {
logger.warn(
`rebuildClientAssociations: transient DB error rebuilding client ${client.clientId} persisted after retries, queuing for deferred processing:`,
err
);
await rebuildQueue.enqueue({
type: "client",
id: client.clientId
});
return;
}
throw err;
} finally {
await decrementOrgRebuildCount(client.orgId);
@@ -1826,12 +1894,15 @@ async function rebuildClientAssociationsFromClientImpl(
// Insert new associations
if (resourcesToAdd.length > 0) {
await trx.insert(clientSiteResourcesAssociationsCache).values(
resourcesToAdd.map((siteResourceId) => ({
clientId: client.clientId,
siteResourceId
}))
);
await trx
.insert(clientSiteResourcesAssociationsCache)
.values(
resourcesToAdd.map((siteResourceId) => ({
clientId: client.clientId,
siteResourceId
}))
)
.onConflictDoNothing();
}
// Remove old associations
@@ -1869,12 +1940,15 @@ async function rebuildClientAssociationsFromClientImpl(
// Insert new site associations
if (sitesToAdd.length > 0) {
await trx.insert(clientSitesAssociationsCache).values(
sitesToAdd.map((siteId) => ({
clientId: client.clientId,
siteId
}))
);
await trx
.insert(clientSitesAssociationsCache)
.values(
sitesToAdd.map((siteId) => ({
clientId: client.clientId,
siteId
}))
)
.onConflictDoNothing();
}
// Remove old site associations

View File

@@ -1,10 +1,14 @@
import logger from "@server/logger";
import { isTransientError } from "@server/lib/dbRetry";
export type RebuildJobType = "site-resource" | "client";
export interface RebuildJob {
type: RebuildJobType;
id: number;
// Number of times this job has already been re-queued after a transient
// failure. Absent/0 means it has not failed yet.
attempt?: number;
}
export interface RebuildJobHandlers {
@@ -24,6 +28,10 @@ export interface RebuildQueueManager {
// retried shortly after against fresh DB state.
const POLL_INTERVAL_MS = 500;
const BATCH_SIZE = 5;
// A job that fails with a transient DB error gets re-queued with backoff
// instead of being dropped, up to this many times.
const MAX_JOB_ATTEMPTS = 5;
const JOB_RETRY_BASE_DELAY_MS = 1000;
function dedupeKey(job: RebuildJob): string {
return `${job.type}:${job.id}`;
@@ -106,10 +114,29 @@ class InMemoryRebuildQueue implements RebuildQueueManager {
`Rebuild queue: completed ${job.type}:${job.id}`
);
} catch (err) {
logger.error(
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
err
);
const attempt = (job.attempt ?? 0) + 1;
if (isTransientError(err) && attempt <= MAX_JOB_ATTEMPTS) {
const delay =
JOB_RETRY_BASE_DELAY_MS * Math.pow(2, attempt - 1);
logger.warn(
`Rebuild queue: job ${job.type}:${job.id} hit a transient error (attempt ${attempt}/${MAX_JOB_ATTEMPTS}), re-queuing in ${delay}ms:`,
err
);
setTimeout(() => {
this.enqueue({ ...job, attempt }).catch(
(enqueueErr) =>
logger.error(
`Rebuild queue: failed to re-queue ${job.type}:${job.id} after transient error:`,
enqueueErr
)
);
}, delay);
} else {
logger.error(
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
err
);
}
}
}
} finally {

View File

@@ -14,7 +14,7 @@
import { redis } from "#private/lib/redis";
import logger from "@server/logger";
export const ORG_REBUILD_CONCURRENCY_LIMIT = 5;
export const ORG_REBUILD_CONCURRENCY_LIMIT = 10;
// Safety-net TTL: slightly longer than the rebuild lock TTL (120 s). If a
// server process dies while holding a rebuild, this ensures the counter key

View File

@@ -14,12 +14,16 @@
import { redis } from "#private/lib/redis";
import { lockManager } from "#private/lib/lock";
import logger from "@server/logger";
import { isTransientError } from "@server/lib/dbRetry";
export type RebuildJobType = "site-resource" | "client";
export interface RebuildJob {
type: RebuildJobType;
id: number;
// Number of times this job has already been re-queued after a transient
// failure. Absent/0 means it has not failed yet.
attempt?: number;
}
export interface RebuildJobHandlers {
@@ -43,6 +47,11 @@ const PROCESSOR_LOCK_TTL_MS = 120000 * BATCH_SIZE + 30000; // ~630 s
const POLL_INTERVAL_MS = 500;
// A job that fails with a transient DB error gets re-queued with backoff
// instead of being dropped, up to this many times.
const MAX_JOB_ATTEMPTS = 5;
const JOB_RETRY_BASE_DELAY_MS = 1000;
class RedisRebuildQueue {
private processingStarted = false;
@@ -180,10 +189,33 @@ class RedisRebuildQueue {
`Rebuild queue: completed ${job.type}:${job.id}`
);
} catch (err) {
logger.error(
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
err
);
const attempt = (job.attempt ?? 0) + 1;
if (
isTransientError(err) &&
attempt <= MAX_JOB_ATTEMPTS
) {
const delay =
JOB_RETRY_BASE_DELAY_MS *
Math.pow(2, attempt - 1);
logger.warn(
`Rebuild queue: job ${job.type}:${job.id} hit a transient error (attempt ${attempt}/${MAX_JOB_ATTEMPTS}), re-queuing in ${delay}ms:`,
err
);
setTimeout(() => {
this.enqueue({ ...job, attempt }).catch(
(enqueueErr) =>
logger.error(
`Rebuild queue: failed to re-queue ${job.type}:${job.id} after transient error:`,
enqueueErr
)
);
}, delay);
} else {
logger.error(
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
err
);
}
}
}
},

View File

@@ -894,6 +894,19 @@ class RegionalRedisManager {
return opts;
}
// The regional Redis StatefulSet's "redis" service pins to pod redis-0
// (primary). The replica (redis-1) is only reachable through the
// per-pod headless service: <svc>.<namespace>.svc.cluster.local ->
// redis-1.redis-headless.<namespace>.svc.cluster.local. Returns null
// if the configured host doesn't match that pattern (e.g. local dev),
// in which case callers should fall back to the primary for reads.
private getReplicaHost(primaryHost: string): string | null {
const match = primaryHost.match(/^redis\.([^.]+)\.svc\.cluster\.local$/);
if (!match) return null;
const namespace = match[1];
return `redis-1.redis-headless.${namespace}.svc.cluster.local`;
}
private initializeClients(): void {
const cfg = this.getConfig();
const baseOpts = {
@@ -907,35 +920,42 @@ class RegionalRedisManager {
try {
this.writeClient = new Redis(baseOpts);
// redis-1 (replica) handles reads; fall back to primary if not resolvable
this.readClient = new Redis({
...baseOpts,
host: cfg.host!.replace(/^(.*?)(\.\S+)$/, (_, h, rest) => {
// Derive replica hostname from the headless service pattern:
// redis.redis.svc.cluster.local -> redis-1.redis-headless.redis.svc.cluster.local
// If it doesn't look like a k8s service, just use the same host
return h + rest;
})
});
// For simplicity use same host for both; callers can always read from primary
// The real replica routing is handled by the StatefulSet headless service
this.readClient = this.writeClient;
const replicaHost = this.getReplicaHost(cfg.host!);
this.readClient = replicaHost
? new Redis({ ...baseOpts, host: replicaHost })
: this.writeClient;
this.writeClient.on("ready", () => {
logger.info("Regional Redis client ready");
logger.info("Regional Redis write client ready");
this.isHealthy = true;
});
this.writeClient.on("error", (err) => {
logger.error("Regional Redis client error:", err);
logger.error("Regional Redis write client error:", err);
this.isHealthy = false;
});
this.writeClient.on("reconnecting", () => {
logger.info("Regional Redis client reconnecting...");
logger.info("Regional Redis write client reconnecting...");
this.isHealthy = false;
});
logger.info("Regional Redis client initialized");
if (this.readClient !== this.writeClient) {
this.readClient.on("ready", () => {
logger.info("Regional Redis read client ready");
});
this.readClient.on("error", (err) => {
logger.error("Regional Redis read client error:", err);
});
this.readClient.on("reconnecting", () => {
logger.info("Regional Redis read client reconnecting...");
});
}
logger.info(
replicaHost
? `Regional Redis client initialized (reads routed to replica ${replicaHost})`
: "Regional Redis client initialized (no replica resolvable, reads routed to primary)"
);
} catch (error) {
logger.error("Failed to initialize regional Redis client:", error);
this.isEnabled = false;
@@ -1041,11 +1061,14 @@ class RegionalRedisManager {
public async disconnect(): Promise<void> {
try {
if (this.readClient && this.readClient !== this.writeClient) {
await this.readClient.quit();
}
this.readClient = null;
if (this.writeClient) {
await this.writeClient.quit();
this.writeClient = null;
}
this.readClient = null;
logger.info("Regional Redis client disconnected");
} catch (error) {
logger.error("Error disconnecting regional Redis client:", error);

View File

@@ -29,37 +29,41 @@ export async function createExitNode(
.where(eq(exitNodes.publicKey, publicKey));
let exitNode: ExitNode;
if (!exitNodeQuery) {
const address = await getNextAvailableSubnet();
// TODO: eventually we will want to get the next available port so that we can multiple exit nodes
// const listenPort = await getNextAvailablePort();
const listenPort = config.getRawConfig().gerbil.start_port;
let subEndpoint = "";
if (config.getRawConfig().gerbil.use_subdomain) {
subEndpoint = await getUniqueExitNodeEndpointName();
const { value: address, release } = await getNextAvailableSubnet();
try {
// TODO: eventually we will want to get the next available port so that we can multiple exit nodes
// const listenPort = await getNextAvailablePort();
const listenPort = config.getRawConfig().gerbil.start_port;
let subEndpoint = "";
if (config.getRawConfig().gerbil.use_subdomain) {
subEndpoint = await getUniqueExitNodeEndpointName();
}
const exitNodeName =
config.getRawConfig().gerbil.exit_node_name ||
`Exit Node ${publicKey.slice(0, 8)}`;
// create a new exit node
[exitNode] = await db
.insert(exitNodes)
.values({
publicKey,
endpoint: `${subEndpoint}${subEndpoint != "" ? "." : ""}${config.getRawConfig().gerbil.base_endpoint}`,
address,
listenPort,
online: true,
reachableAt,
name: exitNodeName
})
.returning()
.execute();
logger.info(
`Created new exit node ${exitNode.name} with address ${exitNode.address} and port ${exitNode.listenPort}`
);
} finally {
await release();
}
const exitNodeName =
config.getRawConfig().gerbil.exit_node_name ||
`Exit Node ${publicKey.slice(0, 8)}`;
// create a new exit node
[exitNode] = await db
.insert(exitNodes)
.values({
publicKey,
endpoint: `${subEndpoint}${subEndpoint != "" ? "." : ""}${config.getRawConfig().gerbil.base_endpoint}`,
address,
listenPort,
online: true,
reachableAt,
name: exitNodeName
})
.returning()
.execute();
logger.info(
`Created new exit node ${exitNode.name} with address ${exitNode.address} and port ${exitNode.listenPort}`
);
} else {
// update the reachable at
[exitNode] = await db

View File

@@ -114,8 +114,6 @@ export async function createRemoteExitNode(
}
const secretHash = await hashPassword(secret);
// const address = await getNextAvailableSubnet();
const address = "100.89.140.1/24"; // FOR NOW LETS HARDCODE THESE ADDRESSES
const [existingRemoteExitNode] = await db
.select()
@@ -191,89 +189,106 @@ export async function createRemoteExitNode(
);
}
await db.transaction(async (trx) => {
if (!existingExitNode) {
const [res] = await trx
.insert(exitNodes)
.values({
name: remoteExitNodeId,
address,
endpoint: "",
publicKey: "",
listenPort: 0,
online: false,
type: "remoteExitNode"
})
.returning();
existingExitNode = res;
}
// If this remote exit node isn't already backing an exit node in
// another org, we're about to create a brand new one. Reserve a
// subnet for it up front so the allocation lock is held across the
// whole insert - this guarantees exit node subnets never overlap,
// even under concurrent creation, which matters for HA setups.
let releaseSubnetLock: (() => Promise<void>) | null = null;
let newExitNodeAddress: string | null = null;
if (!existingExitNode) {
const { value, release } = await getNextAvailableSubnet();
newExitNodeAddress = value;
releaseSubnetLock = release;
}
if (!existingRemoteExitNode) {
await trx.insert(remoteExitNodes).values({
remoteExitNodeId: remoteExitNodeId,
secretHash,
dateCreated: moment().toISOString(),
exitNodeId: existingExitNode.exitNodeId
});
} else {
// update the existing remote exit node
await trx
.update(remoteExitNodes)
.set({
exitNodeId: existingExitNode.exitNodeId
})
.where(
eq(
remoteExitNodes.remoteExitNodeId,
existingRemoteExitNode.remoteExitNodeId
)
);
}
if (!existingExitNodeOrg) {
await trx.insert(exitNodeOrgs).values({
exitNodeId: existingExitNode.exitNodeId,
orgId: orgId
});
}
// calculate if the node is in any other of the orgs before we count it as an add to the billing org
if (org.billingOrgId) {
const otherBillingOrgs = await trx
.select()
.from(orgs)
.where(
and(
eq(orgs.billingOrgId, org.billingOrgId),
ne(orgs.orgId, orgId)
)
);
const billingOrgIds = otherBillingOrgs.map((o) => o.orgId);
const orgsInBillingDomainThatTheNodeIsStillIn = await trx
.select()
.from(exitNodeOrgs)
.where(
and(
eq(
exitNodeOrgs.exitNodeId,
existingExitNode.exitNodeId
),
inArray(exitNodeOrgs.orgId, billingOrgIds)
)
);
if (orgsInBillingDomainThatTheNodeIsStillIn.length === 0) {
await usageService.add(
orgId,
LimitId.REMOTE_EXIT_NODES,
1,
trx
);
try {
await db.transaction(async (trx) => {
if (!existingExitNode) {
const [res] = await trx
.insert(exitNodes)
.values({
name: remoteExitNodeId,
address: newExitNodeAddress!,
endpoint: "",
publicKey: "",
listenPort: 0,
online: false,
type: "remoteExitNode"
})
.returning();
existingExitNode = res;
}
}
});
if (!existingRemoteExitNode) {
await trx.insert(remoteExitNodes).values({
remoteExitNodeId: remoteExitNodeId,
secretHash,
dateCreated: moment().toISOString(),
exitNodeId: existingExitNode.exitNodeId
});
} else {
// update the existing remote exit node
await trx
.update(remoteExitNodes)
.set({
exitNodeId: existingExitNode.exitNodeId
})
.where(
eq(
remoteExitNodes.remoteExitNodeId,
existingRemoteExitNode.remoteExitNodeId
)
);
}
if (!existingExitNodeOrg) {
await trx.insert(exitNodeOrgs).values({
exitNodeId: existingExitNode.exitNodeId,
orgId: orgId
});
}
// calculate if the node is in any other of the orgs before we count it as an add to the billing org
if (org.billingOrgId) {
const otherBillingOrgs = await trx
.select()
.from(orgs)
.where(
and(
eq(orgs.billingOrgId, org.billingOrgId),
ne(orgs.orgId, orgId)
)
);
const billingOrgIds = otherBillingOrgs.map((o) => o.orgId);
const orgsInBillingDomainThatTheNodeIsStillIn = await trx
.select()
.from(exitNodeOrgs)
.where(
and(
eq(
exitNodeOrgs.exitNodeId,
existingExitNode.exitNodeId
),
inArray(exitNodeOrgs.orgId, billingOrgIds)
)
);
if (orgsInBillingDomainThatTheNodeIsStillIn.length === 0) {
await usageService.add(
orgId,
LimitId.REMOTE_EXIT_NODES,
1,
trx
);
}
}
});
} finally {
await releaseSubnetLock?.();
}
const token = generateSessionToken();
await createRemoteExitNodeSession(token, remoteExitNodeId);

View File

@@ -13,37 +13,41 @@ export async function createExitNode(
const [exitNodeQuery] = await db.select().from(exitNodes).limit(1);
let exitNode: ExitNode;
if (!exitNodeQuery) {
const address = await getNextAvailableSubnet();
// TODO: eventually we will want to get the next available port so that we can multiple exit nodes
// const listenPort = await getNextAvailablePort();
const listenPort = config.getRawConfig().gerbil.start_port;
let subEndpoint = "";
if (config.getRawConfig().gerbil.use_subdomain) {
subEndpoint = await getUniqueExitNodeEndpointName();
const { value: address, release } = await getNextAvailableSubnet();
try {
// TODO: eventually we will want to get the next available port so that we can multiple exit nodes
// const listenPort = await getNextAvailablePort();
const listenPort = config.getRawConfig().gerbil.start_port;
let subEndpoint = "";
if (config.getRawConfig().gerbil.use_subdomain) {
subEndpoint = await getUniqueExitNodeEndpointName();
}
const exitNodeName =
config.getRawConfig().gerbil.exit_node_name ||
`Exit Node ${publicKey.slice(0, 8)}`;
// create a new exit node
[exitNode] = await db
.insert(exitNodes)
.values({
publicKey,
endpoint: `${subEndpoint}${subEndpoint != "" ? "." : ""}${config.getRawConfig().gerbil.base_endpoint}`,
address,
online: true,
listenPort,
reachableAt,
name: exitNodeName
})
.returning()
.execute();
logger.info(
`Created new exit node ${exitNode.name} with address ${exitNode.address} and port ${exitNode.listenPort}`
);
} finally {
await release();
}
const exitNodeName =
config.getRawConfig().gerbil.exit_node_name ||
`Exit Node ${publicKey.slice(0, 8)}`;
// create a new exit node
[exitNode] = await db
.insert(exitNodes)
.values({
publicKey,
endpoint: `${subEndpoint}${subEndpoint != "" ? "." : ""}${config.getRawConfig().gerbil.base_endpoint}`,
address,
online: true,
listenPort,
reachableAt,
name: exitNodeName
})
.returning()
.execute();
logger.info(
`Created new exit node ${exitNode.name} with address ${exitNode.address} and port ${exitNode.listenPort}`
);
} else {
// update the existing exit node
[exitNode] = await db

View File

@@ -52,13 +52,13 @@ export async function buildClientConfigurationForNewtClient(
clientsRes
.filter((client) => {
if (!client.clients.pubKey) {
logger.warn(
logger.debug(
`Client ${client.clients.clientId} has no public key, skipping`
);
return false;
}
if (!client.clients.subnet) {
logger.warn(
logger.debug(
`Client ${client.clients.clientId} has no subnet, skipping`
);
return false;

View File

@@ -19,7 +19,7 @@ export const handleNewtDisconnectingMessage: MessageHandler = async (
}
if (!newt.siteId) {
logger.warn("Newt has no client ID!");
logger.warn("Newt has no site ID!");
return;
}
@@ -34,6 +34,12 @@ export const handleNewtDisconnectingMessage: MessageHandler = async (
.where(eq(sites.siteId, newt.siteId!))
.returning();
if (!site) {
throw new Error(
`Could not find site ${newt.siteId} to update disconnection from disconnect message`
);
}
await fireSiteOfflineAlert(
site.orgId,
site.siteId,
@@ -43,6 +49,6 @@ export const handleNewtDisconnectingMessage: MessageHandler = async (
);
});
} catch (error) {
logger.error("Error handling disconnecting message", { error });
logger.error("Error handling site disconnecting message", error);
}
};

View File

@@ -5,8 +5,21 @@ import { Newt } from "@server/db";
import { eq } from "drizzle-orm";
import logger from "@server/logger";
import { sendNewtSyncMessage } from "./sync";
import semver from "semver";
import { recordSitePing } from "./pingAccumulator";
const NEWT_SUPPORTS_SYNC_VERSION = ">=1.14.0";
const PONG = {
message: {
type: "pong",
data: {
timestamp: new Date().toISOString()
}
},
broadcast: false,
excludeSender: false
};
/**
* Handles ping messages from newt clients.
*
@@ -37,6 +50,14 @@ export const handleNewtPingMessage: MessageHandler = async (context) => {
// cross-region latency to the database.
recordSitePing(newt.siteId);
if (
newt.version &&
!semver.satisfies(newt.version, NEWT_SUPPORTS_SYNC_VERSION)
) {
// Newt does not support the sync message so not checking - stop here -
return PONG;
}
// Check config version and sync if stale.
const configVersion = await getClientConfigVersion(newt.newtId);
@@ -65,14 +86,5 @@ export const handleNewtPingMessage: MessageHandler = async (context) => {
await sendNewtSyncMessage(newt, site);
}
return {
message: {
type: "pong",
data: {
timestamp: new Date().toISOString()
}
},
broadcast: false,
excludeSender: false
};
return PONG;
};

View File

@@ -3,6 +3,7 @@ import { sites, clients, olms } from "@server/db";
import { and, eq, inArray } from "drizzle-orm";
import logger from "@server/logger";
import { fireSiteOnlineAlert } from "@server/lib/alerts";
import { withRetry } from "@server/lib/dbRetry";
/**
* Ping Accumulator
@@ -22,8 +23,6 @@ import { fireSiteOnlineAlert } from "@server/lib/alerts";
*/
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
const MAX_RETRIES = 5;
const BASE_DELAY_MS = 50;
// ── Site (newt) pings ──────────────────────────────────────────────────
// Map of siteId -> latest ping timestamp (unix seconds)
@@ -266,85 +265,7 @@ export async function flushPingsToDb(): Promise<void> {
}
// ── Retry / Error Helpers ──────────────────────────────────────────────
/**
* Simple retry wrapper with exponential backoff for transient errors
* (deadlocks, connection timeouts, unexpected disconnects).
*
* PostgreSQL deadlocks (40P01) are always safe to retry: the database
* guarantees exactly one winner per deadlock pair, so the loser just needs
* to try again. MAX_RETRIES is intentionally higher than typical connection
* retry budgets to give deadlock victims enough chances to succeed.
*/
async function withRetry<T>(
operation: () => Promise<T>,
context: string
): Promise<T> {
let attempt = 0;
while (true) {
try {
return await operation();
} catch (error: any) {
if (isTransientError(error) && attempt < MAX_RETRIES) {
attempt++;
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
const jitter = Math.random() * baseDelay;
const delay = baseDelay + jitter;
logger.warn(
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`,
{ code: error?.code ?? error?.cause?.code }
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
}
/**
* Detect transient errors that are safe to retry.
*/
function isTransientError(error: any): boolean {
if (!error) return false;
const message = (error.message || "").toLowerCase();
const causeMessage = (error.cause?.message || "").toLowerCase();
const code = error.code || error.cause?.code || "";
// Connection timeout / terminated
if (
message.includes("connection timeout") ||
message.includes("connection terminated") ||
message.includes("timeout exceeded when trying to connect") ||
causeMessage.includes("connection terminated unexpectedly") ||
causeMessage.includes("connection timeout")
) {
return true;
}
// PostgreSQL deadlock detected - always safe to retry (one winner guaranteed)
if (code === "40P01" || message.includes("deadlock")) {
return true;
}
// PostgreSQL serialization failure
if (code === "40001") {
return true;
}
// ECONNRESET, ECONNREFUSED, EPIPE, ETIMEDOUT
if (
code === "ECONNRESET" ||
code === "ECONNREFUSED" ||
code === "EPIPE" ||
code === "ETIMEDOUT"
) {
return true;
}
return false;
}
// See @server/lib/dbRetry for the shared withRetry/isTransientError helpers.
// ── Lifecycle ──────────────────────────────────────────────────────────

View File

@@ -9,45 +9,45 @@ import {
import { canCompress } from "@server/lib/clientVersionChecks";
export async function sendNewtSyncMessage(newt: Newt, site: Site) {
// const {
// tcpTargets,
// udpTargets,
// validHealthCheckTargets,
// browserGatewayTargets,
// remoteExitNodeSubnets
// } = await buildTargetConfigurationForNewtClient(site.siteId);
// let exitNode: ExitNode | undefined;
// if (site.exitNodeId) {
// [exitNode] = await db
// .select()
// .from(exitNodes)
// .where(eq(exitNodes.exitNodeId, site.exitNodeId))
// .limit(1);
// }
// const { peers, targets } = await buildClientConfigurationForNewtClient(
// site,
// exitNode
// );
// await sendToClient(
// newt.newtId,
// {
// type: "newt/sync",
// data: {
// proxyTargets: {
// udp: udpTargets,
// tcp: tcpTargets
// },
// healthCheckTargets: validHealthCheckTargets,
// peers: peers,
// clientTargets: targets,
// browserGatewayTargets: browserGatewayTargets,
// remoteExitNodeSubnets: remoteExitNodeSubnets
// }
// },
// {
// compress: canCompress(newt.version, "newt")
// }
// ).catch((error) => {
// logger.warn(`Error sending newt sync message:`, error);
// });
const {
tcpTargets,
udpTargets,
validHealthCheckTargets,
browserGatewayTargets,
remoteExitNodeSubnets
} = await buildTargetConfigurationForNewtClient(site.siteId);
let exitNode: ExitNode | undefined;
if (site.exitNodeId) {
[exitNode] = await db
.select()
.from(exitNodes)
.where(eq(exitNodes.exitNodeId, site.exitNodeId))
.limit(1);
}
const { peers, targets } = await buildClientConfigurationForNewtClient(
site,
exitNode
);
await sendToClient(
newt.newtId,
{
type: "newt/sync",
data: {
proxyTargets: {
udp: udpTargets,
tcp: tcpTargets
},
healthCheckTargets: validHealthCheckTargets,
peers: peers,
clientTargets: targets,
browserGatewayTargets: browserGatewayTargets,
remoteExitNodeSubnets: remoteExitNodeSubnets
}
},
{
compress: canCompress(newt.version, "newt")
}
).catch((error) => {
logger.warn(`Error sending newt sync message:`, error);
});
}

View File

@@ -161,7 +161,7 @@ export async function buildSiteConfigurationForOlmClient(
}
if (!site.subnet) {
logger.warn(`Site ${site.siteId} has no subnet, skipping`);
logger.debug(`Site ${site.siteId} has no subnet, skipping`);
continue;
}

View File

@@ -6,7 +6,9 @@ import logger from "@server/logger";
/**
* Handles disconnecting messages from clients to show disconnected in the ui
*/
export const handleOlmDisconnectingMessage: MessageHandler = async (context) => {
export const handleOlmDisconnectingMessage: MessageHandler = async (
context
) => {
const { message, client: c, sendToClient } = context;
const olm = c as Olm;
@@ -29,6 +31,6 @@ export const handleOlmDisconnectingMessage: MessageHandler = async (context) =>
})
.where(eq(clients.clientId, olm.clientId));
} catch (error) {
logger.error("Error handling disconnecting message", { error });
logger.error("Error handling client disconnecting message", error);
}
};

View File

@@ -616,8 +616,8 @@ export async function listResources(
and(
inArray(resources.mode, browserGatewayModes),
or(
eq(effectiveSso, true),
eq(effectiveWhitelist, true),
effectiveSso,
effectiveWhitelist,
not(isNull(effectiveHeaderAuthId)),
not(isNull(effectivePincodeId)),
not(isNull(effectivePasswordId))
@@ -629,8 +629,8 @@ export async function listResources(
conditions.push(
and(
inArray(resources.mode, browserGatewayModes),
not(eq(effectiveSso, true)),
not(eq(effectiveWhitelist, true)),
not(effectiveSso),
not(effectiveWhitelist),
isNull(effectiveHeaderAuthId),
isNull(effectivePincodeId),
isNull(effectivePasswordId)

View File

@@ -268,7 +268,11 @@ export async function createSite(
let newSite: Site | undefined;
try {
if (subnet && exitNodeId) {
if (type === "wireguard" && subnet && exitNodeId) {
// Only wireguard sites actually persist the provided subnet/exitNodeId.
// Newt sites have their subnet/exit node chosen (under a lock) when the
// newt connects, so validating them here is both unnecessary and racy,
// since pickSiteDefaults does not lock the subnet it suggests.
//make sure the subnet is in the range of the exit node if provided
const [exitNode] = await db
.select()

View File

@@ -290,7 +290,13 @@ export default function BillingPage() {
setHasSubscription(
tierSub.subscription.status === "active"
);
setIsTrial(tierSub.subscription.expiresAt != null);
// expiresAt is only meaningful while the trial hasn't
// actually run out yet; a stale row with a past
// expiresAt should no longer be treated as a live trial
const expiresAt = tierSub.subscription.expiresAt;
setIsTrial(
expiresAt != null && expiresAt * 1000 > Date.now()
);
}
// Find license subscription
@@ -1057,21 +1063,23 @@ export default function BillingPage() {
</SettingsSectionDescription>
</SettingsSectionHeader>
<SettingsSectionBody>
<div className="grid grid-cols-1 md:grid-cols-2 gap-6">
<div className="grid grid-cols-1 md:grid-cols-4 gap-6">
{/* Current Usage */}
<div className="border rounded-lg p-4">
<div className="border rounded-lg p-4 md:col-span-1">
<div className="text-sm text-muted-foreground mb-2">
{t("billingCurrentUsage") || "Current Usage"}
</div>
<div className="flex items-baseline gap-2">
<span className="text-3xl font-semibold">
{getUserCount()}
</span>
<span className="text-lg">
{t("billingUsers") || "Users"}
</span>
<div className="flex flex-col items-start gap-1">
<div className="flex items-baseline gap-2">
<span className="text-3xl font-semibold">
{getUserCount()}
</span>
<span className="text-lg">
{t("billingUsers") || "Users"}
</span>
</div>
{hasSubscription && getPricePerUser() > 0 && (
<div className="text-sm text-muted-foreground mt-1">
<div className="text-sm text-muted-foreground">
x ${getPricePerUser()} / month = $
{getUserCount() * getPricePerUser()} /
month
@@ -1081,7 +1089,7 @@ export default function BillingPage() {
</div>
{/* Maximum Limits */}
<div className="border rounded-lg p-4">
<div className="border rounded-lg p-4 md:col-span-3">
<div className="text-sm text-muted-foreground mb-3">
{t("billingMaximumLimits") || "Maximum Limits"}
</div>

View File

@@ -41,7 +41,7 @@ import {
import { AxiosResponse } from "axios";
import { useTranslations } from "next-intl";
import { useParams, useRouter } from "next/navigation";
import { useActionState } from "react";
import { useActionState, useState } from "react";
import { useForm } from "react-hook-form";
import { z } from "zod";
@@ -137,11 +137,21 @@ function ProxyResourceHttpForm({
});
const [, formAction, saveLoading] = useActionState(onSubmit, null);
const [headersValid, setHeadersValid] = useState(true);
async function onSubmit() {
const isValid = await form.trigger();
if (!isValid) return;
if (!headersValid) {
toast({
variant: "destructive",
title: t("settingsErrorUpdate"),
description: t("headersValidationError")
});
return;
}
const data = form.getValues();
const res = await api
@@ -318,6 +328,9 @@ function ProxyResourceHttpForm({
onChange={
field.onChange
}
onValidityChange={
setHeadersValid
}
rows={4}
/>
</FormControl>
@@ -341,7 +354,7 @@ function ProxyResourceHttpForm({
<Button
type="submit"
loading={saveLoading}
disabled={saveLoading}
disabled={saveLoading || !headersValid}
form="http-settings-form"
>
{t("saveSettings")}

View File

@@ -2,24 +2,36 @@
import { useEffect, useState, useRef } from "react";
import { Textarea } from "@/components/ui/textarea";
import { useTranslations } from "next-intl";
interface HeadersInputProps {
value?: { name: string; value: string }[] | null;
onChange: (value: { name: string; value: string }[] | null) => void;
onValidityChange?: (isValid: boolean) => void;
placeholder?: string;
rows?: number;
className?: string;
}
// Mirrors the server side validation in updateResource.ts so that invalid
// input is caught (and shown to the user) before it is ever submitted,
// instead of being silently dropped in favor of the last known good value.
const validHeaderNamePattern = /^[a-zA-Z0-9!#$%&'*+\-.^_`|~]+$/;
const validHeaderValuePattern = /^[\t\x20-\x7E]*$/;
const templatePattern = /\{\{[^}]+\}\}/;
export function HeadersInput({
value = [],
onChange,
onValidityChange,
placeholder = `X-Example-Header: example-value
X-Another-Header: another-value`,
rows = 4,
className
}: HeadersInputProps) {
const t = useTranslations();
const [internalValue, setInternalValue] = useState("");
const [error, setError] = useState<string | null>(null);
const textareaRef = useRef<HTMLTextAreaElement>(null);
const isUserEditingRef = useRef(false);
@@ -34,37 +46,56 @@ X-Another-Header: another-value`,
.join("\n");
};
// Convert newline-separated string to header objects array
const convertToHeadersArray = (
// Parse newline-separated text into header objects, validating each line
// against the same rules enforced by the server. Returns either the
// parsed headers or an error message describing the first invalid line.
const parseHeaders = (
newlineSeparated: string
): { name: string; value: string }[] | null => {
if (!newlineSeparated || newlineSeparated.trim() === "") return [];
):
| { headers: { name: string; value: string }[]; error: null }
| { headers: null; error: string } => {
if (!newlineSeparated || newlineSeparated.trim() === "") {
return { headers: [], error: null };
}
return newlineSeparated
const lines = newlineSeparated
.split("\n")
.map((line) => line.trim())
.filter((line) => line.length > 0 && line.includes(":"))
.map((line) => {
const colonIndex = line.indexOf(":");
const name = line.substring(0, colonIndex).trim();
const value = line.substring(colonIndex + 1).trim();
.filter((line) => line.length > 0);
// Ensure header name conforms to HTTP header requirements
// Header names should be case-insensitive, contain only ASCII letters, digits, and hyphens
const normalizedName = name
.replace(/[^a-zA-Z0-9\-]/g, "")
.toLowerCase();
const headers: { name: string; value: string }[] = [];
return { name: normalizedName, value };
})
.filter((header) => header.name.length > 0); // Filter out headers with invalid names
for (const line of lines) {
const colonIndex = line.indexOf(":");
if (colonIndex === -1) {
return { headers: null, error: t("headersValidationError") };
}
const name = line.substring(0, colonIndex).trim();
const value = line.substring(colonIndex + 1).trim();
if (
!validHeaderNamePattern.test(name) ||
!validHeaderValuePattern.test(value) ||
templatePattern.test(name) ||
templatePattern.test(value)
) {
return { headers: null, error: t("headersValidationError") };
}
headers.push({ name, value });
}
return { headers, error: null };
};
// Update internal value when external value changes
// But only if the user is not currently editing (textarea not focused)
useEffect(() => {
if (!isUserEditingRef.current) {
setInternalValue(convertToNewlineSeparated(value));
setInternalValue(convertToNewlineSeparated(value ?? []));
setError(null);
onValidityChange?.(true);
}
}, [value]);
@@ -75,31 +106,20 @@ X-Another-Header: another-value`,
// Mark that user is actively editing
isUserEditingRef.current = true;
// Only update parent if the input is in a valid state
// Valid states: empty/whitespace only, or contains properly formatted headers
const result = parseHeaders(newValue);
if (newValue.trim() === "") {
// Empty input is valid - represents no headers
onChange([]);
} else {
// Check if all non-empty lines are properly formatted (contain ':')
const lines = newValue.split("\n");
const nonEmptyLines = lines
.map((line) => line.trim())
.filter((line) => line.length > 0);
// If there are no non-empty lines, or all non-empty lines contain ':', it's valid
const isValid =
nonEmptyLines.length === 0 ||
nonEmptyLines.every((line) => line.includes(":"));
if (isValid) {
// Safe to convert and update parent
const headersArray = convertToHeadersArray(newValue);
onChange(headersArray);
}
// If not valid, don't call onChange - let user continue typing
if (result.error) {
// Surface the error and do not touch the last known good value.
// Silently dropping the update here (without telling the user)
// is what previously let stale data get saved without warning.
setError(result.error);
onValidityChange?.(false);
return;
}
setError(null);
onValidityChange?.(true);
onChange(result.headers);
};
const handleFocus = () => {
@@ -114,15 +134,20 @@ X-Another-Header: another-value`,
};
return (
<Textarea
ref={textareaRef}
value={internalValue}
onChange={handleChange}
onFocus={handleFocus}
onBlur={handleBlur}
placeholder={placeholder}
rows={rows}
className={className}
/>
<div>
<Textarea
ref={textareaRef}
value={internalValue}
onChange={handleChange}
onFocus={handleFocus}
onBlur={handleBlur}
placeholder={placeholder}
rows={rows}
className={className}
/>
{error && (
<p className="text-sm text-destructive mt-1.5">{error}</p>
)}
</div>
);
}