Compare commits

...

18 Commits

Author SHA1 Message Date
Owen Schwartz
d4c52bbf2f Merge pull request #3396 from fosrl/dev
1.19.4-s.5
2026-07-03 16:15:18 -04:00
Owen
390c822bb4 Fix issue with overlapping site resources not sending 2026-07-03 16:13:50 -04:00
Owen
2bfc1901a6 Dont check the subnet because we dont use it 2026-07-03 14:38:01 -04:00
Owen
5da186b528 Quiet warning messages 2026-07-03 11:58:00 -04:00
Owen
600a96c13b Update rate limit to 10 2026-07-03 11:52:59 -04:00
Owen
e4e0da3723 Add not exists check 2026-07-03 11:24:53 -04:00
Owen Schwartz
87f50bf0cc Merge pull request #3389 from fosrl/dev
1.19.4-s.3
2026-07-03 10:29:15 -04:00
Owen
440ebfe08e Clarify error messages 2026-07-03 10:26:13 -04:00
Owen
b399d2a291 Add some retry and database confict mitigation 2026-07-03 10:23:32 -04:00
Owen Schwartz
2c66da1b19 Merge pull request #3386 from v1rusnl/main
Upgrade Traefik image to version 3.7
2026-07-03 10:18:44 -04:00
Owen
1b1fba60f1 Make sure to retry rebuilds 2026-07-03 09:02:08 -04:00
v1rusnl
ab19955502 Upgrade Traefik image to version 3.7 2026-07-03 08:22:13 +02:00
v1rusnl
1db9dcec81 Update Traefik image version to v3.7 2026-07-03 08:21:12 +02:00
Owen
b93d26f09f Fix reading from replicas 2026-07-02 21:46:53 -04:00
Owen
fc54ad49b5 Fix trial showing 2026-07-02 21:46:44 -04:00
Owen
f87e136f6b Unique subnets for exit nodes 2026-07-02 20:54:59 -04:00
Owen Schwartz
49c2d3163e Merge pull request #3381 from fosrl/dev
dev
2026-07-02 10:56:39 -04:00
Owen Schwartz
45b9e13a13 Merge pull request #3378 from fosrl/dev
1.19.4-s.1
2026-07-01 21:48:01 -04:00
19 changed files with 639 additions and 413 deletions

View File

@@ -41,7 +41,7 @@ services:
- 80:80 # Port for traefik because of the network_mode
traefik:
image: traefik:v3.6
image: traefik:v3.7
container_name: traefik
restart: unless-stopped
network_mode: service:gerbil # Ports appear on the gerbil service

View File

@@ -50,7 +50,7 @@ services:
- 80:80{{end}}
traefik:
image: docker.io/traefik:v3.6
image: docker.io/traefik:v3.7
container_name: traefik
restart: unless-stopped
{{if .InstallGerbil}}network_mode: service:gerbil # Ports appear on the gerbil service{{end}}{{if not .InstallGerbil}}

83
server/lib/dbRetry.ts Normal file
View File

@@ -0,0 +1,83 @@
import logger from "@server/logger";
const MAX_RETRIES = 5;
const BASE_DELAY_MS = 50;
/**
* Detect transient errors that are safe to retry (connection drops, deadlocks,
* serialization failures). PostgreSQL deadlocks (40P01) are always safe to
* retry: the database guarantees exactly one winner per deadlock pair, so the
* loser just needs to try again.
*/
export function isTransientError(error: any): boolean {
if (!error) return false;
const message = (error.message || "").toLowerCase();
const causeMessage = (error.cause?.message || "").toLowerCase();
const code = error.code || error.cause?.code || "";
// Connection timeout / terminated
if (
message.includes("connection timeout") ||
message.includes("connection terminated") ||
message.includes("timeout exceeded when trying to connect") ||
causeMessage.includes("connection terminated unexpectedly") ||
causeMessage.includes("connection timeout")
) {
return true;
}
// PostgreSQL deadlock detected - always safe to retry (one winner guaranteed)
if (code === "40P01" || message.includes("deadlock")) {
return true;
}
// PostgreSQL serialization failure
if (code === "40001") {
return true;
}
// ECONNRESET, ECONNREFUSED, EPIPE, ETIMEDOUT
if (
code === "ECONNRESET" ||
code === "ECONNREFUSED" ||
code === "EPIPE" ||
code === "ETIMEDOUT"
) {
return true;
}
return false;
}
/**
* Simple retry wrapper with exponential backoff for transient errors
* (deadlocks, connection timeouts, unexpected disconnects).
*/
export async function withRetry<T>(
operation: () => Promise<T>,
context: string,
maxRetries: number = MAX_RETRIES,
baseDelayMs: number = BASE_DELAY_MS
): Promise<T> {
let attempt = 0;
while (true) {
try {
return await operation();
} catch (error: any) {
if (isTransientError(error) && attempt < maxRetries) {
attempt++;
const baseDelay = Math.pow(2, attempt - 1) * baseDelayMs;
const jitter = Math.random() * baseDelay;
const delay = baseDelay + jitter;
logger.warn(
`Transient DB error in ${context}, retrying attempt ${attempt}/${maxRetries} after ${delay.toFixed(0)}ms`,
{ code: error?.code ?? error?.cause?.code }
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
}

View File

@@ -1,30 +1,55 @@
import { db, exitNodes } from "@server/db";
import { db, exitNodes, Transaction } from "@server/db";
import config from "@server/lib/config";
import { findNextAvailableCidr } from "@server/lib/ip";
import { lockManager } from "#dynamic/lib/lock";
export async function getNextAvailableSubnet(): Promise<string> {
// Get all existing subnets from routes table
const existingAddresses = await db
.select({
address: exitNodes.address
})
.from(exitNodes);
const addresses = existingAddresses.map((a) => a.address);
let subnet = findNextAvailableCidr(
addresses,
config.getRawConfig().gerbil.block_size,
config.getRawConfig().gerbil.subnet_group
);
if (!subnet) {
throw new Error("No available subnets remaining in space");
/**
* Reserves the next available exit node subnet.
*
* Exit node subnets must never overlap with one another - regardless of
* which org(s) they belong to - since HA exit nodes can end up routing for
* the same org. This acquires a lock that the caller MUST release (via the
* returned `release`) only after the chosen address has been durably
* persisted (e.g. after the enclosing transaction commits), otherwise
* concurrent callers can race and pick the same subnet.
*/
export async function getNextAvailableSubnet(
trx: Transaction | typeof db = db
): Promise<{ value: string; release: () => Promise<void> }> {
const lockKey = "exit-node-subnet-allocation";
const acquired = await lockManager.acquireLockWithRetry(lockKey, 6000);
if (!acquired) {
throw new Error(`Failed to acquire lock: ${lockKey}`);
}
const release = () => lockManager.releaseLock(lockKey, acquired);
// replace the last octet with 1
subnet =
subnet.split(".").slice(0, 3).join(".") +
".1" +
"/" +
subnet.split("/")[1];
return subnet;
try {
// Get all existing subnets from routes table
const existingAddresses = await trx
.select({
address: exitNodes.address
})
.from(exitNodes);
const addresses = existingAddresses.map((a) => a.address);
let subnet = findNextAvailableCidr(
addresses,
config.getRawConfig().gerbil.block_size,
config.getRawConfig().gerbil.subnet_group
);
if (!subnet) {
throw new Error("No available subnets remaining in space");
}
// replace the last octet with 1
subnet =
subnet.split(".").slice(0, 3).join(".") +
".1" +
"/" +
subnet.split("/")[1];
return { value: subnet, release };
} catch (e) {
await release();
throw e;
}
}

View File

@@ -45,6 +45,7 @@ import {
} from "@server/routers/client/targets";
import { lockManager } from "#dynamic/lib/lock";
import { rebuildQueue } from "#dynamic/lib/rebuildQueue";
import { withRetry, isTransientError } from "@server/lib/dbRetry";
import {
checkOrgRebuildRateLimit,
decrementOrgRebuildCount,
@@ -285,10 +286,20 @@ export async function rebuildClientAssociationsFromSiteResource(
) {
await incrementOrgRebuildCount(siteResource.orgId);
try {
return await lockManager.withLock(
`rebuild-client-associations:site-resource:${siteResource.siteResourceId}`,
() => rebuildClientAssociationsFromSiteResourceImpl(siteResource),
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
// The whole locked rebuild is idempotent (it diffs full expected vs.
// actual state each time), so on a transient DB error it's safe to
// retry the entire thing rather than just the failed query.
return await withRetry(
() =>
lockManager.withLock(
`rebuild-client-associations:site-resource:${siteResource.siteResourceId}`,
() =>
rebuildClientAssociationsFromSiteResourceImpl(
siteResource
),
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
),
`rebuildClientAssociationsFromSiteResource:${siteResource.siteResourceId}`
);
} catch (err: any) {
if (
@@ -304,6 +315,17 @@ export async function rebuildClientAssociationsFromSiteResource(
});
return { mergedAllClients: [] };
}
if (isTransientError(err)) {
logger.warn(
`rebuildClientAssociations: transient DB error rebuilding site resource ${siteResource.siteResourceId} persisted after retries, queuing for deferred processing:`,
err
);
await rebuildQueue.enqueue({
type: "site-resource",
id: siteResource.siteResourceId
});
return { mergedAllClients: [] };
}
throw err;
} finally {
await decrementOrgRebuildCount(siteResource.orgId);
@@ -463,6 +485,7 @@ async function rebuildClientAssociationsFromSiteResourceImpl(
await trx
.insert(clientSiteResourcesAssociationsCache)
.values(clientSiteResourcesToInsert)
.onConflictDoNothing()
.returning();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteResourceId=${siteResource.siteResourceId} inserted clientSiteResource associations`
@@ -510,121 +533,148 @@ async function rebuildClientAssociationsFromSiteResourceImpl(
for (const site of sitesToProcess) {
const siteId = site.siteId;
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] processing siteId=${siteId} for siteResourceId=${siteResource.siteResourceId}`
);
try {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] processing siteId=${siteId} for siteResourceId=${siteResource.siteResourceId}`
);
const existingClientSites = await trx
.select({
clientId: clientSitesAssociationsCache.clientId
})
.from(clientSitesAssociationsCache)
.where(eq(clientSitesAssociationsCache.siteId, siteId));
const existingClientSites = await trx
.select({
clientId: clientSitesAssociationsCache.clientId
})
.from(clientSitesAssociationsCache)
.where(eq(clientSitesAssociationsCache.siteId, siteId));
const existingClientSiteIds = existingClientSites.map(
(row) => row.clientId
);
const existingClientSiteIds = existingClientSites.map(
(row) => row.clientId
);
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} existingClientSiteIds=[${existingClientSiteIds.join(", ")}]`
);
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} existingClientSiteIds=[${existingClientSiteIds.join(", ")}]`
);
// Get full client details for existing clients (needed for sending delete messages)
const existingClients =
existingClientSiteIds.length > 0
? await trx
.select({
clientId: clients.clientId,
pubKey: clients.pubKey,
subnet: clients.subnet
})
.from(clients)
.where(inArray(clients.clientId, existingClientSiteIds))
// Get full client details for existing clients (needed for sending delete messages)
const existingClients =
existingClientSiteIds.length > 0
? await trx
.select({
clientId: clients.clientId,
pubKey: clients.pubKey,
subnet: clients.subnet
})
.from(clients)
.where(
inArray(clients.clientId, existingClientSiteIds)
)
: [];
const otherResourceClientIds =
clientsFromOtherResourcesBySite.get(siteId) ??
new Set<number>();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} otherResourceClientIds=[${[...otherResourceClientIds].join(", ")}] mergedAllClientIds=[${mergedAllClientIds.join(", ")}]`
);
// Expected clients from this resource are site-scoped: if this site is
// no longer attached to the resource, the expected set is empty.
const expectedClientIdsForSite = currentSiteIdSet.has(siteId)
? mergedAllClientIds
: [];
const otherResourceClientIds =
clientsFromOtherResourcesBySite.get(siteId) ?? new Set<number>();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} otherResourceClientIds=[${[...otherResourceClientIds].join(", ")}] mergedAllClientIds=[${mergedAllClientIds.join(", ")}]`
);
// Expected clients from this resource are site-scoped: if this site is
// no longer attached to the resource, the expected set is empty.
const expectedClientIdsForSite = currentSiteIdSet.has(siteId)
? mergedAllClientIds
: [];
const clientSitesToAdd = expectedClientIdsForSite.filter(
(clientId) =>
!existingClientSiteIds.includes(clientId) &&
!otherResourceClientIds.has(clientId) // dont add if already connected via another site resource
);
const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({
clientId,
siteId
}));
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toAdd=[${clientSitesToAdd.join(", ")}]`
);
if (clientSitesToInsert.length > 0) {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserting ${clientSitesToInsert.length} clientSite association(s)`
// Note: we deliberately do NOT exclude clients covered by another
// site resource here (unlike clientSitesToRemove below). Doing so
// previously caused a permanent gap: if resource A saw resource B's
// cache row and skipped adding (assuming B would maintain it), and
// B's own rebuild made the same assumption about A, the site-level
// row could end up never inserted by anyone even though both
// resources' client associations were otherwise correct.
// onConflictDoNothing makes a redundant insert harmless, so there's
// no correctness reason to skip here.
const clientSitesToAdd = expectedClientIdsForSite.filter(
(clientId) => !existingClientSiteIds.includes(clientId)
);
await trx
.insert(clientSitesAssociationsCache)
.values(clientSitesToInsert)
.returning();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserted clientSite associations`
);
} else {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} no clientSite associations to insert`
);
}
// Now remove any client-site associations that should no longer exist
const clientSitesToRemove = existingClientSiteIds.filter(
(clientId) =>
!expectedClientIdsForSite.includes(clientId) &&
!otherResourceClientIds.has(clientId) // dont remove if there is still another connection for another site resource
);
const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({
clientId,
siteId
}));
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toRemove=[${clientSitesToRemove.join(", ")}]`
);
if (clientSitesToRemove.length > 0) {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} deleting ${clientSitesToRemove.length} clientSite association(s)`
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toAdd=[${clientSitesToAdd.join(", ")}]`
);
await trx
.delete(clientSitesAssociationsCache)
.where(
and(
eq(clientSitesAssociationsCache.siteId, siteId),
inArray(
clientSitesAssociationsCache.clientId,
clientSitesToRemove
)
)
if (clientSitesToInsert.length > 0) {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserting ${clientSitesToInsert.length} clientSite association(s)`
);
}
await trx
.insert(clientSitesAssociationsCache)
.values(clientSitesToInsert)
.onConflictDoNothing()
.returning();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserted clientSite associations`
);
} else {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} no clientSite associations to insert`
);
}
// Now handle the messages to add/remove peers on both the newt and olm sides
await handleMessagesForSiteClients(
site,
siteId,
mergedAllClients,
existingClients,
clientSitesToAdd,
clientSitesToRemove,
trx
);
// Now remove any client-site associations that should no longer exist
const clientSitesToRemove = existingClientSiteIds.filter(
(clientId) =>
!expectedClientIdsForSite.includes(clientId) &&
!otherResourceClientIds.has(clientId) // dont remove if there is still another connection for another site resource
);
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toRemove=[${clientSitesToRemove.join(", ")}]`
);
if (clientSitesToRemove.length > 0) {
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} deleting ${clientSitesToRemove.length} clientSite association(s)`
);
await trx
.delete(clientSitesAssociationsCache)
.where(
and(
eq(clientSitesAssociationsCache.siteId, siteId),
inArray(
clientSitesAssociationsCache.clientId,
clientSitesToRemove
)
)
);
}
// Now handle the messages to add/remove peers on both the newt and olm sides
await handleMessagesForSiteClients(
site,
siteId,
mergedAllClients,
existingClients,
clientSitesToAdd,
clientSitesToRemove,
trx
);
} catch (err) {
// Don't let a failure on one site abort processing of every
// other site queued after it in this run. Since we're not
// re-throwing, the outer wrapper's retry/requeue logic never
// sees this failure, so explicitly queue this resource for a
// follow-up pass to reconcile whatever this site didn't get to.
logger.error(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} failed while processing site for siteResourceId=${siteResource.siteResourceId}, continuing with remaining sites and queuing a follow-up pass:`,
err
);
await rebuildQueue.enqueue({
type: "site-resource",
id: siteResource.siteResourceId
});
}
}
// Handle subnet proxy target updates for the resource associations
@@ -657,7 +707,7 @@ async function handleMessagesForSiteClients(
trx: Transaction | typeof db = db
): Promise<void> {
if (!site.exitNodeId) {
logger.warn(
logger.debug(
`Exit node ID not on site ${site.siteId} so there is no reason to update clients because it must be offline`
);
return;
@@ -671,14 +721,14 @@ async function handleMessagesForSiteClients(
.limit(1);
if (!exitNode) {
logger.warn(
logger.debug(
`Exit node not found for site ${site.siteId} so there is no reason to update clients because it must be offline`
);
return;
}
if (!site.publicKey) {
logger.warn(
logger.debug(
`Site publicKey not set for site ${site.siteId} so cannot add peers to clients`
);
return;
@@ -692,7 +742,7 @@ async function handleMessagesForSiteClients(
.where(eq(newts.siteId, siteId))
.limit(1);
if (!newt) {
logger.warn(
logger.debug(
`Newt not found for site ${siteId} so cannot add peers to clients`
);
return;
@@ -917,7 +967,7 @@ export async function updateClientSiteDestinations(
for (const site of sitesData) {
if (!site.sites.subnet) {
logger.warn(`Site ${site.sites.siteId} has no subnet, skipping`);
logger.debug(`Site ${site.sites.siteId} has no subnet, skipping`);
continue;
}
@@ -1656,10 +1706,17 @@ export async function rebuildClientAssociationsFromClient(
await incrementOrgRebuildCount(client.orgId);
try {
const trx = primaryDb;
return await lockManager.withLock(
`rebuild-client-associations:client:${client.clientId}`,
() => rebuildClientAssociationsFromClientImpl(client, trx),
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
// The whole locked rebuild is idempotent (it diffs full expected vs.
// actual state each time), so on a transient DB error it's safe to
// retry the entire thing rather than just the failed query.
return await withRetry(
() =>
lockManager.withLock(
`rebuild-client-associations:client:${client.clientId}`,
() => rebuildClientAssociationsFromClientImpl(client, trx),
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
),
`rebuildClientAssociationsFromClient:${client.clientId}`
);
} catch (err: any) {
if (
@@ -1675,6 +1732,17 @@ export async function rebuildClientAssociationsFromClient(
});
return;
}
if (isTransientError(err)) {
logger.warn(
`rebuildClientAssociations: transient DB error rebuilding client ${client.clientId} persisted after retries, queuing for deferred processing:`,
err
);
await rebuildQueue.enqueue({
type: "client",
id: client.clientId
});
return;
}
throw err;
} finally {
await decrementOrgRebuildCount(client.orgId);
@@ -1826,12 +1894,15 @@ async function rebuildClientAssociationsFromClientImpl(
// Insert new associations
if (resourcesToAdd.length > 0) {
await trx.insert(clientSiteResourcesAssociationsCache).values(
resourcesToAdd.map((siteResourceId) => ({
clientId: client.clientId,
siteResourceId
}))
);
await trx
.insert(clientSiteResourcesAssociationsCache)
.values(
resourcesToAdd.map((siteResourceId) => ({
clientId: client.clientId,
siteResourceId
}))
)
.onConflictDoNothing();
}
// Remove old associations
@@ -1869,12 +1940,15 @@ async function rebuildClientAssociationsFromClientImpl(
// Insert new site associations
if (sitesToAdd.length > 0) {
await trx.insert(clientSitesAssociationsCache).values(
sitesToAdd.map((siteId) => ({
clientId: client.clientId,
siteId
}))
);
await trx
.insert(clientSitesAssociationsCache)
.values(
sitesToAdd.map((siteId) => ({
clientId: client.clientId,
siteId
}))
)
.onConflictDoNothing();
}
// Remove old site associations

View File

@@ -1,10 +1,14 @@
import logger from "@server/logger";
import { isTransientError } from "@server/lib/dbRetry";
export type RebuildJobType = "site-resource" | "client";
export interface RebuildJob {
type: RebuildJobType;
id: number;
// Number of times this job has already been re-queued after a transient
// failure. Absent/0 means it has not failed yet.
attempt?: number;
}
export interface RebuildJobHandlers {
@@ -24,6 +28,10 @@ export interface RebuildQueueManager {
// retried shortly after against fresh DB state.
const POLL_INTERVAL_MS = 500;
const BATCH_SIZE = 5;
// A job that fails with a transient DB error gets re-queued with backoff
// instead of being dropped, up to this many times.
const MAX_JOB_ATTEMPTS = 5;
const JOB_RETRY_BASE_DELAY_MS = 1000;
function dedupeKey(job: RebuildJob): string {
return `${job.type}:${job.id}`;
@@ -106,10 +114,29 @@ class InMemoryRebuildQueue implements RebuildQueueManager {
`Rebuild queue: completed ${job.type}:${job.id}`
);
} catch (err) {
logger.error(
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
err
);
const attempt = (job.attempt ?? 0) + 1;
if (isTransientError(err) && attempt <= MAX_JOB_ATTEMPTS) {
const delay =
JOB_RETRY_BASE_DELAY_MS * Math.pow(2, attempt - 1);
logger.warn(
`Rebuild queue: job ${job.type}:${job.id} hit a transient error (attempt ${attempt}/${MAX_JOB_ATTEMPTS}), re-queuing in ${delay}ms:`,
err
);
setTimeout(() => {
this.enqueue({ ...job, attempt }).catch(
(enqueueErr) =>
logger.error(
`Rebuild queue: failed to re-queue ${job.type}:${job.id} after transient error:`,
enqueueErr
)
);
}, delay);
} else {
logger.error(
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
err
);
}
}
}
} finally {

View File

@@ -14,7 +14,7 @@
import { redis } from "#private/lib/redis";
import logger from "@server/logger";
export const ORG_REBUILD_CONCURRENCY_LIMIT = 5;
export const ORG_REBUILD_CONCURRENCY_LIMIT = 10;
// Safety-net TTL: slightly longer than the rebuild lock TTL (120 s). If a
// server process dies while holding a rebuild, this ensures the counter key

View File

@@ -14,12 +14,16 @@
import { redis } from "#private/lib/redis";
import { lockManager } from "#private/lib/lock";
import logger from "@server/logger";
import { isTransientError } from "@server/lib/dbRetry";
export type RebuildJobType = "site-resource" | "client";
export interface RebuildJob {
type: RebuildJobType;
id: number;
// Number of times this job has already been re-queued after a transient
// failure. Absent/0 means it has not failed yet.
attempt?: number;
}
export interface RebuildJobHandlers {
@@ -43,6 +47,11 @@ const PROCESSOR_LOCK_TTL_MS = 120000 * BATCH_SIZE + 30000; // ~630 s
const POLL_INTERVAL_MS = 500;
// A job that fails with a transient DB error gets re-queued with backoff
// instead of being dropped, up to this many times.
const MAX_JOB_ATTEMPTS = 5;
const JOB_RETRY_BASE_DELAY_MS = 1000;
class RedisRebuildQueue {
private processingStarted = false;
@@ -180,10 +189,33 @@ class RedisRebuildQueue {
`Rebuild queue: completed ${job.type}:${job.id}`
);
} catch (err) {
logger.error(
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
err
);
const attempt = (job.attempt ?? 0) + 1;
if (
isTransientError(err) &&
attempt <= MAX_JOB_ATTEMPTS
) {
const delay =
JOB_RETRY_BASE_DELAY_MS *
Math.pow(2, attempt - 1);
logger.warn(
`Rebuild queue: job ${job.type}:${job.id} hit a transient error (attempt ${attempt}/${MAX_JOB_ATTEMPTS}), re-queuing in ${delay}ms:`,
err
);
setTimeout(() => {
this.enqueue({ ...job, attempt }).catch(
(enqueueErr) =>
logger.error(
`Rebuild queue: failed to re-queue ${job.type}:${job.id} after transient error:`,
enqueueErr
)
);
}, delay);
} else {
logger.error(
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
err
);
}
}
}
},

View File

@@ -894,6 +894,19 @@ class RegionalRedisManager {
return opts;
}
// The regional Redis StatefulSet's "redis" service pins to pod redis-0
// (primary). The replica (redis-1) is only reachable through the
// per-pod headless service: <svc>.<namespace>.svc.cluster.local ->
// redis-1.redis-headless.<namespace>.svc.cluster.local. Returns null
// if the configured host doesn't match that pattern (e.g. local dev),
// in which case callers should fall back to the primary for reads.
private getReplicaHost(primaryHost: string): string | null {
const match = primaryHost.match(/^redis\.([^.]+)\.svc\.cluster\.local$/);
if (!match) return null;
const namespace = match[1];
return `redis-1.redis-headless.${namespace}.svc.cluster.local`;
}
private initializeClients(): void {
const cfg = this.getConfig();
const baseOpts = {
@@ -907,35 +920,42 @@ class RegionalRedisManager {
try {
this.writeClient = new Redis(baseOpts);
// redis-1 (replica) handles reads; fall back to primary if not resolvable
this.readClient = new Redis({
...baseOpts,
host: cfg.host!.replace(/^(.*?)(\.\S+)$/, (_, h, rest) => {
// Derive replica hostname from the headless service pattern:
// redis.redis.svc.cluster.local -> redis-1.redis-headless.redis.svc.cluster.local
// If it doesn't look like a k8s service, just use the same host
return h + rest;
})
});
// For simplicity use same host for both; callers can always read from primary
// The real replica routing is handled by the StatefulSet headless service
this.readClient = this.writeClient;
const replicaHost = this.getReplicaHost(cfg.host!);
this.readClient = replicaHost
? new Redis({ ...baseOpts, host: replicaHost })
: this.writeClient;
this.writeClient.on("ready", () => {
logger.info("Regional Redis client ready");
logger.info("Regional Redis write client ready");
this.isHealthy = true;
});
this.writeClient.on("error", (err) => {
logger.error("Regional Redis client error:", err);
logger.error("Regional Redis write client error:", err);
this.isHealthy = false;
});
this.writeClient.on("reconnecting", () => {
logger.info("Regional Redis client reconnecting...");
logger.info("Regional Redis write client reconnecting...");
this.isHealthy = false;
});
logger.info("Regional Redis client initialized");
if (this.readClient !== this.writeClient) {
this.readClient.on("ready", () => {
logger.info("Regional Redis read client ready");
});
this.readClient.on("error", (err) => {
logger.error("Regional Redis read client error:", err);
});
this.readClient.on("reconnecting", () => {
logger.info("Regional Redis read client reconnecting...");
});
}
logger.info(
replicaHost
? `Regional Redis client initialized (reads routed to replica ${replicaHost})`
: "Regional Redis client initialized (no replica resolvable, reads routed to primary)"
);
} catch (error) {
logger.error("Failed to initialize regional Redis client:", error);
this.isEnabled = false;
@@ -1041,11 +1061,14 @@ class RegionalRedisManager {
public async disconnect(): Promise<void> {
try {
if (this.readClient && this.readClient !== this.writeClient) {
await this.readClient.quit();
}
this.readClient = null;
if (this.writeClient) {
await this.writeClient.quit();
this.writeClient = null;
}
this.readClient = null;
logger.info("Regional Redis client disconnected");
} catch (error) {
logger.error("Error disconnecting regional Redis client:", error);

View File

@@ -29,37 +29,41 @@ export async function createExitNode(
.where(eq(exitNodes.publicKey, publicKey));
let exitNode: ExitNode;
if (!exitNodeQuery) {
const address = await getNextAvailableSubnet();
// TODO: eventually we will want to get the next available port so that we can multiple exit nodes
// const listenPort = await getNextAvailablePort();
const listenPort = config.getRawConfig().gerbil.start_port;
let subEndpoint = "";
if (config.getRawConfig().gerbil.use_subdomain) {
subEndpoint = await getUniqueExitNodeEndpointName();
const { value: address, release } = await getNextAvailableSubnet();
try {
// TODO: eventually we will want to get the next available port so that we can multiple exit nodes
// const listenPort = await getNextAvailablePort();
const listenPort = config.getRawConfig().gerbil.start_port;
let subEndpoint = "";
if (config.getRawConfig().gerbil.use_subdomain) {
subEndpoint = await getUniqueExitNodeEndpointName();
}
const exitNodeName =
config.getRawConfig().gerbil.exit_node_name ||
`Exit Node ${publicKey.slice(0, 8)}`;
// create a new exit node
[exitNode] = await db
.insert(exitNodes)
.values({
publicKey,
endpoint: `${subEndpoint}${subEndpoint != "" ? "." : ""}${config.getRawConfig().gerbil.base_endpoint}`,
address,
listenPort,
online: true,
reachableAt,
name: exitNodeName
})
.returning()
.execute();
logger.info(
`Created new exit node ${exitNode.name} with address ${exitNode.address} and port ${exitNode.listenPort}`
);
} finally {
await release();
}
const exitNodeName =
config.getRawConfig().gerbil.exit_node_name ||
`Exit Node ${publicKey.slice(0, 8)}`;
// create a new exit node
[exitNode] = await db
.insert(exitNodes)
.values({
publicKey,
endpoint: `${subEndpoint}${subEndpoint != "" ? "." : ""}${config.getRawConfig().gerbil.base_endpoint}`,
address,
listenPort,
online: true,
reachableAt,
name: exitNodeName
})
.returning()
.execute();
logger.info(
`Created new exit node ${exitNode.name} with address ${exitNode.address} and port ${exitNode.listenPort}`
);
} else {
// update the reachable at
[exitNode] = await db

View File

@@ -114,8 +114,6 @@ export async function createRemoteExitNode(
}
const secretHash = await hashPassword(secret);
// const address = await getNextAvailableSubnet();
const address = "100.89.140.1/24"; // FOR NOW LETS HARDCODE THESE ADDRESSES
const [existingRemoteExitNode] = await db
.select()
@@ -191,89 +189,106 @@ export async function createRemoteExitNode(
);
}
await db.transaction(async (trx) => {
if (!existingExitNode) {
const [res] = await trx
.insert(exitNodes)
.values({
name: remoteExitNodeId,
address,
endpoint: "",
publicKey: "",
listenPort: 0,
online: false,
type: "remoteExitNode"
})
.returning();
existingExitNode = res;
}
// If this remote exit node isn't already backing an exit node in
// another org, we're about to create a brand new one. Reserve a
// subnet for it up front so the allocation lock is held across the
// whole insert - this guarantees exit node subnets never overlap,
// even under concurrent creation, which matters for HA setups.
let releaseSubnetLock: (() => Promise<void>) | null = null;
let newExitNodeAddress: string | null = null;
if (!existingExitNode) {
const { value, release } = await getNextAvailableSubnet();
newExitNodeAddress = value;
releaseSubnetLock = release;
}
if (!existingRemoteExitNode) {
await trx.insert(remoteExitNodes).values({
remoteExitNodeId: remoteExitNodeId,
secretHash,
dateCreated: moment().toISOString(),
exitNodeId: existingExitNode.exitNodeId
});
} else {
// update the existing remote exit node
await trx
.update(remoteExitNodes)
.set({
exitNodeId: existingExitNode.exitNodeId
})
.where(
eq(
remoteExitNodes.remoteExitNodeId,
existingRemoteExitNode.remoteExitNodeId
)
);
}
if (!existingExitNodeOrg) {
await trx.insert(exitNodeOrgs).values({
exitNodeId: existingExitNode.exitNodeId,
orgId: orgId
});
}
// calculate if the node is in any other of the orgs before we count it as an add to the billing org
if (org.billingOrgId) {
const otherBillingOrgs = await trx
.select()
.from(orgs)
.where(
and(
eq(orgs.billingOrgId, org.billingOrgId),
ne(orgs.orgId, orgId)
)
);
const billingOrgIds = otherBillingOrgs.map((o) => o.orgId);
const orgsInBillingDomainThatTheNodeIsStillIn = await trx
.select()
.from(exitNodeOrgs)
.where(
and(
eq(
exitNodeOrgs.exitNodeId,
existingExitNode.exitNodeId
),
inArray(exitNodeOrgs.orgId, billingOrgIds)
)
);
if (orgsInBillingDomainThatTheNodeIsStillIn.length === 0) {
await usageService.add(
orgId,
LimitId.REMOTE_EXIT_NODES,
1,
trx
);
try {
await db.transaction(async (trx) => {
if (!existingExitNode) {
const [res] = await trx
.insert(exitNodes)
.values({
name: remoteExitNodeId,
address: newExitNodeAddress!,
endpoint: "",
publicKey: "",
listenPort: 0,
online: false,
type: "remoteExitNode"
})
.returning();
existingExitNode = res;
}
}
});
if (!existingRemoteExitNode) {
await trx.insert(remoteExitNodes).values({
remoteExitNodeId: remoteExitNodeId,
secretHash,
dateCreated: moment().toISOString(),
exitNodeId: existingExitNode.exitNodeId
});
} else {
// update the existing remote exit node
await trx
.update(remoteExitNodes)
.set({
exitNodeId: existingExitNode.exitNodeId
})
.where(
eq(
remoteExitNodes.remoteExitNodeId,
existingRemoteExitNode.remoteExitNodeId
)
);
}
if (!existingExitNodeOrg) {
await trx.insert(exitNodeOrgs).values({
exitNodeId: existingExitNode.exitNodeId,
orgId: orgId
});
}
// calculate if the node is in any other of the orgs before we count it as an add to the billing org
if (org.billingOrgId) {
const otherBillingOrgs = await trx
.select()
.from(orgs)
.where(
and(
eq(orgs.billingOrgId, org.billingOrgId),
ne(orgs.orgId, orgId)
)
);
const billingOrgIds = otherBillingOrgs.map((o) => o.orgId);
const orgsInBillingDomainThatTheNodeIsStillIn = await trx
.select()
.from(exitNodeOrgs)
.where(
and(
eq(
exitNodeOrgs.exitNodeId,
existingExitNode.exitNodeId
),
inArray(exitNodeOrgs.orgId, billingOrgIds)
)
);
if (orgsInBillingDomainThatTheNodeIsStillIn.length === 0) {
await usageService.add(
orgId,
LimitId.REMOTE_EXIT_NODES,
1,
trx
);
}
}
});
} finally {
await releaseSubnetLock?.();
}
const token = generateSessionToken();
await createRemoteExitNodeSession(token, remoteExitNodeId);

View File

@@ -13,37 +13,41 @@ export async function createExitNode(
const [exitNodeQuery] = await db.select().from(exitNodes).limit(1);
let exitNode: ExitNode;
if (!exitNodeQuery) {
const address = await getNextAvailableSubnet();
// TODO: eventually we will want to get the next available port so that we can multiple exit nodes
// const listenPort = await getNextAvailablePort();
const listenPort = config.getRawConfig().gerbil.start_port;
let subEndpoint = "";
if (config.getRawConfig().gerbil.use_subdomain) {
subEndpoint = await getUniqueExitNodeEndpointName();
const { value: address, release } = await getNextAvailableSubnet();
try {
// TODO: eventually we will want to get the next available port so that we can multiple exit nodes
// const listenPort = await getNextAvailablePort();
const listenPort = config.getRawConfig().gerbil.start_port;
let subEndpoint = "";
if (config.getRawConfig().gerbil.use_subdomain) {
subEndpoint = await getUniqueExitNodeEndpointName();
}
const exitNodeName =
config.getRawConfig().gerbil.exit_node_name ||
`Exit Node ${publicKey.slice(0, 8)}`;
// create a new exit node
[exitNode] = await db
.insert(exitNodes)
.values({
publicKey,
endpoint: `${subEndpoint}${subEndpoint != "" ? "." : ""}${config.getRawConfig().gerbil.base_endpoint}`,
address,
online: true,
listenPort,
reachableAt,
name: exitNodeName
})
.returning()
.execute();
logger.info(
`Created new exit node ${exitNode.name} with address ${exitNode.address} and port ${exitNode.listenPort}`
);
} finally {
await release();
}
const exitNodeName =
config.getRawConfig().gerbil.exit_node_name ||
`Exit Node ${publicKey.slice(0, 8)}`;
// create a new exit node
[exitNode] = await db
.insert(exitNodes)
.values({
publicKey,
endpoint: `${subEndpoint}${subEndpoint != "" ? "." : ""}${config.getRawConfig().gerbil.base_endpoint}`,
address,
online: true,
listenPort,
reachableAt,
name: exitNodeName
})
.returning()
.execute();
logger.info(
`Created new exit node ${exitNode.name} with address ${exitNode.address} and port ${exitNode.listenPort}`
);
} else {
// update the existing exit node
[exitNode] = await db

View File

@@ -52,13 +52,13 @@ export async function buildClientConfigurationForNewtClient(
clientsRes
.filter((client) => {
if (!client.clients.pubKey) {
logger.warn(
logger.debug(
`Client ${client.clients.clientId} has no public key, skipping`
);
return false;
}
if (!client.clients.subnet) {
logger.warn(
logger.debug(
`Client ${client.clients.clientId} has no subnet, skipping`
);
return false;

View File

@@ -19,7 +19,7 @@ export const handleNewtDisconnectingMessage: MessageHandler = async (
}
if (!newt.siteId) {
logger.warn("Newt has no client ID!");
logger.warn("Newt has no site ID!");
return;
}
@@ -34,6 +34,12 @@ export const handleNewtDisconnectingMessage: MessageHandler = async (
.where(eq(sites.siteId, newt.siteId!))
.returning();
if (!site) {
throw new Error(
`Could not find site ${newt.siteId} to update disconnection from disconnect message`
);
}
await fireSiteOfflineAlert(
site.orgId,
site.siteId,
@@ -43,6 +49,6 @@ export const handleNewtDisconnectingMessage: MessageHandler = async (
);
});
} catch (error) {
logger.error("Error handling disconnecting message", { error });
logger.error("Error handling site disconnecting message", error);
}
};

View File

@@ -3,6 +3,7 @@ import { sites, clients, olms } from "@server/db";
import { and, eq, inArray } from "drizzle-orm";
import logger from "@server/logger";
import { fireSiteOnlineAlert } from "@server/lib/alerts";
import { withRetry } from "@server/lib/dbRetry";
/**
* Ping Accumulator
@@ -22,8 +23,6 @@ import { fireSiteOnlineAlert } from "@server/lib/alerts";
*/
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
const MAX_RETRIES = 5;
const BASE_DELAY_MS = 50;
// ── Site (newt) pings ──────────────────────────────────────────────────
// Map of siteId -> latest ping timestamp (unix seconds)
@@ -266,85 +265,7 @@ export async function flushPingsToDb(): Promise<void> {
}
// ── Retry / Error Helpers ──────────────────────────────────────────────
/**
* Simple retry wrapper with exponential backoff for transient errors
* (deadlocks, connection timeouts, unexpected disconnects).
*
* PostgreSQL deadlocks (40P01) are always safe to retry: the database
* guarantees exactly one winner per deadlock pair, so the loser just needs
* to try again. MAX_RETRIES is intentionally higher than typical connection
* retry budgets to give deadlock victims enough chances to succeed.
*/
async function withRetry<T>(
operation: () => Promise<T>,
context: string
): Promise<T> {
let attempt = 0;
while (true) {
try {
return await operation();
} catch (error: any) {
if (isTransientError(error) && attempt < MAX_RETRIES) {
attempt++;
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
const jitter = Math.random() * baseDelay;
const delay = baseDelay + jitter;
logger.warn(
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`,
{ code: error?.code ?? error?.cause?.code }
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
}
/**
* Detect transient errors that are safe to retry.
*/
function isTransientError(error: any): boolean {
if (!error) return false;
const message = (error.message || "").toLowerCase();
const causeMessage = (error.cause?.message || "").toLowerCase();
const code = error.code || error.cause?.code || "";
// Connection timeout / terminated
if (
message.includes("connection timeout") ||
message.includes("connection terminated") ||
message.includes("timeout exceeded when trying to connect") ||
causeMessage.includes("connection terminated unexpectedly") ||
causeMessage.includes("connection timeout")
) {
return true;
}
// PostgreSQL deadlock detected - always safe to retry (one winner guaranteed)
if (code === "40P01" || message.includes("deadlock")) {
return true;
}
// PostgreSQL serialization failure
if (code === "40001") {
return true;
}
// ECONNRESET, ECONNREFUSED, EPIPE, ETIMEDOUT
if (
code === "ECONNRESET" ||
code === "ECONNREFUSED" ||
code === "EPIPE" ||
code === "ETIMEDOUT"
) {
return true;
}
return false;
}
// See @server/lib/dbRetry for the shared withRetry/isTransientError helpers.
// ── Lifecycle ──────────────────────────────────────────────────────────

View File

@@ -161,7 +161,7 @@ export async function buildSiteConfigurationForOlmClient(
}
if (!site.subnet) {
logger.warn(`Site ${site.siteId} has no subnet, skipping`);
logger.debug(`Site ${site.siteId} has no subnet, skipping`);
continue;
}

View File

@@ -6,7 +6,9 @@ import logger from "@server/logger";
/**
* Handles disconnecting messages from clients to show disconnected in the ui
*/
export const handleOlmDisconnectingMessage: MessageHandler = async (context) => {
export const handleOlmDisconnectingMessage: MessageHandler = async (
context
) => {
const { message, client: c, sendToClient } = context;
const olm = c as Olm;
@@ -29,6 +31,6 @@ export const handleOlmDisconnectingMessage: MessageHandler = async (context) =>
})
.where(eq(clients.clientId, olm.clientId));
} catch (error) {
logger.error("Error handling disconnecting message", { error });
logger.error("Error handling client disconnecting message", error);
}
};

View File

@@ -268,7 +268,11 @@ export async function createSite(
let newSite: Site | undefined;
try {
if (subnet && exitNodeId) {
if (type === "wireguard" && subnet && exitNodeId) {
// Only wireguard sites actually persist the provided subnet/exitNodeId.
// Newt sites have their subnet/exit node chosen (under a lock) when the
// newt connects, so validating them here is both unnecessary and racy,
// since pickSiteDefaults does not lock the subnet it suggests.
//make sure the subnet is in the range of the exit node if provided
const [exitNode] = await db
.select()

View File

@@ -290,7 +290,13 @@ export default function BillingPage() {
setHasSubscription(
tierSub.subscription.status === "active"
);
setIsTrial(tierSub.subscription.expiresAt != null);
// expiresAt is only meaningful while the trial hasn't
// actually run out yet; a stale row with a past
// expiresAt should no longer be treated as a live trial
const expiresAt = tierSub.subscription.expiresAt;
setIsTrial(
expiresAt != null && expiresAt * 1000 > Date.now()
);
}
// Find license subscription