mirror of
https://github.com/fosrl/pangolin.git
synced 2026-07-05 19:59:43 +00:00
Compare commits
18 Commits
feat/remem
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d4c52bbf2f | ||
|
|
390c822bb4 | ||
|
|
2bfc1901a6 | ||
|
|
5da186b528 | ||
|
|
600a96c13b | ||
|
|
e4e0da3723 | ||
|
|
87f50bf0cc | ||
|
|
440ebfe08e | ||
|
|
b399d2a291 | ||
|
|
2c66da1b19 | ||
|
|
1b1fba60f1 | ||
|
|
ab19955502 | ||
|
|
1db9dcec81 | ||
|
|
b93d26f09f | ||
|
|
fc54ad49b5 | ||
|
|
f87e136f6b | ||
|
|
49c2d3163e | ||
|
|
45b9e13a13 |
@@ -41,7 +41,7 @@ services:
|
||||
- 80:80 # Port for traefik because of the network_mode
|
||||
|
||||
traefik:
|
||||
image: traefik:v3.6
|
||||
image: traefik:v3.7
|
||||
container_name: traefik
|
||||
restart: unless-stopped
|
||||
network_mode: service:gerbil # Ports appear on the gerbil service
|
||||
|
||||
@@ -50,7 +50,7 @@ services:
|
||||
- 80:80{{end}}
|
||||
|
||||
traefik:
|
||||
image: docker.io/traefik:v3.6
|
||||
image: docker.io/traefik:v3.7
|
||||
container_name: traefik
|
||||
restart: unless-stopped
|
||||
{{if .InstallGerbil}}network_mode: service:gerbil # Ports appear on the gerbil service{{end}}{{if not .InstallGerbil}}
|
||||
|
||||
83
server/lib/dbRetry.ts
Normal file
83
server/lib/dbRetry.ts
Normal file
@@ -0,0 +1,83 @@
|
||||
import logger from "@server/logger";
|
||||
|
||||
const MAX_RETRIES = 5;
|
||||
const BASE_DELAY_MS = 50;
|
||||
|
||||
/**
|
||||
* Detect transient errors that are safe to retry (connection drops, deadlocks,
|
||||
* serialization failures). PostgreSQL deadlocks (40P01) are always safe to
|
||||
* retry: the database guarantees exactly one winner per deadlock pair, so the
|
||||
* loser just needs to try again.
|
||||
*/
|
||||
export function isTransientError(error: any): boolean {
|
||||
if (!error) return false;
|
||||
|
||||
const message = (error.message || "").toLowerCase();
|
||||
const causeMessage = (error.cause?.message || "").toLowerCase();
|
||||
const code = error.code || error.cause?.code || "";
|
||||
|
||||
// Connection timeout / terminated
|
||||
if (
|
||||
message.includes("connection timeout") ||
|
||||
message.includes("connection terminated") ||
|
||||
message.includes("timeout exceeded when trying to connect") ||
|
||||
causeMessage.includes("connection terminated unexpectedly") ||
|
||||
causeMessage.includes("connection timeout")
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// PostgreSQL deadlock detected - always safe to retry (one winner guaranteed)
|
||||
if (code === "40P01" || message.includes("deadlock")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// PostgreSQL serialization failure
|
||||
if (code === "40001") {
|
||||
return true;
|
||||
}
|
||||
|
||||
// ECONNRESET, ECONNREFUSED, EPIPE, ETIMEDOUT
|
||||
if (
|
||||
code === "ECONNRESET" ||
|
||||
code === "ECONNREFUSED" ||
|
||||
code === "EPIPE" ||
|
||||
code === "ETIMEDOUT"
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple retry wrapper with exponential backoff for transient errors
|
||||
* (deadlocks, connection timeouts, unexpected disconnects).
|
||||
*/
|
||||
export async function withRetry<T>(
|
||||
operation: () => Promise<T>,
|
||||
context: string,
|
||||
maxRetries: number = MAX_RETRIES,
|
||||
baseDelayMs: number = BASE_DELAY_MS
|
||||
): Promise<T> {
|
||||
let attempt = 0;
|
||||
while (true) {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error: any) {
|
||||
if (isTransientError(error) && attempt < maxRetries) {
|
||||
attempt++;
|
||||
const baseDelay = Math.pow(2, attempt - 1) * baseDelayMs;
|
||||
const jitter = Math.random() * baseDelay;
|
||||
const delay = baseDelay + jitter;
|
||||
logger.warn(
|
||||
`Transient DB error in ${context}, retrying attempt ${attempt}/${maxRetries} after ${delay.toFixed(0)}ms`,
|
||||
{ code: error?.code ?? error?.cause?.code }
|
||||
);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
continue;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,30 +1,55 @@
|
||||
import { db, exitNodes } from "@server/db";
|
||||
import { db, exitNodes, Transaction } from "@server/db";
|
||||
import config from "@server/lib/config";
|
||||
import { findNextAvailableCidr } from "@server/lib/ip";
|
||||
import { lockManager } from "#dynamic/lib/lock";
|
||||
|
||||
export async function getNextAvailableSubnet(): Promise<string> {
|
||||
// Get all existing subnets from routes table
|
||||
const existingAddresses = await db
|
||||
.select({
|
||||
address: exitNodes.address
|
||||
})
|
||||
.from(exitNodes);
|
||||
|
||||
const addresses = existingAddresses.map((a) => a.address);
|
||||
let subnet = findNextAvailableCidr(
|
||||
addresses,
|
||||
config.getRawConfig().gerbil.block_size,
|
||||
config.getRawConfig().gerbil.subnet_group
|
||||
);
|
||||
if (!subnet) {
|
||||
throw new Error("No available subnets remaining in space");
|
||||
/**
|
||||
* Reserves the next available exit node subnet.
|
||||
*
|
||||
* Exit node subnets must never overlap with one another - regardless of
|
||||
* which org(s) they belong to - since HA exit nodes can end up routing for
|
||||
* the same org. This acquires a lock that the caller MUST release (via the
|
||||
* returned `release`) only after the chosen address has been durably
|
||||
* persisted (e.g. after the enclosing transaction commits), otherwise
|
||||
* concurrent callers can race and pick the same subnet.
|
||||
*/
|
||||
export async function getNextAvailableSubnet(
|
||||
trx: Transaction | typeof db = db
|
||||
): Promise<{ value: string; release: () => Promise<void> }> {
|
||||
const lockKey = "exit-node-subnet-allocation";
|
||||
const acquired = await lockManager.acquireLockWithRetry(lockKey, 6000);
|
||||
if (!acquired) {
|
||||
throw new Error(`Failed to acquire lock: ${lockKey}`);
|
||||
}
|
||||
const release = () => lockManager.releaseLock(lockKey, acquired);
|
||||
|
||||
// replace the last octet with 1
|
||||
subnet =
|
||||
subnet.split(".").slice(0, 3).join(".") +
|
||||
".1" +
|
||||
"/" +
|
||||
subnet.split("/")[1];
|
||||
return subnet;
|
||||
try {
|
||||
// Get all existing subnets from routes table
|
||||
const existingAddresses = await trx
|
||||
.select({
|
||||
address: exitNodes.address
|
||||
})
|
||||
.from(exitNodes);
|
||||
|
||||
const addresses = existingAddresses.map((a) => a.address);
|
||||
let subnet = findNextAvailableCidr(
|
||||
addresses,
|
||||
config.getRawConfig().gerbil.block_size,
|
||||
config.getRawConfig().gerbil.subnet_group
|
||||
);
|
||||
if (!subnet) {
|
||||
throw new Error("No available subnets remaining in space");
|
||||
}
|
||||
|
||||
// replace the last octet with 1
|
||||
subnet =
|
||||
subnet.split(".").slice(0, 3).join(".") +
|
||||
".1" +
|
||||
"/" +
|
||||
subnet.split("/")[1];
|
||||
return { value: subnet, release };
|
||||
} catch (e) {
|
||||
await release();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,6 +45,7 @@ import {
|
||||
} from "@server/routers/client/targets";
|
||||
import { lockManager } from "#dynamic/lib/lock";
|
||||
import { rebuildQueue } from "#dynamic/lib/rebuildQueue";
|
||||
import { withRetry, isTransientError } from "@server/lib/dbRetry";
|
||||
import {
|
||||
checkOrgRebuildRateLimit,
|
||||
decrementOrgRebuildCount,
|
||||
@@ -285,10 +286,20 @@ export async function rebuildClientAssociationsFromSiteResource(
|
||||
) {
|
||||
await incrementOrgRebuildCount(siteResource.orgId);
|
||||
try {
|
||||
return await lockManager.withLock(
|
||||
`rebuild-client-associations:site-resource:${siteResource.siteResourceId}`,
|
||||
() => rebuildClientAssociationsFromSiteResourceImpl(siteResource),
|
||||
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
||||
// The whole locked rebuild is idempotent (it diffs full expected vs.
|
||||
// actual state each time), so on a transient DB error it's safe to
|
||||
// retry the entire thing rather than just the failed query.
|
||||
return await withRetry(
|
||||
() =>
|
||||
lockManager.withLock(
|
||||
`rebuild-client-associations:site-resource:${siteResource.siteResourceId}`,
|
||||
() =>
|
||||
rebuildClientAssociationsFromSiteResourceImpl(
|
||||
siteResource
|
||||
),
|
||||
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
||||
),
|
||||
`rebuildClientAssociationsFromSiteResource:${siteResource.siteResourceId}`
|
||||
);
|
||||
} catch (err: any) {
|
||||
if (
|
||||
@@ -304,6 +315,17 @@ export async function rebuildClientAssociationsFromSiteResource(
|
||||
});
|
||||
return { mergedAllClients: [] };
|
||||
}
|
||||
if (isTransientError(err)) {
|
||||
logger.warn(
|
||||
`rebuildClientAssociations: transient DB error rebuilding site resource ${siteResource.siteResourceId} persisted after retries, queuing for deferred processing:`,
|
||||
err
|
||||
);
|
||||
await rebuildQueue.enqueue({
|
||||
type: "site-resource",
|
||||
id: siteResource.siteResourceId
|
||||
});
|
||||
return { mergedAllClients: [] };
|
||||
}
|
||||
throw err;
|
||||
} finally {
|
||||
await decrementOrgRebuildCount(siteResource.orgId);
|
||||
@@ -463,6 +485,7 @@ async function rebuildClientAssociationsFromSiteResourceImpl(
|
||||
await trx
|
||||
.insert(clientSiteResourcesAssociationsCache)
|
||||
.values(clientSiteResourcesToInsert)
|
||||
.onConflictDoNothing()
|
||||
.returning();
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteResourceId=${siteResource.siteResourceId} inserted clientSiteResource associations`
|
||||
@@ -510,121 +533,148 @@ async function rebuildClientAssociationsFromSiteResourceImpl(
|
||||
for (const site of sitesToProcess) {
|
||||
const siteId = site.siteId;
|
||||
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] processing siteId=${siteId} for siteResourceId=${siteResource.siteResourceId}`
|
||||
);
|
||||
try {
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] processing siteId=${siteId} for siteResourceId=${siteResource.siteResourceId}`
|
||||
);
|
||||
|
||||
const existingClientSites = await trx
|
||||
.select({
|
||||
clientId: clientSitesAssociationsCache.clientId
|
||||
})
|
||||
.from(clientSitesAssociationsCache)
|
||||
.where(eq(clientSitesAssociationsCache.siteId, siteId));
|
||||
const existingClientSites = await trx
|
||||
.select({
|
||||
clientId: clientSitesAssociationsCache.clientId
|
||||
})
|
||||
.from(clientSitesAssociationsCache)
|
||||
.where(eq(clientSitesAssociationsCache.siteId, siteId));
|
||||
|
||||
const existingClientSiteIds = existingClientSites.map(
|
||||
(row) => row.clientId
|
||||
);
|
||||
const existingClientSiteIds = existingClientSites.map(
|
||||
(row) => row.clientId
|
||||
);
|
||||
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} existingClientSiteIds=[${existingClientSiteIds.join(", ")}]`
|
||||
);
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} existingClientSiteIds=[${existingClientSiteIds.join(", ")}]`
|
||||
);
|
||||
|
||||
// Get full client details for existing clients (needed for sending delete messages)
|
||||
const existingClients =
|
||||
existingClientSiteIds.length > 0
|
||||
? await trx
|
||||
.select({
|
||||
clientId: clients.clientId,
|
||||
pubKey: clients.pubKey,
|
||||
subnet: clients.subnet
|
||||
})
|
||||
.from(clients)
|
||||
.where(inArray(clients.clientId, existingClientSiteIds))
|
||||
// Get full client details for existing clients (needed for sending delete messages)
|
||||
const existingClients =
|
||||
existingClientSiteIds.length > 0
|
||||
? await trx
|
||||
.select({
|
||||
clientId: clients.clientId,
|
||||
pubKey: clients.pubKey,
|
||||
subnet: clients.subnet
|
||||
})
|
||||
.from(clients)
|
||||
.where(
|
||||
inArray(clients.clientId, existingClientSiteIds)
|
||||
)
|
||||
: [];
|
||||
|
||||
const otherResourceClientIds =
|
||||
clientsFromOtherResourcesBySite.get(siteId) ??
|
||||
new Set<number>();
|
||||
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} otherResourceClientIds=[${[...otherResourceClientIds].join(", ")}] mergedAllClientIds=[${mergedAllClientIds.join(", ")}]`
|
||||
);
|
||||
|
||||
// Expected clients from this resource are site-scoped: if this site is
|
||||
// no longer attached to the resource, the expected set is empty.
|
||||
const expectedClientIdsForSite = currentSiteIdSet.has(siteId)
|
||||
? mergedAllClientIds
|
||||
: [];
|
||||
|
||||
const otherResourceClientIds =
|
||||
clientsFromOtherResourcesBySite.get(siteId) ?? new Set<number>();
|
||||
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} otherResourceClientIds=[${[...otherResourceClientIds].join(", ")}] mergedAllClientIds=[${mergedAllClientIds.join(", ")}]`
|
||||
);
|
||||
|
||||
// Expected clients from this resource are site-scoped: if this site is
|
||||
// no longer attached to the resource, the expected set is empty.
|
||||
const expectedClientIdsForSite = currentSiteIdSet.has(siteId)
|
||||
? mergedAllClientIds
|
||||
: [];
|
||||
|
||||
const clientSitesToAdd = expectedClientIdsForSite.filter(
|
||||
(clientId) =>
|
||||
!existingClientSiteIds.includes(clientId) &&
|
||||
!otherResourceClientIds.has(clientId) // dont add if already connected via another site resource
|
||||
);
|
||||
|
||||
const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({
|
||||
clientId,
|
||||
siteId
|
||||
}));
|
||||
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toAdd=[${clientSitesToAdd.join(", ")}]`
|
||||
);
|
||||
|
||||
if (clientSitesToInsert.length > 0) {
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserting ${clientSitesToInsert.length} clientSite association(s)`
|
||||
// Note: we deliberately do NOT exclude clients covered by another
|
||||
// site resource here (unlike clientSitesToRemove below). Doing so
|
||||
// previously caused a permanent gap: if resource A saw resource B's
|
||||
// cache row and skipped adding (assuming B would maintain it), and
|
||||
// B's own rebuild made the same assumption about A, the site-level
|
||||
// row could end up never inserted by anyone even though both
|
||||
// resources' client associations were otherwise correct.
|
||||
// onConflictDoNothing makes a redundant insert harmless, so there's
|
||||
// no correctness reason to skip here.
|
||||
const clientSitesToAdd = expectedClientIdsForSite.filter(
|
||||
(clientId) => !existingClientSiteIds.includes(clientId)
|
||||
);
|
||||
await trx
|
||||
.insert(clientSitesAssociationsCache)
|
||||
.values(clientSitesToInsert)
|
||||
.returning();
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserted clientSite associations`
|
||||
);
|
||||
} else {
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} no clientSite associations to insert`
|
||||
);
|
||||
}
|
||||
|
||||
// Now remove any client-site associations that should no longer exist
|
||||
const clientSitesToRemove = existingClientSiteIds.filter(
|
||||
(clientId) =>
|
||||
!expectedClientIdsForSite.includes(clientId) &&
|
||||
!otherResourceClientIds.has(clientId) // dont remove if there is still another connection for another site resource
|
||||
);
|
||||
const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({
|
||||
clientId,
|
||||
siteId
|
||||
}));
|
||||
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toRemove=[${clientSitesToRemove.join(", ")}]`
|
||||
);
|
||||
|
||||
if (clientSitesToRemove.length > 0) {
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} deleting ${clientSitesToRemove.length} clientSite association(s)`
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toAdd=[${clientSitesToAdd.join(", ")}]`
|
||||
);
|
||||
await trx
|
||||
.delete(clientSitesAssociationsCache)
|
||||
.where(
|
||||
and(
|
||||
eq(clientSitesAssociationsCache.siteId, siteId),
|
||||
inArray(
|
||||
clientSitesAssociationsCache.clientId,
|
||||
clientSitesToRemove
|
||||
)
|
||||
)
|
||||
|
||||
if (clientSitesToInsert.length > 0) {
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserting ${clientSitesToInsert.length} clientSite association(s)`
|
||||
);
|
||||
}
|
||||
await trx
|
||||
.insert(clientSitesAssociationsCache)
|
||||
.values(clientSitesToInsert)
|
||||
.onConflictDoNothing()
|
||||
.returning();
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} inserted clientSite associations`
|
||||
);
|
||||
} else {
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} no clientSite associations to insert`
|
||||
);
|
||||
}
|
||||
|
||||
// Now handle the messages to add/remove peers on both the newt and olm sides
|
||||
await handleMessagesForSiteClients(
|
||||
site,
|
||||
siteId,
|
||||
mergedAllClients,
|
||||
existingClients,
|
||||
clientSitesToAdd,
|
||||
clientSitesToRemove,
|
||||
trx
|
||||
);
|
||||
// Now remove any client-site associations that should no longer exist
|
||||
const clientSitesToRemove = existingClientSiteIds.filter(
|
||||
(clientId) =>
|
||||
!expectedClientIdsForSite.includes(clientId) &&
|
||||
!otherResourceClientIds.has(clientId) // dont remove if there is still another connection for another site resource
|
||||
);
|
||||
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} clientSites toRemove=[${clientSitesToRemove.join(", ")}]`
|
||||
);
|
||||
|
||||
if (clientSitesToRemove.length > 0) {
|
||||
logger.debug(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} deleting ${clientSitesToRemove.length} clientSite association(s)`
|
||||
);
|
||||
await trx
|
||||
.delete(clientSitesAssociationsCache)
|
||||
.where(
|
||||
and(
|
||||
eq(clientSitesAssociationsCache.siteId, siteId),
|
||||
inArray(
|
||||
clientSitesAssociationsCache.clientId,
|
||||
clientSitesToRemove
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// Now handle the messages to add/remove peers on both the newt and olm sides
|
||||
await handleMessagesForSiteClients(
|
||||
site,
|
||||
siteId,
|
||||
mergedAllClients,
|
||||
existingClients,
|
||||
clientSitesToAdd,
|
||||
clientSitesToRemove,
|
||||
trx
|
||||
);
|
||||
} catch (err) {
|
||||
// Don't let a failure on one site abort processing of every
|
||||
// other site queued after it in this run. Since we're not
|
||||
// re-throwing, the outer wrapper's retry/requeue logic never
|
||||
// sees this failure, so explicitly queue this resource for a
|
||||
// follow-up pass to reconcile whatever this site didn't get to.
|
||||
logger.error(
|
||||
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} failed while processing site for siteResourceId=${siteResource.siteResourceId}, continuing with remaining sites and queuing a follow-up pass:`,
|
||||
err
|
||||
);
|
||||
await rebuildQueue.enqueue({
|
||||
type: "site-resource",
|
||||
id: siteResource.siteResourceId
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Handle subnet proxy target updates for the resource associations
|
||||
@@ -657,7 +707,7 @@ async function handleMessagesForSiteClients(
|
||||
trx: Transaction | typeof db = db
|
||||
): Promise<void> {
|
||||
if (!site.exitNodeId) {
|
||||
logger.warn(
|
||||
logger.debug(
|
||||
`Exit node ID not on site ${site.siteId} so there is no reason to update clients because it must be offline`
|
||||
);
|
||||
return;
|
||||
@@ -671,14 +721,14 @@ async function handleMessagesForSiteClients(
|
||||
.limit(1);
|
||||
|
||||
if (!exitNode) {
|
||||
logger.warn(
|
||||
logger.debug(
|
||||
`Exit node not found for site ${site.siteId} so there is no reason to update clients because it must be offline`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!site.publicKey) {
|
||||
logger.warn(
|
||||
logger.debug(
|
||||
`Site publicKey not set for site ${site.siteId} so cannot add peers to clients`
|
||||
);
|
||||
return;
|
||||
@@ -692,7 +742,7 @@ async function handleMessagesForSiteClients(
|
||||
.where(eq(newts.siteId, siteId))
|
||||
.limit(1);
|
||||
if (!newt) {
|
||||
logger.warn(
|
||||
logger.debug(
|
||||
`Newt not found for site ${siteId} so cannot add peers to clients`
|
||||
);
|
||||
return;
|
||||
@@ -917,7 +967,7 @@ export async function updateClientSiteDestinations(
|
||||
|
||||
for (const site of sitesData) {
|
||||
if (!site.sites.subnet) {
|
||||
logger.warn(`Site ${site.sites.siteId} has no subnet, skipping`);
|
||||
logger.debug(`Site ${site.sites.siteId} has no subnet, skipping`);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -1656,10 +1706,17 @@ export async function rebuildClientAssociationsFromClient(
|
||||
await incrementOrgRebuildCount(client.orgId);
|
||||
try {
|
||||
const trx = primaryDb;
|
||||
return await lockManager.withLock(
|
||||
`rebuild-client-associations:client:${client.clientId}`,
|
||||
() => rebuildClientAssociationsFromClientImpl(client, trx),
|
||||
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
||||
// The whole locked rebuild is idempotent (it diffs full expected vs.
|
||||
// actual state each time), so on a transient DB error it's safe to
|
||||
// retry the entire thing rather than just the failed query.
|
||||
return await withRetry(
|
||||
() =>
|
||||
lockManager.withLock(
|
||||
`rebuild-client-associations:client:${client.clientId}`,
|
||||
() => rebuildClientAssociationsFromClientImpl(client, trx),
|
||||
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
||||
),
|
||||
`rebuildClientAssociationsFromClient:${client.clientId}`
|
||||
);
|
||||
} catch (err: any) {
|
||||
if (
|
||||
@@ -1675,6 +1732,17 @@ export async function rebuildClientAssociationsFromClient(
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (isTransientError(err)) {
|
||||
logger.warn(
|
||||
`rebuildClientAssociations: transient DB error rebuilding client ${client.clientId} persisted after retries, queuing for deferred processing:`,
|
||||
err
|
||||
);
|
||||
await rebuildQueue.enqueue({
|
||||
type: "client",
|
||||
id: client.clientId
|
||||
});
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
} finally {
|
||||
await decrementOrgRebuildCount(client.orgId);
|
||||
@@ -1826,12 +1894,15 @@ async function rebuildClientAssociationsFromClientImpl(
|
||||
|
||||
// Insert new associations
|
||||
if (resourcesToAdd.length > 0) {
|
||||
await trx.insert(clientSiteResourcesAssociationsCache).values(
|
||||
resourcesToAdd.map((siteResourceId) => ({
|
||||
clientId: client.clientId,
|
||||
siteResourceId
|
||||
}))
|
||||
);
|
||||
await trx
|
||||
.insert(clientSiteResourcesAssociationsCache)
|
||||
.values(
|
||||
resourcesToAdd.map((siteResourceId) => ({
|
||||
clientId: client.clientId,
|
||||
siteResourceId
|
||||
}))
|
||||
)
|
||||
.onConflictDoNothing();
|
||||
}
|
||||
|
||||
// Remove old associations
|
||||
@@ -1869,12 +1940,15 @@ async function rebuildClientAssociationsFromClientImpl(
|
||||
|
||||
// Insert new site associations
|
||||
if (sitesToAdd.length > 0) {
|
||||
await trx.insert(clientSitesAssociationsCache).values(
|
||||
sitesToAdd.map((siteId) => ({
|
||||
clientId: client.clientId,
|
||||
siteId
|
||||
}))
|
||||
);
|
||||
await trx
|
||||
.insert(clientSitesAssociationsCache)
|
||||
.values(
|
||||
sitesToAdd.map((siteId) => ({
|
||||
clientId: client.clientId,
|
||||
siteId
|
||||
}))
|
||||
)
|
||||
.onConflictDoNothing();
|
||||
}
|
||||
|
||||
// Remove old site associations
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
import logger from "@server/logger";
|
||||
import { isTransientError } from "@server/lib/dbRetry";
|
||||
|
||||
export type RebuildJobType = "site-resource" | "client";
|
||||
|
||||
export interface RebuildJob {
|
||||
type: RebuildJobType;
|
||||
id: number;
|
||||
// Number of times this job has already been re-queued after a transient
|
||||
// failure. Absent/0 means it has not failed yet.
|
||||
attempt?: number;
|
||||
}
|
||||
|
||||
export interface RebuildJobHandlers {
|
||||
@@ -24,6 +28,10 @@ export interface RebuildQueueManager {
|
||||
// retried shortly after against fresh DB state.
|
||||
const POLL_INTERVAL_MS = 500;
|
||||
const BATCH_SIZE = 5;
|
||||
// A job that fails with a transient DB error gets re-queued with backoff
|
||||
// instead of being dropped, up to this many times.
|
||||
const MAX_JOB_ATTEMPTS = 5;
|
||||
const JOB_RETRY_BASE_DELAY_MS = 1000;
|
||||
|
||||
function dedupeKey(job: RebuildJob): string {
|
||||
return `${job.type}:${job.id}`;
|
||||
@@ -106,10 +114,29 @@ class InMemoryRebuildQueue implements RebuildQueueManager {
|
||||
`Rebuild queue: completed ${job.type}:${job.id}`
|
||||
);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
|
||||
err
|
||||
);
|
||||
const attempt = (job.attempt ?? 0) + 1;
|
||||
if (isTransientError(err) && attempt <= MAX_JOB_ATTEMPTS) {
|
||||
const delay =
|
||||
JOB_RETRY_BASE_DELAY_MS * Math.pow(2, attempt - 1);
|
||||
logger.warn(
|
||||
`Rebuild queue: job ${job.type}:${job.id} hit a transient error (attempt ${attempt}/${MAX_JOB_ATTEMPTS}), re-queuing in ${delay}ms:`,
|
||||
err
|
||||
);
|
||||
setTimeout(() => {
|
||||
this.enqueue({ ...job, attempt }).catch(
|
||||
(enqueueErr) =>
|
||||
logger.error(
|
||||
`Rebuild queue: failed to re-queue ${job.type}:${job.id} after transient error:`,
|
||||
enqueueErr
|
||||
)
|
||||
);
|
||||
}, delay);
|
||||
} else {
|
||||
logger.error(
|
||||
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
import { redis } from "#private/lib/redis";
|
||||
import logger from "@server/logger";
|
||||
|
||||
export const ORG_REBUILD_CONCURRENCY_LIMIT = 5;
|
||||
export const ORG_REBUILD_CONCURRENCY_LIMIT = 10;
|
||||
|
||||
// Safety-net TTL: slightly longer than the rebuild lock TTL (120 s). If a
|
||||
// server process dies while holding a rebuild, this ensures the counter key
|
||||
|
||||
@@ -14,12 +14,16 @@
|
||||
import { redis } from "#private/lib/redis";
|
||||
import { lockManager } from "#private/lib/lock";
|
||||
import logger from "@server/logger";
|
||||
import { isTransientError } from "@server/lib/dbRetry";
|
||||
|
||||
export type RebuildJobType = "site-resource" | "client";
|
||||
|
||||
export interface RebuildJob {
|
||||
type: RebuildJobType;
|
||||
id: number;
|
||||
// Number of times this job has already been re-queued after a transient
|
||||
// failure. Absent/0 means it has not failed yet.
|
||||
attempt?: number;
|
||||
}
|
||||
|
||||
export interface RebuildJobHandlers {
|
||||
@@ -43,6 +47,11 @@ const PROCESSOR_LOCK_TTL_MS = 120000 * BATCH_SIZE + 30000; // ~630 s
|
||||
|
||||
const POLL_INTERVAL_MS = 500;
|
||||
|
||||
// A job that fails with a transient DB error gets re-queued with backoff
|
||||
// instead of being dropped, up to this many times.
|
||||
const MAX_JOB_ATTEMPTS = 5;
|
||||
const JOB_RETRY_BASE_DELAY_MS = 1000;
|
||||
|
||||
class RedisRebuildQueue {
|
||||
private processingStarted = false;
|
||||
|
||||
@@ -180,10 +189,33 @@ class RedisRebuildQueue {
|
||||
`Rebuild queue: completed ${job.type}:${job.id}`
|
||||
);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
|
||||
err
|
||||
);
|
||||
const attempt = (job.attempt ?? 0) + 1;
|
||||
if (
|
||||
isTransientError(err) &&
|
||||
attempt <= MAX_JOB_ATTEMPTS
|
||||
) {
|
||||
const delay =
|
||||
JOB_RETRY_BASE_DELAY_MS *
|
||||
Math.pow(2, attempt - 1);
|
||||
logger.warn(
|
||||
`Rebuild queue: job ${job.type}:${job.id} hit a transient error (attempt ${attempt}/${MAX_JOB_ATTEMPTS}), re-queuing in ${delay}ms:`,
|
||||
err
|
||||
);
|
||||
setTimeout(() => {
|
||||
this.enqueue({ ...job, attempt }).catch(
|
||||
(enqueueErr) =>
|
||||
logger.error(
|
||||
`Rebuild queue: failed to re-queue ${job.type}:${job.id} after transient error:`,
|
||||
enqueueErr
|
||||
)
|
||||
);
|
||||
}, delay);
|
||||
} else {
|
||||
logger.error(
|
||||
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
@@ -894,6 +894,19 @@ class RegionalRedisManager {
|
||||
return opts;
|
||||
}
|
||||
|
||||
// The regional Redis StatefulSet's "redis" service pins to pod redis-0
|
||||
// (primary). The replica (redis-1) is only reachable through the
|
||||
// per-pod headless service: <svc>.<namespace>.svc.cluster.local ->
|
||||
// redis-1.redis-headless.<namespace>.svc.cluster.local. Returns null
|
||||
// if the configured host doesn't match that pattern (e.g. local dev),
|
||||
// in which case callers should fall back to the primary for reads.
|
||||
private getReplicaHost(primaryHost: string): string | null {
|
||||
const match = primaryHost.match(/^redis\.([^.]+)\.svc\.cluster\.local$/);
|
||||
if (!match) return null;
|
||||
const namespace = match[1];
|
||||
return `redis-1.redis-headless.${namespace}.svc.cluster.local`;
|
||||
}
|
||||
|
||||
private initializeClients(): void {
|
||||
const cfg = this.getConfig();
|
||||
const baseOpts = {
|
||||
@@ -907,35 +920,42 @@ class RegionalRedisManager {
|
||||
|
||||
try {
|
||||
this.writeClient = new Redis(baseOpts);
|
||||
// redis-1 (replica) handles reads; fall back to primary if not resolvable
|
||||
this.readClient = new Redis({
|
||||
...baseOpts,
|
||||
host: cfg.host!.replace(/^(.*?)(\.\S+)$/, (_, h, rest) => {
|
||||
// Derive replica hostname from the headless service pattern:
|
||||
// redis.redis.svc.cluster.local -> redis-1.redis-headless.redis.svc.cluster.local
|
||||
// If it doesn't look like a k8s service, just use the same host
|
||||
return h + rest;
|
||||
})
|
||||
});
|
||||
|
||||
// For simplicity use same host for both; callers can always read from primary
|
||||
// The real replica routing is handled by the StatefulSet headless service
|
||||
this.readClient = this.writeClient;
|
||||
const replicaHost = this.getReplicaHost(cfg.host!);
|
||||
this.readClient = replicaHost
|
||||
? new Redis({ ...baseOpts, host: replicaHost })
|
||||
: this.writeClient;
|
||||
|
||||
this.writeClient.on("ready", () => {
|
||||
logger.info("Regional Redis client ready");
|
||||
logger.info("Regional Redis write client ready");
|
||||
this.isHealthy = true;
|
||||
});
|
||||
this.writeClient.on("error", (err) => {
|
||||
logger.error("Regional Redis client error:", err);
|
||||
logger.error("Regional Redis write client error:", err);
|
||||
this.isHealthy = false;
|
||||
});
|
||||
this.writeClient.on("reconnecting", () => {
|
||||
logger.info("Regional Redis client reconnecting...");
|
||||
logger.info("Regional Redis write client reconnecting...");
|
||||
this.isHealthy = false;
|
||||
});
|
||||
|
||||
logger.info("Regional Redis client initialized");
|
||||
if (this.readClient !== this.writeClient) {
|
||||
this.readClient.on("ready", () => {
|
||||
logger.info("Regional Redis read client ready");
|
||||
});
|
||||
this.readClient.on("error", (err) => {
|
||||
logger.error("Regional Redis read client error:", err);
|
||||
});
|
||||
this.readClient.on("reconnecting", () => {
|
||||
logger.info("Regional Redis read client reconnecting...");
|
||||
});
|
||||
}
|
||||
|
||||
logger.info(
|
||||
replicaHost
|
||||
? `Regional Redis client initialized (reads routed to replica ${replicaHost})`
|
||||
: "Regional Redis client initialized (no replica resolvable, reads routed to primary)"
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error("Failed to initialize regional Redis client:", error);
|
||||
this.isEnabled = false;
|
||||
@@ -1041,11 +1061,14 @@ class RegionalRedisManager {
|
||||
|
||||
public async disconnect(): Promise<void> {
|
||||
try {
|
||||
if (this.readClient && this.readClient !== this.writeClient) {
|
||||
await this.readClient.quit();
|
||||
}
|
||||
this.readClient = null;
|
||||
if (this.writeClient) {
|
||||
await this.writeClient.quit();
|
||||
this.writeClient = null;
|
||||
}
|
||||
this.readClient = null;
|
||||
logger.info("Regional Redis client disconnected");
|
||||
} catch (error) {
|
||||
logger.error("Error disconnecting regional Redis client:", error);
|
||||
|
||||
@@ -29,37 +29,41 @@ export async function createExitNode(
|
||||
.where(eq(exitNodes.publicKey, publicKey));
|
||||
let exitNode: ExitNode;
|
||||
if (!exitNodeQuery) {
|
||||
const address = await getNextAvailableSubnet();
|
||||
// TODO: eventually we will want to get the next available port so that we can multiple exit nodes
|
||||
// const listenPort = await getNextAvailablePort();
|
||||
const listenPort = config.getRawConfig().gerbil.start_port;
|
||||
let subEndpoint = "";
|
||||
if (config.getRawConfig().gerbil.use_subdomain) {
|
||||
subEndpoint = await getUniqueExitNodeEndpointName();
|
||||
const { value: address, release } = await getNextAvailableSubnet();
|
||||
try {
|
||||
// TODO: eventually we will want to get the next available port so that we can multiple exit nodes
|
||||
// const listenPort = await getNextAvailablePort();
|
||||
const listenPort = config.getRawConfig().gerbil.start_port;
|
||||
let subEndpoint = "";
|
||||
if (config.getRawConfig().gerbil.use_subdomain) {
|
||||
subEndpoint = await getUniqueExitNodeEndpointName();
|
||||
}
|
||||
|
||||
const exitNodeName =
|
||||
config.getRawConfig().gerbil.exit_node_name ||
|
||||
`Exit Node ${publicKey.slice(0, 8)}`;
|
||||
|
||||
// create a new exit node
|
||||
[exitNode] = await db
|
||||
.insert(exitNodes)
|
||||
.values({
|
||||
publicKey,
|
||||
endpoint: `${subEndpoint}${subEndpoint != "" ? "." : ""}${config.getRawConfig().gerbil.base_endpoint}`,
|
||||
address,
|
||||
listenPort,
|
||||
online: true,
|
||||
reachableAt,
|
||||
name: exitNodeName
|
||||
})
|
||||
.returning()
|
||||
.execute();
|
||||
|
||||
logger.info(
|
||||
`Created new exit node ${exitNode.name} with address ${exitNode.address} and port ${exitNode.listenPort}`
|
||||
);
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
|
||||
const exitNodeName =
|
||||
config.getRawConfig().gerbil.exit_node_name ||
|
||||
`Exit Node ${publicKey.slice(0, 8)}`;
|
||||
|
||||
// create a new exit node
|
||||
[exitNode] = await db
|
||||
.insert(exitNodes)
|
||||
.values({
|
||||
publicKey,
|
||||
endpoint: `${subEndpoint}${subEndpoint != "" ? "." : ""}${config.getRawConfig().gerbil.base_endpoint}`,
|
||||
address,
|
||||
listenPort,
|
||||
online: true,
|
||||
reachableAt,
|
||||
name: exitNodeName
|
||||
})
|
||||
.returning()
|
||||
.execute();
|
||||
|
||||
logger.info(
|
||||
`Created new exit node ${exitNode.name} with address ${exitNode.address} and port ${exitNode.listenPort}`
|
||||
);
|
||||
} else {
|
||||
// update the reachable at
|
||||
[exitNode] = await db
|
||||
|
||||
@@ -114,8 +114,6 @@ export async function createRemoteExitNode(
|
||||
}
|
||||
|
||||
const secretHash = await hashPassword(secret);
|
||||
// const address = await getNextAvailableSubnet();
|
||||
const address = "100.89.140.1/24"; // FOR NOW LETS HARDCODE THESE ADDRESSES
|
||||
|
||||
const [existingRemoteExitNode] = await db
|
||||
.select()
|
||||
@@ -191,89 +189,106 @@ export async function createRemoteExitNode(
|
||||
);
|
||||
}
|
||||
|
||||
await db.transaction(async (trx) => {
|
||||
if (!existingExitNode) {
|
||||
const [res] = await trx
|
||||
.insert(exitNodes)
|
||||
.values({
|
||||
name: remoteExitNodeId,
|
||||
address,
|
||||
endpoint: "",
|
||||
publicKey: "",
|
||||
listenPort: 0,
|
||||
online: false,
|
||||
type: "remoteExitNode"
|
||||
})
|
||||
.returning();
|
||||
existingExitNode = res;
|
||||
}
|
||||
// If this remote exit node isn't already backing an exit node in
|
||||
// another org, we're about to create a brand new one. Reserve a
|
||||
// subnet for it up front so the allocation lock is held across the
|
||||
// whole insert - this guarantees exit node subnets never overlap,
|
||||
// even under concurrent creation, which matters for HA setups.
|
||||
let releaseSubnetLock: (() => Promise<void>) | null = null;
|
||||
let newExitNodeAddress: string | null = null;
|
||||
if (!existingExitNode) {
|
||||
const { value, release } = await getNextAvailableSubnet();
|
||||
newExitNodeAddress = value;
|
||||
releaseSubnetLock = release;
|
||||
}
|
||||
|
||||
if (!existingRemoteExitNode) {
|
||||
await trx.insert(remoteExitNodes).values({
|
||||
remoteExitNodeId: remoteExitNodeId,
|
||||
secretHash,
|
||||
dateCreated: moment().toISOString(),
|
||||
exitNodeId: existingExitNode.exitNodeId
|
||||
});
|
||||
} else {
|
||||
// update the existing remote exit node
|
||||
await trx
|
||||
.update(remoteExitNodes)
|
||||
.set({
|
||||
exitNodeId: existingExitNode.exitNodeId
|
||||
})
|
||||
.where(
|
||||
eq(
|
||||
remoteExitNodes.remoteExitNodeId,
|
||||
existingRemoteExitNode.remoteExitNodeId
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
if (!existingExitNodeOrg) {
|
||||
await trx.insert(exitNodeOrgs).values({
|
||||
exitNodeId: existingExitNode.exitNodeId,
|
||||
orgId: orgId
|
||||
});
|
||||
}
|
||||
|
||||
// calculate if the node is in any other of the orgs before we count it as an add to the billing org
|
||||
if (org.billingOrgId) {
|
||||
const otherBillingOrgs = await trx
|
||||
.select()
|
||||
.from(orgs)
|
||||
.where(
|
||||
and(
|
||||
eq(orgs.billingOrgId, org.billingOrgId),
|
||||
ne(orgs.orgId, orgId)
|
||||
)
|
||||
);
|
||||
|
||||
const billingOrgIds = otherBillingOrgs.map((o) => o.orgId);
|
||||
|
||||
const orgsInBillingDomainThatTheNodeIsStillIn = await trx
|
||||
.select()
|
||||
.from(exitNodeOrgs)
|
||||
.where(
|
||||
and(
|
||||
eq(
|
||||
exitNodeOrgs.exitNodeId,
|
||||
existingExitNode.exitNodeId
|
||||
),
|
||||
inArray(exitNodeOrgs.orgId, billingOrgIds)
|
||||
)
|
||||
);
|
||||
|
||||
if (orgsInBillingDomainThatTheNodeIsStillIn.length === 0) {
|
||||
await usageService.add(
|
||||
orgId,
|
||||
LimitId.REMOTE_EXIT_NODES,
|
||||
1,
|
||||
trx
|
||||
);
|
||||
try {
|
||||
await db.transaction(async (trx) => {
|
||||
if (!existingExitNode) {
|
||||
const [res] = await trx
|
||||
.insert(exitNodes)
|
||||
.values({
|
||||
name: remoteExitNodeId,
|
||||
address: newExitNodeAddress!,
|
||||
endpoint: "",
|
||||
publicKey: "",
|
||||
listenPort: 0,
|
||||
online: false,
|
||||
type: "remoteExitNode"
|
||||
})
|
||||
.returning();
|
||||
existingExitNode = res;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (!existingRemoteExitNode) {
|
||||
await trx.insert(remoteExitNodes).values({
|
||||
remoteExitNodeId: remoteExitNodeId,
|
||||
secretHash,
|
||||
dateCreated: moment().toISOString(),
|
||||
exitNodeId: existingExitNode.exitNodeId
|
||||
});
|
||||
} else {
|
||||
// update the existing remote exit node
|
||||
await trx
|
||||
.update(remoteExitNodes)
|
||||
.set({
|
||||
exitNodeId: existingExitNode.exitNodeId
|
||||
})
|
||||
.where(
|
||||
eq(
|
||||
remoteExitNodes.remoteExitNodeId,
|
||||
existingRemoteExitNode.remoteExitNodeId
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
if (!existingExitNodeOrg) {
|
||||
await trx.insert(exitNodeOrgs).values({
|
||||
exitNodeId: existingExitNode.exitNodeId,
|
||||
orgId: orgId
|
||||
});
|
||||
}
|
||||
|
||||
// calculate if the node is in any other of the orgs before we count it as an add to the billing org
|
||||
if (org.billingOrgId) {
|
||||
const otherBillingOrgs = await trx
|
||||
.select()
|
||||
.from(orgs)
|
||||
.where(
|
||||
and(
|
||||
eq(orgs.billingOrgId, org.billingOrgId),
|
||||
ne(orgs.orgId, orgId)
|
||||
)
|
||||
);
|
||||
|
||||
const billingOrgIds = otherBillingOrgs.map((o) => o.orgId);
|
||||
|
||||
const orgsInBillingDomainThatTheNodeIsStillIn = await trx
|
||||
.select()
|
||||
.from(exitNodeOrgs)
|
||||
.where(
|
||||
and(
|
||||
eq(
|
||||
exitNodeOrgs.exitNodeId,
|
||||
existingExitNode.exitNodeId
|
||||
),
|
||||
inArray(exitNodeOrgs.orgId, billingOrgIds)
|
||||
)
|
||||
);
|
||||
|
||||
if (orgsInBillingDomainThatTheNodeIsStillIn.length === 0) {
|
||||
await usageService.add(
|
||||
orgId,
|
||||
LimitId.REMOTE_EXIT_NODES,
|
||||
1,
|
||||
trx
|
||||
);
|
||||
}
|
||||
}
|
||||
});
|
||||
} finally {
|
||||
await releaseSubnetLock?.();
|
||||
}
|
||||
|
||||
const token = generateSessionToken();
|
||||
await createRemoteExitNodeSession(token, remoteExitNodeId);
|
||||
|
||||
@@ -13,37 +13,41 @@ export async function createExitNode(
|
||||
const [exitNodeQuery] = await db.select().from(exitNodes).limit(1);
|
||||
let exitNode: ExitNode;
|
||||
if (!exitNodeQuery) {
|
||||
const address = await getNextAvailableSubnet();
|
||||
// TODO: eventually we will want to get the next available port so that we can multiple exit nodes
|
||||
// const listenPort = await getNextAvailablePort();
|
||||
const listenPort = config.getRawConfig().gerbil.start_port;
|
||||
let subEndpoint = "";
|
||||
if (config.getRawConfig().gerbil.use_subdomain) {
|
||||
subEndpoint = await getUniqueExitNodeEndpointName();
|
||||
const { value: address, release } = await getNextAvailableSubnet();
|
||||
try {
|
||||
// TODO: eventually we will want to get the next available port so that we can multiple exit nodes
|
||||
// const listenPort = await getNextAvailablePort();
|
||||
const listenPort = config.getRawConfig().gerbil.start_port;
|
||||
let subEndpoint = "";
|
||||
if (config.getRawConfig().gerbil.use_subdomain) {
|
||||
subEndpoint = await getUniqueExitNodeEndpointName();
|
||||
}
|
||||
|
||||
const exitNodeName =
|
||||
config.getRawConfig().gerbil.exit_node_name ||
|
||||
`Exit Node ${publicKey.slice(0, 8)}`;
|
||||
|
||||
// create a new exit node
|
||||
[exitNode] = await db
|
||||
.insert(exitNodes)
|
||||
.values({
|
||||
publicKey,
|
||||
endpoint: `${subEndpoint}${subEndpoint != "" ? "." : ""}${config.getRawConfig().gerbil.base_endpoint}`,
|
||||
address,
|
||||
online: true,
|
||||
listenPort,
|
||||
reachableAt,
|
||||
name: exitNodeName
|
||||
})
|
||||
.returning()
|
||||
.execute();
|
||||
|
||||
logger.info(
|
||||
`Created new exit node ${exitNode.name} with address ${exitNode.address} and port ${exitNode.listenPort}`
|
||||
);
|
||||
} finally {
|
||||
await release();
|
||||
}
|
||||
|
||||
const exitNodeName =
|
||||
config.getRawConfig().gerbil.exit_node_name ||
|
||||
`Exit Node ${publicKey.slice(0, 8)}`;
|
||||
|
||||
// create a new exit node
|
||||
[exitNode] = await db
|
||||
.insert(exitNodes)
|
||||
.values({
|
||||
publicKey,
|
||||
endpoint: `${subEndpoint}${subEndpoint != "" ? "." : ""}${config.getRawConfig().gerbil.base_endpoint}`,
|
||||
address,
|
||||
online: true,
|
||||
listenPort,
|
||||
reachableAt,
|
||||
name: exitNodeName
|
||||
})
|
||||
.returning()
|
||||
.execute();
|
||||
|
||||
logger.info(
|
||||
`Created new exit node ${exitNode.name} with address ${exitNode.address} and port ${exitNode.listenPort}`
|
||||
);
|
||||
} else {
|
||||
// update the existing exit node
|
||||
[exitNode] = await db
|
||||
|
||||
@@ -52,13 +52,13 @@ export async function buildClientConfigurationForNewtClient(
|
||||
clientsRes
|
||||
.filter((client) => {
|
||||
if (!client.clients.pubKey) {
|
||||
logger.warn(
|
||||
logger.debug(
|
||||
`Client ${client.clients.clientId} has no public key, skipping`
|
||||
);
|
||||
return false;
|
||||
}
|
||||
if (!client.clients.subnet) {
|
||||
logger.warn(
|
||||
logger.debug(
|
||||
`Client ${client.clients.clientId} has no subnet, skipping`
|
||||
);
|
||||
return false;
|
||||
|
||||
@@ -19,7 +19,7 @@ export const handleNewtDisconnectingMessage: MessageHandler = async (
|
||||
}
|
||||
|
||||
if (!newt.siteId) {
|
||||
logger.warn("Newt has no client ID!");
|
||||
logger.warn("Newt has no site ID!");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -34,6 +34,12 @@ export const handleNewtDisconnectingMessage: MessageHandler = async (
|
||||
.where(eq(sites.siteId, newt.siteId!))
|
||||
.returning();
|
||||
|
||||
if (!site) {
|
||||
throw new Error(
|
||||
`Could not find site ${newt.siteId} to update disconnection from disconnect message`
|
||||
);
|
||||
}
|
||||
|
||||
await fireSiteOfflineAlert(
|
||||
site.orgId,
|
||||
site.siteId,
|
||||
@@ -43,6 +49,6 @@ export const handleNewtDisconnectingMessage: MessageHandler = async (
|
||||
);
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error("Error handling disconnecting message", { error });
|
||||
logger.error("Error handling site disconnecting message", error);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -3,6 +3,7 @@ import { sites, clients, olms } from "@server/db";
|
||||
import { and, eq, inArray } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { fireSiteOnlineAlert } from "@server/lib/alerts";
|
||||
import { withRetry } from "@server/lib/dbRetry";
|
||||
|
||||
/**
|
||||
* Ping Accumulator
|
||||
@@ -22,8 +23,6 @@ import { fireSiteOnlineAlert } from "@server/lib/alerts";
|
||||
*/
|
||||
|
||||
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
|
||||
const MAX_RETRIES = 5;
|
||||
const BASE_DELAY_MS = 50;
|
||||
|
||||
// ── Site (newt) pings ──────────────────────────────────────────────────
|
||||
// Map of siteId -> latest ping timestamp (unix seconds)
|
||||
@@ -266,85 +265,7 @@ export async function flushPingsToDb(): Promise<void> {
|
||||
}
|
||||
|
||||
// ── Retry / Error Helpers ──────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Simple retry wrapper with exponential backoff for transient errors
|
||||
* (deadlocks, connection timeouts, unexpected disconnects).
|
||||
*
|
||||
* PostgreSQL deadlocks (40P01) are always safe to retry: the database
|
||||
* guarantees exactly one winner per deadlock pair, so the loser just needs
|
||||
* to try again. MAX_RETRIES is intentionally higher than typical connection
|
||||
* retry budgets to give deadlock victims enough chances to succeed.
|
||||
*/
|
||||
async function withRetry<T>(
|
||||
operation: () => Promise<T>,
|
||||
context: string
|
||||
): Promise<T> {
|
||||
let attempt = 0;
|
||||
while (true) {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error: any) {
|
||||
if (isTransientError(error) && attempt < MAX_RETRIES) {
|
||||
attempt++;
|
||||
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
|
||||
const jitter = Math.random() * baseDelay;
|
||||
const delay = baseDelay + jitter;
|
||||
logger.warn(
|
||||
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`,
|
||||
{ code: error?.code ?? error?.cause?.code }
|
||||
);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
continue;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect transient errors that are safe to retry.
|
||||
*/
|
||||
function isTransientError(error: any): boolean {
|
||||
if (!error) return false;
|
||||
|
||||
const message = (error.message || "").toLowerCase();
|
||||
const causeMessage = (error.cause?.message || "").toLowerCase();
|
||||
const code = error.code || error.cause?.code || "";
|
||||
|
||||
// Connection timeout / terminated
|
||||
if (
|
||||
message.includes("connection timeout") ||
|
||||
message.includes("connection terminated") ||
|
||||
message.includes("timeout exceeded when trying to connect") ||
|
||||
causeMessage.includes("connection terminated unexpectedly") ||
|
||||
causeMessage.includes("connection timeout")
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// PostgreSQL deadlock detected - always safe to retry (one winner guaranteed)
|
||||
if (code === "40P01" || message.includes("deadlock")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// PostgreSQL serialization failure
|
||||
if (code === "40001") {
|
||||
return true;
|
||||
}
|
||||
|
||||
// ECONNRESET, ECONNREFUSED, EPIPE, ETIMEDOUT
|
||||
if (
|
||||
code === "ECONNRESET" ||
|
||||
code === "ECONNREFUSED" ||
|
||||
code === "EPIPE" ||
|
||||
code === "ETIMEDOUT"
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
// See @server/lib/dbRetry for the shared withRetry/isTransientError helpers.
|
||||
|
||||
// ── Lifecycle ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@@ -161,7 +161,7 @@ export async function buildSiteConfigurationForOlmClient(
|
||||
}
|
||||
|
||||
if (!site.subnet) {
|
||||
logger.warn(`Site ${site.siteId} has no subnet, skipping`);
|
||||
logger.debug(`Site ${site.siteId} has no subnet, skipping`);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,9 @@ import logger from "@server/logger";
|
||||
/**
|
||||
* Handles disconnecting messages from clients to show disconnected in the ui
|
||||
*/
|
||||
export const handleOlmDisconnectingMessage: MessageHandler = async (context) => {
|
||||
export const handleOlmDisconnectingMessage: MessageHandler = async (
|
||||
context
|
||||
) => {
|
||||
const { message, client: c, sendToClient } = context;
|
||||
const olm = c as Olm;
|
||||
|
||||
@@ -29,6 +31,6 @@ export const handleOlmDisconnectingMessage: MessageHandler = async (context) =>
|
||||
})
|
||||
.where(eq(clients.clientId, olm.clientId));
|
||||
} catch (error) {
|
||||
logger.error("Error handling disconnecting message", { error });
|
||||
logger.error("Error handling client disconnecting message", error);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -268,7 +268,11 @@ export async function createSite(
|
||||
|
||||
let newSite: Site | undefined;
|
||||
try {
|
||||
if (subnet && exitNodeId) {
|
||||
if (type === "wireguard" && subnet && exitNodeId) {
|
||||
// Only wireguard sites actually persist the provided subnet/exitNodeId.
|
||||
// Newt sites have their subnet/exit node chosen (under a lock) when the
|
||||
// newt connects, so validating them here is both unnecessary and racy,
|
||||
// since pickSiteDefaults does not lock the subnet it suggests.
|
||||
//make sure the subnet is in the range of the exit node if provided
|
||||
const [exitNode] = await db
|
||||
.select()
|
||||
|
||||
@@ -290,7 +290,13 @@ export default function BillingPage() {
|
||||
setHasSubscription(
|
||||
tierSub.subscription.status === "active"
|
||||
);
|
||||
setIsTrial(tierSub.subscription.expiresAt != null);
|
||||
// expiresAt is only meaningful while the trial hasn't
|
||||
// actually run out yet; a stale row with a past
|
||||
// expiresAt should no longer be treated as a live trial
|
||||
const expiresAt = tierSub.subscription.expiresAt;
|
||||
setIsTrial(
|
||||
expiresAt != null && expiresAt * 1000 > Date.now()
|
||||
);
|
||||
}
|
||||
|
||||
// Find license subscription
|
||||
|
||||
Reference in New Issue
Block a user