mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-17 04:12:45 +00:00
Merge branch 'dev' into batch-add-client-to-resources
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
|
||||
|
||||
async function cleanup() {
|
||||
await flushBandwidthToDb();
|
||||
await flushSiteBandwidthToDb();
|
||||
await wsCleanup();
|
||||
|
||||
process.exit(0);
|
||||
@@ -10,4 +14,4 @@ export async function initCleanup() {
|
||||
// Handle process termination
|
||||
process.on("SIGTERM", () => cleanup());
|
||||
process.on("SIGINT", () => cleanup());
|
||||
}
|
||||
}
|
||||
@@ -328,6 +328,14 @@ export const approvals = pgTable("approvals", {
|
||||
.notNull()
|
||||
});
|
||||
|
||||
export const bannedEmails = pgTable("bannedEmails", {
|
||||
email: varchar("email", { length: 255 }).primaryKey(),
|
||||
});
|
||||
|
||||
export const bannedIps = pgTable("bannedIps", {
|
||||
ip: varchar("ip", { length: 255 }).primaryKey(),
|
||||
});
|
||||
|
||||
export type Approval = InferSelectModel<typeof approvals>;
|
||||
export type Limit = InferSelectModel<typeof limits>;
|
||||
export type Account = InferSelectModel<typeof account>;
|
||||
|
||||
@@ -22,7 +22,8 @@ export const domains = pgTable("domains", {
|
||||
tries: integer("tries").notNull().default(0),
|
||||
certResolver: varchar("certResolver"),
|
||||
customCertResolver: varchar("customCertResolver"),
|
||||
preferWildcardCert: boolean("preferWildcardCert")
|
||||
preferWildcardCert: boolean("preferWildcardCert"),
|
||||
errorMessage: text("errorMessage")
|
||||
});
|
||||
|
||||
export const dnsRecords = pgTable("dnsRecords", {
|
||||
@@ -88,6 +89,7 @@ export const sites = pgTable("sites", {
|
||||
lastBandwidthUpdate: varchar("lastBandwidthUpdate"),
|
||||
type: varchar("type").notNull(), // "newt" or "wireguard"
|
||||
online: boolean("online").notNull().default(false),
|
||||
lastPing: integer("lastPing"),
|
||||
address: varchar("address"),
|
||||
endpoint: varchar("endpoint"),
|
||||
publicKey: varchar("publicKey"),
|
||||
@@ -720,6 +722,7 @@ export const clientSitesAssociationsCache = pgTable(
|
||||
.notNull(),
|
||||
siteId: integer("siteId").notNull(),
|
||||
isRelayed: boolean("isRelayed").notNull().default(false),
|
||||
isJitMode: boolean("isJitMode").notNull().default(false),
|
||||
endpoint: varchar("endpoint"),
|
||||
publicKey: varchar("publicKey") // this will act as the session's public key for hole punching so we can track when it changes
|
||||
}
|
||||
|
||||
@@ -318,6 +318,15 @@ export const approvals = sqliteTable("approvals", {
|
||||
.notNull()
|
||||
});
|
||||
|
||||
|
||||
export const bannedEmails = sqliteTable("bannedEmails", {
|
||||
email: text("email").primaryKey()
|
||||
});
|
||||
|
||||
export const bannedIps = sqliteTable("bannedIps", {
|
||||
ip: text("ip").primaryKey()
|
||||
});
|
||||
|
||||
export type Approval = InferSelectModel<typeof approvals>;
|
||||
export type Limit = InferSelectModel<typeof limits>;
|
||||
export type Account = InferSelectModel<typeof account>;
|
||||
|
||||
@@ -13,7 +13,8 @@ export const domains = sqliteTable("domains", {
|
||||
failed: integer("failed", { mode: "boolean" }).notNull().default(false),
|
||||
tries: integer("tries").notNull().default(0),
|
||||
certResolver: text("certResolver"),
|
||||
preferWildcardCert: integer("preferWildcardCert", { mode: "boolean" })
|
||||
preferWildcardCert: integer("preferWildcardCert", { mode: "boolean" }),
|
||||
errorMessage: text("errorMessage")
|
||||
});
|
||||
|
||||
export const dnsRecords = sqliteTable("dnsRecords", {
|
||||
@@ -89,6 +90,7 @@ export const sites = sqliteTable("sites", {
|
||||
lastBandwidthUpdate: text("lastBandwidthUpdate"),
|
||||
type: text("type").notNull(), // "newt" or "wireguard"
|
||||
online: integer("online", { mode: "boolean" }).notNull().default(false),
|
||||
lastPing: integer("lastPing"),
|
||||
|
||||
// exit node stuff that is how to connect to the site when it has a wg server
|
||||
address: text("address"), // this is the address of the wireguard interface in newt
|
||||
@@ -409,6 +411,9 @@ export const clientSitesAssociationsCache = sqliteTable(
|
||||
isRelayed: integer("isRelayed", { mode: "boolean" })
|
||||
.notNull()
|
||||
.default(false),
|
||||
isJitMode: integer("isJitMode", { mode: "boolean" })
|
||||
.notNull()
|
||||
.default(false),
|
||||
endpoint: text("endpoint"),
|
||||
publicKey: text("publicKey") // this will act as the session's public key for hole punching so we can track when it changes
|
||||
}
|
||||
|
||||
@@ -107,7 +107,7 @@ export async function applyBlueprint({
|
||||
[target],
|
||||
matchingHealthcheck ? [matchingHealthcheck] : [],
|
||||
result.proxyResource.protocol,
|
||||
result.proxyResource.proxyPort
|
||||
site.newt.version
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,8 +4,12 @@ import { cleanUpOldLogs as cleanUpOldActionLogs } from "#dynamic/middlewares/log
|
||||
import { cleanUpOldLogs as cleanUpOldRequestLogs } from "@server/routers/badger/logRequestAudit";
|
||||
import { gt, or } from "drizzle-orm";
|
||||
import { cleanUpOldFingerprintSnapshots } from "@server/routers/olm/fingerprintingUtils";
|
||||
import { build } from "@server/build";
|
||||
|
||||
export function initLogCleanupInterval() {
|
||||
if (build == "saas") { // skip log cleanup for saas builds
|
||||
return null;
|
||||
}
|
||||
return setInterval(
|
||||
async () => {
|
||||
const orgsToClean = await db
|
||||
|
||||
20
server/lib/clientVersionChecks.ts
Normal file
20
server/lib/clientVersionChecks.ts
Normal file
@@ -0,0 +1,20 @@
|
||||
import semver from "semver";
|
||||
|
||||
export function canCompress(
|
||||
clientVersion: string | null | undefined,
|
||||
type: "newt" | "olm"
|
||||
): boolean {
|
||||
try {
|
||||
if (!clientVersion) return false;
|
||||
// check if it is a valid semver
|
||||
if (!semver.valid(clientVersion)) return false;
|
||||
if (type === "newt") {
|
||||
return semver.gte(clientVersion, "1.10.3");
|
||||
} else if (type === "olm") {
|
||||
return semver.gte(clientVersion, "1.4.3");
|
||||
}
|
||||
return false;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -85,9 +85,7 @@ export async function deleteOrgById(
|
||||
deletedNewtIds.push(deletedNewt.newtId);
|
||||
await trx
|
||||
.delete(newtSessions)
|
||||
.where(
|
||||
eq(newtSessions.newtId, deletedNewt.newtId)
|
||||
);
|
||||
.where(eq(newtSessions.newtId, deletedNewt.newtId));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -121,33 +119,38 @@ export async function deleteOrgById(
|
||||
eq(clientSitesAssociationsCache.clientId, client.clientId)
|
||||
);
|
||||
}
|
||||
|
||||
await trx.delete(resources).where(eq(resources.orgId, orgId));
|
||||
|
||||
const allOrgDomains = await trx
|
||||
.select()
|
||||
.from(orgDomains)
|
||||
.innerJoin(domains, eq(domains.domainId, orgDomains.domainId))
|
||||
.innerJoin(domains, eq(orgDomains.domainId, domains.domainId))
|
||||
.where(
|
||||
and(
|
||||
eq(orgDomains.orgId, orgId),
|
||||
eq(domains.configManaged, false)
|
||||
)
|
||||
);
|
||||
logger.info(`Found ${allOrgDomains.length} domains to delete`);
|
||||
const domainIdsToDelete: string[] = [];
|
||||
for (const orgDomain of allOrgDomains) {
|
||||
const domainId = orgDomain.domains.domainId;
|
||||
const orgCount = await trx
|
||||
.select({ count: sql<number>`count(*)` })
|
||||
const [orgCount] = await trx
|
||||
.select({ count: count() })
|
||||
.from(orgDomains)
|
||||
.where(eq(orgDomains.domainId, domainId));
|
||||
if (orgCount[0].count === 1) {
|
||||
logger.info(`Found ${orgCount.count} orgs using domain ${domainId}`);
|
||||
if (orgCount.count === 1) {
|
||||
domainIdsToDelete.push(domainId);
|
||||
}
|
||||
}
|
||||
logger.info(`Found ${domainIdsToDelete.length} domains to delete`);
|
||||
if (domainIdsToDelete.length > 0) {
|
||||
await trx
|
||||
.delete(domains)
|
||||
.where(inArray(domains.domainId, domainIdsToDelete));
|
||||
}
|
||||
await trx.delete(resources).where(eq(resources.orgId, orgId));
|
||||
|
||||
await usageService.add(orgId, FeatureId.ORGINIZATIONS, -1, trx); // here we are decreasing the org count BEFORE deleting the org because we need to still be able to get the org to get the billing org inside of here
|
||||
|
||||
@@ -231,15 +234,13 @@ export function sendTerminationMessages(result: DeleteOrgByIdResult): void {
|
||||
);
|
||||
}
|
||||
for (const olmId of result.olmsToTerminate) {
|
||||
sendTerminateClient(
|
||||
0,
|
||||
OlmErrorCodes.TERMINATED_REKEYED,
|
||||
olmId
|
||||
).catch((error) => {
|
||||
logger.error(
|
||||
"Failed to send termination message to olm:",
|
||||
error
|
||||
);
|
||||
});
|
||||
sendTerminateClient(0, OlmErrorCodes.TERMINATED_REKEYED, olmId).catch(
|
||||
(error) => {
|
||||
logger.error(
|
||||
"Failed to send termination message to olm:",
|
||||
error
|
||||
);
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -477,6 +477,7 @@ async function handleMessagesForSiteClients(
|
||||
}
|
||||
|
||||
if (isAdd) {
|
||||
// TODO: if we are in jit mode here should we really be sending this?
|
||||
await initPeerAddHandshake(
|
||||
// this will kick off the add peer process for the client
|
||||
client.clientId,
|
||||
@@ -571,7 +572,7 @@ export async function updateClientSiteDestinations(
|
||||
destinations: [
|
||||
{
|
||||
destinationIP: site.sites.subnet.split("/")[0],
|
||||
destinationPort: site.sites.listenPort || 0
|
||||
destinationPort: site.sites.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
|
||||
}
|
||||
]
|
||||
};
|
||||
@@ -579,7 +580,7 @@ export async function updateClientSiteDestinations(
|
||||
// add to the existing destinations
|
||||
destinations.destinations.push({
|
||||
destinationIP: site.sites.subnet.split("/")[0],
|
||||
destinationPort: site.sites.listenPort || 0
|
||||
destinationPort: site.sites.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
|
||||
});
|
||||
}
|
||||
|
||||
@@ -669,7 +670,11 @@ async function handleSubnetProxyTargetUpdates(
|
||||
`Adding ${targetsToAdd.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}`
|
||||
);
|
||||
proxyJobs.push(
|
||||
addSubnetProxyTargets(newt.newtId, targetsToAdd)
|
||||
addSubnetProxyTargets(
|
||||
newt.newtId,
|
||||
targetsToAdd,
|
||||
newt.version
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -705,7 +710,11 @@ async function handleSubnetProxyTargetUpdates(
|
||||
`Removing ${targetsToRemove.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}`
|
||||
);
|
||||
proxyJobs.push(
|
||||
removeSubnetProxyTargets(newt.newtId, targetsToRemove)
|
||||
removeSubnetProxyTargets(
|
||||
newt.newtId,
|
||||
targetsToRemove,
|
||||
newt.version
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1080,6 +1089,7 @@ async function handleMessagesForClientSites(
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: if we are in jit mode here should we really be sending this?
|
||||
await initPeerAddHandshake(
|
||||
// this will kick off the add peer process for the client
|
||||
client.clientId,
|
||||
@@ -1146,7 +1156,7 @@ async function handleMessagesForClientResources(
|
||||
// Add subnet proxy targets for each site
|
||||
for (const [siteId, resources] of addedBySite.entries()) {
|
||||
const [newt] = await trx
|
||||
.select({ newtId: newts.newtId })
|
||||
.select({ newtId: newts.newtId, version: newts.version })
|
||||
.from(newts)
|
||||
.where(eq(newts.siteId, siteId))
|
||||
.limit(1);
|
||||
@@ -1168,7 +1178,13 @@ async function handleMessagesForClientResources(
|
||||
]);
|
||||
|
||||
if (targets.length > 0) {
|
||||
proxyJobs.push(addSubnetProxyTargets(newt.newtId, targets));
|
||||
proxyJobs.push(
|
||||
addSubnetProxyTargets(
|
||||
newt.newtId,
|
||||
targets,
|
||||
newt.version
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
try {
|
||||
@@ -1217,7 +1233,7 @@ async function handleMessagesForClientResources(
|
||||
// Remove subnet proxy targets for each site
|
||||
for (const [siteId, resources] of removedBySite.entries()) {
|
||||
const [newt] = await trx
|
||||
.select({ newtId: newts.newtId })
|
||||
.select({ newtId: newts.newtId, version: newts.version })
|
||||
.from(newts)
|
||||
.where(eq(newts.siteId, siteId))
|
||||
.limit(1);
|
||||
@@ -1240,7 +1256,11 @@ async function handleMessagesForClientResources(
|
||||
|
||||
if (targets.length > 0) {
|
||||
proxyJobs.push(
|
||||
removeSubnetProxyTargets(newt.newtId, targets)
|
||||
removeSubnetProxyTargets(
|
||||
newt.newtId,
|
||||
targets,
|
||||
newt.version
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -218,10 +218,11 @@ export class TraefikConfigManager {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Fetch if it's been more than 24 hours (for renewals)
|
||||
const dayInMs = 24 * 60 * 60 * 1000;
|
||||
const timeSinceLastFetch =
|
||||
Date.now() - this.lastCertificateFetch.getTime();
|
||||
|
||||
// Fetch if it's been more than 24 hours (daily routine check)
|
||||
if (timeSinceLastFetch > dayInMs) {
|
||||
logger.info("Fetching certificates due to 24-hour renewal check");
|
||||
return true;
|
||||
@@ -265,7 +266,7 @@ export class TraefikConfigManager {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check if any local certificates are missing or appear to be outdated
|
||||
// Check if any local certificates are missing (needs immediate fetch)
|
||||
for (const domain of domainsNeedingCerts) {
|
||||
const localState = this.lastLocalCertificateState.get(domain);
|
||||
if (!localState || !localState.exists) {
|
||||
@@ -274,17 +275,55 @@ export class TraefikConfigManager {
|
||||
);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if certificate is expiring soon (within 30 days)
|
||||
if (localState.expiresAt) {
|
||||
const nowInSeconds = Math.floor(Date.now() / 1000);
|
||||
const secondsUntilExpiry = localState.expiresAt - nowInSeconds;
|
||||
const daysUntilExpiry = secondsUntilExpiry / (60 * 60 * 24);
|
||||
if (daysUntilExpiry < 30) {
|
||||
logger.info(
|
||||
`Fetching certificates due to upcoming expiry for ${domain} (${Math.round(daysUntilExpiry)} days remaining)`
|
||||
);
|
||||
return true;
|
||||
// For expiry checks, throttle to every 6 hours to avoid querying the
|
||||
// API/DB on every monitor loop. The certificate-service renews certs
|
||||
// 45 days before expiry, so checking every 6 hours is plenty frequent
|
||||
// to pick up renewed certs promptly.
|
||||
const renewalCheckIntervalMs = 6 * 60 * 60 * 1000; // 6 hours
|
||||
if (timeSinceLastFetch > renewalCheckIntervalMs) {
|
||||
// Check non-wildcard certs for expiry (within 45 days to match
|
||||
// the server-side renewal window in certificate-service)
|
||||
for (const domain of domainsNeedingCerts) {
|
||||
const localState =
|
||||
this.lastLocalCertificateState.get(domain);
|
||||
if (localState?.expiresAt) {
|
||||
const nowInSeconds = Math.floor(Date.now() / 1000);
|
||||
const secondsUntilExpiry =
|
||||
localState.expiresAt - nowInSeconds;
|
||||
const daysUntilExpiry =
|
||||
secondsUntilExpiry / (60 * 60 * 24);
|
||||
if (daysUntilExpiry < 45) {
|
||||
logger.info(
|
||||
`Fetching certificates due to upcoming expiry for ${domain} (${Math.round(daysUntilExpiry)} days remaining)`
|
||||
);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Also check wildcard certificates for expiry. These are not
|
||||
// included in domainsNeedingCerts since their subdomains are
|
||||
// filtered out, so we must check them separately.
|
||||
for (const [certDomain, state] of this
|
||||
.lastLocalCertificateState) {
|
||||
if (
|
||||
state.exists &&
|
||||
state.wildcard &&
|
||||
state.expiresAt
|
||||
) {
|
||||
const nowInSeconds = Math.floor(Date.now() / 1000);
|
||||
const secondsUntilExpiry =
|
||||
state.expiresAt - nowInSeconds;
|
||||
const daysUntilExpiry =
|
||||
secondsUntilExpiry / (60 * 60 * 24);
|
||||
if (daysUntilExpiry < 45) {
|
||||
logger.info(
|
||||
`Fetching certificates due to upcoming expiry for wildcard cert ${certDomain} (${Math.round(daysUntilExpiry)} days remaining)`
|
||||
);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -361,6 +400,32 @@ export class TraefikConfigManager {
|
||||
}
|
||||
}
|
||||
|
||||
// Also include wildcard cert base domains that are
|
||||
// expiring or expired so they get re-fetched even though
|
||||
// their subdomains were filtered out above.
|
||||
for (const [certDomain, state] of this
|
||||
.lastLocalCertificateState) {
|
||||
if (
|
||||
state.exists &&
|
||||
state.wildcard &&
|
||||
state.expiresAt
|
||||
) {
|
||||
const nowInSeconds = Math.floor(
|
||||
Date.now() / 1000
|
||||
);
|
||||
const secondsUntilExpiry =
|
||||
state.expiresAt - nowInSeconds;
|
||||
const daysUntilExpiry =
|
||||
secondsUntilExpiry / (60 * 60 * 24);
|
||||
if (daysUntilExpiry < 45) {
|
||||
domainsToFetch.add(certDomain);
|
||||
logger.info(
|
||||
`Including expiring wildcard cert domain ${certDomain} in fetch (${Math.round(daysUntilExpiry)} days remaining)`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (domainsToFetch.size > 0) {
|
||||
// Get valid certificates for domains not covered by wildcards
|
||||
validCertificates =
|
||||
|
||||
@@ -13,8 +13,12 @@
|
||||
|
||||
import { rateLimitService } from "#private/lib/rateLimit";
|
||||
import { cleanup as wsCleanup } from "#private/routers/ws";
|
||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||
|
||||
async function cleanup() {
|
||||
await flushBandwidthToDb();
|
||||
await flushSiteBandwidthToDb();
|
||||
await rateLimitService.cleanup();
|
||||
await wsCleanup();
|
||||
|
||||
@@ -25,4 +29,4 @@ export async function initCleanup() {
|
||||
// Handle process termination
|
||||
process.on("SIGTERM", () => cleanup());
|
||||
process.on("SIGINT", () => cleanup());
|
||||
}
|
||||
}
|
||||
@@ -515,6 +515,6 @@ authenticated.post(
|
||||
verifyOrgAccess,
|
||||
verifyLimits,
|
||||
verifyUserHasAction(ActionsEnum.signSshKey),
|
||||
logActionAudit(ActionsEnum.signSshKey),
|
||||
// logActionAudit(ActionsEnum.signSshKey), // it is handled inside of the function below so we can include more metadata
|
||||
ssh.signSshKey
|
||||
);
|
||||
|
||||
@@ -14,7 +14,9 @@
|
||||
import { Request, Response, NextFunction } from "express";
|
||||
import { z } from "zod";
|
||||
import {
|
||||
actionAuditLog,
|
||||
db,
|
||||
logsDb,
|
||||
newts,
|
||||
roles,
|
||||
roundTripMessageTracker,
|
||||
@@ -29,12 +31,12 @@ import HttpCode from "@server/types/HttpCode";
|
||||
import createHttpError from "http-errors";
|
||||
import logger from "@server/logger";
|
||||
import { fromError } from "zod-validation-error";
|
||||
import { OpenAPITags, registry } from "@server/openApi";
|
||||
import { eq, or, and } from "drizzle-orm";
|
||||
import { canUserAccessSiteResource } from "@server/auth/canUserAccessSiteResource";
|
||||
import { signPublicKey, getOrgCAKeys } from "@server/lib/sshCA";
|
||||
import config from "@server/lib/config";
|
||||
import { sendToClient } from "#private/routers/ws";
|
||||
import { ActionsEnum } from "@server/auth/actions";
|
||||
|
||||
const paramsSchema = z.strictObject({
|
||||
orgId: z.string().nonempty()
|
||||
@@ -64,6 +66,7 @@ export type SignSshKeyResponse = {
|
||||
sshUsername: string;
|
||||
sshHost: string;
|
||||
resourceId: number;
|
||||
siteId: number;
|
||||
keyId: string;
|
||||
validPrincipals: string[];
|
||||
validAfter: string;
|
||||
@@ -446,6 +449,20 @@ export async function signSshKey(
|
||||
sshHost = resource.destination;
|
||||
}
|
||||
|
||||
await logsDb.insert(actionAuditLog).values({
|
||||
timestamp: Math.floor(Date.now() / 1000),
|
||||
orgId: orgId,
|
||||
actorType: "user",
|
||||
actor: req.user?.username ?? "",
|
||||
actorId: req.user?.userId ?? "",
|
||||
action: ActionsEnum.signSshKey,
|
||||
metadata: JSON.stringify({
|
||||
resourceId: resource.siteResourceId,
|
||||
resource: resource.name,
|
||||
siteId: resource.siteId,
|
||||
})
|
||||
});
|
||||
|
||||
return response<SignSshKeyResponse>(res, {
|
||||
data: {
|
||||
certificate: cert.certificate,
|
||||
@@ -453,6 +470,7 @@ export async function signSshKey(
|
||||
sshUsername: usernameToUse,
|
||||
sshHost: sshHost,
|
||||
resourceId: resource.siteResourceId,
|
||||
siteId: resource.siteId,
|
||||
keyId: cert.keyId,
|
||||
validPrincipals: cert.validPrincipals,
|
||||
validAfter: cert.validAfter.toISOString(),
|
||||
|
||||
@@ -17,10 +17,13 @@ import {
|
||||
startRemoteExitNodeOfflineChecker
|
||||
} from "#private/routers/remoteExitNode";
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { build } from "@server/build";
|
||||
|
||||
export const messageHandlers: Record<string, MessageHandler> = {
|
||||
"remoteExitNode/register": handleRemoteExitNodeRegisterMessage,
|
||||
"remoteExitNode/ping": handleRemoteExitNodePingMessage
|
||||
};
|
||||
|
||||
startRemoteExitNodeOfflineChecker(); // this is to handle the offline check for remote exit nodes
|
||||
if (build != "saas") {
|
||||
startRemoteExitNodeOfflineChecker(); // this is to handle the offline check for remote exit nodes
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
*/
|
||||
|
||||
import { Router, Request, Response } from "express";
|
||||
import zlib from "zlib";
|
||||
import { Server as HttpServer } from "http";
|
||||
import { WebSocket, WebSocketServer } from "ws";
|
||||
import { Socket } from "net";
|
||||
@@ -24,7 +25,8 @@ import {
|
||||
OlmSession,
|
||||
RemoteExitNode,
|
||||
RemoteExitNodeSession,
|
||||
remoteExitNodes
|
||||
remoteExitNodes,
|
||||
sites
|
||||
} from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { db } from "@server/db";
|
||||
@@ -57,11 +59,13 @@ const MAX_PENDING_MESSAGES = 50; // Maximum messages to queue during connection
|
||||
const processMessage = async (
|
||||
ws: AuthenticatedWebSocket,
|
||||
data: Buffer,
|
||||
isBinary: boolean,
|
||||
clientId: string,
|
||||
clientType: ClientType
|
||||
): Promise<void> => {
|
||||
try {
|
||||
const message: WSMessage = JSON.parse(data.toString());
|
||||
const messageBuffer = isBinary ? zlib.gunzipSync(data) : data;
|
||||
const message: WSMessage = JSON.parse(messageBuffer.toString());
|
||||
|
||||
// logger.debug(
|
||||
// `Processing message from ${clientType.toUpperCase()} ID: ${clientId}, type: ${message.type}`
|
||||
@@ -76,7 +80,7 @@ const processMessage = async (
|
||||
clientId,
|
||||
message.type, // Pass message type for granular limiting
|
||||
100, // max requests per window
|
||||
20, // max requests per message type per window
|
||||
100, // max requests per message type per window
|
||||
60 * 1000 // window in milliseconds
|
||||
);
|
||||
if (rateLimitResult.isLimited) {
|
||||
@@ -163,8 +167,16 @@ const processPendingMessages = async (
|
||||
);
|
||||
|
||||
const jobs = [];
|
||||
for (const messageData of ws.pendingMessages) {
|
||||
jobs.push(processMessage(ws, messageData, clientId, clientType));
|
||||
for (const pending of ws.pendingMessages) {
|
||||
jobs.push(
|
||||
processMessage(
|
||||
ws,
|
||||
pending.data,
|
||||
pending.isBinary,
|
||||
clientId,
|
||||
clientType
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
await Promise.all(jobs);
|
||||
@@ -185,6 +197,12 @@ const connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
|
||||
// Config version tracking map (local to this node, resets on server restart)
|
||||
const clientConfigVersions: Map<string, number> = new Map();
|
||||
|
||||
// Tracks the last Unix timestamp (seconds) at which a ping was flushed to the
|
||||
// DB for a given siteId. Resets on server restart which is fine – the first
|
||||
// ping after startup will always write, re-establishing the online state.
|
||||
const lastPingDbWrite: Map<number, number> = new Map();
|
||||
const PING_DB_WRITE_INTERVAL = 45; // seconds
|
||||
|
||||
// Recovery tracking
|
||||
let isRedisRecoveryInProgress = false;
|
||||
|
||||
@@ -325,7 +343,9 @@ const addClient = async (
|
||||
// Check Redis first if enabled
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
try {
|
||||
const redisVersion = await redisManager.get(getConfigVersionKey(clientId));
|
||||
const redisVersion = await redisManager.get(
|
||||
getConfigVersionKey(clientId)
|
||||
);
|
||||
if (redisVersion !== null) {
|
||||
configVersion = parseInt(redisVersion, 10);
|
||||
// Sync to local cache
|
||||
@@ -337,7 +357,10 @@ const addClient = async (
|
||||
} else {
|
||||
// Use local cache version and sync to Redis
|
||||
configVersion = clientConfigVersions.get(clientId) || 0;
|
||||
await redisManager.set(getConfigVersionKey(clientId), configVersion.toString());
|
||||
await redisManager.set(
|
||||
getConfigVersionKey(clientId),
|
||||
configVersion.toString()
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error("Failed to get/set config version in Redis:", error);
|
||||
@@ -432,7 +455,9 @@ const removeClient = async (
|
||||
};
|
||||
|
||||
// Helper to get the current config version for a client
|
||||
const getClientConfigVersion = async (clientId: string): Promise<number | undefined> => {
|
||||
const getClientConfigVersion = async (
|
||||
clientId: string
|
||||
): Promise<number | undefined> => {
|
||||
// Try Redis first if available
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
try {
|
||||
@@ -502,11 +527,26 @@ const sendToClientLocal = async (
|
||||
};
|
||||
|
||||
const messageString = JSON.stringify(messageWithVersion);
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(messageString);
|
||||
}
|
||||
});
|
||||
if (options.compress) {
|
||||
logger.debug(
|
||||
`Message size before compression: ${messageString.length} bytes`
|
||||
);
|
||||
const compressed = zlib.gzipSync(Buffer.from(messageString, "utf8"));
|
||||
logger.debug(
|
||||
`Message size after compression: ${compressed.length} bytes`
|
||||
);
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(compressed);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(messageString);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
@@ -532,11 +572,22 @@ const broadcastToAllExceptLocal = async (
|
||||
configVersion
|
||||
};
|
||||
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(JSON.stringify(messageWithVersion));
|
||||
}
|
||||
});
|
||||
if (options.compress) {
|
||||
const compressed = zlib.gzipSync(
|
||||
Buffer.from(JSON.stringify(messageWithVersion), "utf8")
|
||||
);
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(compressed);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(JSON.stringify(messageWithVersion));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -762,7 +813,7 @@ const setupConnection = async (
|
||||
}
|
||||
|
||||
// Set up message handler FIRST to prevent race condition
|
||||
ws.on("message", async (data) => {
|
||||
ws.on("message", async (data, isBinary) => {
|
||||
if (!ws.isFullyConnected) {
|
||||
// Queue message for later processing with limits
|
||||
ws.pendingMessages = ws.pendingMessages || [];
|
||||
@@ -777,11 +828,17 @@ const setupConnection = async (
|
||||
logger.debug(
|
||||
`Queueing message from ${clientType.toUpperCase()} ID: ${clientId} (connection not fully established)`
|
||||
);
|
||||
ws.pendingMessages.push(data as Buffer);
|
||||
ws.pendingMessages.push({ data: data as Buffer, isBinary });
|
||||
return;
|
||||
}
|
||||
|
||||
await processMessage(ws, data as Buffer, clientId, clientType);
|
||||
await processMessage(
|
||||
ws,
|
||||
data as Buffer,
|
||||
isBinary,
|
||||
clientId,
|
||||
clientType
|
||||
);
|
||||
});
|
||||
|
||||
// Set up other event handlers before async operations
|
||||
@@ -796,6 +853,35 @@ const setupConnection = async (
|
||||
);
|
||||
});
|
||||
|
||||
// Handle WebSocket protocol-level pings from older newt clients that do
|
||||
// not send application-level "newt/ping" messages. Update the site's
|
||||
// online state and lastPing timestamp so the offline checker treats them
|
||||
// the same as modern newt clients.
|
||||
if (clientType === "newt") {
|
||||
const newtClient = client as Newt;
|
||||
ws.on("ping", async () => {
|
||||
if (!newtClient.siteId) return;
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const lastWrite = lastPingDbWrite.get(newtClient.siteId) ?? 0;
|
||||
if (now - lastWrite < PING_DB_WRITE_INTERVAL) return;
|
||||
lastPingDbWrite.set(newtClient.siteId, now);
|
||||
try {
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: now
|
||||
})
|
||||
.where(eq(sites.siteId, newtClient.siteId));
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
"Error updating newt site online state on WS ping",
|
||||
{ error }
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
ws.on("error", (error: Error) => {
|
||||
logger.error(
|
||||
`WebSocket error for ${clientType.toUpperCase()} ID ${clientId}:`,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { NextFunction, Request, Response } from "express";
|
||||
import { db, users } from "@server/db";
|
||||
import { bannedEmails, bannedIps, db, users } from "@server/db";
|
||||
import HttpCode from "@server/types/HttpCode";
|
||||
import { email, z } from "zod";
|
||||
import { fromError } from "zod-validation-error";
|
||||
@@ -65,6 +65,30 @@ export async function signup(
|
||||
skipVerificationEmail
|
||||
} = parsedBody.data;
|
||||
|
||||
const [bannedEmail] = await db
|
||||
.select()
|
||||
.from(bannedEmails)
|
||||
.where(eq(bannedEmails.email, email))
|
||||
.limit(1);
|
||||
if (bannedEmail) {
|
||||
return next(
|
||||
createHttpError(HttpCode.FORBIDDEN, "Signup blocked. Do not attempt to continue to use this service.")
|
||||
);
|
||||
}
|
||||
|
||||
if (req.ip) {
|
||||
const [bannedIp] = await db
|
||||
.select()
|
||||
.from(bannedIps)
|
||||
.where(eq(bannedIps.ip, req.ip))
|
||||
.limit(1);
|
||||
if (bannedIp) {
|
||||
return next(
|
||||
createHttpError(HttpCode.FORBIDDEN, "Signup blocked. Do not attempt to continue to use this service.")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const passwordHash = await hashPassword(password);
|
||||
const userId = generateId(15);
|
||||
|
||||
|
||||
@@ -1,51 +1,38 @@
|
||||
import { sendToClient } from "#dynamic/routers/ws";
|
||||
import { db, olms, Transaction } from "@server/db";
|
||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||
import { Alias, SubnetProxyTarget } from "@server/lib/ip";
|
||||
import logger from "@server/logger";
|
||||
import { eq } from "drizzle-orm";
|
||||
|
||||
const BATCH_SIZE = 50;
|
||||
const BATCH_DELAY_MS = 50;
|
||||
|
||||
function sleep(ms: number): Promise<void> {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
function chunkArray<T>(array: T[], size: number): T[][] {
|
||||
const chunks: T[][] = [];
|
||||
for (let i = 0; i < array.length; i += size) {
|
||||
chunks.push(array.slice(i, i + size));
|
||||
}
|
||||
return chunks;
|
||||
}
|
||||
|
||||
export async function addTargets(newtId: string, targets: SubnetProxyTarget[]) {
|
||||
const batches = chunkArray(targets, BATCH_SIZE);
|
||||
for (let i = 0; i < batches.length; i++) {
|
||||
if (i > 0) {
|
||||
await sleep(BATCH_DELAY_MS);
|
||||
}
|
||||
await sendToClient(newtId, {
|
||||
export async function addTargets(
|
||||
newtId: string,
|
||||
targets: SubnetProxyTarget[],
|
||||
version?: string | null
|
||||
) {
|
||||
await sendToClient(
|
||||
newtId,
|
||||
{
|
||||
type: `newt/wg/targets/add`,
|
||||
data: batches[i]
|
||||
}, { incrementConfigVersion: true });
|
||||
}
|
||||
data: targets
|
||||
},
|
||||
{ incrementConfigVersion: true, compress: canCompress(version, "newt") }
|
||||
);
|
||||
}
|
||||
|
||||
export async function removeTargets(
|
||||
newtId: string,
|
||||
targets: SubnetProxyTarget[]
|
||||
targets: SubnetProxyTarget[],
|
||||
version?: string | null
|
||||
) {
|
||||
const batches = chunkArray(targets, BATCH_SIZE);
|
||||
for (let i = 0; i < batches.length; i++) {
|
||||
if (i > 0) {
|
||||
await sleep(BATCH_DELAY_MS);
|
||||
}
|
||||
await sendToClient(newtId, {
|
||||
await sendToClient(
|
||||
newtId,
|
||||
{
|
||||
type: `newt/wg/targets/remove`,
|
||||
data: batches[i]
|
||||
},{ incrementConfigVersion: true });
|
||||
}
|
||||
data: targets
|
||||
},
|
||||
{ incrementConfigVersion: true, compress: canCompress(version, "newt") }
|
||||
);
|
||||
}
|
||||
|
||||
export async function updateTargets(
|
||||
@@ -53,26 +40,22 @@ export async function updateTargets(
|
||||
targets: {
|
||||
oldTargets: SubnetProxyTarget[];
|
||||
newTargets: SubnetProxyTarget[];
|
||||
}
|
||||
},
|
||||
version?: string | null
|
||||
) {
|
||||
const oldBatches = chunkArray(targets.oldTargets, BATCH_SIZE);
|
||||
const newBatches = chunkArray(targets.newTargets, BATCH_SIZE);
|
||||
const maxBatches = Math.max(oldBatches.length, newBatches.length);
|
||||
|
||||
for (let i = 0; i < maxBatches; i++) {
|
||||
if (i > 0) {
|
||||
await sleep(BATCH_DELAY_MS);
|
||||
}
|
||||
await sendToClient(newtId, {
|
||||
await sendToClient(
|
||||
newtId,
|
||||
{
|
||||
type: `newt/wg/targets/update`,
|
||||
data: {
|
||||
oldTargets: oldBatches[i] || [],
|
||||
newTargets: newBatches[i] || []
|
||||
oldTargets: targets.oldTargets,
|
||||
newTargets: targets.newTargets
|
||||
}
|
||||
}, { incrementConfigVersion: true }).catch((error) => {
|
||||
logger.warn(`Error sending message:`, error);
|
||||
});
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: true, compress: canCompress(version, "newt") }
|
||||
).catch((error) => {
|
||||
logger.warn(`Error sending message:`, error);
|
||||
});
|
||||
}
|
||||
|
||||
export async function addPeerData(
|
||||
@@ -80,7 +63,8 @@ export async function addPeerData(
|
||||
siteId: number,
|
||||
remoteSubnets: string[],
|
||||
aliases: Alias[],
|
||||
olmId?: string
|
||||
olmId?: string,
|
||||
version?: string | null
|
||||
) {
|
||||
if (!olmId) {
|
||||
const [olm] = await db
|
||||
@@ -92,16 +76,21 @@ export async function addPeerData(
|
||||
return; // ignore this because an olm might not be associated with the client anymore
|
||||
}
|
||||
olmId = olm.olmId;
|
||||
version = olm.version;
|
||||
}
|
||||
|
||||
await sendToClient(olmId, {
|
||||
type: `olm/wg/peer/data/add`,
|
||||
data: {
|
||||
siteId: siteId,
|
||||
remoteSubnets: remoteSubnets,
|
||||
aliases: aliases
|
||||
}
|
||||
}, { incrementConfigVersion: true }).catch((error) => {
|
||||
await sendToClient(
|
||||
olmId,
|
||||
{
|
||||
type: `olm/wg/peer/data/add`,
|
||||
data: {
|
||||
siteId: siteId,
|
||||
remoteSubnets: remoteSubnets,
|
||||
aliases: aliases
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
|
||||
).catch((error) => {
|
||||
logger.warn(`Error sending message:`, error);
|
||||
});
|
||||
}
|
||||
@@ -111,7 +100,8 @@ export async function removePeerData(
|
||||
siteId: number,
|
||||
remoteSubnets: string[],
|
||||
aliases: Alias[],
|
||||
olmId?: string
|
||||
olmId?: string,
|
||||
version?: string | null
|
||||
) {
|
||||
if (!olmId) {
|
||||
const [olm] = await db
|
||||
@@ -123,16 +113,21 @@ export async function removePeerData(
|
||||
return;
|
||||
}
|
||||
olmId = olm.olmId;
|
||||
version = olm.version;
|
||||
}
|
||||
|
||||
await sendToClient(olmId, {
|
||||
type: `olm/wg/peer/data/remove`,
|
||||
data: {
|
||||
siteId: siteId,
|
||||
remoteSubnets: remoteSubnets,
|
||||
aliases: aliases
|
||||
}
|
||||
}, { incrementConfigVersion: true }).catch((error) => {
|
||||
await sendToClient(
|
||||
olmId,
|
||||
{
|
||||
type: `olm/wg/peer/data/remove`,
|
||||
data: {
|
||||
siteId: siteId,
|
||||
remoteSubnets: remoteSubnets,
|
||||
aliases: aliases
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
|
||||
).catch((error) => {
|
||||
logger.warn(`Error sending message:`, error);
|
||||
});
|
||||
}
|
||||
@@ -152,7 +147,8 @@ export async function updatePeerData(
|
||||
newAliases: Alias[];
|
||||
}
|
||||
| undefined,
|
||||
olmId?: string
|
||||
olmId?: string,
|
||||
version?: string | null
|
||||
) {
|
||||
if (!olmId) {
|
||||
const [olm] = await db
|
||||
@@ -164,16 +160,21 @@ export async function updatePeerData(
|
||||
return;
|
||||
}
|
||||
olmId = olm.olmId;
|
||||
version = olm.version;
|
||||
}
|
||||
|
||||
await sendToClient(olmId, {
|
||||
type: `olm/wg/peer/data/update`,
|
||||
data: {
|
||||
siteId: siteId,
|
||||
...remoteSubnets,
|
||||
...aliases
|
||||
}
|
||||
}, { incrementConfigVersion: true }).catch((error) => {
|
||||
await sendToClient(
|
||||
olmId,
|
||||
{
|
||||
type: `olm/wg/peer/data/update`,
|
||||
data: {
|
||||
siteId: siteId,
|
||||
...remoteSubnets,
|
||||
...aliases
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
|
||||
).catch((error) => {
|
||||
logger.warn(`Error sending message:`, error);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -40,7 +40,8 @@ async function queryDomains(orgId: string, limit: number, offset: number) {
|
||||
tries: domains.tries,
|
||||
configManaged: domains.configManaged,
|
||||
certResolver: domains.certResolver,
|
||||
preferWildcardCert: domains.preferWildcardCert
|
||||
preferWildcardCert: domains.preferWildcardCert,
|
||||
errorMessage: domains.errorMessage
|
||||
})
|
||||
.from(orgDomains)
|
||||
.where(eq(orgDomains.orgId, orgId))
|
||||
|
||||
@@ -125,7 +125,7 @@ export async function generateRelayMappings(exitNode: ExitNode) {
|
||||
// Add site as a destination for this client
|
||||
const destination: PeerDestination = {
|
||||
destinationIP: site.subnet.split("/")[0],
|
||||
destinationPort: site.listenPort
|
||||
destinationPort: site.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
|
||||
};
|
||||
|
||||
// Check if this destination is already in the array to avoid duplicates
|
||||
@@ -165,7 +165,7 @@ export async function generateRelayMappings(exitNode: ExitNode) {
|
||||
|
||||
const destination: PeerDestination = {
|
||||
destinationIP: peer.subnet.split("/")[0],
|
||||
destinationPort: peer.listenPort
|
||||
destinationPort: peer.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
|
||||
};
|
||||
|
||||
// Check for duplicates
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { Request, Response, NextFunction } from "express";
|
||||
import { eq, and, lt, inArray, sql } from "drizzle-orm";
|
||||
import { eq, sql } from "drizzle-orm";
|
||||
import { sites } from "@server/db";
|
||||
import { db } from "@server/db";
|
||||
import logger from "@server/logger";
|
||||
@@ -11,19 +11,31 @@ import { FeatureId } from "@server/lib/billing/features";
|
||||
import { checkExitNodeOrg } from "#dynamic/lib/exitNodes";
|
||||
import { build } from "@server/build";
|
||||
|
||||
// Track sites that are already offline to avoid unnecessary queries
|
||||
const offlineSites = new Set<string>();
|
||||
|
||||
// Retry configuration for deadlock handling
|
||||
const MAX_RETRIES = 3;
|
||||
const BASE_DELAY_MS = 50;
|
||||
|
||||
interface PeerBandwidth {
|
||||
publicKey: string;
|
||||
bytesIn: number;
|
||||
bytesOut: number;
|
||||
}
|
||||
|
||||
interface AccumulatorEntry {
|
||||
bytesIn: number;
|
||||
bytesOut: number;
|
||||
/** Present when the update came through a remote exit node. */
|
||||
exitNodeId?: number;
|
||||
/** Whether to record egress usage for billing purposes. */
|
||||
calcUsage: boolean;
|
||||
}
|
||||
|
||||
// Retry configuration for deadlock handling
|
||||
const MAX_RETRIES = 3;
|
||||
const BASE_DELAY_MS = 50;
|
||||
|
||||
// How often to flush accumulated bandwidth data to the database
|
||||
const FLUSH_INTERVAL_MS = 30_000; // 30 seconds
|
||||
|
||||
// In-memory accumulator: publicKey -> AccumulatorEntry
|
||||
let accumulator = new Map<string, AccumulatorEntry>();
|
||||
|
||||
/**
|
||||
* Check if an error is a deadlock error
|
||||
*/
|
||||
@@ -63,6 +75,220 @@ async function withDeadlockRetry<T>(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush all accumulated site bandwidth data to the database.
|
||||
*
|
||||
* Swaps out the accumulator before writing so that any bandwidth messages
|
||||
* received during the flush are captured in the new accumulator rather than
|
||||
* being lost or causing contention. Entries that fail to write are re-queued
|
||||
* back into the accumulator so they will be retried on the next flush.
|
||||
*
|
||||
* This function is exported so that the application's graceful-shutdown
|
||||
* cleanup handler can call it before the process exits.
|
||||
*/
|
||||
export async function flushSiteBandwidthToDb(): Promise<void> {
|
||||
if (accumulator.size === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Atomically swap out the accumulator so new data keeps flowing in
|
||||
// while we write the snapshot to the database.
|
||||
const snapshot = accumulator;
|
||||
accumulator = new Map<string, AccumulatorEntry>();
|
||||
|
||||
const currentTime = new Date().toISOString();
|
||||
|
||||
// Sort by publicKey for consistent lock ordering across concurrent
|
||||
// writers — deadlock-prevention strategy.
|
||||
const sortedEntries = [...snapshot.entries()].sort(([a], [b]) =>
|
||||
a.localeCompare(b)
|
||||
);
|
||||
|
||||
logger.debug(
|
||||
`Flushing accumulated bandwidth data for ${sortedEntries.length} site(s) to the database`
|
||||
);
|
||||
|
||||
// Aggregate billing usage by org, collected during the DB update loop.
|
||||
const orgUsageMap = new Map<string, number>();
|
||||
|
||||
for (const [publicKey, { bytesIn, bytesOut, exitNodeId, calcUsage }] of sortedEntries) {
|
||||
try {
|
||||
const updatedSite = await withDeadlockRetry(async () => {
|
||||
const [result] = await db
|
||||
.update(sites)
|
||||
.set({
|
||||
megabytesOut: sql`COALESCE(${sites.megabytesOut}, 0) + ${bytesIn}`,
|
||||
megabytesIn: sql`COALESCE(${sites.megabytesIn}, 0) + ${bytesOut}`,
|
||||
lastBandwidthUpdate: currentTime,
|
||||
})
|
||||
.where(eq(sites.pubKey, publicKey))
|
||||
.returning({
|
||||
orgId: sites.orgId,
|
||||
siteId: sites.siteId
|
||||
});
|
||||
return result;
|
||||
}, `flush bandwidth for site ${publicKey}`);
|
||||
|
||||
if (updatedSite) {
|
||||
if (exitNodeId) {
|
||||
const notAllowed = await checkExitNodeOrg(
|
||||
exitNodeId,
|
||||
updatedSite.orgId
|
||||
);
|
||||
if (notAllowed) {
|
||||
logger.warn(
|
||||
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
|
||||
);
|
||||
// Skip usage tracking for this site but continue
|
||||
// processing the rest.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (calcUsage) {
|
||||
const totalBandwidth = bytesIn + bytesOut;
|
||||
const current = orgUsageMap.get(updatedSite.orgId) ?? 0;
|
||||
orgUsageMap.set(updatedSite.orgId, current + totalBandwidth);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush bandwidth for site ${publicKey}:`,
|
||||
error
|
||||
);
|
||||
|
||||
// Re-queue the failed entry so it is retried on the next flush
|
||||
// rather than silently dropped.
|
||||
const existing = accumulator.get(publicKey);
|
||||
if (existing) {
|
||||
existing.bytesIn += bytesIn;
|
||||
existing.bytesOut += bytesOut;
|
||||
} else {
|
||||
accumulator.set(publicKey, {
|
||||
bytesIn,
|
||||
bytesOut,
|
||||
exitNodeId,
|
||||
calcUsage
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process billing usage updates outside the site-update loop to keep
|
||||
// lock scope small and concerns separated.
|
||||
if (orgUsageMap.size > 0) {
|
||||
// Sort org IDs for consistent lock ordering.
|
||||
const sortedOrgIds = [...orgUsageMap.keys()].sort();
|
||||
|
||||
for (const orgId of sortedOrgIds) {
|
||||
try {
|
||||
const totalBandwidth = orgUsageMap.get(orgId)!;
|
||||
const bandwidthUsage = await usageService.add(
|
||||
orgId,
|
||||
FeatureId.EGRESS_DATA_MB,
|
||||
totalBandwidth
|
||||
);
|
||||
if (bandwidthUsage) {
|
||||
// Fire-and-forget — don't block the flush on limit checking.
|
||||
usageService
|
||||
.checkLimitSet(
|
||||
orgId,
|
||||
FeatureId.EGRESS_DATA_MB,
|
||||
bandwidthUsage
|
||||
)
|
||||
.catch((error: any) => {
|
||||
logger.error(
|
||||
`Error checking bandwidth limits for org ${orgId}:`,
|
||||
error
|
||||
);
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Error processing usage for org ${orgId}:`,
|
||||
error
|
||||
);
|
||||
// Continue with other orgs.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Periodic flush timer
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const flushTimer = setInterval(async () => {
|
||||
try {
|
||||
await flushSiteBandwidthToDb();
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
"Unexpected error during periodic site bandwidth flush:",
|
||||
error
|
||||
);
|
||||
}
|
||||
}, FLUSH_INTERVAL_MS);
|
||||
|
||||
// Allow the process to exit normally even while the timer is pending.
|
||||
// The graceful-shutdown path (see server/cleanup.ts) will call
|
||||
// flushSiteBandwidthToDb() explicitly before process.exit(), so no data
|
||||
// is lost.
|
||||
flushTimer.unref();
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Public API
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Accumulate bandwidth data reported by a gerbil or remote exit node.
|
||||
*
|
||||
* Only peers that actually transferred data (bytesIn > 0) are added to the
|
||||
* accumulator; peers with no activity are silently ignored, which means the
|
||||
* flush will only write rows that have genuinely changed.
|
||||
*
|
||||
* The function is intentionally synchronous in its fast path so that the
|
||||
* HTTP handler can respond immediately without waiting for any I/O.
|
||||
*/
|
||||
export async function updateSiteBandwidth(
|
||||
bandwidthData: PeerBandwidth[],
|
||||
calcUsageAndLimits: boolean,
|
||||
exitNodeId?: number
|
||||
): Promise<void> {
|
||||
for (const { publicKey, bytesIn, bytesOut } of bandwidthData) {
|
||||
// Skip peers that haven't transferred any data — writing zeros to the
|
||||
// database would be a no-op anyway.
|
||||
if (bytesIn <= 0 && bytesOut <= 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const existing = accumulator.get(publicKey);
|
||||
if (existing) {
|
||||
existing.bytesIn += bytesIn;
|
||||
existing.bytesOut += bytesOut;
|
||||
// Retain the most-recent exitNodeId for this peer.
|
||||
if (exitNodeId !== undefined) {
|
||||
existing.exitNodeId = exitNodeId;
|
||||
}
|
||||
// Once calcUsage has been requested for a peer, keep it set for
|
||||
// the lifetime of this flush window.
|
||||
if (calcUsageAndLimits) {
|
||||
existing.calcUsage = true;
|
||||
}
|
||||
} else {
|
||||
accumulator.set(publicKey, {
|
||||
bytesIn,
|
||||
bytesOut,
|
||||
exitNodeId,
|
||||
calcUsage: calcUsageAndLimits
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// HTTP handler
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export const receiveBandwidth = async (
|
||||
req: Request,
|
||||
res: Response,
|
||||
@@ -75,7 +301,9 @@ export const receiveBandwidth = async (
|
||||
throw new Error("Invalid bandwidth data");
|
||||
}
|
||||
|
||||
await updateSiteBandwidth(bandwidthData, build == "saas"); // we are checking the usage on saas only
|
||||
// Accumulate in memory; the periodic timer (and the shutdown hook)
|
||||
// will write to the database.
|
||||
await updateSiteBandwidth(bandwidthData, build == "saas");
|
||||
|
||||
return response(res, {
|
||||
data: {},
|
||||
@@ -94,201 +322,3 @@ export const receiveBandwidth = async (
|
||||
);
|
||||
}
|
||||
};
|
||||
|
||||
export async function updateSiteBandwidth(
|
||||
bandwidthData: PeerBandwidth[],
|
||||
calcUsageAndLimits: boolean,
|
||||
exitNodeId?: number
|
||||
) {
|
||||
const currentTime = new Date();
|
||||
const oneMinuteAgo = new Date(currentTime.getTime() - 60000); // 1 minute ago
|
||||
|
||||
// Sort bandwidth data by publicKey to ensure consistent lock ordering across all instances
|
||||
// This is critical for preventing deadlocks when multiple instances update the same sites
|
||||
const sortedBandwidthData = [...bandwidthData].sort((a, b) =>
|
||||
a.publicKey.localeCompare(b.publicKey)
|
||||
);
|
||||
|
||||
// First, handle sites that are actively reporting bandwidth
|
||||
const activePeers = sortedBandwidthData.filter((peer) => peer.bytesIn > 0);
|
||||
|
||||
// Aggregate usage data by organization (collected outside transaction)
|
||||
const orgUsageMap = new Map<string, number>();
|
||||
|
||||
if (activePeers.length > 0) {
|
||||
// Remove any active peers from offline tracking since they're sending data
|
||||
activePeers.forEach((peer) => offlineSites.delete(peer.publicKey));
|
||||
|
||||
// Update each active site individually with retry logic
|
||||
// This reduces transaction scope and allows retries per-site
|
||||
for (const peer of activePeers) {
|
||||
try {
|
||||
const updatedSite = await withDeadlockRetry(async () => {
|
||||
const [result] = await db
|
||||
.update(sites)
|
||||
.set({
|
||||
megabytesOut: sql`${sites.megabytesOut} + ${peer.bytesIn}`,
|
||||
megabytesIn: sql`${sites.megabytesIn} + ${peer.bytesOut}`,
|
||||
lastBandwidthUpdate: currentTime.toISOString(),
|
||||
online: true
|
||||
})
|
||||
.where(eq(sites.pubKey, peer.publicKey))
|
||||
.returning({
|
||||
online: sites.online,
|
||||
orgId: sites.orgId,
|
||||
siteId: sites.siteId,
|
||||
lastBandwidthUpdate: sites.lastBandwidthUpdate
|
||||
});
|
||||
return result;
|
||||
}, `update active site ${peer.publicKey}`);
|
||||
|
||||
if (updatedSite) {
|
||||
if (exitNodeId) {
|
||||
const notAllowed = await checkExitNodeOrg(
|
||||
exitNodeId,
|
||||
updatedSite.orgId
|
||||
);
|
||||
if (notAllowed) {
|
||||
logger.warn(
|
||||
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
|
||||
);
|
||||
// Skip this site but continue processing others
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate bandwidth usage for the org
|
||||
const totalBandwidth = peer.bytesIn + peer.bytesOut;
|
||||
const currentOrgUsage =
|
||||
orgUsageMap.get(updatedSite.orgId) || 0;
|
||||
orgUsageMap.set(
|
||||
updatedSite.orgId,
|
||||
currentOrgUsage + totalBandwidth
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to update bandwidth for site ${peer.publicKey}:`,
|
||||
error
|
||||
);
|
||||
// Continue with other sites
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process usage updates outside of site update transactions
|
||||
// This separates the concerns and reduces lock contention
|
||||
if (calcUsageAndLimits && orgUsageMap.size > 0) {
|
||||
// Sort org IDs to ensure consistent lock ordering
|
||||
const allOrgIds = [...new Set([...orgUsageMap.keys()])].sort();
|
||||
|
||||
for (const orgId of allOrgIds) {
|
||||
try {
|
||||
// Process bandwidth usage for this org
|
||||
const totalBandwidth = orgUsageMap.get(orgId);
|
||||
if (totalBandwidth) {
|
||||
const bandwidthUsage = await usageService.add(
|
||||
orgId,
|
||||
FeatureId.EGRESS_DATA_MB,
|
||||
totalBandwidth
|
||||
);
|
||||
if (bandwidthUsage) {
|
||||
// Fire and forget - don't block on limit checking
|
||||
usageService
|
||||
.checkLimitSet(
|
||||
orgId,
|
||||
FeatureId.EGRESS_DATA_MB,
|
||||
bandwidthUsage
|
||||
)
|
||||
.catch((error: any) => {
|
||||
logger.error(
|
||||
`Error checking bandwidth limits for org ${orgId}:`,
|
||||
error
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Error processing usage for org ${orgId}:`, error);
|
||||
// Continue with other orgs
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle sites that reported zero bandwidth but need online status updated
|
||||
const zeroBandwidthPeers = sortedBandwidthData.filter(
|
||||
(peer) => peer.bytesIn === 0 && !offlineSites.has(peer.publicKey)
|
||||
);
|
||||
|
||||
if (zeroBandwidthPeers.length > 0) {
|
||||
// Fetch all zero bandwidth sites in one query
|
||||
const zeroBandwidthSites = await db
|
||||
.select()
|
||||
.from(sites)
|
||||
.where(
|
||||
inArray(
|
||||
sites.pubKey,
|
||||
zeroBandwidthPeers.map((p) => p.publicKey)
|
||||
)
|
||||
);
|
||||
|
||||
// Sort by siteId to ensure consistent lock ordering
|
||||
const sortedZeroBandwidthSites = zeroBandwidthSites.sort(
|
||||
(a, b) => a.siteId - b.siteId
|
||||
);
|
||||
|
||||
for (const site of sortedZeroBandwidthSites) {
|
||||
let newOnlineStatus = site.online;
|
||||
|
||||
// Check if site should go offline based on last bandwidth update WITH DATA
|
||||
if (site.lastBandwidthUpdate) {
|
||||
const lastUpdateWithData = new Date(site.lastBandwidthUpdate);
|
||||
if (lastUpdateWithData < oneMinuteAgo) {
|
||||
newOnlineStatus = false;
|
||||
}
|
||||
} else {
|
||||
// No previous data update recorded, set to offline
|
||||
newOnlineStatus = false;
|
||||
}
|
||||
|
||||
// Only update online status if it changed
|
||||
if (site.online !== newOnlineStatus) {
|
||||
try {
|
||||
const updatedSite = await withDeadlockRetry(async () => {
|
||||
const [result] = await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: newOnlineStatus
|
||||
})
|
||||
.where(eq(sites.siteId, site.siteId))
|
||||
.returning();
|
||||
return result;
|
||||
}, `update offline status for site ${site.siteId}`);
|
||||
|
||||
if (updatedSite && exitNodeId) {
|
||||
const notAllowed = await checkExitNodeOrg(
|
||||
exitNodeId,
|
||||
updatedSite.orgId
|
||||
);
|
||||
if (notAllowed) {
|
||||
logger.warn(
|
||||
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// If site went offline, add it to our tracking set
|
||||
if (!newOnlineStatus && site.pubKey) {
|
||||
offlineSites.add(site.pubKey);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to update offline status for site ${site.siteId}:`,
|
||||
error
|
||||
);
|
||||
// Continue with other sites
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,7 +112,7 @@ export async function updateHolePunch(
|
||||
destinations: destinations
|
||||
});
|
||||
} catch (error) {
|
||||
// logger.error(error); // FIX THIS
|
||||
logger.error(error);
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.INTERNAL_SERVER_ERROR,
|
||||
@@ -262,7 +262,7 @@ export async function updateAndGenerateEndpointDestinations(
|
||||
if (site.subnet && site.listenPort) {
|
||||
destinations.push({
|
||||
destinationIP: site.subnet.split("/")[0],
|
||||
destinationPort: site.listenPort
|
||||
destinationPort: site.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -339,10 +339,10 @@ export async function updateAndGenerateEndpointDestinations(
|
||||
handleSiteEndpointChange(newt.siteId, updatedSite.endpoint!);
|
||||
}
|
||||
|
||||
if (!updatedSite || !updatedSite.subnet) {
|
||||
logger.warn(`Site not found: ${newt.siteId}`);
|
||||
throw new Error("Site not found");
|
||||
}
|
||||
// if (!updatedSite || !updatedSite.subnet) {
|
||||
// logger.warn(`Site not found: ${newt.siteId}`);
|
||||
// throw new Error("Site not found");
|
||||
// }
|
||||
|
||||
// Find all clients that connect to this site
|
||||
// const sitesClientPairs = await db
|
||||
|
||||
@@ -1,4 +1,15 @@
|
||||
import { clients, clientSiteResourcesAssociationsCache, clientSitesAssociationsCache, db, ExitNode, resources, Site, siteResources, targetHealthCheck, targets } from "@server/db";
|
||||
import {
|
||||
clients,
|
||||
clientSiteResourcesAssociationsCache,
|
||||
clientSitesAssociationsCache,
|
||||
db,
|
||||
ExitNode,
|
||||
resources,
|
||||
Site,
|
||||
siteResources,
|
||||
targetHealthCheck,
|
||||
targets
|
||||
} from "@server/db";
|
||||
import logger from "@server/logger";
|
||||
import { initPeerAddHandshake, updatePeer } from "../olm/peers";
|
||||
import { eq, and } from "drizzle-orm";
|
||||
@@ -69,40 +80,42 @@ export async function buildClientConfigurationForNewtClient(
|
||||
// )
|
||||
// );
|
||||
|
||||
// update the peer info on the olm
|
||||
// if the peer has not been added yet this will be a no-op
|
||||
await updatePeer(client.clients.clientId, {
|
||||
siteId: site.siteId,
|
||||
endpoint: site.endpoint!,
|
||||
relayEndpoint: `${exitNode.endpoint}:${config.getRawConfig().gerbil.clients_start_port}`,
|
||||
publicKey: site.publicKey!,
|
||||
serverIP: site.address,
|
||||
serverPort: site.listenPort
|
||||
// remoteSubnets: generateRemoteSubnets(
|
||||
// allSiteResources.map(
|
||||
// ({ siteResources }) => siteResources
|
||||
// )
|
||||
// ),
|
||||
// aliases: generateAliasConfig(
|
||||
// allSiteResources.map(
|
||||
// ({ siteResources }) => siteResources
|
||||
// )
|
||||
// )
|
||||
});
|
||||
if (!client.clientSitesAssociationsCache.isJitMode) { // if we are adding sites through jit then dont add the site to the olm
|
||||
// update the peer info on the olm
|
||||
// if the peer has not been added yet this will be a no-op
|
||||
await updatePeer(client.clients.clientId, {
|
||||
siteId: site.siteId,
|
||||
endpoint: site.endpoint!,
|
||||
relayEndpoint: `${exitNode.endpoint}:${config.getRawConfig().gerbil.clients_start_port}`,
|
||||
publicKey: site.publicKey!,
|
||||
serverIP: site.address,
|
||||
serverPort: site.listenPort
|
||||
// remoteSubnets: generateRemoteSubnets(
|
||||
// allSiteResources.map(
|
||||
// ({ siteResources }) => siteResources
|
||||
// )
|
||||
// ),
|
||||
// aliases: generateAliasConfig(
|
||||
// allSiteResources.map(
|
||||
// ({ siteResources }) => siteResources
|
||||
// )
|
||||
// )
|
||||
});
|
||||
|
||||
// also trigger the peer add handshake in case the peer was not already added to the olm and we need to hole punch
|
||||
// if it has already been added this will be a no-op
|
||||
await initPeerAddHandshake(
|
||||
// this will kick off the add peer process for the client
|
||||
client.clients.clientId,
|
||||
{
|
||||
siteId,
|
||||
exitNode: {
|
||||
publicKey: exitNode.publicKey,
|
||||
endpoint: exitNode.endpoint
|
||||
// also trigger the peer add handshake in case the peer was not already added to the olm and we need to hole punch
|
||||
// if it has already been added this will be a no-op
|
||||
await initPeerAddHandshake(
|
||||
// this will kick off the add peer process for the client
|
||||
client.clients.clientId,
|
||||
{
|
||||
siteId,
|
||||
exitNode: {
|
||||
publicKey: exitNode.publicKey,
|
||||
endpoint: exitNode.endpoint
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
publicKey: client.clients.pubKey!,
|
||||
@@ -230,9 +243,9 @@ export async function buildTargetConfigurationForNewtClient(siteId: number) {
|
||||
!target.hcInterval ||
|
||||
!target.hcMethod
|
||||
) {
|
||||
logger.debug(
|
||||
`Skipping adding target health check ${target.targetId} due to missing health check fields`
|
||||
);
|
||||
// logger.debug(
|
||||
// `Skipping adding target health check ${target.targetId} due to missing health check fields`
|
||||
// );
|
||||
return null; // Skip targets with missing health check fields
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ import { db, ExitNode, exitNodes, Newt, sites } from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { sendToExitNode } from "#dynamic/lib/exitNodes";
|
||||
import { buildClientConfigurationForNewtClient } from "./buildConfiguration";
|
||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||
|
||||
const inputSchema = z.object({
|
||||
publicKey: z.string(),
|
||||
@@ -104,11 +105,11 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
|
||||
const payload = {
|
||||
oldDestination: {
|
||||
destinationIP: existingSite.subnet?.split("/")[0],
|
||||
destinationPort: existingSite.listenPort
|
||||
destinationPort: existingSite.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
|
||||
},
|
||||
newDestination: {
|
||||
destinationIP: site.subnet?.split("/")[0],
|
||||
destinationPort: site.listenPort
|
||||
destinationPort: site.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
|
||||
}
|
||||
};
|
||||
|
||||
@@ -135,6 +136,9 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
|
||||
targets
|
||||
}
|
||||
},
|
||||
options: {
|
||||
compress: canCompress(newt.version, "newt")
|
||||
},
|
||||
broadcast: false,
|
||||
excludeSender: false
|
||||
};
|
||||
|
||||
34
server/routers/newt/handleNewtDisconnectingMessage.ts
Normal file
34
server/routers/newt/handleNewtDisconnectingMessage.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { db, Newt, sites } from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
|
||||
/**
|
||||
* Handles disconnecting messages from sites to show disconnected in the ui
|
||||
*/
|
||||
export const handleNewtDisconnectingMessage: MessageHandler = async (context) => {
|
||||
const { message, client: c, sendToClient } = context;
|
||||
const newt = c as Newt;
|
||||
|
||||
if (!newt) {
|
||||
logger.warn("Newt not found");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!newt.siteId) {
|
||||
logger.warn("Newt has no client ID!");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Update the client's last ping timestamp
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: false
|
||||
})
|
||||
.where(eq(sites.siteId, sites.siteId));
|
||||
} catch (error) {
|
||||
logger.error("Error handling disconnecting message", { error });
|
||||
}
|
||||
};
|
||||
@@ -1,105 +1,107 @@
|
||||
import { db, sites } from "@server/db";
|
||||
import { disconnectClient, getClientConfigVersion } from "#dynamic/routers/ws";
|
||||
import { db, newts, sites } from "@server/db";
|
||||
import { hasActiveConnections, getClientConfigVersion } from "#dynamic/routers/ws";
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { clients, Newt } from "@server/db";
|
||||
import { Newt } from "@server/db";
|
||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { validateSessionToken } from "@server/auth/sessions/app";
|
||||
import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy";
|
||||
import { sendTerminateClient } from "../client/terminate";
|
||||
import { encodeHexLowerCase } from "@oslojs/encoding";
|
||||
import { sha256 } from "@oslojs/crypto/sha2";
|
||||
import { sendNewtSyncMessage } from "./sync";
|
||||
|
||||
// Track if the offline checker interval is running
|
||||
// let offlineCheckerInterval: NodeJS.Timeout | null = null;
|
||||
// const OFFLINE_CHECK_INTERVAL = 30 * 1000; // Check every 30 seconds
|
||||
// const OFFLINE_THRESHOLD_MS = 2 * 60 * 1000; // 2 minutes
|
||||
let offlineCheckerInterval: NodeJS.Timeout | null = null;
|
||||
const OFFLINE_CHECK_INTERVAL = 30 * 1000; // Check every 30 seconds
|
||||
const OFFLINE_THRESHOLD_MS = 2 * 60 * 1000; // 2 minutes
|
||||
|
||||
/**
|
||||
* Starts the background interval that checks for clients that haven't pinged recently
|
||||
* and marks them as offline
|
||||
* Starts the background interval that checks for newt sites that haven't
|
||||
* pinged recently and marks them as offline. For backward compatibility,
|
||||
* a site is only marked offline when there is no active WebSocket connection
|
||||
* either — so older newt versions that don't send pings but remain connected
|
||||
* continue to be treated as online.
|
||||
*/
|
||||
// export const startNewtOfflineChecker = (): void => {
|
||||
// if (offlineCheckerInterval) {
|
||||
// return; // Already running
|
||||
// }
|
||||
export const startNewtOfflineChecker = (): void => {
|
||||
if (offlineCheckerInterval) {
|
||||
return; // Already running
|
||||
}
|
||||
|
||||
// offlineCheckerInterval = setInterval(async () => {
|
||||
// try {
|
||||
// const twoMinutesAgo = Math.floor(
|
||||
// (Date.now() - OFFLINE_THRESHOLD_MS) / 1000
|
||||
// );
|
||||
offlineCheckerInterval = setInterval(async () => {
|
||||
try {
|
||||
const twoMinutesAgo = Math.floor(
|
||||
(Date.now() - OFFLINE_THRESHOLD_MS) / 1000
|
||||
);
|
||||
|
||||
// // TODO: WE NEED TO MAKE SURE THIS WORKS WITH DISTRIBUTED NODES ALL DOING THE SAME THING
|
||||
// Find all online newt-type sites that haven't pinged recently
|
||||
// (or have never pinged at all). Join newts to obtain the newtId
|
||||
// needed for the WebSocket connection check.
|
||||
const staleSites = await db
|
||||
.select({
|
||||
siteId: sites.siteId,
|
||||
newtId: newts.newtId,
|
||||
lastPing: sites.lastPing
|
||||
})
|
||||
.from(sites)
|
||||
.innerJoin(newts, eq(newts.siteId, sites.siteId))
|
||||
.where(
|
||||
and(
|
||||
eq(sites.online, true),
|
||||
eq(sites.type, "newt"),
|
||||
or(
|
||||
lt(sites.lastPing, twoMinutesAgo),
|
||||
isNull(sites.lastPing)
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
// // Find clients that haven't pinged in the last 2 minutes and mark them as offline
|
||||
// const offlineClients = await db
|
||||
// .update(clients)
|
||||
// .set({ online: false })
|
||||
// .where(
|
||||
// and(
|
||||
// eq(clients.online, true),
|
||||
// or(
|
||||
// lt(clients.lastPing, twoMinutesAgo),
|
||||
// isNull(clients.lastPing)
|
||||
// )
|
||||
// )
|
||||
// )
|
||||
// .returning();
|
||||
for (const staleSite of staleSites) {
|
||||
// Backward-compatibility check: if the newt still has an
|
||||
// active WebSocket connection (older clients that don't send
|
||||
// pings), keep the site online.
|
||||
const isConnected = await hasActiveConnections(staleSite.newtId);
|
||||
if (isConnected) {
|
||||
logger.debug(
|
||||
`Newt ${staleSite.newtId} has not pinged recently but is still connected via WebSocket — keeping site ${staleSite.siteId} online`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// for (const offlineClient of offlineClients) {
|
||||
// logger.info(
|
||||
// `Kicking offline newt client ${offlineClient.clientId} due to inactivity`
|
||||
// );
|
||||
logger.info(
|
||||
`Marking site ${staleSite.siteId} offline: newt ${staleSite.newtId} has no recent ping and no active WebSocket connection`
|
||||
);
|
||||
|
||||
// if (!offlineClient.newtId) {
|
||||
// logger.warn(
|
||||
// `Offline client ${offlineClient.clientId} has no newtId, cannot disconnect`
|
||||
// );
|
||||
// continue;
|
||||
// }
|
||||
await db
|
||||
.update(sites)
|
||||
.set({ online: false })
|
||||
.where(eq(sites.siteId, staleSite.siteId));
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error("Error in newt offline checker interval", { error });
|
||||
}
|
||||
}, OFFLINE_CHECK_INTERVAL);
|
||||
|
||||
// // Send a disconnect message to the client if connected
|
||||
// try {
|
||||
// await sendTerminateClient(
|
||||
// offlineClient.clientId,
|
||||
// offlineClient.newtId
|
||||
// ); // terminate first
|
||||
// // wait a moment to ensure the message is sent
|
||||
// await new Promise((resolve) => setTimeout(resolve, 1000));
|
||||
// await disconnectClient(offlineClient.newtId);
|
||||
// } catch (error) {
|
||||
// logger.error(
|
||||
// `Error sending disconnect to offline newt ${offlineClient.clientId}`,
|
||||
// { error }
|
||||
// );
|
||||
// }
|
||||
// }
|
||||
// } catch (error) {
|
||||
// logger.error("Error in offline checker interval", { error });
|
||||
// }
|
||||
// }, OFFLINE_CHECK_INTERVAL);
|
||||
|
||||
// logger.debug("Started offline checker interval");
|
||||
// };
|
||||
logger.debug("Started newt offline checker interval");
|
||||
};
|
||||
|
||||
/**
|
||||
* Stops the background interval that checks for offline clients
|
||||
* Stops the background interval that checks for offline newt sites.
|
||||
*/
|
||||
// export const stopNewtOfflineChecker = (): void => {
|
||||
// if (offlineCheckerInterval) {
|
||||
// clearInterval(offlineCheckerInterval);
|
||||
// offlineCheckerInterval = null;
|
||||
// logger.info("Stopped offline checker interval");
|
||||
// }
|
||||
// };
|
||||
export const stopNewtOfflineChecker = (): void => {
|
||||
if (offlineCheckerInterval) {
|
||||
clearInterval(offlineCheckerInterval);
|
||||
offlineCheckerInterval = null;
|
||||
logger.info("Stopped newt offline checker interval");
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Handles ping messages from clients and responds with pong
|
||||
* Handles ping messages from newt clients.
|
||||
*
|
||||
* On each ping:
|
||||
* - Marks the associated site as online.
|
||||
* - Records the current timestamp as the newt's last-ping time.
|
||||
* - Triggers a config sync if the newt is running an outdated config version.
|
||||
* - Responds with a pong message.
|
||||
*/
|
||||
export const handleNewtPingMessage: MessageHandler = async (context) => {
|
||||
const { message, client: c, sendToClient } = context;
|
||||
const { message, client: c } = context;
|
||||
const newt = c as Newt;
|
||||
|
||||
if (!newt) {
|
||||
@@ -112,15 +114,31 @@ export const handleNewtPingMessage: MessageHandler = async (context) => {
|
||||
return;
|
||||
}
|
||||
|
||||
// get the version
|
||||
try {
|
||||
// Mark the site as online and record the ping timestamp.
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: Math.floor(Date.now() / 1000)
|
||||
})
|
||||
.where(eq(sites.siteId, newt.siteId));
|
||||
} catch (error) {
|
||||
logger.error("Error updating online state on newt ping", { error });
|
||||
}
|
||||
|
||||
// Check config version and sync if stale.
|
||||
const configVersion = await getClientConfigVersion(newt.newtId);
|
||||
|
||||
if (message.configVersion && configVersion != null && configVersion != message.configVersion) {
|
||||
if (
|
||||
message.configVersion != null &&
|
||||
configVersion != null &&
|
||||
configVersion !== message.configVersion
|
||||
) {
|
||||
logger.warn(
|
||||
`Newt ping with outdated config version: ${message.configVersion} (current: ${configVersion})`
|
||||
);
|
||||
|
||||
// get the site
|
||||
const [site] = await db
|
||||
.select()
|
||||
.from(sites)
|
||||
@@ -137,19 +155,6 @@ export const handleNewtPingMessage: MessageHandler = async (context) => {
|
||||
await sendNewtSyncMessage(newt, site);
|
||||
}
|
||||
|
||||
// try {
|
||||
// // Update the client's last ping timestamp
|
||||
// await db
|
||||
// .update(clients)
|
||||
// .set({
|
||||
// lastPing: Math.floor(Date.now() / 1000),
|
||||
// online: true
|
||||
// })
|
||||
// .where(eq(clients.clientId, newt.clientId));
|
||||
// } catch (error) {
|
||||
// logger.error("Error handling ping message", { error });
|
||||
// }
|
||||
|
||||
return {
|
||||
message: {
|
||||
type: "pong",
|
||||
|
||||
@@ -5,9 +5,7 @@ import { eq } from "drizzle-orm";
|
||||
import { addPeer, deletePeer } from "../gerbil/peers";
|
||||
import logger from "@server/logger";
|
||||
import config from "@server/lib/config";
|
||||
import {
|
||||
findNextAvailableCidr,
|
||||
} from "@server/lib/ip";
|
||||
import { findNextAvailableCidr } from "@server/lib/ip";
|
||||
import {
|
||||
selectBestExitNode,
|
||||
verifyExitNodeOrgAccess
|
||||
@@ -15,6 +13,7 @@ import {
|
||||
import { fetchContainers } from "./dockerSocket";
|
||||
import { lockManager } from "#dynamic/lib/lock";
|
||||
import { buildTargetConfigurationForNewtClient } from "./buildConfiguration";
|
||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||
|
||||
export type ExitNodePingResult = {
|
||||
exitNodeId: number;
|
||||
@@ -215,6 +214,9 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
|
||||
healthCheckTargets: validHealthCheckTargets
|
||||
}
|
||||
},
|
||||
options: {
|
||||
compress: canCompress(newt.version, "newt")
|
||||
},
|
||||
broadcast: false, // Send to all clients
|
||||
excludeSender: false // Include sender in broadcast
|
||||
};
|
||||
|
||||
@@ -10,10 +10,21 @@ interface PeerBandwidth {
|
||||
bytesOut: number;
|
||||
}
|
||||
|
||||
interface BandwidthAccumulator {
|
||||
bytesIn: number;
|
||||
bytesOut: number;
|
||||
}
|
||||
|
||||
// Retry configuration for deadlock handling
|
||||
const MAX_RETRIES = 3;
|
||||
const BASE_DELAY_MS = 50;
|
||||
|
||||
// How often to flush accumulated bandwidth data to the database
|
||||
const FLUSH_INTERVAL_MS = 120_000; // 120 seconds
|
||||
|
||||
// In-memory accumulator: publicKey -> { bytesIn, bytesOut }
|
||||
let accumulator = new Map<string, BandwidthAccumulator>();
|
||||
|
||||
/**
|
||||
* Check if an error is a deadlock error
|
||||
*/
|
||||
@@ -53,6 +64,90 @@ async function withDeadlockRetry<T>(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush all accumulated bandwidth data to the database.
|
||||
*
|
||||
* Swaps out the accumulator before writing so that any bandwidth messages
|
||||
* received during the flush are captured in the new accumulator rather than
|
||||
* being lost or causing contention. Entries that fail to write are re-queued
|
||||
* back into the accumulator so they will be retried on the next flush.
|
||||
*
|
||||
* This function is exported so that the application's graceful-shutdown
|
||||
* cleanup handler can call it before the process exits.
|
||||
*/
|
||||
export async function flushBandwidthToDb(): Promise<void> {
|
||||
if (accumulator.size === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Atomically swap out the accumulator so new data keeps flowing in
|
||||
// while we write the snapshot to the database.
|
||||
const snapshot = accumulator;
|
||||
accumulator = new Map<string, BandwidthAccumulator>();
|
||||
|
||||
const currentTime = new Date().toISOString();
|
||||
|
||||
// Sort by publicKey for consistent lock ordering across concurrent
|
||||
// writers — this is the same deadlock-prevention strategy used in the
|
||||
// original per-message implementation.
|
||||
const sortedEntries = [...snapshot.entries()].sort(([a], [b]) =>
|
||||
a.localeCompare(b)
|
||||
);
|
||||
|
||||
logger.debug(
|
||||
`Flushing accumulated bandwidth data for ${sortedEntries.length} client(s) to the database`
|
||||
);
|
||||
|
||||
for (const [publicKey, { bytesIn, bytesOut }] of sortedEntries) {
|
||||
try {
|
||||
await withDeadlockRetry(async () => {
|
||||
// Use atomic SQL increment to avoid the SELECT-then-UPDATE
|
||||
// anti-pattern and the races it would introduce.
|
||||
await db
|
||||
.update(clients)
|
||||
.set({
|
||||
// Note: bytesIn from peer goes to megabytesOut (data
|
||||
// sent to client) and bytesOut from peer goes to
|
||||
// megabytesIn (data received from client).
|
||||
megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`,
|
||||
megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`,
|
||||
lastBandwidthUpdate: currentTime
|
||||
})
|
||||
.where(eq(clients.pubKey, publicKey));
|
||||
}, `flush bandwidth for client ${publicKey}`);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush bandwidth for client ${publicKey}:`,
|
||||
error
|
||||
);
|
||||
|
||||
// Re-queue the failed entry so it is retried on the next flush
|
||||
// rather than silently dropped.
|
||||
const existing = accumulator.get(publicKey);
|
||||
if (existing) {
|
||||
existing.bytesIn += bytesIn;
|
||||
existing.bytesOut += bytesOut;
|
||||
} else {
|
||||
accumulator.set(publicKey, { bytesIn, bytesOut });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const flushTimer = setInterval(async () => {
|
||||
try {
|
||||
await flushBandwidthToDb();
|
||||
} catch (error) {
|
||||
logger.error("Unexpected error during periodic bandwidth flush:", error);
|
||||
}
|
||||
}, FLUSH_INTERVAL_MS);
|
||||
|
||||
// Calling unref() means this timer will not keep the Node.js event loop alive
|
||||
// on its own — the process can still exit normally when there is no other work
|
||||
// left. The graceful-shutdown path (see server/cleanup.ts) will call
|
||||
// flushBandwidthToDb() explicitly before process.exit(), so no data is lost.
|
||||
flushTimer.unref();
|
||||
|
||||
export const handleReceiveBandwidthMessage: MessageHandler = async (
|
||||
context
|
||||
) => {
|
||||
@@ -69,40 +164,21 @@ export const handleReceiveBandwidthMessage: MessageHandler = async (
|
||||
throw new Error("Invalid bandwidth data");
|
||||
}
|
||||
|
||||
// Sort bandwidth data by publicKey to ensure consistent lock ordering across all instances
|
||||
// This is critical for preventing deadlocks when multiple instances update the same clients
|
||||
const sortedBandwidthData = [...bandwidthData].sort((a, b) =>
|
||||
a.publicKey.localeCompare(b.publicKey)
|
||||
);
|
||||
// Accumulate the incoming data in memory; the periodic timer (and the
|
||||
// shutdown hook) will take care of writing it to the database.
|
||||
for (const { publicKey, bytesIn, bytesOut } of bandwidthData) {
|
||||
// Skip peers that haven't transferred any data — writing zeros to the
|
||||
// database would be a no-op anyway.
|
||||
if (bytesIn <= 0 && bytesOut <= 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const currentTime = new Date().toISOString();
|
||||
|
||||
// Update each client individually with retry logic
|
||||
// This reduces transaction scope and allows retries per-client
|
||||
for (const peer of sortedBandwidthData) {
|
||||
const { publicKey, bytesIn, bytesOut } = peer;
|
||||
|
||||
try {
|
||||
await withDeadlockRetry(async () => {
|
||||
// Use atomic SQL increment to avoid SELECT then UPDATE pattern
|
||||
// This eliminates the need to read the current value first
|
||||
await db
|
||||
.update(clients)
|
||||
.set({
|
||||
// Note: bytesIn from peer goes to megabytesOut (data sent to client)
|
||||
// and bytesOut from peer goes to megabytesIn (data received from client)
|
||||
megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`,
|
||||
megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`,
|
||||
lastBandwidthUpdate: currentTime
|
||||
})
|
||||
.where(eq(clients.pubKey, publicKey));
|
||||
}, `update client bandwidth ${publicKey}`);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to update bandwidth for client ${publicKey}:`,
|
||||
error
|
||||
);
|
||||
// Continue with other clients even if one fails
|
||||
const existing = accumulator.get(publicKey);
|
||||
if (existing) {
|
||||
existing.bytesIn += bytesIn;
|
||||
existing.bytesOut += bytesOut;
|
||||
} else {
|
||||
accumulator.set(publicKey, { bytesIn, bytesOut });
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -7,3 +7,4 @@ export * from "./handleSocketMessages";
|
||||
export * from "./handleNewtPingRequestMessage";
|
||||
export * from "./handleApplyBlueprintMessage";
|
||||
export * from "./handleNewtPingMessage";
|
||||
export * from "./handleNewtDisconnectingMessage";
|
||||
|
||||
@@ -6,6 +6,7 @@ import {
|
||||
buildClientConfigurationForNewtClient,
|
||||
buildTargetConfigurationForNewtClient
|
||||
} from "./buildConfiguration";
|
||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||
|
||||
export async function sendNewtSyncMessage(newt: Newt, site: Site) {
|
||||
const { tcpTargets, udpTargets, validHealthCheckTargets } =
|
||||
@@ -24,18 +25,24 @@ export async function sendNewtSyncMessage(newt: Newt, site: Site) {
|
||||
exitNode
|
||||
);
|
||||
|
||||
await sendToClient(newt.newtId, {
|
||||
type: "newt/sync",
|
||||
data: {
|
||||
proxyTargets: {
|
||||
udp: udpTargets,
|
||||
tcp: tcpTargets
|
||||
},
|
||||
healthCheckTargets: validHealthCheckTargets,
|
||||
peers: peers,
|
||||
clientTargets: targets
|
||||
await sendToClient(
|
||||
newt.newtId,
|
||||
{
|
||||
type: "newt/sync",
|
||||
data: {
|
||||
proxyTargets: {
|
||||
udp: udpTargets,
|
||||
tcp: tcpTargets
|
||||
},
|
||||
healthCheckTargets: validHealthCheckTargets,
|
||||
peers: peers,
|
||||
clientTargets: targets
|
||||
}
|
||||
},
|
||||
{
|
||||
compress: canCompress(newt.version, "newt")
|
||||
}
|
||||
}).catch((error) => {
|
||||
).catch((error) => {
|
||||
logger.warn(`Error sending newt sync message:`, error);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -2,13 +2,14 @@ import { Target, TargetHealthCheck, db, targetHealthCheck } from "@server/db";
|
||||
import { sendToClient } from "#dynamic/routers/ws";
|
||||
import logger from "@server/logger";
|
||||
import { eq, inArray } from "drizzle-orm";
|
||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||
|
||||
export async function addTargets(
|
||||
newtId: string,
|
||||
targets: Target[],
|
||||
healthCheckData: TargetHealthCheck[],
|
||||
protocol: string,
|
||||
port: number | null = null
|
||||
version?: string | null
|
||||
) {
|
||||
//create a list of udp and tcp targets
|
||||
const payloadTargets = targets.map((target) => {
|
||||
@@ -22,7 +23,7 @@ export async function addTargets(
|
||||
data: {
|
||||
targets: payloadTargets
|
||||
}
|
||||
}, { incrementConfigVersion: true });
|
||||
}, { incrementConfigVersion: true, compress: canCompress(version, "newt") });
|
||||
|
||||
// Create a map for quick lookup
|
||||
const healthCheckMap = new Map<number, TargetHealthCheck>();
|
||||
@@ -103,14 +104,14 @@ export async function addTargets(
|
||||
data: {
|
||||
targets: validHealthCheckTargets
|
||||
}
|
||||
}, { incrementConfigVersion: true });
|
||||
}, { incrementConfigVersion: true, compress: canCompress(version, "newt") });
|
||||
}
|
||||
|
||||
export async function removeTargets(
|
||||
newtId: string,
|
||||
targets: Target[],
|
||||
protocol: string,
|
||||
port: number | null = null
|
||||
version?: string | null
|
||||
) {
|
||||
//create a list of udp and tcp targets
|
||||
const payloadTargets = targets.map((target) => {
|
||||
@@ -135,5 +136,5 @@ export async function removeTargets(
|
||||
data: {
|
||||
ids: healthCheckTargets
|
||||
}
|
||||
}, { incrementConfigVersion: true });
|
||||
}, { incrementConfigVersion: true, compress: canCompress(version, "newt") });
|
||||
}
|
||||
|
||||
@@ -1,5 +1,17 @@
|
||||
import { Client, clientSiteResourcesAssociationsCache, clientSitesAssociationsCache, db, exitNodes, siteResources, sites } from "@server/db";
|
||||
import { generateAliasConfig, generateRemoteSubnets } from "@server/lib/ip";
|
||||
import {
|
||||
Client,
|
||||
clientSiteResourcesAssociationsCache,
|
||||
clientSitesAssociationsCache,
|
||||
db,
|
||||
exitNodes,
|
||||
siteResources,
|
||||
sites
|
||||
} from "@server/db";
|
||||
import {
|
||||
Alias,
|
||||
generateAliasConfig,
|
||||
generateRemoteSubnets
|
||||
} from "@server/lib/ip";
|
||||
import logger from "@server/logger";
|
||||
import { and, eq } from "drizzle-orm";
|
||||
import { addPeer, deletePeer } from "../newt/peers";
|
||||
@@ -8,9 +20,19 @@ import config from "@server/lib/config";
|
||||
export async function buildSiteConfigurationForOlmClient(
|
||||
client: Client,
|
||||
publicKey: string | null,
|
||||
relay: boolean
|
||||
relay: boolean,
|
||||
jitMode: boolean = false
|
||||
) {
|
||||
const siteConfigurations = [];
|
||||
const siteConfigurations: {
|
||||
siteId: number;
|
||||
name?: string
|
||||
endpoint?: string
|
||||
publicKey?: string
|
||||
serverIP?: string | null
|
||||
serverPort?: number | null
|
||||
remoteSubnets?: string[];
|
||||
aliases: Alias[];
|
||||
}[] = [];
|
||||
|
||||
// Get all sites data
|
||||
const sitesData = await db
|
||||
@@ -27,6 +49,40 @@ export async function buildSiteConfigurationForOlmClient(
|
||||
sites: site,
|
||||
clientSitesAssociationsCache: association
|
||||
} of sitesData) {
|
||||
const allSiteResources = await db // only get the site resources that this client has access to
|
||||
.select()
|
||||
.from(siteResources)
|
||||
.innerJoin(
|
||||
clientSiteResourcesAssociationsCache,
|
||||
eq(
|
||||
siteResources.siteResourceId,
|
||||
clientSiteResourcesAssociationsCache.siteResourceId
|
||||
)
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(siteResources.siteId, site.siteId),
|
||||
eq(
|
||||
clientSiteResourcesAssociationsCache.clientId,
|
||||
client.clientId
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
if (jitMode) {
|
||||
// Add site configuration to the array
|
||||
siteConfigurations.push({
|
||||
siteId: site.siteId,
|
||||
// remoteSubnets: generateRemoteSubnets(
|
||||
// allSiteResources.map(({ siteResources }) => siteResources)
|
||||
// ),
|
||||
aliases: generateAliasConfig(
|
||||
allSiteResources.map(({ siteResources }) => siteResources)
|
||||
)
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!site.exitNodeId) {
|
||||
logger.warn(
|
||||
`Site ${site.siteId} does not have exit node, skipping`
|
||||
@@ -42,6 +98,13 @@ export async function buildSiteConfigurationForOlmClient(
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!site.publicKey || site.publicKey == "") { // the site is not ready to accept new peers
|
||||
logger.warn(
|
||||
`Site ${site.siteId} has no public key, skipping`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// if (site.lastHolePunch && now - site.lastHolePunch > 6 && relay) {
|
||||
// logger.warn(
|
||||
// `Site ${site.siteId} last hole punch is too old, skipping`
|
||||
@@ -103,26 +166,6 @@ export async function buildSiteConfigurationForOlmClient(
|
||||
relayEndpoint = `${exitNode.endpoint}:${config.getRawConfig().gerbil.clients_start_port}`;
|
||||
}
|
||||
|
||||
const allSiteResources = await db // only get the site resources that this client has access to
|
||||
.select()
|
||||
.from(siteResources)
|
||||
.innerJoin(
|
||||
clientSiteResourcesAssociationsCache,
|
||||
eq(
|
||||
siteResources.siteResourceId,
|
||||
clientSiteResourcesAssociationsCache.siteResourceId
|
||||
)
|
||||
)
|
||||
.where(
|
||||
and(
|
||||
eq(siteResources.siteId, site.siteId),
|
||||
eq(
|
||||
clientSiteResourcesAssociationsCache.clientId,
|
||||
client.clientId
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
// Add site configuration to the array
|
||||
siteConfigurations.push({
|
||||
siteId: site.siteId,
|
||||
|
||||
@@ -6,7 +6,7 @@ import logger from "@server/logger";
|
||||
/**
|
||||
* Handles disconnecting messages from clients to show disconnected in the ui
|
||||
*/
|
||||
export const handleOlmDisconnecingMessage: MessageHandler = async (context) => {
|
||||
export const handleOlmDisconnectingMessage: MessageHandler = async (context) => {
|
||||
const { message, client: c, sendToClient } = context;
|
||||
const olm = c as Olm;
|
||||
|
||||
|
||||
@@ -17,6 +17,9 @@ import { getUserDeviceName } from "@server/db/names";
|
||||
import { buildSiteConfigurationForOlmClient } from "./buildConfiguration";
|
||||
import { OlmErrorCodes, sendOlmError } from "./error";
|
||||
import { handleFingerprintInsertion } from "./fingerprintingUtils";
|
||||
import { Alias } from "@server/lib/ip";
|
||||
import { build } from "@server/build";
|
||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||
|
||||
export const handleOlmRegisterMessage: MessageHandler = async (context) => {
|
||||
logger.info("Handling register olm message!");
|
||||
@@ -207,6 +210,32 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
|
||||
}
|
||||
}
|
||||
|
||||
// Get all sites data
|
||||
const sitesCountResult = await db
|
||||
.select({ count: count() })
|
||||
.from(sites)
|
||||
.innerJoin(
|
||||
clientSitesAssociationsCache,
|
||||
eq(sites.siteId, clientSitesAssociationsCache.siteId)
|
||||
)
|
||||
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
|
||||
|
||||
// Extract the count value from the result array
|
||||
const sitesCount =
|
||||
sitesCountResult.length > 0 ? sitesCountResult[0].count : 0;
|
||||
|
||||
// Prepare an array to store site configurations
|
||||
logger.debug(`Found ${sitesCount} sites for client ${client.clientId}`);
|
||||
|
||||
let jitMode = false;
|
||||
if (sitesCount > 250 && build == "saas") {
|
||||
// THIS IS THE MAX ON THE BUSINESS TIER
|
||||
// we have too many sites
|
||||
// If we have too many sites we need to drop into fully JIT mode by not sending any of the sites
|
||||
logger.info("Too many sites (%d), dropping into JIT mode", sitesCount);
|
||||
jitMode = true;
|
||||
}
|
||||
|
||||
logger.debug(
|
||||
`Olm client ID: ${client.clientId}, Public Key: ${publicKey}, Relay: ${relay}`
|
||||
);
|
||||
@@ -233,28 +262,12 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
|
||||
await db
|
||||
.update(clientSitesAssociationsCache)
|
||||
.set({
|
||||
isRelayed: relay == true
|
||||
isRelayed: relay == true,
|
||||
isJitMode: jitMode
|
||||
})
|
||||
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
|
||||
}
|
||||
|
||||
// Get all sites data
|
||||
const sitesCountResult = await db
|
||||
.select({ count: count() })
|
||||
.from(sites)
|
||||
.innerJoin(
|
||||
clientSitesAssociationsCache,
|
||||
eq(sites.siteId, clientSitesAssociationsCache.siteId)
|
||||
)
|
||||
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
|
||||
|
||||
// Extract the count value from the result array
|
||||
const sitesCount =
|
||||
sitesCountResult.length > 0 ? sitesCountResult[0].count : 0;
|
||||
|
||||
// Prepare an array to store site configurations
|
||||
logger.debug(`Found ${sitesCount} sites for client ${client.clientId}`);
|
||||
|
||||
// this prevents us from accepting a register from an olm that has not hole punched yet.
|
||||
// the olm will pump the register so we can keep checking
|
||||
// TODO: I still think there is a better way to do this rather than locking it out here but ???
|
||||
@@ -265,19 +278,14 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
|
||||
return;
|
||||
}
|
||||
|
||||
// NOTE: its important that the client here is the old client and the public key is the new key
|
||||
// NOTE: its important that the client here is the old client and the public key is the new key
|
||||
const siteConfigurations = await buildSiteConfigurationForOlmClient(
|
||||
client,
|
||||
publicKey,
|
||||
relay
|
||||
relay,
|
||||
jitMode
|
||||
);
|
||||
|
||||
// REMOVED THIS SO IT CREATES THE INTERFACE AND JUST WAITS FOR THE SITES
|
||||
// if (siteConfigurations.length === 0) {
|
||||
// logger.warn("No valid site configurations found");
|
||||
// return;
|
||||
// }
|
||||
|
||||
// Return connect message with all site configurations
|
||||
return {
|
||||
message: {
|
||||
@@ -288,6 +296,9 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
|
||||
utilitySubnet: org.utilitySubnet
|
||||
}
|
||||
},
|
||||
options: {
|
||||
compress: canCompress(olm.version, "olm")
|
||||
},
|
||||
broadcast: false,
|
||||
excludeSender: false
|
||||
};
|
||||
|
||||
@@ -18,7 +18,7 @@ export const handleOlmRelayMessage: MessageHandler = async (context) => {
|
||||
}
|
||||
|
||||
if (!olm.clientId) {
|
||||
logger.warn("Olm has no site!"); // TODO: Maybe we create the site here?
|
||||
logger.warn("Olm has no client!");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ export const handleOlmRelayMessage: MessageHandler = async (context) => {
|
||||
return;
|
||||
}
|
||||
|
||||
const { siteId } = message.data;
|
||||
const { siteId, chainId } = message.data;
|
||||
|
||||
// Get the site
|
||||
const [site] = await db
|
||||
@@ -90,7 +90,8 @@ export const handleOlmRelayMessage: MessageHandler = async (context) => {
|
||||
data: {
|
||||
siteId: siteId,
|
||||
relayEndpoint: exitNode.endpoint,
|
||||
relayPort: config.getRawConfig().gerbil.clients_start_port
|
||||
relayPort: config.getRawConfig().gerbil.clients_start_port,
|
||||
chainId
|
||||
}
|
||||
},
|
||||
broadcast: false,
|
||||
|
||||
241
server/routers/olm/handleOlmServerInitAddPeerHandshake.ts
Normal file
241
server/routers/olm/handleOlmServerInitAddPeerHandshake.ts
Normal file
@@ -0,0 +1,241 @@
|
||||
import {
|
||||
clientSiteResourcesAssociationsCache,
|
||||
clientSitesAssociationsCache,
|
||||
db,
|
||||
exitNodes,
|
||||
Site,
|
||||
siteResources
|
||||
} from "@server/db";
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { clients, Olm, sites } from "@server/db";
|
||||
import { and, eq, or } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { initPeerAddHandshake } from "./peers";
|
||||
|
||||
export const handleOlmServerInitAddPeerHandshake: MessageHandler = async (
|
||||
context
|
||||
) => {
|
||||
logger.info("Handling register olm message!");
|
||||
const { message, client: c, sendToClient } = context;
|
||||
const olm = c as Olm;
|
||||
|
||||
if (!olm) {
|
||||
logger.warn("Olm not found");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!olm.clientId) {
|
||||
logger.warn("Olm has no client!"); // TODO: Maybe we create the site here?
|
||||
return;
|
||||
}
|
||||
|
||||
const clientId = olm.clientId;
|
||||
|
||||
const [client] = await db
|
||||
.select()
|
||||
.from(clients)
|
||||
.where(eq(clients.clientId, clientId))
|
||||
.limit(1);
|
||||
|
||||
if (!client) {
|
||||
logger.warn("Client not found");
|
||||
return;
|
||||
}
|
||||
|
||||
const { siteId, resourceId, chainId } = message.data;
|
||||
|
||||
let site: Site | null = null;
|
||||
if (siteId) {
|
||||
// get the site
|
||||
const [siteRes] = await db
|
||||
.select()
|
||||
.from(sites)
|
||||
.where(eq(sites.siteId, siteId))
|
||||
.limit(1);
|
||||
if (siteRes) {
|
||||
site = siteRes;
|
||||
}
|
||||
}
|
||||
|
||||
if (resourceId && !site) {
|
||||
const resources = await db
|
||||
.select()
|
||||
.from(siteResources)
|
||||
.where(
|
||||
and(
|
||||
or(
|
||||
eq(siteResources.niceId, resourceId),
|
||||
eq(siteResources.alias, resourceId)
|
||||
),
|
||||
eq(siteResources.orgId, client.orgId)
|
||||
)
|
||||
);
|
||||
|
||||
if (!resources || resources.length === 0) {
|
||||
logger.error(`handleOlmServerPeerAddMessage: Resource not found`);
|
||||
// cancel the request from the olm side to not keep doing this
|
||||
await sendToClient(
|
||||
olm.olmId,
|
||||
{
|
||||
type: "olm/wg/peer/chain/cancel",
|
||||
data: {
|
||||
chainId
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: false }
|
||||
).catch((error) => {
|
||||
logger.warn(`Error sending message:`, error);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (resources.length > 1) {
|
||||
// error but this should not happen because the nice id cant contain a dot and the alias has to have a dot and both have to be unique within the org so there should never be multiple matches
|
||||
logger.error(
|
||||
`handleOlmServerPeerAddMessage: Multiple resources found matching the criteria`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const resource = resources[0];
|
||||
|
||||
const currentResourceAssociationCaches = await db
|
||||
.select()
|
||||
.from(clientSiteResourcesAssociationsCache)
|
||||
.where(
|
||||
and(
|
||||
eq(
|
||||
clientSiteResourcesAssociationsCache.siteResourceId,
|
||||
resource.siteResourceId
|
||||
),
|
||||
eq(
|
||||
clientSiteResourcesAssociationsCache.clientId,
|
||||
client.clientId
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
if (currentResourceAssociationCaches.length === 0) {
|
||||
logger.error(
|
||||
`handleOlmServerPeerAddMessage: Client ${client.clientId} does not have access to resource ${resource.siteResourceId}`
|
||||
);
|
||||
// cancel the request from the olm side to not keep doing this
|
||||
await sendToClient(
|
||||
olm.olmId,
|
||||
{
|
||||
type: "olm/wg/peer/chain/cancel",
|
||||
data: {
|
||||
chainId
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: false }
|
||||
).catch((error) => {
|
||||
logger.warn(`Error sending message:`, error);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const siteIdFromResource = resource.siteId;
|
||||
|
||||
// get the site
|
||||
const [siteRes] = await db
|
||||
.select()
|
||||
.from(sites)
|
||||
.where(eq(sites.siteId, siteIdFromResource));
|
||||
if (!siteRes) {
|
||||
logger.error(
|
||||
`handleOlmServerPeerAddMessage: Site with ID ${site} not found`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
site = siteRes;
|
||||
}
|
||||
|
||||
if (!site) {
|
||||
logger.error(`handleOlmServerPeerAddMessage: Site not found`);
|
||||
return;
|
||||
}
|
||||
|
||||
// check if the client can access this site using the cache
|
||||
const currentSiteAssociationCaches = await db
|
||||
.select()
|
||||
.from(clientSitesAssociationsCache)
|
||||
.where(
|
||||
and(
|
||||
eq(clientSitesAssociationsCache.clientId, client.clientId),
|
||||
eq(clientSitesAssociationsCache.siteId, site.siteId)
|
||||
)
|
||||
);
|
||||
|
||||
if (currentSiteAssociationCaches.length === 0) {
|
||||
logger.error(
|
||||
`handleOlmServerPeerAddMessage: Client ${client.clientId} does not have access to site ${site.siteId}`
|
||||
);
|
||||
// cancel the request from the olm side to not keep doing this
|
||||
await sendToClient(
|
||||
olm.olmId,
|
||||
{
|
||||
type: "olm/wg/peer/chain/cancel",
|
||||
data: {
|
||||
chainId
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: false }
|
||||
).catch((error) => {
|
||||
logger.warn(`Error sending message:`, error);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (!site.exitNodeId) {
|
||||
logger.error(
|
||||
`handleOlmServerPeerAddMessage: Site with ID ${site.siteId} has no exit node`
|
||||
);
|
||||
// cancel the request from the olm side to not keep doing this
|
||||
await sendToClient(
|
||||
olm.olmId,
|
||||
{
|
||||
type: "olm/wg/peer/chain/cancel",
|
||||
data: {
|
||||
chainId
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: false }
|
||||
).catch((error) => {
|
||||
logger.warn(`Error sending message:`, error);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// get the exit node from the side
|
||||
const [exitNode] = await db
|
||||
.select()
|
||||
.from(exitNodes)
|
||||
.where(eq(exitNodes.exitNodeId, site.exitNodeId));
|
||||
|
||||
if (!exitNode) {
|
||||
logger.error(
|
||||
`handleOlmServerPeerAddMessage: Site with ID ${site.siteId} has no exit node`
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
// also trigger the peer add handshake in case the peer was not already added to the olm and we need to hole punch
|
||||
// if it has already been added this will be a no-op
|
||||
await initPeerAddHandshake(
|
||||
// this will kick off the add peer process for the client
|
||||
client.clientId,
|
||||
{
|
||||
siteId: site.siteId,
|
||||
exitNode: {
|
||||
publicKey: exitNode.publicKey,
|
||||
endpoint: exitNode.endpoint
|
||||
}
|
||||
},
|
||||
olm.olmId,
|
||||
chainId
|
||||
);
|
||||
|
||||
return;
|
||||
};
|
||||
@@ -54,7 +54,7 @@ export const handleOlmServerPeerAddMessage: MessageHandler = async (
|
||||
return;
|
||||
}
|
||||
|
||||
const { siteId } = message.data;
|
||||
const { siteId, chainId } = message.data;
|
||||
|
||||
// get the site
|
||||
const [site] = await db
|
||||
@@ -179,7 +179,8 @@ export const handleOlmServerPeerAddMessage: MessageHandler = async (
|
||||
),
|
||||
aliases: generateAliasConfig(
|
||||
allSiteResources.map(({ siteResources }) => siteResources)
|
||||
)
|
||||
),
|
||||
chainId: chainId,
|
||||
}
|
||||
},
|
||||
broadcast: false,
|
||||
|
||||
@@ -17,7 +17,7 @@ export const handleOlmUnRelayMessage: MessageHandler = async (context) => {
|
||||
}
|
||||
|
||||
if (!olm.clientId) {
|
||||
logger.warn("Olm has no site!"); // TODO: Maybe we create the site here?
|
||||
logger.warn("Olm has no client!");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ export const handleOlmUnRelayMessage: MessageHandler = async (context) => {
|
||||
return;
|
||||
}
|
||||
|
||||
const { siteId } = message.data;
|
||||
const { siteId, chainId } = message.data;
|
||||
|
||||
// Get the site
|
||||
const [site] = await db
|
||||
@@ -87,7 +87,8 @@ export const handleOlmUnRelayMessage: MessageHandler = async (context) => {
|
||||
type: "olm/wg/peer/unrelay",
|
||||
data: {
|
||||
siteId: siteId,
|
||||
endpoint: site.endpoint
|
||||
endpoint: site.endpoint,
|
||||
chainId
|
||||
}
|
||||
},
|
||||
broadcast: false,
|
||||
|
||||
@@ -11,3 +11,4 @@ export * from "./handleOlmServerPeerAddMessage";
|
||||
export * from "./handleOlmUnRelayMessage";
|
||||
export * from "./recoverOlmWithFingerprint";
|
||||
export * from "./handleOlmDisconnectingMessage";
|
||||
export * from "./handleOlmServerInitAddPeerHandshake";
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { sendToClient } from "#dynamic/routers/ws";
|
||||
import { db, olms } from "@server/db";
|
||||
import { clientSitesAssociationsCache, db, olms } from "@server/db";
|
||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||
import config from "@server/lib/config";
|
||||
import logger from "@server/logger";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { and, eq } from "drizzle-orm";
|
||||
import { Alias } from "yaml";
|
||||
|
||||
export async function addPeer(
|
||||
@@ -18,7 +19,8 @@ export async function addPeer(
|
||||
remoteSubnets: string[] | null; // optional, comma-separated list of subnets that this site can access
|
||||
aliases: Alias[];
|
||||
},
|
||||
olmId?: string
|
||||
olmId?: string,
|
||||
version?: string | null
|
||||
) {
|
||||
if (!olmId) {
|
||||
const [olm] = await db
|
||||
@@ -30,6 +32,7 @@ export async function addPeer(
|
||||
return; // ignore this because an olm might not be associated with the client anymore
|
||||
}
|
||||
olmId = olm.olmId;
|
||||
version = olm.version;
|
||||
}
|
||||
|
||||
await sendToClient(
|
||||
@@ -48,7 +51,7 @@ export async function addPeer(
|
||||
aliases: peer.aliases
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: true }
|
||||
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
|
||||
).catch((error) => {
|
||||
logger.warn(`Error sending message:`, error);
|
||||
});
|
||||
@@ -60,7 +63,8 @@ export async function deletePeer(
|
||||
clientId: number,
|
||||
siteId: number,
|
||||
publicKey: string,
|
||||
olmId?: string
|
||||
olmId?: string,
|
||||
version?: string | null
|
||||
) {
|
||||
if (!olmId) {
|
||||
const [olm] = await db
|
||||
@@ -72,6 +76,7 @@ export async function deletePeer(
|
||||
return;
|
||||
}
|
||||
olmId = olm.olmId;
|
||||
version = olm.version;
|
||||
}
|
||||
|
||||
await sendToClient(
|
||||
@@ -83,7 +88,7 @@ export async function deletePeer(
|
||||
siteId: siteId
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: true }
|
||||
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
|
||||
).catch((error) => {
|
||||
logger.warn(`Error sending message:`, error);
|
||||
});
|
||||
@@ -103,7 +108,8 @@ export async function updatePeer(
|
||||
remoteSubnets?: string[] | null; // optional, comma-separated list of subnets that
|
||||
aliases?: Alias[] | null;
|
||||
},
|
||||
olmId?: string
|
||||
olmId?: string,
|
||||
version?: string | null
|
||||
) {
|
||||
if (!olmId) {
|
||||
const [olm] = await db
|
||||
@@ -115,6 +121,7 @@ export async function updatePeer(
|
||||
return;
|
||||
}
|
||||
olmId = olm.olmId;
|
||||
version = olm.version;
|
||||
}
|
||||
|
||||
await sendToClient(
|
||||
@@ -132,7 +139,7 @@ export async function updatePeer(
|
||||
aliases: peer.aliases
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: true }
|
||||
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
|
||||
).catch((error) => {
|
||||
logger.warn(`Error sending message:`, error);
|
||||
});
|
||||
@@ -149,7 +156,8 @@ export async function initPeerAddHandshake(
|
||||
endpoint: string;
|
||||
};
|
||||
},
|
||||
olmId?: string
|
||||
olmId?: string,
|
||||
chainId?: string
|
||||
) {
|
||||
if (!olmId) {
|
||||
const [olm] = await db
|
||||
@@ -173,7 +181,8 @@ export async function initPeerAddHandshake(
|
||||
publicKey: peer.exitNode.publicKey,
|
||||
relayPort: config.getRawConfig().gerbil.clients_start_port,
|
||||
endpoint: peer.exitNode.endpoint
|
||||
}
|
||||
},
|
||||
chainId
|
||||
}
|
||||
},
|
||||
{ incrementConfigVersion: true }
|
||||
@@ -181,6 +190,17 @@ export async function initPeerAddHandshake(
|
||||
logger.warn(`Error sending message:`, error);
|
||||
});
|
||||
|
||||
// update the clientSiteAssociationsCache to make the isJitMode flag false so that JIT mode is disabled for this site if it restarts or something after the connection
|
||||
await db
|
||||
.update(clientSitesAssociationsCache)
|
||||
.set({ isJitMode: false })
|
||||
.where(
|
||||
and(
|
||||
eq(clientSitesAssociationsCache.clientId, clientId),
|
||||
eq(clientSitesAssociationsCache.siteId, peer.siteId)
|
||||
)
|
||||
);
|
||||
|
||||
logger.info(
|
||||
`Initiated peer add handshake for site ${peer.siteId} to olm ${olmId}`
|
||||
);
|
||||
|
||||
@@ -1,9 +1,17 @@
|
||||
import { Client, db, exitNodes, Olm, sites, clientSitesAssociationsCache } from "@server/db";
|
||||
import {
|
||||
Client,
|
||||
db,
|
||||
exitNodes,
|
||||
Olm,
|
||||
sites,
|
||||
clientSitesAssociationsCache
|
||||
} from "@server/db";
|
||||
import { buildSiteConfigurationForOlmClient } from "./buildConfiguration";
|
||||
import { sendToClient } from "#dynamic/routers/ws";
|
||||
import logger from "@server/logger";
|
||||
import { eq, inArray } from "drizzle-orm";
|
||||
import config from "@server/lib/config";
|
||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||
|
||||
export async function sendOlmSyncMessage(olm: Olm, client: Client) {
|
||||
// NOTE: WE ARE HARDCODING THE RELAY PARAMETER TO FALSE HERE BUT IN THE REGISTER MESSAGE ITS DEFINED BY THE CLIENT
|
||||
@@ -17,10 +25,7 @@ export async function sendOlmSyncMessage(olm: Olm, client: Client) {
|
||||
const clientSites = await db
|
||||
.select()
|
||||
.from(clientSitesAssociationsCache)
|
||||
.innerJoin(
|
||||
sites,
|
||||
eq(sites.siteId, clientSitesAssociationsCache.siteId)
|
||||
)
|
||||
.innerJoin(sites, eq(sites.siteId, clientSitesAssociationsCache.siteId))
|
||||
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
|
||||
|
||||
// Extract unique exit node IDs
|
||||
@@ -68,13 +73,20 @@ export async function sendOlmSyncMessage(olm: Olm, client: Client) {
|
||||
|
||||
logger.debug("sendOlmSyncMessage: sending sync message");
|
||||
|
||||
await sendToClient(olm.olmId, {
|
||||
type: "olm/sync",
|
||||
data: {
|
||||
sites: siteConfigurations,
|
||||
exitNodes: exitNodesData
|
||||
await sendToClient(
|
||||
olm.olmId,
|
||||
{
|
||||
type: "olm/sync",
|
||||
data: {
|
||||
sites: siteConfigurations,
|
||||
exitNodes: exitNodesData
|
||||
}
|
||||
},
|
||||
|
||||
{
|
||||
compress: canCompress(olm.version, "olm")
|
||||
}
|
||||
}).catch((error) => {
|
||||
).catch((error) => {
|
||||
logger.warn(`Error sending olm sync message:`, error);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -223,6 +223,20 @@ async function createHttpResource(
|
||||
);
|
||||
}
|
||||
|
||||
// Prevent creating resource with same domain as dashboard
|
||||
const dashboardUrl = config.getRawConfig().app.dashboard_url;
|
||||
if (dashboardUrl) {
|
||||
const dashboardHost = new URL(dashboardUrl).hostname;
|
||||
if (fullDomain === dashboardHost) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.CONFLICT,
|
||||
"Resource domain cannot be the same as the dashboard domain"
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (build != "oss") {
|
||||
const existingLoginPages = await db
|
||||
.select()
|
||||
|
||||
@@ -353,6 +353,20 @@ async function updateHttpResource(
|
||||
);
|
||||
}
|
||||
|
||||
// Prevent updating resource with same domain as dashboard
|
||||
const dashboardUrl = config.getRawConfig().app.dashboard_url;
|
||||
if (dashboardUrl) {
|
||||
const dashboardHost = new URL(dashboardUrl).hostname;
|
||||
if (fullDomain === dashboardHost) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.CONFLICT,
|
||||
"Resource domain cannot be the same as the dashboard domain"
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (build != "oss") {
|
||||
const existingLoginPages = await db
|
||||
.select()
|
||||
|
||||
@@ -620,7 +620,7 @@ export async function handleMessagingForUpdatedSiteResource(
|
||||
await updateTargets(newt.newtId, {
|
||||
oldTargets: oldTargets,
|
||||
newTargets: newTargets
|
||||
});
|
||||
}, newt.version);
|
||||
}
|
||||
|
||||
const olmJobs: Promise<void>[] = [];
|
||||
|
||||
@@ -264,7 +264,7 @@ export async function createTarget(
|
||||
newTarget,
|
||||
healthCheck,
|
||||
resource.protocol,
|
||||
resource.proxyPort
|
||||
newt.version
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -262,7 +262,7 @@ export async function updateTarget(
|
||||
[updatedTarget],
|
||||
[updatedHc],
|
||||
resource.protocol,
|
||||
resource.proxyPort
|
||||
newt.version
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { build } from "@server/build";
|
||||
import {
|
||||
handleNewtRegisterMessage,
|
||||
handleReceiveBandwidthMessage,
|
||||
@@ -6,7 +7,9 @@ import {
|
||||
handleDockerContainersMessage,
|
||||
handleNewtPingRequestMessage,
|
||||
handleApplyBlueprintMessage,
|
||||
handleNewtPingMessage
|
||||
handleNewtPingMessage,
|
||||
startNewtOfflineChecker,
|
||||
handleNewtDisconnectingMessage
|
||||
} from "../newt";
|
||||
import {
|
||||
handleOlmRegisterMessage,
|
||||
@@ -15,7 +18,8 @@ import {
|
||||
startOlmOfflineChecker,
|
||||
handleOlmServerPeerAddMessage,
|
||||
handleOlmUnRelayMessage,
|
||||
handleOlmDisconnecingMessage
|
||||
handleOlmDisconnectingMessage,
|
||||
handleOlmServerInitAddPeerHandshake
|
||||
} from "../olm";
|
||||
import { handleHealthcheckStatusMessage } from "../target";
|
||||
import { handleRoundTripMessage } from "./handleRoundTripMessage";
|
||||
@@ -23,11 +27,13 @@ import { MessageHandler } from "./types";
|
||||
|
||||
export const messageHandlers: Record<string, MessageHandler> = {
|
||||
"olm/wg/server/peer/add": handleOlmServerPeerAddMessage,
|
||||
"olm/wg/server/peer/init": handleOlmServerInitAddPeerHandshake,
|
||||
"olm/wg/register": handleOlmRegisterMessage,
|
||||
"olm/wg/relay": handleOlmRelayMessage,
|
||||
"olm/wg/unrelay": handleOlmUnRelayMessage,
|
||||
"olm/ping": handleOlmPingMessage,
|
||||
"olm/disconnecting": handleOlmDisconnecingMessage,
|
||||
"olm/disconnecting": handleOlmDisconnectingMessage,
|
||||
"newt/disconnecting": handleNewtDisconnectingMessage,
|
||||
"newt/ping": handleNewtPingMessage,
|
||||
"newt/wg/register": handleNewtRegisterMessage,
|
||||
"newt/wg/get-config": handleGetConfigMessage,
|
||||
@@ -40,4 +46,7 @@ export const messageHandlers: Record<string, MessageHandler> = {
|
||||
"ws/round-trip/complete": handleRoundTripMessage
|
||||
};
|
||||
|
||||
startOlmOfflineChecker(); // this is to handle the offline check for olms
|
||||
if (build != "saas") {
|
||||
startOlmOfflineChecker(); // this is to handle the offline check for olms
|
||||
startNewtOfflineChecker(); // this is to handle the offline check for newts
|
||||
}
|
||||
|
||||
@@ -24,7 +24,7 @@ export interface AuthenticatedWebSocket extends WebSocket {
|
||||
clientType?: ClientType;
|
||||
connectionId?: string;
|
||||
isFullyConnected?: boolean;
|
||||
pendingMessages?: Buffer[];
|
||||
pendingMessages?: { data: Buffer; isBinary: boolean }[];
|
||||
configVersion?: number;
|
||||
}
|
||||
|
||||
@@ -73,6 +73,7 @@ export type MessageHandler = (
|
||||
// Options for sending messages with config version tracking
|
||||
export interface SendMessageOptions {
|
||||
incrementConfigVersion?: boolean;
|
||||
compress?: boolean;
|
||||
}
|
||||
|
||||
// Redis message type for cross-node communication
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
import { Router, Request, Response } from "express";
|
||||
import zlib from "zlib";
|
||||
import { Server as HttpServer } from "http";
|
||||
import { WebSocket, WebSocketServer } from "ws";
|
||||
import { Socket } from "net";
|
||||
import { Newt, newts, NewtSession, olms, Olm, OlmSession } from "@server/db";
|
||||
import { Newt, newts, NewtSession, olms, Olm, OlmSession, sites } from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { db } from "@server/db";
|
||||
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
||||
@@ -116,11 +117,20 @@ const sendToClientLocal = async (
|
||||
};
|
||||
|
||||
const messageString = JSON.stringify(messageWithVersion);
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(messageString);
|
||||
}
|
||||
});
|
||||
if (options.compress) {
|
||||
const compressed = zlib.gzipSync(Buffer.from(messageString, "utf8"));
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(compressed);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(messageString);
|
||||
}
|
||||
});
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
@@ -147,11 +157,22 @@ const broadcastToAllExceptLocal = async (
|
||||
...message,
|
||||
configVersion
|
||||
};
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(JSON.stringify(messageWithVersion));
|
||||
}
|
||||
});
|
||||
if (options.compress) {
|
||||
const compressed = zlib.gzipSync(
|
||||
Buffer.from(JSON.stringify(messageWithVersion), "utf8")
|
||||
);
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(compressed);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === WebSocket.OPEN) {
|
||||
client.send(JSON.stringify(messageWithVersion));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
@@ -286,9 +307,12 @@ const setupConnection = async (
|
||||
clientType === "newt" ? (client as Newt).newtId : (client as Olm).olmId;
|
||||
await addClient(clientType, clientId, ws);
|
||||
|
||||
ws.on("message", async (data) => {
|
||||
ws.on("message", async (data, isBinary) => {
|
||||
try {
|
||||
const message: WSMessage = JSON.parse(data.toString());
|
||||
const messageBuffer = isBinary
|
||||
? zlib.gunzipSync(data as Buffer)
|
||||
: (data as Buffer);
|
||||
const message: WSMessage = JSON.parse(messageBuffer.toString());
|
||||
|
||||
if (!message.type || typeof message.type !== "string") {
|
||||
throw new Error(
|
||||
@@ -356,6 +380,31 @@ const setupConnection = async (
|
||||
);
|
||||
});
|
||||
|
||||
// Handle WebSocket protocol-level pings from older newt clients that do
|
||||
// not send application-level "newt/ping" messages. Update the site's
|
||||
// online state and lastPing timestamp so the offline checker treats them
|
||||
// the same as modern newt clients.
|
||||
if (clientType === "newt") {
|
||||
const newtClient = client as Newt;
|
||||
ws.on("ping", async () => {
|
||||
if (!newtClient.siteId) return;
|
||||
try {
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: Math.floor(Date.now() / 1000)
|
||||
})
|
||||
.where(eq(sites.siteId, newtClient.siteId));
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
"Error updating newt site online state on WS ping",
|
||||
{ error }
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
ws.on("error", (error: Error) => {
|
||||
logger.error(
|
||||
`WebSocket error for ${clientType.toUpperCase()} ID ${clientId}:`,
|
||||
|
||||
Reference in New Issue
Block a user