Merge branch 'hybrid' into dev

This commit is contained in:
Owen
2025-08-18 15:29:23 -07:00
54 changed files with 3959 additions and 1319 deletions

View File

@@ -4,6 +4,9 @@ import { resourceSessions, ResourceSession } from "@server/db";
import { db } from "@server/db";
import { eq, and } from "drizzle-orm";
import config from "@server/lib/config";
import axios from "axios";
import logger from "@server/logger";
import { tokenManager } from "@server/lib/tokenManager";
export const SESSION_COOKIE_NAME =
config.getRawConfig().server.session_cookie_name;
@@ -62,6 +65,29 @@ export async function validateResourceSessionToken(
token: string,
resourceId: number
): Promise<ResourceSessionValidationResult> {
if (config.isHybridMode()) {
try {
const response = await axios.post(`${config.getRawConfig().hybrid?.endpoint}/api/v1/hybrid/resource/${resourceId}/session/validate`, {
token: token
}, await tokenManager.getAuthHeader());
return response.data.data;
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error("Error validating resource session token in hybrid mode:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error validating resource session token in hybrid mode:", error);
}
return { resourceSession: null };
}
}
const sessionId = encodeHexLowerCase(
sha256(new TextEncoder().encode(token))
);

View File

@@ -124,7 +124,10 @@ export const exitNodes = pgTable("exitNodes", {
publicKey: varchar("publicKey").notNull(),
listenPort: integer("listenPort").notNull(),
reachableAt: varchar("reachableAt"),
maxConnections: integer("maxConnections")
maxConnections: integer("maxConnections"),
online: boolean("online").notNull().default(false),
lastPing: integer("lastPing"),
type: text("type").default("gerbil") // gerbil, remoteExitNode
});
export const siteResources = pgTable("siteResources", { // this is for the clients

View File

@@ -0,0 +1,277 @@
import { db } from "@server/db";
import {
Resource,
ResourcePassword,
ResourcePincode,
ResourceRule,
resourcePassword,
resourcePincode,
resourceRules,
resources,
roleResources,
sessions,
userOrgs,
userResources,
users
} from "@server/db";
import { and, eq } from "drizzle-orm";
import axios from "axios";
import config from "@server/lib/config";
import logger from "@server/logger";
import { tokenManager } from "@server/lib/tokenManager";
export type ResourceWithAuth = {
resource: Resource | null;
pincode: ResourcePincode | null;
password: ResourcePassword | null;
};
export type UserSessionWithUser = {
session: any;
user: any;
};
/**
* Get resource by domain with pincode and password information
*/
export async function getResourceByDomain(
domain: string
): Promise<ResourceWithAuth | null> {
if (config.isHybridMode()) {
try {
const response = await axios.get(`${config.getRawConfig().hybrid?.endpoint}/api/v1/hybrid/resource/domain/${domain}`, await tokenManager.getAuthHeader());
return response.data.data;
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error("Error fetching config in verify session:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error fetching config in verify session:", error);
}
return null;
}
}
const [result] = await db
.select()
.from(resources)
.leftJoin(
resourcePincode,
eq(resourcePincode.resourceId, resources.resourceId)
)
.leftJoin(
resourcePassword,
eq(resourcePassword.resourceId, resources.resourceId)
)
.where(eq(resources.fullDomain, domain))
.limit(1);
if (!result) {
return null;
}
return {
resource: result.resources,
pincode: result.resourcePincode,
password: result.resourcePassword
};
}
/**
* Get user session with user information
*/
export async function getUserSessionWithUser(
userSessionId: string
): Promise<UserSessionWithUser | null> {
if (config.isHybridMode()) {
try {
const response = await axios.get(`${config.getRawConfig().hybrid?.endpoint}/api/v1/hybrid/session/${userSessionId}`, await tokenManager.getAuthHeader());
return response.data.data;
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error("Error fetching config in verify session:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error fetching config in verify session:", error);
}
return null;
}
}
const [res] = await db
.select()
.from(sessions)
.leftJoin(users, eq(users.userId, sessions.userId))
.where(eq(sessions.sessionId, userSessionId));
if (!res) {
return null;
}
return {
session: res.session,
user: res.user
};
}
/**
* Get user organization role
*/
export async function getUserOrgRole(userId: string, orgId: string) {
if (config.isHybridMode()) {
try {
const response = await axios.get(`${config.getRawConfig().hybrid?.endpoint}/api/v1/hybrid/user/${userId}/org/${orgId}/role`, await tokenManager.getAuthHeader());
return response.data.data;
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error("Error fetching config in verify session:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error fetching config in verify session:", error);
}
return null;
}
}
const userOrgRole = await db
.select()
.from(userOrgs)
.where(
and(
eq(userOrgs.userId, userId),
eq(userOrgs.orgId, orgId)
)
)
.limit(1);
return userOrgRole.length > 0 ? userOrgRole[0] : null;
}
/**
* Check if role has access to resource
*/
export async function getRoleResourceAccess(resourceId: number, roleId: number) {
if (config.isHybridMode()) {
try {
const response = await axios.get(`${config.getRawConfig().hybrid?.endpoint}/api/v1/hybrid/role/${roleId}/resource/${resourceId}/access`, await tokenManager.getAuthHeader());
return response.data.data;
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error("Error fetching config in verify session:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error fetching config in verify session:", error);
}
return null;
}
}
const roleResourceAccess = await db
.select()
.from(roleResources)
.where(
and(
eq(roleResources.resourceId, resourceId),
eq(roleResources.roleId, roleId)
)
)
.limit(1);
return roleResourceAccess.length > 0 ? roleResourceAccess[0] : null;
}
/**
* Check if user has direct access to resource
*/
export async function getUserResourceAccess(userId: string, resourceId: number) {
if (config.isHybridMode()) {
try {
const response = await axios.get(`${config.getRawConfig().hybrid?.endpoint}/api/v1/hybrid/user/${userId}/resource/${resourceId}/access`, await tokenManager.getAuthHeader());
return response.data.data;
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error("Error fetching config in verify session:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error fetching config in verify session:", error);
}
return null;
}
}
const userResourceAccess = await db
.select()
.from(userResources)
.where(
and(
eq(userResources.userId, userId),
eq(userResources.resourceId, resourceId)
)
)
.limit(1);
return userResourceAccess.length > 0 ? userResourceAccess[0] : null;
}
/**
* Get resource rules for a given resource
*/
export async function getResourceRules(resourceId: number): Promise<ResourceRule[]> {
if (config.isHybridMode()) {
try {
const response = await axios.get(`${config.getRawConfig().hybrid?.endpoint}/api/v1/hybrid/resource/${resourceId}/rules`, await tokenManager.getAuthHeader());
return response.data.data;
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error("Error fetching config in verify session:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error fetching config in verify session:", error);
}
return [];
}
}
const rules = await db
.select()
.from(resourceRules)
.where(eq(resourceRules.resourceId, resourceId));
return rules;
}

View File

@@ -136,7 +136,10 @@ export const exitNodes = sqliteTable("exitNodes", {
publicKey: text("publicKey").notNull(),
listenPort: integer("listenPort").notNull(),
reachableAt: text("reachableAt"), // this is the internal address of the gerbil http server for command control
maxConnections: integer("maxConnections")
maxConnections: integer("maxConnections"),
online: integer("online", { mode: "boolean" }).notNull().default(false),
lastPing: integer("lastPing"),
type: text("type").default("gerbil") // gerbil, remoteExitNode
});
export const siteResources = sqliteTable("siteResources", { // this is for the clients

147
server/hybridServer.ts Normal file
View File

@@ -0,0 +1,147 @@
import logger from "@server/logger";
import config from "@server/lib/config";
import { createWebSocketClient } from "./routers/ws/client";
import { addPeer, deletePeer } from "./routers/gerbil/peers";
import { db, exitNodes } from "./db";
import { TraefikConfigManager } from "./lib/traefikConfig";
import { tokenManager } from "./lib/tokenManager";
import { APP_VERSION } from "./lib/consts";
import axios from "axios";
export async function createHybridClientServer() {
logger.info("Starting hybrid client server...");
// Start the token manager
await tokenManager.start();
const token = await tokenManager.getToken();
const monitor = new TraefikConfigManager();
await monitor.start();
// Create client
const client = createWebSocketClient(
token,
config.getRawConfig().hybrid!.endpoint!,
{
reconnectInterval: 5000,
pingInterval: 30000,
pingTimeout: 10000
}
);
// Register message handlers
client.registerHandler("remoteExitNode/peers/add", async (message) => {
const { publicKey, allowedIps } = message.data;
// TODO: we are getting the exit node twice here
// NOTE: there should only be one gerbil registered so...
const [exitNode] = await db.select().from(exitNodes).limit(1);
await addPeer(exitNode.exitNodeId, {
publicKey: publicKey,
allowedIps: allowedIps || []
});
});
client.registerHandler("remoteExitNode/peers/remove", async (message) => {
const { publicKey } = message.data;
// TODO: we are getting the exit node twice here
// NOTE: there should only be one gerbil registered so...
const [exitNode] = await db.select().from(exitNodes).limit(1);
await deletePeer(exitNode.exitNodeId, publicKey);
});
// /update-proxy-mapping
client.registerHandler("remoteExitNode/update-proxy-mapping", async (message) => {
try {
const [exitNode] = await db.select().from(exitNodes).limit(1);
if (!exitNode) {
logger.error("No exit node found for proxy mapping update");
return;
}
const response = await axios.post(`${exitNode.endpoint}/update-proxy-mapping`, message.data);
logger.info(`Successfully updated proxy mapping: ${response.status}`);
} catch (error) {
// pull data out of the axios error to log
if (axios.isAxiosError(error)) {
logger.error("Error updating proxy mapping:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error updating proxy mapping:", error);
}
}
});
// /update-destinations
client.registerHandler("remoteExitNode/update-destinations", async (message) => {
try {
const [exitNode] = await db.select().from(exitNodes).limit(1);
if (!exitNode) {
logger.error("No exit node found for destinations update");
return;
}
const response = await axios.post(`${exitNode.endpoint}/update-destinations`, message.data);
logger.info(`Successfully updated destinations: ${response.status}`);
} catch (error) {
// pull data out of the axios error to log
if (axios.isAxiosError(error)) {
logger.error("Error updating destinations:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error updating destinations:", error);
}
}
});
client.registerHandler("remoteExitNode/traefik/reload", async (message) => {
await monitor.HandleTraefikConfig();
});
// Listen to connection events
client.on("connect", () => {
logger.info("Connected to WebSocket server");
client.sendMessage("remoteExitNode/register", {
remoteExitNodeVersion: APP_VERSION
});
});
client.on("disconnect", () => {
logger.info("Disconnected from WebSocket server");
});
client.on("message", (message) => {
logger.info(
`Received message: ${message.type} ${JSON.stringify(message.data)}`
);
});
// Connect to the server
try {
await client.connect();
logger.info("Connection initiated");
} catch (error) {
logger.error("Failed to connect:", error);
}
client.sendMessageInterval(
"remoteExitNode/ping",
{ timestamp: Date.now() / 1000 },
60000
); // send every minute
}

View File

@@ -7,9 +7,11 @@ import { createNextServer } from "./nextServer";
import { createInternalServer } from "./internalServer";
import { ApiKey, ApiKeyOrg, Session, User, UserOrg } from "@server/db";
import { createIntegrationApiServer } from "./integrationApiServer";
import { createHybridClientServer } from "./hybridServer";
import config from "@server/lib/config";
import { setHostMeta } from "@server/lib/hostMeta";
import { initTelemetryClient } from "./lib/telemetry.js";
import { TraefikConfigManager } from "./lib/traefikConfig.js";
async function startServers() {
await setHostMeta();
@@ -22,7 +24,18 @@ async function startServers() {
// Start all servers
const apiServer = createApiServer();
const internalServer = createInternalServer();
const nextServer = await createNextServer();
let hybridClientServer;
let nextServer;
if (config.isHybridMode()) {
hybridClientServer = await createHybridClientServer();
} else {
nextServer = await createNextServer();
if (config.getRawConfig().traefik.file_mode) {
const monitor = new TraefikConfigManager();
await monitor.start();
}
}
let integrationServer;
if (config.getRawConfig().flags?.enable_integration_api) {
@@ -33,7 +46,8 @@ async function startServers() {
apiServer,
nextServer,
internalServer,
integrationServer
integrationServer,
hybridClientServer
};
}

View File

@@ -103,9 +103,7 @@ export class Config {
private async checkKeyStatus() {
const licenseStatus = await license.check();
if (
!licenseStatus.isHostLicensed
) {
if (!licenseStatus.isHostLicensed) {
this.checkSupporterKey();
}
}
@@ -147,6 +145,10 @@ export class Config {
return false;
}
public isHybridMode() {
return typeof this.rawConfig?.hybrid === "object";
}
public async checkSupporterKey() {
const [key] = await db.select().from(supporterKey).limit(1);

View File

@@ -0,0 +1,86 @@
import axios from "axios";
import logger from "@server/logger";
import { ExitNode } from "@server/db";
interface ExitNodeRequest {
remoteType: string;
localPath: string;
method?: "POST" | "DELETE" | "GET" | "PUT";
data?: any;
queryParams?: Record<string, string>;
}
/**
* Sends a request to an exit node, handling both remote and local exit nodes
* @param exitNode The exit node to send the request to
* @param request The request configuration
* @returns Promise<any> Response data for local nodes, undefined for remote nodes
*/
export async function sendToExitNode(
exitNode: ExitNode,
request: ExitNodeRequest
): Promise<any> {
if (!exitNode.reachableAt) {
throw new Error(
`Exit node with ID ${exitNode.exitNodeId} is not reachable`
);
}
// Handle local exit node with HTTP API
const method = request.method || "POST";
let url = `${exitNode.reachableAt}${request.localPath}`;
// Add query parameters if provided
if (request.queryParams) {
const params = new URLSearchParams(request.queryParams);
url += `?${params.toString()}`;
}
try {
let response;
switch (method) {
case "POST":
response = await axios.post(url, request.data, {
headers: {
"Content-Type": "application/json"
}
});
break;
case "DELETE":
response = await axios.delete(url);
break;
case "GET":
response = await axios.get(url);
break;
case "PUT":
response = await axios.put(url, request.data, {
headers: {
"Content-Type": "application/json"
}
});
break;
default:
throw new Error(`Unsupported HTTP method: ${method}`);
}
logger.info(`Exit node request successful:`, {
method,
url,
status: response.data.status
});
return response.data;
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error(
`Error making ${method} request (can Pangolin see Gerbil HTTP API?) for exit node at ${exitNode.reachableAt} (status: ${error.response?.status}): ${error.message}`
);
} else {
logger.error(
`Error making ${method} request for exit node at ${exitNode.reachableAt}: ${error}`
);
}
throw error;
}
}

View File

@@ -0,0 +1,59 @@
import { db, exitNodes } from "@server/db";
import logger from "@server/logger";
import { ExitNodePingResult } from "@server/routers/newt";
import { eq } from "drizzle-orm";
export async function verifyExitNodeOrgAccess(
exitNodeId: number,
orgId: string
) {
const [exitNode] = await db
.select()
.from(exitNodes)
.where(eq(exitNodes.exitNodeId, exitNodeId));
// For any other type, deny access
return { hasAccess: true, exitNode };
}
export async function listExitNodes(orgId: string, filterOnline = false) {
// TODO: pick which nodes to send and ping better than just all of them that are not remote
const allExitNodes = await db
.select({
exitNodeId: exitNodes.exitNodeId,
name: exitNodes.name,
address: exitNodes.address,
endpoint: exitNodes.endpoint,
publicKey: exitNodes.publicKey,
listenPort: exitNodes.listenPort,
reachableAt: exitNodes.reachableAt,
maxConnections: exitNodes.maxConnections,
online: exitNodes.online,
lastPing: exitNodes.lastPing,
type: exitNodes.type
})
.from(exitNodes);
// Filter the nodes. If there are NO remoteExitNodes then do nothing. If there are then remove all of the non-remoteExitNodes
if (allExitNodes.length === 0) {
logger.warn("No exit nodes found!");
return [];
}
return allExitNodes;
}
export function selectBestExitNode(
pingResults: ExitNodePingResult[]
): ExitNodePingResult | null {
if (!pingResults || pingResults.length === 0) {
logger.warn("No ping results provided");
return null;
}
return pingResults[0];
}
export async function checkExitNodeOrg(exitNodeId: number, orgId: string) {
return false;
}

View File

@@ -0,0 +1,2 @@
export * from "./exitNodes";
export * from "./shared";

View File

@@ -0,0 +1,30 @@
import { db, exitNodes } from "@server/db";
import config from "@server/lib/config";
import { findNextAvailableCidr } from "@server/lib/ip";
export async function getNextAvailableSubnet(): Promise<string> {
// Get all existing subnets from routes table
const existingAddresses = await db
.select({
address: exitNodes.address
})
.from(exitNodes);
const addresses = existingAddresses.map((a) => a.address);
let subnet = findNextAvailableCidr(
addresses,
config.getRawConfig().gerbil.block_size,
config.getRawConfig().gerbil.subnet_group
);
if (!subnet) {
throw new Error("No available subnets remaining in space");
}
// replace the last octet with 1
subnet =
subnet.split(".").slice(0, 3).join(".") +
".1" +
"/" +
subnet.split("/")[1];
return subnet;
}

View File

@@ -1 +1,2 @@
export * from "./response";
export { tokenManager, TokenManager } from "./tokenManager";

View File

@@ -32,6 +32,15 @@ export const configSchema = z
.optional()
.default({})
}),
hybrid: z
.object({
name: z.string().optional(),
id: z.string().optional(),
secret: z.string().optional(),
endpoint: z.string().optional(),
redirect_endpoint: z.string().optional()
})
.optional(),
domains: z
.record(
z.string(),
@@ -135,7 +144,20 @@ export const configSchema = z
https_entrypoint: z.string().optional().default("websecure"),
additional_middlewares: z.array(z.string()).optional(),
cert_resolver: z.string().optional().default("letsencrypt"),
prefer_wildcard_cert: z.boolean().optional().default(false)
prefer_wildcard_cert: z.boolean().optional().default(false),
certificates_path: z.string().default("/var/certificates"),
monitor_interval: z.number().default(5000),
dynamic_cert_config_path: z
.string()
.optional()
.default("/var/dynamic/cert_config.yml"),
dynamic_router_config_path: z
.string()
.optional()
.default("/var/dynamic/router_config.yml"),
static_domains: z.array(z.string()).optional().default([]),
site_types: z.array(z.string()).optional().default(["newt", "wireguard", "local"]),
file_mode: z.boolean().optional().default(false)
})
.optional()
.default({}),

View File

@@ -0,0 +1,78 @@
import axios from "axios";
import { tokenManager } from "../tokenManager";
import logger from "@server/logger";
import config from "../config";
/**
* Get valid certificates for the specified domains
*/
export async function getValidCertificatesForDomainsHybrid(domains: Set<string>): Promise<
Array<{
id: number;
domain: string;
certFile: string | null;
keyFile: string | null;
expiresAt: Date | null;
updatedAt?: Date | null;
}>
> {
if (domains.size === 0) {
return [];
}
const domainArray = Array.from(domains);
try {
const response = await axios.get(
`${config.getRawConfig().hybrid?.endpoint}/api/v1/hybrid/certificates/domains`,
{
params: {
domains: domainArray
},
headers: (await tokenManager.getAuthHeader()).headers
}
);
if (response.status !== 200) {
logger.error(
`Failed to fetch certificates for domains: ${response.status} ${response.statusText}`,
{ responseData: response.data, domains: domainArray }
);
return [];
}
// logger.debug(
// `Successfully retrieved ${response.data.data?.length || 0} certificates for ${domainArray.length} domains`
// );
return response.data.data;
} catch (error) {
// pull data out of the axios error to log
if (axios.isAxiosError(error)) {
logger.error("Error getting certificates:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error getting certificates:", error);
}
return [];
}
}
export async function getValidCertificatesForDomains(domains: Set<string>): Promise<
Array<{
id: number;
domain: string;
certFile: string | null;
keyFile: string | null;
expiresAt: Date | null;
updatedAt?: Date | null;
}>
> {
return []; // stub
}

View File

@@ -0,0 +1 @@
export * from "./certificates";

73
server/lib/remoteProxy.ts Normal file
View File

@@ -0,0 +1,73 @@
import { Request, Response, NextFunction } from "express";
import { Router } from "express";
import axios from "axios";
import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
import logger from "@server/logger";
import config from "@server/lib/config";
import { tokenManager } from "./tokenManager";
/**
* Proxy function that forwards requests to the remote cloud server
*/
export const proxyToRemote = async (
req: Request,
res: Response,
next: NextFunction,
endpoint: string
): Promise<any> => {
try {
const remoteUrl = `${config.getRawConfig().hybrid?.endpoint?.replace(/\/$/, '')}/api/v1/${endpoint}`;
logger.debug(`Proxying request to remote server: ${remoteUrl}`);
// Forward the request to the remote server
const response = await axios({
method: req.method as any,
url: remoteUrl,
data: req.body,
headers: {
'Content-Type': 'application/json',
...(await tokenManager.getAuthHeader()).headers
},
params: req.query,
timeout: 30000, // 30 second timeout
validateStatus: () => true // Don't throw on non-2xx status codes
});
logger.debug(`Proxy response: ${JSON.stringify(response.data)}`);
// Forward the response status and data
return res.status(response.status).json(response.data);
} catch (error) {
logger.error("Error proxying request to remote server:", error);
if (axios.isAxiosError(error)) {
if (error.code === 'ECONNREFUSED' || error.code === 'ENOTFOUND') {
return next(
createHttpError(
HttpCode.SERVICE_UNAVAILABLE,
"Remote server is unavailable"
)
);
}
if (error.code === 'ECONNABORTED') {
return next(
createHttpError(
HttpCode.REQUEST_TIMEOUT,
"Request to remote server timed out"
)
);
}
}
return next(
createHttpError(
HttpCode.INTERNAL_SERVER_ERROR,
"Error communicating with remote server"
)
);
}
}

274
server/lib/tokenManager.ts Normal file
View File

@@ -0,0 +1,274 @@
import axios from "axios";
import config from "@server/lib/config";
import logger from "@server/logger";
export interface TokenResponse {
success: boolean;
message?: string;
data: {
token: string;
};
}
/**
* Token Manager - Handles automatic token refresh for hybrid server authentication
*
* Usage throughout the application:
* ```typescript
* import { tokenManager } from "@server/lib/tokenManager";
*
* // Get the current valid token
* const token = await tokenManager.getToken();
*
* // Force refresh if needed
* await tokenManager.refreshToken();
* ```
*
* The token manager automatically refreshes tokens every 24 hours by default
* and is started once in the privateHybridServer.ts file.
*/
export class TokenManager {
private token: string | null = null;
private refreshInterval: NodeJS.Timeout | null = null;
private isRefreshing: boolean = false;
private refreshIntervalMs: number;
private retryInterval: NodeJS.Timeout | null = null;
private retryIntervalMs: number;
private tokenAvailablePromise: Promise<void> | null = null;
private tokenAvailableResolve: (() => void) | null = null;
constructor(refreshIntervalMs: number = 24 * 60 * 60 * 1000, retryIntervalMs: number = 5000) {
// Default to 24 hours for refresh, 5 seconds for retry
this.refreshIntervalMs = refreshIntervalMs;
this.retryIntervalMs = retryIntervalMs;
this.setupTokenAvailablePromise();
}
/**
* Set up promise that resolves when token becomes available
*/
private setupTokenAvailablePromise(): void {
this.tokenAvailablePromise = new Promise((resolve) => {
this.tokenAvailableResolve = resolve;
});
}
/**
* Resolve the token available promise
*/
private resolveTokenAvailable(): void {
if (this.tokenAvailableResolve) {
this.tokenAvailableResolve();
this.tokenAvailableResolve = null;
}
}
/**
* Start the token manager - gets initial token and sets up refresh interval
* If initial token fetch fails, keeps retrying every few seconds until successful
*/
async start(): Promise<void> {
logger.info("Starting token manager...");
try {
await this.refreshToken();
this.setupRefreshInterval();
this.resolveTokenAvailable();
logger.info("Token manager started successfully");
} catch (error) {
logger.warn(`Failed to get initial token, will retry in ${this.retryIntervalMs / 1000} seconds:`, error);
this.setupRetryInterval();
}
}
/**
* Set up retry interval for initial token acquisition
*/
private setupRetryInterval(): void {
if (this.retryInterval) {
clearInterval(this.retryInterval);
}
this.retryInterval = setInterval(async () => {
try {
logger.debug("Retrying initial token acquisition");
await this.refreshToken();
this.setupRefreshInterval();
this.clearRetryInterval();
this.resolveTokenAvailable();
logger.info("Token manager started successfully after retry");
} catch (error) {
logger.debug("Token acquisition retry failed, will try again");
}
}, this.retryIntervalMs);
}
/**
* Clear retry interval
*/
private clearRetryInterval(): void {
if (this.retryInterval) {
clearInterval(this.retryInterval);
this.retryInterval = null;
}
}
/**
* Stop the token manager and clear all intervals
*/
stop(): void {
if (this.refreshInterval) {
clearInterval(this.refreshInterval);
this.refreshInterval = null;
}
this.clearRetryInterval();
logger.info("Token manager stopped");
}
/**
* Get the current valid token
*/
// TODO: WE SHOULD NOT BE GETTING A TOKEN EVERY TIME WE REQUEST IT
async getToken(): Promise<string> {
// If we don't have a token yet, wait for it to become available
if (!this.token && this.tokenAvailablePromise) {
await this.tokenAvailablePromise;
}
if (!this.token) {
if (this.isRefreshing) {
// Wait for current refresh to complete
await this.waitForRefresh();
} else {
throw new Error("No valid token available");
}
}
if (!this.token) {
throw new Error("No valid token available");
}
return this.token;
}
async getAuthHeader() {
return {
headers: {
Authorization: `Bearer ${await this.getToken()}`,
"X-CSRF-Token": "x-csrf-protection",
}
};
}
/**
* Force refresh the token
*/
async refreshToken(): Promise<void> {
if (this.isRefreshing) {
await this.waitForRefresh();
return;
}
this.isRefreshing = true;
try {
const hybridConfig = config.getRawConfig().hybrid;
if (
!hybridConfig?.id ||
!hybridConfig?.secret ||
!hybridConfig?.endpoint
) {
throw new Error("Hybrid configuration is not defined");
}
const tokenEndpoint = `${hybridConfig.endpoint}/api/v1/auth/remoteExitNode/get-token`;
const tokenData = {
remoteExitNodeId: hybridConfig.id,
secret: hybridConfig.secret
};
logger.debug("Requesting new token from server");
const response = await axios.post<TokenResponse>(
tokenEndpoint,
tokenData,
{
headers: {
"Content-Type": "application/json",
"X-CSRF-Token": "x-csrf-protection"
},
timeout: 10000 // 10 second timeout
}
);
if (!response.data.success) {
throw new Error(
`Failed to get token: ${response.data.message}`
);
}
if (!response.data.data.token) {
throw new Error("Received empty token from server");
}
this.token = response.data.data.token;
logger.debug("Token refreshed successfully");
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error("Error updating proxy mapping:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error updating proxy mapping:", error);
}
throw new Error("Failed to refresh token");
} finally {
this.isRefreshing = false;
}
}
/**
* Set up automatic token refresh interval
*/
private setupRefreshInterval(): void {
if (this.refreshInterval) {
clearInterval(this.refreshInterval);
}
this.refreshInterval = setInterval(async () => {
try {
logger.debug("Auto-refreshing token");
await this.refreshToken();
} catch (error) {
logger.error("Failed to auto-refresh token:", error);
}
}, this.refreshIntervalMs);
}
/**
* Wait for current refresh operation to complete
*/
private async waitForRefresh(): Promise<void> {
return new Promise((resolve) => {
const checkInterval = setInterval(() => {
if (!this.isRefreshing) {
clearInterval(checkInterval);
resolve();
}
}, 100);
});
}
}
// Export a singleton instance for use throughout the application
export const tokenManager = new TokenManager();

907
server/lib/traefikConfig.ts Normal file
View File

@@ -0,0 +1,907 @@
import * as fs from "fs";
import * as path from "path";
import config from "@server/lib/config";
import logger from "@server/logger";
import * as yaml from "js-yaml";
import axios from "axios";
import { db, exitNodes } from "@server/db";
import { eq } from "drizzle-orm";
import { tokenManager } from "./tokenManager";
import {
getCurrentExitNodeId,
getTraefikConfig
} from "@server/routers/traefik";
import {
getValidCertificatesForDomains,
getValidCertificatesForDomainsHybrid
} from "./remoteCertificates";
export class TraefikConfigManager {
private intervalId: NodeJS.Timeout | null = null;
private isRunning = false;
private activeDomains = new Set<string>();
private timeoutId: NodeJS.Timeout | null = null;
private lastCertificateFetch: Date | null = null;
private lastKnownDomains = new Set<string>();
private lastLocalCertificateState = new Map<
string,
{
exists: boolean;
lastModified: Date | null;
expiresAt: Date | null;
}
>();
constructor() {}
/**
* Start monitoring certificates
*/
private scheduleNextExecution(): void {
const intervalMs = config.getRawConfig().traefik.monitor_interval;
const now = Date.now();
const nextExecution = Math.ceil(now / intervalMs) * intervalMs;
const delay = nextExecution - now;
this.timeoutId = setTimeout(async () => {
try {
await this.HandleTraefikConfig();
} catch (error) {
logger.error("Error during certificate monitoring:", error);
}
if (this.isRunning) {
this.scheduleNextExecution(); // Schedule the next execution
}
}, delay);
}
async start(): Promise<void> {
if (this.isRunning) {
logger.info("Certificate monitor is already running");
return;
}
this.isRunning = true;
logger.info(`Starting certificate monitor for exit node`);
// Ensure certificates directory exists
await this.ensureDirectoryExists(
config.getRawConfig().traefik.certificates_path
);
// Initialize local certificate state
this.lastLocalCertificateState = await this.scanLocalCertificateState();
logger.info(
`Found ${this.lastLocalCertificateState.size} existing certificate directories`
);
// Run initial check
await this.HandleTraefikConfig();
// Start synchronized scheduling
this.scheduleNextExecution();
logger.info(
`Certificate monitor started with synchronized ${
config.getRawConfig().traefik.monitor_interval
}ms interval`
);
}
/**
* Stop monitoring certificates
*/
stop(): void {
if (!this.isRunning) {
logger.info("Certificate monitor is not running");
return;
}
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
this.isRunning = false;
logger.info("Certificate monitor stopped");
}
/**
* Scan local certificate directories to build current state
*/
private async scanLocalCertificateState(): Promise<
Map<
string,
{
exists: boolean;
lastModified: Date | null;
expiresAt: Date | null;
}
>
> {
const state = new Map();
const certsPath = config.getRawConfig().traefik.certificates_path;
try {
if (!fs.existsSync(certsPath)) {
return state;
}
const certDirs = fs.readdirSync(certsPath, { withFileTypes: true });
for (const dirent of certDirs) {
if (!dirent.isDirectory()) continue;
const domain = dirent.name;
const domainDir = path.join(certsPath, domain);
const certPath = path.join(domainDir, "cert.pem");
const keyPath = path.join(domainDir, "key.pem");
const lastUpdatePath = path.join(domainDir, ".last_update");
const certExists = await this.fileExists(certPath);
const keyExists = await this.fileExists(keyPath);
const lastUpdateExists = await this.fileExists(lastUpdatePath);
let lastModified: Date | null = null;
let expiresAt: Date | null = null;
if (lastUpdateExists) {
try {
const lastUpdateStr = fs
.readFileSync(lastUpdatePath, "utf8")
.trim();
lastModified = new Date(lastUpdateStr);
} catch {
// If we can't read the last update, fall back to file stats
try {
const stats = fs.statSync(certPath);
lastModified = stats.mtime;
} catch {
lastModified = null;
}
}
}
state.set(domain, {
exists: certExists && keyExists,
lastModified,
expiresAt
});
}
} catch (error) {
logger.error("Error scanning local certificate state:", error);
}
return state;
}
/**
* Check if we need to fetch certificates from remote
*/
private shouldFetchCertificates(currentDomains: Set<string>): boolean {
// Always fetch on first run
if (!this.lastCertificateFetch) {
return true;
}
// Fetch if it's been more than 24 hours (for renewals)
const dayInMs = 24 * 60 * 60 * 1000;
const timeSinceLastFetch =
Date.now() - this.lastCertificateFetch.getTime();
if (timeSinceLastFetch > dayInMs) {
logger.info("Fetching certificates due to 24-hour renewal check");
return true;
}
// Fetch if domains have changed
if (
this.lastKnownDomains.size !== currentDomains.size ||
!Array.from(this.lastKnownDomains).every((domain) =>
currentDomains.has(domain)
)
) {
logger.info("Fetching certificates due to domain changes");
return true;
}
// Check if any local certificates are missing or appear to be outdated
for (const domain of currentDomains) {
const localState = this.lastLocalCertificateState.get(domain);
if (!localState || !localState.exists) {
logger.info(
`Fetching certificates due to missing local cert for ${domain}`
);
return true;
}
// Check if certificate is expiring soon (within 30 days)
if (localState.expiresAt) {
const daysUntilExpiry =
(localState.expiresAt.getTime() - Date.now()) /
(1000 * 60 * 60 * 24);
if (daysUntilExpiry < 30) {
logger.info(
`Fetching certificates due to upcoming expiry for ${domain} (${Math.round(daysUntilExpiry)} days remaining)`
);
return true;
}
}
}
return false;
}
/**
* Main monitoring logic
*/
lastActiveDomains: Set<string> = new Set();
public async HandleTraefikConfig(): Promise<void> {
try {
// Get all active domains for this exit node via HTTP call
const getTraefikConfig = await this.getTraefikConfig();
if (!getTraefikConfig) {
logger.error(
"Failed to fetch active domains from traefik config"
);
return;
}
const { domains, traefikConfig } = getTraefikConfig;
// Add static domains from config
// const staticDomains = [config.getRawConfig().app.dashboard_url];
// staticDomains.forEach((domain) => domains.add(domain));
// Log if domains changed
if (
this.lastActiveDomains.size !== domains.size ||
!Array.from(this.lastActiveDomains).every((domain) =>
domains.has(domain)
)
) {
logger.info(
`Active domains changed for exit node: ${Array.from(domains).join(", ")}`
);
this.lastActiveDomains = new Set(domains);
}
// Scan current local certificate state
this.lastLocalCertificateState =
await this.scanLocalCertificateState();
// Only fetch certificates if needed (domain changes, missing certs, or daily renewal check)
let validCertificates: Array<{
id: number;
domain: string;
certFile: string | null;
keyFile: string | null;
expiresAt: Date | null;
updatedAt?: Date | null;
}> = [];
if (this.shouldFetchCertificates(domains)) {
// Get valid certificates for active domains
if (config.isHybridMode()) {
validCertificates =
await getValidCertificatesForDomainsHybrid(domains);
} else {
validCertificates =
await getValidCertificatesForDomains(domains);
}
this.lastCertificateFetch = new Date();
this.lastKnownDomains = new Set(domains);
logger.info(
`Fetched ${validCertificates.length} certificates from remote`
);
// Download and decrypt new certificates
await this.processValidCertificates(validCertificates);
} else {
const timeSinceLastFetch = this.lastCertificateFetch
? Math.round(
(Date.now() - this.lastCertificateFetch.getTime()) /
(1000 * 60)
)
: 0;
// logger.debug(
// `Skipping certificate fetch - no changes detected and within 24-hour window (last fetch: ${timeSinceLastFetch} minutes ago)`
// );
// Still need to ensure config is up to date with existing certificates
await this.updateDynamicConfigFromLocalCerts(domains);
}
// Clean up certificates for domains no longer in use
await this.cleanupUnusedCertificates(domains);
// wait 1 second for traefik to pick up the new certificates
await new Promise((resolve) => setTimeout(resolve, 500));
// Write traefik config as YAML to a second dynamic config file if changed
await this.writeTraefikDynamicConfig(traefikConfig);
// Send domains to SNI proxy
try {
let exitNode;
if (config.getRawConfig().gerbil.exit_node_name) {
const exitNodeName =
config.getRawConfig().gerbil.exit_node_name!;
[exitNode] = await db
.select()
.from(exitNodes)
.where(eq(exitNodes.name, exitNodeName))
.limit(1);
} else {
[exitNode] = await db.select().from(exitNodes).limit(1);
}
if (exitNode) {
try {
await axios.post(
`${exitNode.reachableAt}/update-local-snis`,
{ fullDomains: Array.from(domains) },
{ headers: { "Content-Type": "application/json" } }
);
} catch (error) {
// pull data out of the axios error to log
if (axios.isAxiosError(error)) {
logger.error("Error updating local SNI:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error updating local SNI:", error);
}
}
} else {
logger.error(
"No exit node found. Has gerbil registered yet?"
);
}
} catch (err) {
logger.error("Failed to post domains to SNI proxy:", err);
}
// Update active domains tracking
this.activeDomains = domains;
} catch (error) {
logger.error("Error in traefik config monitoring cycle:", error);
}
}
/**
* Get all domains currently in use from traefik config API
*/
private async getTraefikConfig(): Promise<{
domains: Set<string>;
traefikConfig: any;
} | null> {
let traefikConfig;
try {
if (config.isHybridMode()) {
const resp = await axios.get(
`${config.getRawConfig().hybrid?.endpoint}/api/v1/hybrid/traefik-config`,
await tokenManager.getAuthHeader()
);
if (resp.status !== 200) {
logger.error(
`Failed to fetch traefik config: ${resp.status} ${resp.statusText}`,
{ responseData: resp.data }
);
return null;
}
traefikConfig = resp.data.data;
} else {
const currentExitNode = await getCurrentExitNodeId();
traefikConfig = await getTraefikConfig(
currentExitNode,
config.getRawConfig().traefik.site_types
);
}
const domains = new Set<string>();
if (traefikConfig?.http?.routers) {
for (const router of Object.values<any>(
traefikConfig.http.routers
)) {
if (router.rule && typeof router.rule === "string") {
// Match Host(`domain`)
const match = router.rule.match(/Host\(`([^`]+)`\)/);
if (match && match[1]) {
domains.add(match[1]);
}
}
}
}
// logger.debug(
// `Successfully retrieved traefik config: ${JSON.stringify(traefikConfig)}`
// );
const badgerMiddlewareName = "badger";
if (traefikConfig?.http?.middlewares) {
traefikConfig.http.middlewares[badgerMiddlewareName] = {
plugin: {
[badgerMiddlewareName]: {
apiBaseUrl: new URL(
"/api/v1",
`http://${
config.getRawConfig().server
.internal_hostname
}:${config.getRawConfig().server.internal_port}`
).href,
userSessionCookieName:
config.getRawConfig().server
.session_cookie_name,
// deprecated
accessTokenQueryParam:
config.getRawConfig().server
.resource_access_token_param,
resourceSessionRequestParam:
config.getRawConfig().server
.resource_session_request_param
}
}
};
}
return { domains, traefikConfig };
} catch (error) {
// pull data out of the axios error to log
if (axios.isAxiosError(error)) {
logger.error("Error fetching traefik config:", {
message: error.message,
code: error.code,
status: error.response?.status,
statusText: error.response?.statusText,
url: error.config?.url,
method: error.config?.method
});
} else {
logger.error("Error fetching traefik config:", error);
}
return null;
}
}
/**
* Write traefik config as YAML to a second dynamic config file if changed
*/
private async writeTraefikDynamicConfig(traefikConfig: any): Promise<void> {
const traefikDynamicConfigPath =
config.getRawConfig().traefik.dynamic_router_config_path;
let shouldWrite = false;
let oldJson = "";
if (fs.existsSync(traefikDynamicConfigPath)) {
try {
const oldContent = fs.readFileSync(
traefikDynamicConfigPath,
"utf8"
);
// Try to parse as YAML then JSON.stringify for comparison
const oldObj = yaml.load(oldContent);
oldJson = JSON.stringify(oldObj);
} catch {
oldJson = "";
}
}
const newJson = JSON.stringify(traefikConfig);
if (oldJson !== newJson) {
shouldWrite = true;
}
if (shouldWrite) {
try {
fs.writeFileSync(
traefikDynamicConfigPath,
yaml.dump(traefikConfig, { noRefs: true }),
"utf8"
);
logger.info("Traefik dynamic config updated");
} catch (err) {
logger.error("Failed to write traefik dynamic config:", err);
}
}
}
/**
* Update dynamic config from existing local certificates without fetching from remote
*/
private async updateDynamicConfigFromLocalCerts(
domains: Set<string>
): Promise<void> {
const dynamicConfigPath =
config.getRawConfig().traefik.dynamic_cert_config_path;
// Load existing dynamic config if it exists, otherwise initialize
let dynamicConfig: any = { tls: { certificates: [] } };
if (fs.existsSync(dynamicConfigPath)) {
try {
const fileContent = fs.readFileSync(dynamicConfigPath, "utf8");
dynamicConfig = yaml.load(fileContent) || dynamicConfig;
if (!dynamicConfig.tls)
dynamicConfig.tls = { certificates: [] };
if (!Array.isArray(dynamicConfig.tls.certificates)) {
dynamicConfig.tls.certificates = [];
}
} catch (err) {
logger.error("Failed to load existing dynamic config:", err);
}
}
// Keep a copy of the original config for comparison
const originalConfigYaml = yaml.dump(dynamicConfig, { noRefs: true });
// Clear existing certificates and rebuild from local state
dynamicConfig.tls.certificates = [];
for (const domain of domains) {
const localState = this.lastLocalCertificateState.get(domain);
if (localState && localState.exists) {
const domainDir = path.join(
config.getRawConfig().traefik.certificates_path,
domain
);
const certPath = path.join(domainDir, "cert.pem");
const keyPath = path.join(domainDir, "key.pem");
const certEntry = {
certFile: certPath,
keyFile: keyPath
};
dynamicConfig.tls.certificates.push(certEntry);
}
}
// Only write the config if it has changed
const newConfigYaml = yaml.dump(dynamicConfig, { noRefs: true });
if (newConfigYaml !== originalConfigYaml) {
fs.writeFileSync(dynamicConfigPath, newConfigYaml, "utf8");
logger.info("Dynamic cert config updated from local certificates");
}
}
/**
* Process valid certificates - download and decrypt them
*/
private async processValidCertificates(
validCertificates: Array<{
id: number;
domain: string;
certFile: string | null;
keyFile: string | null;
expiresAt: Date | null;
updatedAt?: Date | null;
}>
): Promise<void> {
const dynamicConfigPath =
config.getRawConfig().traefik.dynamic_cert_config_path;
// Load existing dynamic config if it exists, otherwise initialize
let dynamicConfig: any = { tls: { certificates: [] } };
if (fs.existsSync(dynamicConfigPath)) {
try {
const fileContent = fs.readFileSync(dynamicConfigPath, "utf8");
dynamicConfig = yaml.load(fileContent) || dynamicConfig;
if (!dynamicConfig.tls)
dynamicConfig.tls = { certificates: [] };
if (!Array.isArray(dynamicConfig.tls.certificates)) {
dynamicConfig.tls.certificates = [];
}
} catch (err) {
logger.error("Failed to load existing dynamic config:", err);
}
}
// Keep a copy of the original config for comparison
const originalConfigYaml = yaml.dump(dynamicConfig, { noRefs: true });
for (const cert of validCertificates) {
try {
if (!cert.certFile || !cert.keyFile) {
logger.warn(
`Certificate for domain ${cert.domain} is missing cert or key file`
);
continue;
}
const domainDir = path.join(
config.getRawConfig().traefik.certificates_path,
cert.domain
);
await this.ensureDirectoryExists(domainDir);
const certPath = path.join(domainDir, "cert.pem");
const keyPath = path.join(domainDir, "key.pem");
const lastUpdatePath = path.join(domainDir, ".last_update");
// Check if we need to update the certificate
const shouldUpdate = await this.shouldUpdateCertificate(
cert,
certPath,
keyPath,
lastUpdatePath
);
if (shouldUpdate) {
logger.info(
`Processing certificate for domain: ${cert.domain}`
);
fs.writeFileSync(certPath, cert.certFile, "utf8");
fs.writeFileSync(keyPath, cert.keyFile, "utf8");
// Set appropriate permissions (readable by owner only for key file)
fs.chmodSync(certPath, 0o644);
fs.chmodSync(keyPath, 0o600);
// Write/update .last_update file with current timestamp
fs.writeFileSync(
lastUpdatePath,
new Date().toISOString(),
"utf8"
);
logger.info(
`Certificate updated for domain: ${cert.domain}`
);
// Update local state tracking
this.lastLocalCertificateState.set(cert.domain, {
exists: true,
lastModified: new Date(),
expiresAt: cert.expiresAt
});
}
// Always ensure the config entry exists and is up to date
const certEntry = {
certFile: certPath,
keyFile: keyPath
};
// Remove any existing entry for this cert/key path
dynamicConfig.tls.certificates =
dynamicConfig.tls.certificates.filter(
(entry: any) =>
entry.certFile !== certEntry.certFile ||
entry.keyFile !== certEntry.keyFile
);
dynamicConfig.tls.certificates.push(certEntry);
} catch (error) {
logger.error(
`Error processing certificate for domain ${cert.domain}:`,
error
);
}
}
// Only write the config if it has changed
const newConfigYaml = yaml.dump(dynamicConfig, { noRefs: true });
if (newConfigYaml !== originalConfigYaml) {
fs.writeFileSync(dynamicConfigPath, newConfigYaml, "utf8");
logger.info("Dynamic cert config updated");
}
}
/**
* Check if certificate should be updated
*/
private async shouldUpdateCertificate(
cert: {
id: number;
domain: string;
expiresAt: Date | null;
updatedAt?: Date | null;
},
certPath: string,
keyPath: string,
lastUpdatePath: string
): Promise<boolean> {
try {
// If files don't exist, we need to create them
const certExists = await this.fileExists(certPath);
const keyExists = await this.fileExists(keyPath);
const lastUpdateExists = await this.fileExists(lastUpdatePath);
if (!certExists || !keyExists || !lastUpdateExists) {
return true;
}
// Read last update time from .last_update file
let lastUpdateTime: Date | null = null;
try {
const lastUpdateStr = fs
.readFileSync(lastUpdatePath, "utf8")
.trim();
lastUpdateTime = new Date(lastUpdateStr);
} catch {
lastUpdateTime = null;
}
// Use updatedAt from cert, fallback to expiresAt if not present
const dbUpdateTime = cert.updatedAt ?? cert.expiresAt;
if (!dbUpdateTime) {
// If no update time in DB, always update
return true;
}
// If DB updatedAt is newer than last update file, update
if (!lastUpdateTime || dbUpdateTime > lastUpdateTime) {
return true;
}
return false;
} catch (error) {
logger.error(
`Error checking certificate update status for ${cert.domain}:`,
error
);
return true; // When in doubt, update
}
}
/**
* Clean up certificates for domains no longer in use
*/
private async cleanupUnusedCertificates(
currentActiveDomains: Set<string>
): Promise<void> {
try {
const certsPath = config.getRawConfig().traefik.certificates_path;
const dynamicConfigPath =
config.getRawConfig().traefik.dynamic_cert_config_path;
// Load existing dynamic config if it exists
let dynamicConfig: any = { tls: { certificates: [] } };
if (fs.existsSync(dynamicConfigPath)) {
try {
const fileContent = fs.readFileSync(
dynamicConfigPath,
"utf8"
);
dynamicConfig = yaml.load(fileContent) || dynamicConfig;
if (!dynamicConfig.tls)
dynamicConfig.tls = { certificates: [] };
if (!Array.isArray(dynamicConfig.tls.certificates)) {
dynamicConfig.tls.certificates = [];
}
} catch (err) {
logger.error(
"Failed to load existing dynamic config:",
err
);
}
}
const certDirs = fs.readdirSync(certsPath, {
withFileTypes: true
});
let configChanged = false;
for (const dirent of certDirs) {
if (!dirent.isDirectory()) continue;
const dirName = dirent.name;
// Only delete if NO current domain is exactly the same or ends with `.${dirName}`
const shouldDelete = !Array.from(currentActiveDomains).some(
(domain) =>
domain === dirName || domain.endsWith(`.${dirName}`)
);
if (shouldDelete) {
const domainDir = path.join(certsPath, dirName);
logger.info(
`Cleaning up unused certificate directory: ${dirName}`
);
fs.rmSync(domainDir, { recursive: true, force: true });
// Remove from local state tracking
this.lastLocalCertificateState.delete(dirName);
// Remove from dynamic config
const certFilePath = path.join(
domainDir,
"cert.pem"
);
const keyFilePath = path.join(
domainDir,
"key.pem"
);
const before = dynamicConfig.tls.certificates.length;
dynamicConfig.tls.certificates =
dynamicConfig.tls.certificates.filter(
(entry: any) =>
entry.certFile !== certFilePath &&
entry.keyFile !== keyFilePath
);
if (dynamicConfig.tls.certificates.length !== before) {
configChanged = true;
}
}
}
if (configChanged) {
try {
fs.writeFileSync(
dynamicConfigPath,
yaml.dump(dynamicConfig, { noRefs: true }),
"utf8"
);
logger.info("Dynamic config updated after cleanup");
} catch (err) {
logger.error(
"Failed to update dynamic config after cleanup:",
err
);
}
}
} catch (error) {
logger.error("Error during certificate cleanup:", error);
}
}
/**
* Ensure directory exists
*/
private async ensureDirectoryExists(dirPath: string): Promise<void> {
try {
fs.mkdirSync(dirPath, { recursive: true });
} catch (error) {
logger.error(`Error creating directory ${dirPath}:`, error);
throw error;
}
}
/**
* Check if file exists
*/
private async fileExists(filePath: string): Promise<boolean> {
try {
fs.accessSync(filePath);
return true;
} catch {
return false;
}
}
/**
* Force a certificate refresh regardless of cache state
*/
public async forceCertificateRefresh(): Promise<void> {
logger.info("Forcing certificate refresh");
this.lastCertificateFetch = null;
this.lastKnownDomains = new Set();
await this.HandleTraefikConfig();
}
/**
* Get current status
*/
getStatus(): {
isRunning: boolean;
activeDomains: string[];
monitorInterval: number;
lastCertificateFetch: Date | null;
localCertificateCount: number;
} {
return {
isRunning: this.isRunning,
activeDomains: Array.from(this.activeDomains),
monitorInterval:
config.getRawConfig().traefik.monitor_interval || 5000,
lastCertificateFetch: this.lastCertificateFetch,
localCertificateCount: this.lastLocalCertificateState.size
};
}
}

View File

@@ -6,20 +6,21 @@ import {
} from "@server/auth/sessions/resource";
import { verifyResourceAccessToken } from "@server/auth/verifyResourceAccessToken";
import { db } from "@server/db";
import {
getResourceByDomain,
getUserSessionWithUser,
getUserOrgRole,
getRoleResourceAccess,
getUserResourceAccess,
getResourceRules
} from "@server/db/queries/verifySessionQueries";
import {
Resource,
ResourceAccessToken,
ResourcePassword,
resourcePassword,
ResourcePincode,
resourcePincode,
ResourceRule,
resourceRules,
resources,
roleResources,
sessions,
userOrgs,
userResources,
users
} from "@server/db";
import config from "@server/lib/config";
@@ -27,7 +28,6 @@ import { isIpInCidr } from "@server/lib/ip";
import { response } from "@server/lib/response";
import logger from "@server/logger";
import HttpCode from "@server/types/HttpCode";
import { and, eq } from "drizzle-orm";
import { NextFunction, Request, Response } from "express";
import createHttpError from "http-errors";
import NodeCache from "node-cache";
@@ -137,38 +137,21 @@ export async function verifyResourceSession(
| undefined = cache.get(resourceCacheKey);
if (!resourceData) {
const [result] = await db
.select()
.from(resources)
.leftJoin(
resourcePincode,
eq(resourcePincode.resourceId, resources.resourceId)
)
.leftJoin(
resourcePassword,
eq(resourcePassword.resourceId, resources.resourceId)
)
.where(eq(resources.fullDomain, cleanHost))
.limit(1);
const result = await getResourceByDomain(cleanHost);
if (!result) {
logger.debug("Resource not found", cleanHost);
logger.debug(`Resource not found ${cleanHost}`);
return notAllowed(res);
}
resourceData = {
resource: result.resources,
pincode: result.resourcePincode,
password: result.resourcePassword
};
resourceData = result;
cache.set(resourceCacheKey, resourceData);
}
const { resource, pincode, password } = resourceData;
if (!resource) {
logger.debug("Resource not found", cleanHost);
logger.debug(`Resource not found ${cleanHost}`);
return notAllowed(res);
}
@@ -208,7 +191,13 @@ export async function verifyResourceSession(
return allowed(res);
}
const redirectUrl = `${config.getRawConfig().app.dashboard_url}/auth/resource/${encodeURIComponent(
let endpoint: string;
if (config.isHybridMode()) {
endpoint = config.getRawConfig().hybrid?.redirect_endpoint || config.getRawConfig().hybrid?.endpoint || "";
} else {
endpoint = config.getRawConfig().app.dashboard_url;
}
const redirectUrl = `${endpoint}/auth/resource/${encodeURIComponent(
resource.resourceId
)}?redirect=${encodeURIComponent(originalRequestURL)}`;
@@ -529,14 +518,13 @@ async function isUserAllowedToAccessResource(
userSessionId: string,
resource: Resource
): Promise<BasicUserData | null> {
const [res] = await db
.select()
.from(sessions)
.leftJoin(users, eq(users.userId, sessions.userId))
.where(eq(sessions.sessionId, userSessionId));
const result = await getUserSessionWithUser(userSessionId);
const user = res.user;
const session = res.session;
if (!result) {
return null;
}
const { user, session } = result;
if (!user || !session) {
return null;
@@ -549,33 +537,18 @@ async function isUserAllowedToAccessResource(
return null;
}
const userOrgRole = await db
.select()
.from(userOrgs)
.where(
and(
eq(userOrgs.userId, user.userId),
eq(userOrgs.orgId, resource.orgId)
)
)
.limit(1);
const userOrgRole = await getUserOrgRole(user.userId, resource.orgId);
if (userOrgRole.length === 0) {
if (!userOrgRole) {
return null;
}
const roleResourceAccess = await db
.select()
.from(roleResources)
.where(
and(
eq(roleResources.resourceId, resource.resourceId),
eq(roleResources.roleId, userOrgRole[0].roleId)
)
)
.limit(1);
const roleResourceAccess = await getRoleResourceAccess(
resource.resourceId,
userOrgRole.roleId
);
if (roleResourceAccess.length > 0) {
if (roleResourceAccess) {
return {
username: user.username,
email: user.email,
@@ -583,18 +556,12 @@ async function isUserAllowedToAccessResource(
};
}
const userResourceAccess = await db
.select()
.from(userResources)
.where(
and(
eq(userResources.userId, user.userId),
eq(userResources.resourceId, resource.resourceId)
)
)
.limit(1);
const userResourceAccess = await getUserResourceAccess(
user.userId,
resource.resourceId
);
if (userResourceAccess.length > 0) {
if (userResourceAccess) {
return {
username: user.username,
email: user.email,
@@ -615,11 +582,7 @@ async function checkRules(
let rules: ResourceRule[] | undefined = cache.get(ruleCacheKey);
if (!rules) {
rules = await db
.select()
.from(resourceRules)
.where(eq(resourceRules.resourceId, resourceId));
rules = await getResourceRules(resourceId);
cache.set(ruleCacheKey, rules);
}

View File

@@ -24,6 +24,7 @@ import { hashPassword } from "@server/auth/password";
import { isValidCIDR, isValidIP } from "@server/lib/validators";
import { isIpInCidr } from "@server/lib/ip";
import { OpenAPITags, registry } from "@server/openApi";
import { listExitNodes } from "@server/lib/exitNodes";
const createClientParamsSchema = z
.object({
@@ -177,20 +178,9 @@ export async function createClient(
await db.transaction(async (trx) => {
// TODO: more intelligent way to pick the exit node
// make sure there is an exit node by counting the exit nodes table
const nodes = await db.select().from(exitNodes);
if (nodes.length === 0) {
return next(
createHttpError(
HttpCode.NOT_FOUND,
"No exit nodes available"
)
);
}
// get the first exit node
const exitNode = nodes[0];
const exitNodesList = await listExitNodes(orgId);
const randomExitNode =
exitNodesList[Math.floor(Math.random() * exitNodesList.length)];
const adminRole = await trx
.select()
@@ -208,7 +198,7 @@ export async function createClient(
const [newClient] = await trx
.insert(clients)
.values({
exitNodeId: exitNode.exitNodeId,
exitNodeId: randomExitNode.exitNodeId,
orgId,
name,
subnet: updatedSubnet,

View File

@@ -17,7 +17,7 @@ import {
addPeer as olmAddPeer,
deletePeer as olmDeletePeer
} from "../olm/peers";
import axios from "axios";
import { sendToExitNode } from "../../lib/exitNodeComms";
const updateClientParamsSchema = z
.object({
@@ -141,13 +141,15 @@ export async function updateClient(
const isRelayed = true;
// get the clientsite
const [clientSite] = await db
const [clientSite] = await db
.select()
.from(clientSites)
.where(and(
eq(clientSites.clientId, client.clientId),
eq(clientSites.siteId, siteId)
))
.where(
and(
eq(clientSites.clientId, client.clientId),
eq(clientSites.siteId, siteId)
)
)
.limit(1);
if (!clientSite || !clientSite.endpoint) {
@@ -158,7 +160,7 @@ export async function updateClient(
const site = await newtAddPeer(siteId, {
publicKey: client.pubKey,
allowedIps: [`${client.subnet.split("/")[0]}/32`], // we want to only allow from that client
endpoint: isRelayed ? "" : clientSite.endpoint
endpoint: isRelayed ? "" : clientSite.endpoint
});
if (!site) {
@@ -270,114 +272,102 @@ export async function updateClient(
}
}
// get all sites for this client and join with exit nodes with site.exitNodeId
const sitesData = await db
.select()
.from(sites)
.innerJoin(
clientSites,
eq(sites.siteId, clientSites.siteId)
)
.leftJoin(
exitNodes,
eq(sites.exitNodeId, exitNodes.exitNodeId)
)
.where(eq(clientSites.clientId, client.clientId));
// get all sites for this client and join with exit nodes with site.exitNodeId
const sitesData = await db
.select()
.from(sites)
.innerJoin(clientSites, eq(sites.siteId, clientSites.siteId))
.leftJoin(exitNodes, eq(sites.exitNodeId, exitNodes.exitNodeId))
.where(eq(clientSites.clientId, client.clientId));
let exitNodeDestinations: {
reachableAt: string;
sourceIp: string;
sourcePort: number;
destinations: PeerDestination[];
}[] = [];
let exitNodeDestinations: {
reachableAt: string;
exitNodeId: number;
type: string;
sourceIp: string;
sourcePort: number;
destinations: PeerDestination[];
}[] = [];
for (const site of sitesData) {
if (!site.sites.subnet) {
logger.warn(
`Site ${site.sites.siteId} has no subnet, skipping`
);
continue;
}
if (!site.clientSites.endpoint) {
logger.warn(
`Site ${site.sites.siteId} has no endpoint, skipping`
);
continue;
}
// find the destinations in the array
let destinations = exitNodeDestinations.find(
(d) => d.reachableAt === site.exitNodes?.reachableAt
for (const site of sitesData) {
if (!site.sites.subnet) {
logger.warn(
`Site ${site.sites.siteId} has no subnet, skipping`
);
if (!destinations) {
destinations = {
reachableAt: site.exitNodes?.reachableAt || "",
sourceIp: site.clientSites.endpoint.split(":")[0] || "",
sourcePort: parseInt(site.clientSites.endpoint.split(":")[1]) || 0,
destinations: [
{
destinationIP:
site.sites.subnet.split("/")[0],
destinationPort: site.sites.listenPort || 0
}
]
};
} else {
// add to the existing destinations
destinations.destinations.push({
destinationIP: site.sites.subnet.split("/")[0],
destinationPort: site.sites.listenPort || 0
});
}
// update it in the array
exitNodeDestinations = exitNodeDestinations.filter(
(d) => d.reachableAt !== site.exitNodes?.reachableAt
);
exitNodeDestinations.push(destinations);
continue;
}
for (const destination of exitNodeDestinations) {
try {
logger.info(
`Updating destinations for exit node at ${destination.reachableAt}`
);
const payload = {
sourceIp: destination.sourceIp,
sourcePort: destination.sourcePort,
destinations: destination.destinations
};
logger.info(
`Payload for update-destinations: ${JSON.stringify(payload, null, 2)}`
);
const response = await axios.post(
`${destination.reachableAt}/update-destinations`,
payload,
if (!site.clientSites.endpoint) {
logger.warn(
`Site ${site.sites.siteId} has no endpoint, skipping`
);
continue;
}
// find the destinations in the array
let destinations = exitNodeDestinations.find(
(d) => d.reachableAt === site.exitNodes?.reachableAt
);
if (!destinations) {
destinations = {
reachableAt: site.exitNodes?.reachableAt || "",
exitNodeId: site.exitNodes?.exitNodeId || 0,
type: site.exitNodes?.type || "",
sourceIp: site.clientSites.endpoint.split(":")[0] || "",
sourcePort:
parseInt(site.clientSites.endpoint.split(":")[1]) ||
0,
destinations: [
{
headers: {
"Content-Type": "application/json"
}
destinationIP: site.sites.subnet.split("/")[0],
destinationPort: site.sites.listenPort || 0
}
);
logger.info("Destinations updated:", {
peer: response.data.status
});
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error(
`Error updating destinations (can Pangolin see Gerbil HTTP API?) for exit node at ${destination.reachableAt} (status: ${error.response?.status}): ${JSON.stringify(error.response?.data, null, 2)}`
);
} else {
logger.error(
`Error updating destinations for exit node at ${destination.reachableAt}: ${error}`
);
}
}
]
};
} else {
// add to the existing destinations
destinations.destinations.push({
destinationIP: site.sites.subnet.split("/")[0],
destinationPort: site.sites.listenPort || 0
});
}
// update it in the array
exitNodeDestinations = exitNodeDestinations.filter(
(d) => d.reachableAt !== site.exitNodes?.reachableAt
);
exitNodeDestinations.push(destinations);
}
for (const destination of exitNodeDestinations) {
logger.info(
`Updating destinations for exit node at ${destination.reachableAt}`
);
const payload = {
sourceIp: destination.sourceIp,
sourcePort: destination.sourcePort,
destinations: destination.destinations
};
logger.info(
`Payload for update-destinations: ${JSON.stringify(payload, null, 2)}`
);
// Create an ExitNode-like object for sendToExitNode
const exitNodeForComm = {
exitNodeId: destination.exitNodeId,
type: destination.type,
reachableAt: destination.reachableAt
} as any; // Using 'as any' since we know sendToExitNode will handle this correctly
await sendToExitNode(exitNodeForComm, {
remoteType: "remoteExitNode/update-destinations",
localPath: "/update-destinations",
method: "POST",
data: payload
});
}
// Fetch the updated client
const [updatedClient] = await trx
.select()

View File

@@ -872,7 +872,7 @@ authRouter.post(
rateLimit({
windowMs: 15 * 60 * 1000,
max: 900,
keyGenerator: (req) => `newtGetToken:${req.body.newtId || req.ip}`,
keyGenerator: (req) => `olmGetToken:${req.body.newtId || req.ip}`,
handler: (req, res, next) => {
const message = `You can only request an Olm token ${900} times every ${15} minutes. Please try again later.`;
return next(createHttpError(HttpCode.TOO_MANY_REQUESTS, message));
@@ -951,7 +951,8 @@ authRouter.post(
rateLimit({
windowMs: 15 * 60 * 1000,
max: 15,
keyGenerator: (req) => `requestEmailVerificationCode:${req.body.email || req.ip}`,
keyGenerator: (req) =>
`requestEmailVerificationCode:${req.body.email || req.ip}`,
handler: (req, res, next) => {
const message = `You can only request an email verification code ${15} times every ${15} minutes. Please try again later.`;
return next(createHttpError(HttpCode.TOO_MANY_REQUESTS, message));
@@ -972,7 +973,8 @@ authRouter.post(
rateLimit({
windowMs: 15 * 60 * 1000,
max: 15,
keyGenerator: (req) => `requestPasswordReset:${req.body.email || req.ip}`,
keyGenerator: (req) =>
`requestPasswordReset:${req.body.email || req.ip}`,
handler: (req, res, next) => {
const message = `You can only request a password reset ${15} times every ${15} minutes. Please try again later.`;
return next(createHttpError(HttpCode.TOO_MANY_REQUESTS, message));
@@ -1066,7 +1068,8 @@ authRouter.post(
rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 5, // Allow 5 security key registrations per 15 minutes
keyGenerator: (req) => `securityKeyRegister:${req.user?.userId || req.ip}`,
keyGenerator: (req) =>
`securityKeyRegister:${req.user?.userId || req.ip}`,
handler: (req, res, next) => {
const message = `You can only register a security key ${5} times every ${15} minutes. Please try again later.`;
return next(createHttpError(HttpCode.TOO_MANY_REQUESTS, message));

View File

@@ -1,6 +1,15 @@
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import { clients, exitNodes, newts, olms, Site, sites, clientSites } from "@server/db";
import {
clients,
exitNodes,
newts,
olms,
Site,
sites,
clientSites,
ExitNode
} from "@server/db";
import { db } from "@server/db";
import { eq } from "drizzle-orm";
import HttpCode from "@server/types/HttpCode";
@@ -10,7 +19,7 @@ import { fromError } from "zod-validation-error";
// Define Zod schema for request validation
const getAllRelaysSchema = z.object({
publicKey: z.string().optional(),
publicKey: z.string().optional()
});
// Type for peer destination
@@ -44,103 +53,27 @@ export async function getAllRelays(
const { publicKey } = parsedParams.data;
if (!publicKey) {
return next(createHttpError(HttpCode.BAD_REQUEST, 'publicKey is required'));
return next(
createHttpError(HttpCode.BAD_REQUEST, "publicKey is required")
);
}
// Fetch exit node
const [exitNode] = await db.select().from(exitNodes).where(eq(exitNodes.publicKey, publicKey));
const [exitNode] = await db
.select()
.from(exitNodes)
.where(eq(exitNodes.publicKey, publicKey));
if (!exitNode) {
return next(createHttpError(HttpCode.NOT_FOUND, "Exit node not found"));
return next(
createHttpError(HttpCode.NOT_FOUND, "Exit node not found")
);
}
// Fetch sites for this exit node
const sitesRes = await db.select().from(sites).where(eq(sites.exitNodeId, exitNode.exitNodeId));
const mappings = await generateRelayMappings(exitNode);
if (sitesRes.length === 0) {
return res.status(HttpCode.OK).send({
mappings: {}
});
}
// Initialize mappings object for multi-peer support
const mappings: { [key: string]: ProxyMapping } = {};
// Process each site
for (const site of sitesRes) {
if (!site.endpoint || !site.subnet || !site.listenPort) {
continue;
}
// Find all clients associated with this site through clientSites
const clientSitesRes = await db
.select()
.from(clientSites)
.where(eq(clientSites.siteId, site.siteId));
for (const clientSite of clientSitesRes) {
if (!clientSite.endpoint) {
continue;
}
// Add this site as a destination for the client
if (!mappings[clientSite.endpoint]) {
mappings[clientSite.endpoint] = { destinations: [] };
}
// Add site as a destination for this client
const destination: PeerDestination = {
destinationIP: site.subnet.split("/")[0],
destinationPort: site.listenPort
};
// Check if this destination is already in the array to avoid duplicates
const isDuplicate = mappings[clientSite.endpoint].destinations.some(
dest => dest.destinationIP === destination.destinationIP &&
dest.destinationPort === destination.destinationPort
);
if (!isDuplicate) {
mappings[clientSite.endpoint].destinations.push(destination);
}
}
// Also handle site-to-site communication (all sites in the same org)
if (site.orgId) {
const orgSites = await db
.select()
.from(sites)
.where(eq(sites.orgId, site.orgId));
for (const peer of orgSites) {
// Skip self
if (peer.siteId === site.siteId || !peer.endpoint || !peer.subnet || !peer.listenPort) {
continue;
}
// Add peer site as a destination for this site
if (!mappings[site.endpoint]) {
mappings[site.endpoint] = { destinations: [] };
}
const destination: PeerDestination = {
destinationIP: peer.subnet.split("/")[0],
destinationPort: peer.listenPort
};
// Check for duplicates
const isDuplicate = mappings[site.endpoint].destinations.some(
dest => dest.destinationIP === destination.destinationIP &&
dest.destinationPort === destination.destinationPort
);
if (!isDuplicate) {
mappings[site.endpoint].destinations.push(destination);
}
}
}
}
logger.debug(`Returning mappings for ${Object.keys(mappings).length} endpoints`);
logger.debug(
`Returning mappings for ${Object.keys(mappings).length} endpoints`
);
return res.status(HttpCode.OK).send({ mappings });
} catch (error) {
logger.error(error);
@@ -151,4 +84,103 @@ export async function getAllRelays(
)
);
}
}
}
export async function generateRelayMappings(exitNode: ExitNode) {
// Fetch sites for this exit node
const sitesRes = await db
.select()
.from(sites)
.where(eq(sites.exitNodeId, exitNode.exitNodeId));
if (sitesRes.length === 0) {
return {};
}
// Initialize mappings object for multi-peer support
const mappings: { [key: string]: ProxyMapping } = {};
// Process each site
for (const site of sitesRes) {
if (!site.endpoint || !site.subnet || !site.listenPort) {
continue;
}
// Find all clients associated with this site through clientSites
const clientSitesRes = await db
.select()
.from(clientSites)
.where(eq(clientSites.siteId, site.siteId));
for (const clientSite of clientSitesRes) {
if (!clientSite.endpoint) {
continue;
}
// Add this site as a destination for the client
if (!mappings[clientSite.endpoint]) {
mappings[clientSite.endpoint] = { destinations: [] };
}
// Add site as a destination for this client
const destination: PeerDestination = {
destinationIP: site.subnet.split("/")[0],
destinationPort: site.listenPort
};
// Check if this destination is already in the array to avoid duplicates
const isDuplicate = mappings[clientSite.endpoint].destinations.some(
(dest) =>
dest.destinationIP === destination.destinationIP &&
dest.destinationPort === destination.destinationPort
);
if (!isDuplicate) {
mappings[clientSite.endpoint].destinations.push(destination);
}
}
// Also handle site-to-site communication (all sites in the same org)
if (site.orgId) {
const orgSites = await db
.select()
.from(sites)
.where(eq(sites.orgId, site.orgId));
for (const peer of orgSites) {
// Skip self
if (
peer.siteId === site.siteId ||
!peer.endpoint ||
!peer.subnet ||
!peer.listenPort
) {
continue;
}
// Add peer site as a destination for this site
if (!mappings[site.endpoint]) {
mappings[site.endpoint] = { destinations: [] };
}
const destination: PeerDestination = {
destinationIP: peer.subnet.split("/")[0],
destinationPort: peer.listenPort
};
// Check for duplicates
const isDuplicate = mappings[site.endpoint].destinations.some(
(dest) =>
dest.destinationIP === destination.destinationIP &&
dest.destinationPort === destination.destinationPort
);
if (!isDuplicate) {
mappings[site.endpoint].destinations.push(destination);
}
}
}
}
return mappings;
}

View File

@@ -1,6 +1,6 @@
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import { sites, resources, targets, exitNodes } from "@server/db";
import { sites, resources, targets, exitNodes, ExitNode } from "@server/db";
import { db } from "@server/db";
import { eq, isNotNull, and } from "drizzle-orm";
import HttpCode from "@server/types/HttpCode";
@@ -11,6 +11,8 @@ import { getUniqueExitNodeEndpointName } from "../../db/names";
import { findNextAvailableCidr } from "@server/lib/ip";
import { fromError } from "zod-validation-error";
import { getAllowedIps } from "../target/helpers";
import { proxyToRemote } from "@server/lib/remoteProxy";
import { getNextAvailableSubnet } from "@server/lib/exitNodes";
// Define Zod schema for request validation
const getConfigSchema = z.object({
publicKey: z.string(),
@@ -101,42 +103,17 @@ export async function getConfig(
);
}
const sitesRes = await db
.select()
.from(sites)
.where(
and(
eq(sites.exitNodeId, exitNode[0].exitNodeId),
isNotNull(sites.pubKey),
isNotNull(sites.subnet)
)
);
// STOP HERE IN HYBRID MODE
if (config.isHybridMode()) {
req.body = {
...req.body,
endpoint: exitNode[0].endpoint,
listenPort: exitNode[0].listenPort
}
return proxyToRemote(req, res, next, "hybrid/gerbil/get-config");
}
const peers = await Promise.all(
sitesRes.map(async (site) => {
if (site.type === "wireguard") {
return {
publicKey: site.pubKey,
allowedIps: await getAllowedIps(site.siteId)
};
} else if (site.type === "newt") {
return {
publicKey: site.pubKey,
allowedIps: [site.subnet!]
};
}
return {
publicKey: null,
allowedIps: []
};
})
);
const configResponse: GetConfigResponse = {
listenPort: exitNode[0].listenPort || 51820,
ipAddress: exitNode[0].address,
peers
};
const configResponse = await generateGerbilConfig(exitNode[0]);
logger.debug("Sending config: ", configResponse);
@@ -152,31 +129,45 @@ export async function getConfig(
}
}
async function getNextAvailableSubnet(): Promise<string> {
// Get all existing subnets from routes table
const existingAddresses = await db
.select({
address: exitNodes.address
export async function generateGerbilConfig(exitNode: ExitNode) {
const sitesRes = await db
.select()
.from(sites)
.where(
and(
eq(sites.exitNodeId, exitNode.exitNodeId),
isNotNull(sites.pubKey),
isNotNull(sites.subnet)
)
);
const peers = await Promise.all(
sitesRes.map(async (site) => {
if (site.type === "wireguard") {
return {
publicKey: site.pubKey,
allowedIps: await getAllowedIps(site.siteId)
};
} else if (site.type === "newt") {
return {
publicKey: site.pubKey,
allowedIps: [site.subnet!]
};
}
return {
publicKey: null,
allowedIps: []
};
})
.from(exitNodes);
const addresses = existingAddresses.map((a) => a.address);
let subnet = findNextAvailableCidr(
addresses,
config.getRawConfig().gerbil.block_size,
config.getRawConfig().gerbil.subnet_group
);
if (!subnet) {
throw new Error("No available subnets remaining in space");
}
// replace the last octet with 1
subnet =
subnet.split(".").slice(0, 3).join(".") +
".1" +
"/" +
subnet.split("/")[1];
return subnet;
const configResponse: GetConfigResponse = {
listenPort: exitNode.listenPort || 51820,
ipAddress: exitNode.address,
peers
};
return configResponse;
}
async function getNextAvailablePort(): Promise<number> {

View File

@@ -0,0 +1,46 @@
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
import logger from "@server/logger";
import { fromError } from "zod-validation-error";
// Define Zod schema for request validation
const getResolvedHostnameSchema = z.object({
hostname: z.string(),
publicKey: z.string()
});
export async function getResolvedHostname(
req: Request,
res: Response,
next: NextFunction
): Promise<any> {
try {
// Validate request parameters
const parsedParams = getResolvedHostnameSchema.safeParse(
req.body
);
if (!parsedParams.success) {
return next(
createHttpError(
HttpCode.BAD_REQUEST,
fromError(parsedParams.error).toString()
)
);
}
// return the endpoints
return res.status(HttpCode.OK).send({
endpoints: [] // ALWAYS ROUTE LOCALLY
});
} catch (error) {
logger.error(error);
return next(
createHttpError(
HttpCode.INTERNAL_SERVER_ERROR,
"An error occurred..."
)
);
}
}

View File

@@ -1,4 +1,5 @@
export * from "./getConfig";
export * from "./receiveBandwidth";
export * from "./updateHolePunch";
export * from "./getAllRelays";
export * from "./getAllRelays";
export * from "./getResolvedHostname";

View File

@@ -1,8 +1,8 @@
import axios from "axios";
import logger from "@server/logger";
import { db } from "@server/db";
import { exitNodes } from "@server/db";
import { eq } from "drizzle-orm";
import { sendToExitNode } from "../../lib/exitNodeComms";
export async function addPeer(
exitNodeId: number,
@@ -22,34 +22,13 @@ export async function addPeer(
if (!exitNode) {
throw new Error(`Exit node with ID ${exitNodeId} not found`);
}
if (!exitNode.reachableAt) {
throw new Error(`Exit node with ID ${exitNodeId} is not reachable`);
}
try {
const response = await axios.post(
`${exitNode.reachableAt}/peer`,
peer,
{
headers: {
"Content-Type": "application/json"
}
}
);
logger.info("Peer added successfully:", { peer: response.data.status });
return response.data;
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error(
`Error adding peer (can Pangolin see Gerbil HTTP API?) for exit node at ${exitNode.reachableAt} (status: ${error.response?.status}): ${error.message}`
);
} else {
logger.error(
`Error adding peer for exit node at ${exitNode.reachableAt}: ${error}`
);
}
}
return await sendToExitNode(exitNode, {
remoteType: "remoteExitNode/peers/add",
localPath: "/peer",
method: "POST",
data: peer
});
}
export async function deletePeer(exitNodeId: number, publicKey: string) {
@@ -64,24 +43,16 @@ export async function deletePeer(exitNodeId: number, publicKey: string) {
if (!exitNode) {
throw new Error(`Exit node with ID ${exitNodeId} not found`);
}
if (!exitNode.reachableAt) {
throw new Error(`Exit node with ID ${exitNodeId} is not reachable`);
}
try {
const response = await axios.delete(
`${exitNode.reachableAt}/peer?public_key=${encodeURIComponent(publicKey)}`
);
logger.info("Peer deleted successfully:", response.data.status);
return response.data;
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error(
`Error deleting peer (can Pangolin see Gerbil HTTP API?) for exit node at ${exitNode.reachableAt} (status: ${error.response?.status}): ${error.message}`
);
} else {
logger.error(
`Error deleting peer for exit node at ${exitNode.reachableAt}: ${error}`
);
return await sendToExitNode(exitNode, {
remoteType: "remoteExitNode/peers/remove",
localPath: "/peer",
method: "DELETE",
data: {
publicKey: publicKey
},
queryParams: {
public_key: publicKey
}
}
});
}

View File

@@ -6,6 +6,7 @@ import logger from "@server/logger";
import createHttpError from "http-errors";
import HttpCode from "@server/types/HttpCode";
import response from "@server/lib/response";
import { checkExitNodeOrg } from "@server/lib/exitNodes";
// Track sites that are already offline to avoid unnecessary queries
const offlineSites = new Set<string>();
@@ -28,103 +29,7 @@ export const receiveBandwidth = async (
throw new Error("Invalid bandwidth data");
}
const currentTime = new Date();
const oneMinuteAgo = new Date(currentTime.getTime() - 60000); // 1 minute ago
// logger.debug(`Received data: ${JSON.stringify(bandwidthData)}`);
await db.transaction(async (trx) => {
// First, handle sites that are actively reporting bandwidth
const activePeers = bandwidthData.filter(peer => peer.bytesIn > 0); // Bytesout will have data as it tries to send keep alive messages
if (activePeers.length > 0) {
// Remove any active peers from offline tracking since they're sending data
activePeers.forEach(peer => offlineSites.delete(peer.publicKey));
// Aggregate usage data by organization
const orgUsageMap = new Map<string, number>();
const orgUptimeMap = new Map<string, number>();
// Update all active sites with bandwidth data and get the site data in one operation
const updatedSites = [];
for (const peer of activePeers) {
const updatedSite = await trx
.update(sites)
.set({
megabytesOut: sql`${sites.megabytesOut} + ${peer.bytesIn}`,
megabytesIn: sql`${sites.megabytesIn} + ${peer.bytesOut}`,
lastBandwidthUpdate: currentTime.toISOString(),
online: true
})
.where(eq(sites.pubKey, peer.publicKey))
.returning({
online: sites.online,
orgId: sites.orgId,
siteId: sites.siteId,
lastBandwidthUpdate: sites.lastBandwidthUpdate,
});
if (updatedSite.length > 0) {
updatedSites.push({ ...updatedSite[0], peer });
}
}
// Calculate org usage aggregations using the updated site data
for (const { peer, ...site } of updatedSites) {
// Aggregate bandwidth usage for the org
const totalBandwidth = peer.bytesIn + peer.bytesOut;
const currentOrgUsage = orgUsageMap.get(site.orgId) || 0;
orgUsageMap.set(site.orgId, currentOrgUsage + totalBandwidth);
// Add 10 seconds of uptime for each active site
const currentOrgUptime = orgUptimeMap.get(site.orgId) || 0;
orgUptimeMap.set(site.orgId, currentOrgUptime + 10 / 60); // Store in minutes and jut add 10 seconds
}
}
// Handle sites that reported zero bandwidth but need online status updated
const zeroBandwidthPeers = bandwidthData.filter(peer =>
peer.bytesIn === 0 && !offlineSites.has(peer.publicKey) // Bytesout will have data as it tries to send keep alive messages
);
if (zeroBandwidthPeers.length > 0) {
const zeroBandwidthSites = await trx
.select()
.from(sites)
.where(inArray(sites.pubKey, zeroBandwidthPeers.map(p => p.publicKey)));
for (const site of zeroBandwidthSites) {
let newOnlineStatus = site.online;
// Check if site should go offline based on last bandwidth update WITH DATA
if (site.lastBandwidthUpdate) {
const lastUpdateWithData = new Date(site.lastBandwidthUpdate);
if (lastUpdateWithData < oneMinuteAgo) {
newOnlineStatus = false;
}
} else {
// No previous data update recorded, set to offline
newOnlineStatus = false;
}
// Always update lastBandwidthUpdate to show this instance is receiving reports
// Only update online status if it changed
if (site.online !== newOnlineStatus) {
await trx
.update(sites)
.set({
online: newOnlineStatus
})
.where(eq(sites.siteId, site.siteId));
// If site went offline, add it to our tracking set
if (!newOnlineStatus && site.pubKey) {
offlineSites.add(site.pubKey);
}
}
}
}
});
await updateSiteBandwidth(bandwidthData);
return response(res, {
data: {},
@@ -142,4 +47,142 @@ export const receiveBandwidth = async (
)
);
}
};
};
export async function updateSiteBandwidth(
bandwidthData: PeerBandwidth[],
exitNodeId?: number
) {
const currentTime = new Date();
const oneMinuteAgo = new Date(currentTime.getTime() - 60000); // 1 minute ago
// logger.debug(`Received data: ${JSON.stringify(bandwidthData)}`);
await db.transaction(async (trx) => {
// First, handle sites that are actively reporting bandwidth
const activePeers = bandwidthData.filter((peer) => peer.bytesIn > 0); // Bytesout will have data as it tries to send keep alive messages
if (activePeers.length > 0) {
// Remove any active peers from offline tracking since they're sending data
activePeers.forEach((peer) => offlineSites.delete(peer.publicKey));
// Aggregate usage data by organization
const orgUsageMap = new Map<string, number>();
const orgUptimeMap = new Map<string, number>();
// Update all active sites with bandwidth data and get the site data in one operation
const updatedSites = [];
for (const peer of activePeers) {
const [updatedSite] = await trx
.update(sites)
.set({
megabytesOut: sql`${sites.megabytesOut} + ${peer.bytesIn}`,
megabytesIn: sql`${sites.megabytesIn} + ${peer.bytesOut}`,
lastBandwidthUpdate: currentTime.toISOString(),
online: true
})
.where(eq(sites.pubKey, peer.publicKey))
.returning({
online: sites.online,
orgId: sites.orgId,
siteId: sites.siteId,
lastBandwidthUpdate: sites.lastBandwidthUpdate
});
if (exitNodeId) {
if (await checkExitNodeOrg(exitNodeId, updatedSite.orgId)) {
// not allowed
logger.warn(
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
);
// THIS SHOULD TRIGGER THE TRANSACTION TO FAIL?
throw new Error("Exit node not allowed");
}
}
if (updatedSite) {
updatedSites.push({ ...updatedSite, peer });
}
}
// Calculate org usage aggregations using the updated site data
for (const { peer, ...site } of updatedSites) {
// Aggregate bandwidth usage for the org
const totalBandwidth = peer.bytesIn + peer.bytesOut;
const currentOrgUsage = orgUsageMap.get(site.orgId) || 0;
orgUsageMap.set(site.orgId, currentOrgUsage + totalBandwidth);
// Add 10 seconds of uptime for each active site
const currentOrgUptime = orgUptimeMap.get(site.orgId) || 0;
orgUptimeMap.set(site.orgId, currentOrgUptime + 10 / 60); // Store in minutes and jut add 10 seconds
}
}
// Handle sites that reported zero bandwidth but need online status updated
const zeroBandwidthPeers = bandwidthData.filter(
(peer) => peer.bytesIn === 0 && !offlineSites.has(peer.publicKey) // Bytesout will have data as it tries to send keep alive messages
);
if (zeroBandwidthPeers.length > 0) {
const zeroBandwidthSites = await trx
.select()
.from(sites)
.where(
inArray(
sites.pubKey,
zeroBandwidthPeers.map((p) => p.publicKey)
)
);
for (const site of zeroBandwidthSites) {
let newOnlineStatus = site.online;
// Check if site should go offline based on last bandwidth update WITH DATA
if (site.lastBandwidthUpdate) {
const lastUpdateWithData = new Date(
site.lastBandwidthUpdate
);
if (lastUpdateWithData < oneMinuteAgo) {
newOnlineStatus = false;
}
} else {
// No previous data update recorded, set to offline
newOnlineStatus = false;
}
// Always update lastBandwidthUpdate to show this instance is receiving reports
// Only update online status if it changed
if (site.online !== newOnlineStatus) {
const [updatedSite] = await trx
.update(sites)
.set({
online: newOnlineStatus
})
.where(eq(sites.siteId, site.siteId))
.returning();
if (exitNodeId) {
if (
await checkExitNodeOrg(
exitNodeId,
updatedSite.orgId
)
) {
// not allowed
logger.warn(
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
);
// THIS SHOULD TRIGGER THE TRANSACTION TO FAIL?
throw new Error("Exit node not allowed");
}
}
// If site went offline, add it to our tracking set
if (!newOnlineStatus && site.pubKey) {
offlineSites.add(site.pubKey);
}
}
}
}
});
}

View File

@@ -19,6 +19,7 @@ import { fromError } from "zod-validation-error";
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
import axios from "axios";
import { checkExitNodeOrg } from "@server/lib/exitNodes";
// Define Zod schema for request validation
const updateHolePunchSchema = z.object({
@@ -66,228 +67,34 @@ export async function updateHolePunch(
publicKey
} = parsedParams.data;
let currentSiteId: number | undefined;
let destinations: PeerDestination[] = [];
if (olmId) {
logger.debug(
`Got hole punch with ip: ${ip}, port: ${port} for olmId: ${olmId}${publicKey ? ` with exit node publicKey: ${publicKey}` : ""}`
);
const { session, olm: olmSession } =
await validateOlmSessionToken(token);
if (!session || !olmSession) {
return next(
createHttpError(HttpCode.UNAUTHORIZED, "Unauthorized")
);
}
if (olmId !== olmSession.olmId) {
logger.warn(
`Olm ID mismatch: ${olmId} !== ${olmSession.olmId}`
);
return next(
createHttpError(HttpCode.UNAUTHORIZED, "Unauthorized")
);
}
const [olm] = await db
let exitNode: ExitNode | undefined;
if (publicKey) {
// Get the exit node by public key
[exitNode] = await db
.select()
.from(olms)
.where(eq(olms.olmId, olmId));
if (!olm || !olm.clientId) {
logger.warn(`Olm not found: ${olmId}`);
return next(
createHttpError(HttpCode.NOT_FOUND, "Olm not found")
);
}
const [client] = await db
.update(clients)
.set({
lastHolePunch: timestamp
})
.where(eq(clients.clientId, olm.clientId))
.returning();
let exitNode: ExitNode | undefined;
if (publicKey) {
// Get the exit node by public key
[exitNode] = await db
.select()
.from(exitNodes)
.where(eq(exitNodes.publicKey, publicKey));
} else {
// FOR BACKWARDS COMPATIBILITY IF GERBIL IS STILL =<1.1.0
[exitNode] = await db.select().from(exitNodes).limit(1);
}
if (!exitNode) {
logger.warn(`Exit node not found for publicKey: ${publicKey}`);
return next(
createHttpError(HttpCode.NOT_FOUND, "Exit node not found")
);
}
// Get sites that are on this specific exit node and connected to this client
const sitesOnExitNode = await db
.select({ siteId: sites.siteId, subnet: sites.subnet, listenPort: sites.listenPort })
.from(sites)
.innerJoin(clientSites, eq(sites.siteId, clientSites.siteId))
.where(
and(
eq(sites.exitNodeId, exitNode.exitNodeId),
eq(clientSites.clientId, olm.clientId)
)
);
// Update clientSites for each site on this exit node
for (const site of sitesOnExitNode) {
logger.debug(
`Updating site ${site.siteId} on exit node with publicKey: ${publicKey}`
);
await db
.update(clientSites)
.set({
endpoint: `${ip}:${port}`
})
.where(
and(
eq(clientSites.clientId, olm.clientId),
eq(clientSites.siteId, site.siteId)
)
);
}
logger.debug(
`Updated ${sitesOnExitNode.length} sites on exit node with publicKey: ${publicKey}`
);
if (!client) {
logger.warn(`Client not found for olm: ${olmId}`);
return next(
createHttpError(HttpCode.NOT_FOUND, "Client not found")
);
}
// Create a list of the destinations from the sites
for (const site of sitesOnExitNode) {
if (site.subnet && site.listenPort) {
destinations.push({
destinationIP: site.subnet.split("/")[0],
destinationPort: site.listenPort
});
}
}
} else if (newtId) {
logger.debug(
`Got hole punch with ip: ${ip}, port: ${port} for newtId: ${newtId}`
);
const { session, newt: newtSession } =
await validateNewtSessionToken(token);
if (!session || !newtSession) {
return next(
createHttpError(HttpCode.UNAUTHORIZED, "Unauthorized")
);
}
if (newtId !== newtSession.newtId) {
logger.warn(
`Newt ID mismatch: ${newtId} !== ${newtSession.newtId}`
);
return next(
createHttpError(HttpCode.UNAUTHORIZED, "Unauthorized")
);
}
const [newt] = await db
.select()
.from(newts)
.where(eq(newts.newtId, newtId));
if (!newt || !newt.siteId) {
logger.warn(`Newt not found: ${newtId}`);
return next(
createHttpError(HttpCode.NOT_FOUND, "New not found")
);
}
currentSiteId = newt.siteId;
// Update the current site with the new endpoint
const [updatedSite] = await db
.update(sites)
.set({
endpoint: `${ip}:${port}`,
lastHolePunch: timestamp
})
.where(eq(sites.siteId, newt.siteId))
.returning();
if (!updatedSite || !updatedSite.subnet) {
logger.warn(`Site not found: ${newt.siteId}`);
return next(
createHttpError(HttpCode.NOT_FOUND, "Site not found")
);
}
// Find all clients that connect to this site
// const sitesClientPairs = await db
// .select()
// .from(clientSites)
// .where(eq(clientSites.siteId, newt.siteId));
// THE NEWT IS NOT SENDING RAW WG TO THE GERBIL SO IDK IF WE REALLY NEED THIS - REMOVING
// Get client details for each client
// for (const pair of sitesClientPairs) {
// const [client] = await db
// .select()
// .from(clients)
// .where(eq(clients.clientId, pair.clientId));
// if (client && client.endpoint) {
// const [host, portStr] = client.endpoint.split(':');
// if (host && portStr) {
// destinations.push({
// destinationIP: host,
// destinationPort: parseInt(portStr, 10)
// });
// }
// }
// }
// If this is a newt/site, also add other sites in the same org
// if (updatedSite.orgId) {
// const orgSites = await db
// .select()
// .from(sites)
// .where(eq(sites.orgId, updatedSite.orgId));
// for (const site of orgSites) {
// // Don't add the current site to the destinations
// if (site.siteId !== currentSiteId && site.subnet && site.endpoint && site.listenPort) {
// const [host, portStr] = site.endpoint.split(':');
// if (host && portStr) {
// destinations.push({
// destinationIP: host,
// destinationPort: site.listenPort
// });
// }
// }
// }
// }
.from(exitNodes)
.where(eq(exitNodes.publicKey, publicKey));
} else {
// FOR BACKWARDS COMPATIBILITY IF GERBIL IS STILL =<1.1.0
[exitNode] = await db.select().from(exitNodes).limit(1);
}
// if (destinations.length === 0) {
// logger.warn(
// `No peer destinations found for olmId: ${olmId} or newtId: ${newtId}`
// );
// return next(createHttpError(HttpCode.NOT_FOUND, "No peer destinations found"));
// }
if (!exitNode) {
logger.warn(`Exit node not found for publicKey: ${publicKey}`);
return next(
createHttpError(HttpCode.NOT_FOUND, "Exit node not found")
);
}
const destinations = await updateAndGenerateEndpointDestinations(
olmId,
newtId,
ip,
port,
timestamp,
token,
exitNode
);
logger.debug(
`Returning ${destinations.length} peer destinations for olmId: ${olmId} or newtId: ${newtId}: ${JSON.stringify(destinations, null, 2)}`
@@ -307,3 +114,215 @@ export async function updateHolePunch(
);
}
}
export async function updateAndGenerateEndpointDestinations(
olmId: string | undefined,
newtId: string | undefined,
ip: string,
port: number,
timestamp: number,
token: string,
exitNode: ExitNode
) {
let currentSiteId: number | undefined;
let destinations: PeerDestination[] = [];
if (olmId) {
logger.debug(
`Got hole punch with ip: ${ip}, port: ${port} for olmId: ${olmId}`
);
const { session, olm: olmSession } =
await validateOlmSessionToken(token);
if (!session || !olmSession) {
throw new Error("Unauthorized");
}
if (olmId !== olmSession.olmId) {
logger.warn(`Olm ID mismatch: ${olmId} !== ${olmSession.olmId}`);
throw new Error("Unauthorized");
}
const [olm] = await db.select().from(olms).where(eq(olms.olmId, olmId));
if (!olm || !olm.clientId) {
logger.warn(`Olm not found: ${olmId}`);
throw new Error("Olm not found");
}
const [client] = await db
.update(clients)
.set({
lastHolePunch: timestamp
})
.where(eq(clients.clientId, olm.clientId))
.returning();
if (await checkExitNodeOrg(exitNode.exitNodeId, client.orgId)) {
// not allowed
logger.warn(
`Exit node ${exitNode.exitNodeId} is not allowed for org ${client.orgId}`
);
throw new Error("Exit node not allowed");
}
// Get sites that are on this specific exit node and connected to this client
const sitesOnExitNode = await db
.select({
siteId: sites.siteId,
subnet: sites.subnet,
listenPort: sites.listenPort
})
.from(sites)
.innerJoin(clientSites, eq(sites.siteId, clientSites.siteId))
.where(
and(
eq(sites.exitNodeId, exitNode.exitNodeId),
eq(clientSites.clientId, olm.clientId)
)
);
// Update clientSites for each site on this exit node
for (const site of sitesOnExitNode) {
logger.debug(
`Updating site ${site.siteId} on exit node ${exitNode.exitNodeId}`
);
await db
.update(clientSites)
.set({
endpoint: `${ip}:${port}`
})
.where(
and(
eq(clientSites.clientId, olm.clientId),
eq(clientSites.siteId, site.siteId)
)
);
}
logger.debug(
`Updated ${sitesOnExitNode.length} sites on exit node ${exitNode.exitNodeId}`
);
if (!client) {
logger.warn(`Client not found for olm: ${olmId}`);
throw new Error("Client not found");
}
// Create a list of the destinations from the sites
for (const site of sitesOnExitNode) {
if (site.subnet && site.listenPort) {
destinations.push({
destinationIP: site.subnet.split("/")[0],
destinationPort: site.listenPort
});
}
}
} else if (newtId) {
logger.debug(
`Got hole punch with ip: ${ip}, port: ${port} for newtId: ${newtId}`
);
const { session, newt: newtSession } =
await validateNewtSessionToken(token);
if (!session || !newtSession) {
throw new Error("Unauthorized");
}
if (newtId !== newtSession.newtId) {
logger.warn(
`Newt ID mismatch: ${newtId} !== ${newtSession.newtId}`
);
throw new Error("Unauthorized");
}
const [newt] = await db
.select()
.from(newts)
.where(eq(newts.newtId, newtId));
if (!newt || !newt.siteId) {
logger.warn(`Newt not found: ${newtId}`);
throw new Error("Newt not found");
}
const [site] = await db
.select()
.from(sites)
.where(eq(sites.siteId, newt.siteId))
.limit(1);
if (await checkExitNodeOrg(exitNode.exitNodeId, site.orgId)) {
// not allowed
logger.warn(
`Exit node ${exitNode.exitNodeId} is not allowed for org ${site.orgId}`
);
throw new Error("Exit node not allowed");
}
currentSiteId = newt.siteId;
// Update the current site with the new endpoint
const [updatedSite] = await db
.update(sites)
.set({
endpoint: `${ip}:${port}`,
lastHolePunch: timestamp
})
.where(eq(sites.siteId, newt.siteId))
.returning();
if (!updatedSite || !updatedSite.subnet) {
logger.warn(`Site not found: ${newt.siteId}`);
throw new Error("Site not found");
}
// Find all clients that connect to this site
// const sitesClientPairs = await db
// .select()
// .from(clientSites)
// .where(eq(clientSites.siteId, newt.siteId));
// THE NEWT IS NOT SENDING RAW WG TO THE GERBIL SO IDK IF WE REALLY NEED THIS - REMOVING
// Get client details for each client
// for (const pair of sitesClientPairs) {
// const [client] = await db
// .select()
// .from(clients)
// .where(eq(clients.clientId, pair.clientId));
// if (client && client.endpoint) {
// const [host, portStr] = client.endpoint.split(':');
// if (host && portStr) {
// destinations.push({
// destinationIP: host,
// destinationPort: parseInt(portStr, 10)
// });
// }
// }
// }
// If this is a newt/site, also add other sites in the same org
// if (updatedSite.orgId) {
// const orgSites = await db
// .select()
// .from(sites)
// .where(eq(sites.orgId, updatedSite.orgId));
// for (const site of orgSites) {
// // Don't add the current site to the destinations
// if (site.siteId !== currentSiteId && site.subnet && site.endpoint && site.listenPort) {
// const [host, portStr] = site.endpoint.split(':');
// if (host && portStr) {
// destinations.push({
// destinationIP: host,
// destinationPort: site.listenPort
// });
// }
// }
// }
// }
}
return destinations;
}

View File

@@ -7,6 +7,8 @@ import * as auth from "@server/routers/auth";
import * as supporterKey from "@server/routers/supporterKey";
import * as license from "@server/routers/license";
import * as idp from "@server/routers/idp";
import { proxyToRemote } from "@server/lib/remoteProxy";
import config from "@server/lib/config";
import HttpCode from "@server/types/HttpCode";
import {
verifyResourceAccess,
@@ -49,16 +51,51 @@ internalRouter.get("/idp/:idpId", idp.getIdp);
const gerbilRouter = Router();
internalRouter.use("/gerbil", gerbilRouter);
if (config.isHybridMode()) {
// Use proxy router to forward requests to remote cloud server
// Proxy endpoints for each gerbil route
gerbilRouter.post("/receive-bandwidth", (req, res, next) =>
proxyToRemote(req, res, next, "hybrid/gerbil/receive-bandwidth")
);
gerbilRouter.post("/update-hole-punch", (req, res, next) =>
proxyToRemote(req, res, next, "hybrid/gerbil/update-hole-punch")
);
gerbilRouter.post("/get-all-relays", (req, res, next) =>
proxyToRemote(req, res, next, "hybrid/gerbil/get-all-relays")
);
gerbilRouter.post("/get-resolved-hostname", (req, res, next) =>
proxyToRemote(req, res, next, `hybrid/gerbil/get-resolved-hostname`)
);
// GET CONFIG IS HANDLED IN THE ORIGINAL HANDLER
// SO IT CAN REGISTER THE LOCAL EXIT NODE
} else {
// Use local gerbil endpoints
gerbilRouter.post("/receive-bandwidth", gerbil.receiveBandwidth);
gerbilRouter.post("/update-hole-punch", gerbil.updateHolePunch);
gerbilRouter.post("/get-all-relays", gerbil.getAllRelays);
gerbilRouter.post("/get-resolved-hostname", gerbil.getResolvedHostname);
}
// WE HANDLE THE PROXY INSIDE OF THIS FUNCTION
// SO IT REGISTERS THE EXIT NODE LOCALLY AS WELL
gerbilRouter.post("/get-config", gerbil.getConfig);
gerbilRouter.post("/receive-bandwidth", gerbil.receiveBandwidth);
gerbilRouter.post("/update-hole-punch", gerbil.updateHolePunch);
gerbilRouter.post("/get-all-relays", gerbil.getAllRelays);
// Badger routes
const badgerRouter = Router();
internalRouter.use("/badger", badgerRouter);
badgerRouter.post("/verify-session", badger.verifyResourceSession);
badgerRouter.post("/exchange-session", badger.exchangeSession);
if (config.isHybridMode()) {
badgerRouter.post("/exchange-session", (req, res, next) =>
proxyToRemote(req, res, next, "hybrid/badger/exchange-session")
);
} else {
badgerRouter.post("/exchange-session", badger.exchangeSession);
}
export default internalRouter;

View File

@@ -13,7 +13,7 @@ import {
import { clients, clientSites, Newt, sites } from "@server/db";
import { eq, and, inArray } from "drizzle-orm";
import { updatePeer } from "../olm/peers";
import axios from "axios";
import { sendToExitNode } from "../../lib/exitNodeComms";
const inputSchema = z.object({
publicKey: z.string(),
@@ -102,41 +102,28 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
.from(exitNodes)
.where(eq(exitNodes.exitNodeId, site.exitNodeId))
.limit(1);
if (exitNode.reachableAt && existingSite.subnet && existingSite.listenPort) {
try {
const response = await axios.post(
`${exitNode.reachableAt}/update-proxy-mapping`,
{
oldDestination: {
destinationIP: existingSite.subnet?.split("/")[0],
destinationPort: existingSite.listenPort
},
newDestination: {
destinationIP: site.subnet?.split("/")[0],
destinationPort: site.listenPort
}
},
{
headers: {
"Content-Type": "application/json"
}
}
);
logger.info("Destinations updated:", {
peer: response.data.status
});
} catch (error) {
if (axios.isAxiosError(error)) {
logger.error(
`Error updating proxy mapping (can Pangolin see Gerbil HTTP API?) for exit node at ${exitNode.reachableAt} (status: ${error.response?.status}): ${error.message}`
);
} else {
logger.error(
`Error updating proxy mapping for exit node at ${exitNode.reachableAt}: ${error}`
);
if (
exitNode.reachableAt &&
existingSite.subnet &&
existingSite.listenPort
) {
const payload = {
oldDestination: {
destinationIP: existingSite.subnet?.split("/")[0],
destinationPort: existingSite.listenPort
},
newDestination: {
destinationIP: site.subnet?.split("/")[0],
destinationPort: site.listenPort
}
}
};
await sendToExitNode(exitNode, {
remoteType: "remoteExitNode/update-proxy-mapping",
localPath: "/update-proxy-mapping",
method: "POST",
data: payload
});
}
}

View File

@@ -4,6 +4,7 @@ import { exitNodes, Newt } from "@server/db";
import logger from "@server/logger";
import config from "@server/lib/config";
import { ne, eq, or, and, count } from "drizzle-orm";
import { listExitNodes } from "@server/lib/exitNodes";
export const handleNewtPingRequestMessage: MessageHandler = async (context) => {
const { message, client, sendToClient } = context;
@@ -16,12 +17,19 @@ export const handleNewtPingRequestMessage: MessageHandler = async (context) => {
return;
}
// TODO: pick which nodes to send and ping better than just all of them
let exitNodesList = await db
.select()
.from(exitNodes);
// Get the newt's orgId through the site relationship
if (!newt.siteId) {
logger.warn("Newt siteId not found");
return;
}
exitNodesList = exitNodesList.filter((node) => node.maxConnections !== 0);
const [site] = await db
.select({ orgId: sites.orgId })
.from(sites)
.where(eq(sites.siteId, newt.siteId))
.limit(1);
const exitNodesList = await listExitNodes(site.orgId, true); // filter for only the online ones
let lastExitNodeId = null;
if (newt.siteId) {
@@ -54,9 +62,9 @@ export const handleNewtPingRequestMessage: MessageHandler = async (context) => {
)
);
if (currentConnections.count >= maxConnections) {
return null;
}
if (currentConnections.count >= maxConnections) {
return null;
}
weight =
(maxConnections - currentConnections.count) /

View File

@@ -9,6 +9,7 @@ import {
findNextAvailableCidr,
getNextAvailableClientSubnet
} from "@server/lib/ip";
import { selectBestExitNode, verifyExitNodeOrgAccess } from "@server/lib/exitNodes";
export type ExitNodePingResult = {
exitNodeId: number;
@@ -24,7 +25,7 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
const { message, client, sendToClient } = context;
const newt = client as Newt;
logger.info("Handling register newt message!");
logger.debug("Handling register newt message!");
if (!newt) {
logger.warn("Newt not found");
@@ -64,16 +65,6 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
exitNodeId = bestPingResult.exitNodeId;
}
if (newtVersion) {
// update the newt version in the database
await db
.update(newts)
.set({
version: newtVersion as string
})
.where(eq(newts.newtId, newt.newtId));
}
const [oldSite] = await db
.select()
.from(sites)
@@ -91,6 +82,18 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
// This effectively moves the exit node to the new one
exitNodeIdToQuery = exitNodeId; // Use the provided exitNodeId if it differs from the site's exitNodeId
const { exitNode, hasAccess } = await verifyExitNodeOrgAccess(exitNodeIdToQuery, oldSite.orgId);
if (!exitNode) {
logger.warn("Exit node not found");
return;
}
if (!hasAccess) {
logger.warn("Not authorized to use this exit node");
return;
}
const sitesQuery = await db
.select({
subnet: sites.subnet
@@ -98,12 +101,6 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
.from(sites)
.where(eq(sites.exitNodeId, exitNodeId));
const [exitNode] = await db
.select()
.from(exitNodes)
.where(eq(exitNodes.exitNodeId, exitNodeIdToQuery))
.limit(1);
const blockSize = config.getRawConfig().gerbil.site_block_size;
const subnets = sitesQuery
.map((site) => site.subnet)
@@ -162,6 +159,16 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
allowedIps: [siteSubnet]
});
if (newtVersion && newtVersion !== newt.version) {
// update the newt version in the database
await db
.update(newts)
.set({
version: newtVersion as string
})
.where(eq(newts.newtId, newt.newtId));
}
// Get all enabled targets with their resource protocol information
const allTargets = await db
.select({
@@ -217,15 +224,4 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
broadcast: false, // Send to all clients
excludeSender: false // Include sender in broadcast
};
};
function selectBestExitNode(
pingResults: ExitNodePingResult[]
): ExitNodePingResult | null {
if (!pingResults || pingResults.length === 0) {
logger.warn("No ping results provided");
return null;
}
return pingResults[0];
}
};

View File

@@ -1,7 +1,7 @@
import { db } from "@server/db";
import { MessageHandler } from "../ws";
import { clients, Olm } from "@server/db";
import { eq, lt, isNull } from "drizzle-orm";
import { eq, lt, isNull, and, or } from "drizzle-orm";
import logger from "@server/logger";
// Track if the offline checker interval is running
@@ -13,22 +13,27 @@ const OFFLINE_THRESHOLD_MS = 2 * 60 * 1000; // 2 minutes
* Starts the background interval that checks for clients that haven't pinged recently
* and marks them as offline
*/
export const startOfflineChecker = (): void => {
export const startOlmOfflineChecker = (): void => {
if (offlineCheckerInterval) {
return; // Already running
}
offlineCheckerInterval = setInterval(async () => {
try {
const twoMinutesAgo = new Date(Date.now() - OFFLINE_THRESHOLD_MS);
const twoMinutesAgo = Math.floor((Date.now() - OFFLINE_THRESHOLD_MS) / 1000);
// Find clients that haven't pinged in the last 2 minutes and mark them as offline
await db
.update(clients)
.set({ online: false })
.where(
eq(clients.online, true) &&
(lt(clients.lastPing, twoMinutesAgo.getTime() / 1000) || isNull(clients.lastPing))
and(
eq(clients.online, true),
or(
lt(clients.lastPing, twoMinutesAgo),
isNull(clients.lastPing)
)
)
);
} catch (error) {
@@ -42,7 +47,7 @@ export const startOfflineChecker = (): void => {
/**
* Stops the background interval that checks for offline clients
*/
export const stopOfflineChecker = (): void => {
export const stopOlmOfflineChecker = (): void => {
if (offlineCheckerInterval) {
clearInterval(offlineCheckerInterval);
offlineCheckerInterval = null;
@@ -72,7 +77,7 @@ export const handleOlmPingMessage: MessageHandler = async (context) => {
await db
.update(clients)
.set({
lastPing: new Date().getTime() / 1000,
lastPing: Math.floor(Date.now() / 1000),
online: true,
})
.where(eq(clients.clientId, olm.clientId));

View File

@@ -4,6 +4,7 @@ import { clients, clientSites, exitNodes, Olm, olms, sites } from "@server/db";
import { and, eq, inArray } from "drizzle-orm";
import { addPeer, deletePeer } from "../newt/peers";
import logger from "@server/logger";
import { listExitNodes } from "@server/lib/exitNodes";
export const handleOlmRegisterMessage: MessageHandler = async (context) => {
logger.info("Handling register olm message!");
@@ -48,7 +49,7 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
// TODO: FOR NOW WE ARE JUST HOLEPUNCHING ALL EXIT NODES BUT IN THE FUTURE WE SHOULD HANDLE THIS BETTER
// Get the exit node
const allExitNodes = await db.select().from(exitNodes);
const allExitNodes = await listExitNodes(client.orgId, true); // FILTER THE ONLINE ONES
const exitNodesHpData = allExitNodes.map((exitNode: ExitNode) => {
return {

View File

@@ -1,6 +1,6 @@
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import { clients, db } from "@server/db";
import { clients, db, exitNodes } from "@server/db";
import { roles, userSites, sites, roleSites, Site, orgs } from "@server/db";
import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode";
@@ -17,6 +17,7 @@ import { hashPassword } from "@server/auth/password";
import { isValidIP } from "@server/lib/validators";
import { isIpInCidr } from "@server/lib/ip";
import config from "@server/lib/config";
import { verifyExitNodeOrgAccess } from "@server/lib/exitNodes";
const createSiteParamsSchema = z
.object({
@@ -217,6 +218,32 @@ export async function createSite(
);
}
const { exitNode, hasAccess } =
await verifyExitNodeOrgAccess(
exitNodeId,
orgId
);
if (!exitNode) {
logger.warn("Exit node not found");
return next(
createHttpError(
HttpCode.NOT_FOUND,
"Exit node not found"
)
);
}
if (!hasAccess) {
logger.warn("Not authorized to use this exit node");
return next(
createHttpError(
HttpCode.FORBIDDEN,
"Not authorized to use this exit node"
)
);
}
[newSite] = await trx
.insert(sites)
.values({

View File

@@ -6,12 +6,16 @@ import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
import logger from "@server/logger";
import { findNextAvailableCidr, getNextAvailableClientSubnet } from "@server/lib/ip";
import {
findNextAvailableCidr,
getNextAvailableClientSubnet
} from "@server/lib/ip";
import { generateId } from "@server/auth/sessions/app";
import config from "@server/lib/config";
import { OpenAPITags, registry } from "@server/openApi";
import { fromError } from "zod-validation-error";
import { z } from "zod";
import { listExitNodes } from "@server/lib/exitNodes";
export type PickSiteDefaultsResponse = {
exitNodeId: number;
@@ -65,16 +69,10 @@ export async function pickSiteDefaults(
const { orgId } = parsedParams.data;
// TODO: more intelligent way to pick the exit node
// make sure there is an exit node by counting the exit nodes table
const nodes = await db.select().from(exitNodes);
if (nodes.length === 0) {
return next(
createHttpError(HttpCode.NOT_FOUND, "No exit nodes available")
);
}
const exitNodesList = await listExitNodes(orgId);
// get the first exit node
const exitNode = nodes[0];
const randomExitNode =
exitNodesList[Math.floor(Math.random() * exitNodesList.length)];
// TODO: this probably can be optimized...
// list all of the sites on that exit node
@@ -83,13 +81,15 @@ export async function pickSiteDefaults(
subnet: sites.subnet
})
.from(sites)
.where(eq(sites.exitNodeId, exitNode.exitNodeId));
.where(eq(sites.exitNodeId, randomExitNode.exitNodeId));
// TODO: we need to lock this subnet for some time so someone else does not take it
const subnets = sitesQuery.map((site) => site.subnet).filter((subnet) => subnet !== null);
const subnets = sitesQuery
.map((site) => site.subnet)
.filter((subnet) => subnet !== null);
// exclude the exit node address by replacing after the / with a site block size
subnets.push(
exitNode.address.replace(
randomExitNode.address.replace(
/\/\d+$/,
`/${config.getRawConfig().gerbil.site_block_size}`
)
@@ -97,7 +97,7 @@ export async function pickSiteDefaults(
const newSubnet = findNextAvailableCidr(
subnets,
config.getRawConfig().gerbil.site_block_size,
exitNode.address
randomExitNode.address
);
if (!newSubnet) {
return next(
@@ -125,12 +125,12 @@ export async function pickSiteDefaults(
return response<PickSiteDefaultsResponse>(res, {
data: {
exitNodeId: exitNode.exitNodeId,
address: exitNode.address,
publicKey: exitNode.publicKey,
name: exitNode.name,
listenPort: exitNode.listenPort,
endpoint: exitNode.endpoint,
exitNodeId: randomExitNode.exitNodeId,
address: randomExitNode.address,
publicKey: randomExitNode.publicKey,
name: randomExitNode.name,
listenPort: randomExitNode.listenPort,
endpoint: randomExitNode.endpoint,
// subnet: `${newSubnet.split("/")[0]}/${config.getRawConfig().gerbil.block_size}`, // we want the block size of the whole subnet
subnet: newSubnet,
clientAddress: clientAddress,

View File

@@ -5,252 +5,273 @@ import logger from "@server/logger";
import HttpCode from "@server/types/HttpCode";
import config from "@server/lib/config";
import { orgs, resources, sites, Target, targets } from "@server/db";
// Extended Target interface that includes site information
interface TargetWithSite extends Target {
site: {
siteId: number;
type: string;
subnet: string | null;
exitNodeId: number | null;
};
}
import { build } from "@server/build";
let currentExitNodeId: number;
const redirectHttpsMiddlewareName = "redirect-to-https";
const badgerMiddlewareName = "badger";
export async function getCurrentExitNodeId(): Promise<number> {
if (!currentExitNodeId) {
if (config.getRawConfig().gerbil.exit_node_name) {
const exitNodeName = config.getRawConfig().gerbil.exit_node_name!;
const [exitNode] = await db
.select({
exitNodeId: exitNodes.exitNodeId
})
.from(exitNodes)
.where(eq(exitNodes.name, exitNodeName));
if (exitNode) {
currentExitNodeId = exitNode.exitNodeId;
}
} else {
const [exitNode] = await db
.select({
exitNodeId: exitNodes.exitNodeId
})
.from(exitNodes)
.limit(1);
if (exitNode) {
currentExitNodeId = exitNode.exitNodeId;
}
}
}
return currentExitNodeId;
}
export async function traefikConfigProvider(
_: Request,
res: Response
): Promise<any> {
try {
// Get all resources with related data
const allResources = await db.transaction(async (tx) => {
// First query to get resources with site and org info
// Get the current exit node name from config
if (!currentExitNodeId) {
if (config.getRawConfig().gerbil.exit_node_name) {
const exitNodeName =
config.getRawConfig().gerbil.exit_node_name!;
const [exitNode] = await tx
.select({
exitNodeId: exitNodes.exitNodeId
})
.from(exitNodes)
.where(eq(exitNodes.name, exitNodeName));
if (exitNode) {
currentExitNodeId = exitNode.exitNodeId;
}
} else {
const [exitNode] = await tx
.select({
exitNodeId: exitNodes.exitNodeId
})
.from(exitNodes)
.limit(1);
// First query to get resources with site and org info
// Get the current exit node name from config
await getCurrentExitNodeId();
if (exitNode) {
currentExitNodeId = exitNode.exitNodeId;
}
}
}
let traefikConfig = await getTraefikConfig(
currentExitNodeId,
config.getRawConfig().traefik.site_types
);
// Get resources with their targets and sites in a single optimized query
// Start from sites on this exit node, then join to targets and resources
const resourcesWithTargetsAndSites = await tx
.select({
// Resource fields
resourceId: resources.resourceId,
fullDomain: resources.fullDomain,
ssl: resources.ssl,
http: resources.http,
proxyPort: resources.proxyPort,
protocol: resources.protocol,
subdomain: resources.subdomain,
domainId: resources.domainId,
enabled: resources.enabled,
stickySession: resources.stickySession,
tlsServerName: resources.tlsServerName,
setHostHeader: resources.setHostHeader,
enableProxy: resources.enableProxy,
// Target fields
targetId: targets.targetId,
targetEnabled: targets.enabled,
ip: targets.ip,
method: targets.method,
port: targets.port,
internalPort: targets.internalPort,
// Site fields
siteId: sites.siteId,
siteType: sites.type,
subnet: sites.subnet,
exitNodeId: sites.exitNodeId
})
.from(sites)
.innerJoin(targets, eq(targets.siteId, sites.siteId))
.innerJoin(resources, eq(resources.resourceId, targets.resourceId))
.where(
and(
eq(targets.enabled, true),
eq(resources.enabled, true),
or(
eq(sites.exitNodeId, currentExitNodeId),
isNull(sites.exitNodeId)
)
)
);
traefikConfig.http.middlewares[badgerMiddlewareName] = {
plugin: {
[badgerMiddlewareName]: {
apiBaseUrl: new URL(
"/api/v1",
`http://${
config.getRawConfig().server.internal_hostname
}:${config.getRawConfig().server.internal_port}`
).href,
userSessionCookieName:
config.getRawConfig().server.session_cookie_name,
// Group by resource and include targets with their unique site data
const resourcesMap = new Map();
// deprecated
accessTokenQueryParam:
config.getRawConfig().server
.resource_access_token_param,
resourcesWithTargetsAndSites.forEach((row) => {
const resourceId = row.resourceId;
if (!resourcesMap.has(resourceId)) {
resourcesMap.set(resourceId, {
resourceId: row.resourceId,
fullDomain: row.fullDomain,
ssl: row.ssl,
http: row.http,
proxyPort: row.proxyPort,
protocol: row.protocol,
subdomain: row.subdomain,
domainId: row.domainId,
enabled: row.enabled,
stickySession: row.stickySession,
tlsServerName: row.tlsServerName,
setHostHeader: row.setHostHeader,
enableProxy: row.enableProxy,
targets: []
});
}
// Add target with its associated site data
resourcesMap.get(resourceId).targets.push({
resourceId: row.resourceId,
targetId: row.targetId,
ip: row.ip,
method: row.method,
port: row.port,
internalPort: row.internalPort,
enabled: row.targetEnabled,
site: {
siteId: row.siteId,
type: row.siteType,
subnet: row.subnet,
exitNodeId: row.exitNodeId
}
});
});
return Array.from(resourcesMap.values());
});
if (!allResources.length) {
return res.status(HttpCode.OK).json({});
}
const badgerMiddlewareName = "badger";
const redirectHttpsMiddlewareName = "redirect-to-https";
const config_output: any = {
http: {
middlewares: {
[badgerMiddlewareName]: {
plugin: {
[badgerMiddlewareName]: {
apiBaseUrl: new URL(
"/api/v1",
`http://${
config.getRawConfig().server
.internal_hostname
}:${
config.getRawConfig().server
.internal_port
}`
).href,
userSessionCookieName:
config.getRawConfig().server
.session_cookie_name,
// deprecated
accessTokenQueryParam:
config.getRawConfig().server
.resource_access_token_param,
resourceSessionRequestParam:
config.getRawConfig().server
.resource_session_request_param
}
}
},
[redirectHttpsMiddlewareName]: {
redirectScheme: {
scheme: "https"
}
}
resourceSessionRequestParam:
config.getRawConfig().server
.resource_session_request_param
}
}
};
for (const resource of allResources) {
const targets = resource.targets as TargetWithSite[];
return res.status(HttpCode.OK).json(traefikConfig);
} catch (e) {
logger.error(`Failed to build Traefik config: ${e}`);
return res.status(HttpCode.INTERNAL_SERVER_ERROR).json({
error: "Failed to build Traefik config"
});
}
}
const routerName = `${resource.resourceId}-router`;
const serviceName = `${resource.resourceId}-service`;
const fullDomain = `${resource.fullDomain}`;
const transportName = `${resource.resourceId}-transport`;
const hostHeaderMiddlewareName = `${resource.resourceId}-host-header-middleware`;
export async function getTraefikConfig(
exitNodeId: number,
siteTypes: string[]
): Promise<any> {
// Define extended target type with site information
type TargetWithSite = Target & {
site: {
siteId: number;
type: string;
subnet: string | null;
exitNodeId: number | null;
};
};
if (!resource.enabled) {
// Get all resources with related data
const allResources = await db.transaction(async (tx) => {
// Get resources with their targets and sites in a single optimized query
// Start from sites on this exit node, then join to targets and resources
const resourcesWithTargetsAndSites = await tx
.select({
// Resource fields
resourceId: resources.resourceId,
fullDomain: resources.fullDomain,
ssl: resources.ssl,
http: resources.http,
proxyPort: resources.proxyPort,
protocol: resources.protocol,
subdomain: resources.subdomain,
domainId: resources.domainId,
enabled: resources.enabled,
stickySession: resources.stickySession,
tlsServerName: resources.tlsServerName,
setHostHeader: resources.setHostHeader,
enableProxy: resources.enableProxy,
// Target fields
targetId: targets.targetId,
targetEnabled: targets.enabled,
ip: targets.ip,
method: targets.method,
port: targets.port,
internalPort: targets.internalPort,
// Site fields
siteId: sites.siteId,
siteType: sites.type,
subnet: sites.subnet,
exitNodeId: sites.exitNodeId
})
.from(sites)
.innerJoin(targets, eq(targets.siteId, sites.siteId))
.innerJoin(resources, eq(resources.resourceId, targets.resourceId))
.where(
and(
eq(targets.enabled, true),
eq(resources.enabled, true),
or(
eq(sites.exitNodeId, exitNodeId),
isNull(sites.exitNodeId)
),
inArray(sites.type, siteTypes)
)
);
// Group by resource and include targets with their unique site data
const resourcesMap = new Map();
resourcesWithTargetsAndSites.forEach((row) => {
const resourceId = row.resourceId;
if (!resourcesMap.has(resourceId)) {
resourcesMap.set(resourceId, {
resourceId: row.resourceId,
fullDomain: row.fullDomain,
ssl: row.ssl,
http: row.http,
proxyPort: row.proxyPort,
protocol: row.protocol,
subdomain: row.subdomain,
domainId: row.domainId,
enabled: row.enabled,
stickySession: row.stickySession,
tlsServerName: row.tlsServerName,
setHostHeader: row.setHostHeader,
enableProxy: row.enableProxy,
targets: []
});
}
// Add target with its associated site data
resourcesMap.get(resourceId).targets.push({
resourceId: row.resourceId,
targetId: row.targetId,
ip: row.ip,
method: row.method,
port: row.port,
internalPort: row.internalPort,
enabled: row.targetEnabled,
site: {
siteId: row.siteId,
type: row.siteType,
subnet: row.subnet,
exitNodeId: row.exitNodeId
}
});
});
return Array.from(resourcesMap.values());
});
if (!allResources.length) {
return {};
}
const config_output: any = {
http: {
middlewares: {
[redirectHttpsMiddlewareName]: {
redirectScheme: {
scheme: "https"
}
}
}
}
};
for (const resource of allResources) {
const targets = resource.targets;
const routerName = `${resource.resourceId}-router`;
const serviceName = `${resource.resourceId}-service`;
const fullDomain = `${resource.fullDomain}`;
const transportName = `${resource.resourceId}-transport`;
const hostHeaderMiddlewareName = `${resource.resourceId}-host-header-middleware`;
if (!resource.enabled) {
continue;
}
if (resource.http) {
if (!resource.domainId) {
continue;
}
if (resource.http) {
if (!resource.domainId) {
continue;
}
if (!resource.fullDomain) {
logger.error(
`Resource ${resource.resourceId} has no fullDomain`
);
continue;
}
if (!resource.fullDomain) {
logger.error(
`Resource ${resource.resourceId} has no fullDomain`
);
continue;
}
// add routers and services empty objects if they don't exist
if (!config_output.http.routers) {
config_output.http.routers = {};
}
// add routers and services empty objects if they don't exist
if (!config_output.http.routers) {
config_output.http.routers = {};
}
if (!config_output.http.services) {
config_output.http.services = {};
}
if (!config_output.http.services) {
config_output.http.services = {};
}
const domainParts = fullDomain.split(".");
let wildCard;
if (domainParts.length <= 2) {
wildCard = `*.${domainParts.join(".")}`;
} else {
wildCard = `*.${domainParts.slice(1).join(".")}`;
}
const domainParts = fullDomain.split(".");
let wildCard;
if (domainParts.length <= 2) {
wildCard = `*.${domainParts.join(".")}`;
} else {
wildCard = `*.${domainParts.slice(1).join(".")}`;
}
if (!resource.subdomain) {
wildCard = resource.fullDomain;
}
if (!resource.subdomain) {
wildCard = resource.fullDomain;
}
const configDomain = config.getDomain(resource.domainId);
const configDomain = config.getDomain(resource.domainId);
let certResolver: string, preferWildcardCert: boolean;
if (!configDomain) {
certResolver = config.getRawConfig().traefik.cert_resolver;
preferWildcardCert =
config.getRawConfig().traefik.prefer_wildcard_cert;
} else {
certResolver = configDomain.cert_resolver;
preferWildcardCert = configDomain.prefer_wildcard_cert;
}
let certResolver: string, preferWildcardCert: boolean;
if (!configDomain) {
certResolver = config.getRawConfig().traefik.cert_resolver;
preferWildcardCert =
config.getRawConfig().traefik.prefer_wildcard_cert;
} else {
certResolver = configDomain.cert_resolver;
preferWildcardCert = configDomain.prefer_wildcard_cert;
}
const tls = {
let tls = {};
if (build == "oss") {
tls = {
certResolver: certResolver,
...(preferWildcardCert
? {
@@ -262,214 +283,208 @@ export async function traefikConfigProvider(
}
: {})
};
}
const additionalMiddlewares =
config.getRawConfig().traefik.additional_middlewares || [];
const additionalMiddlewares =
config.getRawConfig().traefik.additional_middlewares || [];
config_output.http.routers![routerName] = {
config_output.http.routers![routerName] = {
entryPoints: [
resource.ssl
? config.getRawConfig().traefik.https_entrypoint
: config.getRawConfig().traefik.http_entrypoint
],
middlewares: [badgerMiddlewareName, ...additionalMiddlewares],
service: serviceName,
rule: `Host(\`${fullDomain}\`)`,
priority: 100,
...(resource.ssl ? { tls } : {})
};
if (resource.ssl) {
config_output.http.routers![routerName + "-redirect"] = {
entryPoints: [
resource.ssl
? config.getRawConfig().traefik.https_entrypoint
: config.getRawConfig().traefik.http_entrypoint
],
middlewares: [
badgerMiddlewareName,
...additionalMiddlewares
config.getRawConfig().traefik.http_entrypoint
],
middlewares: [redirectHttpsMiddlewareName],
service: serviceName,
rule: `Host(\`${fullDomain}\`)`,
priority: 100,
...(resource.ssl ? { tls } : {})
};
if (resource.ssl) {
config_output.http.routers![routerName + "-redirect"] = {
entryPoints: [
config.getRawConfig().traefik.http_entrypoint
],
middlewares: [redirectHttpsMiddlewareName],
service: serviceName,
rule: `Host(\`${fullDomain}\`)`,
priority: 100
};
}
config_output.http.services![serviceName] = {
loadBalancer: {
servers: targets
.filter((target: TargetWithSite) => {
if (!target.enabled) {
return false;
}
if (
target.site.type === "local" ||
target.site.type === "wireguard"
) {
if (
!target.ip ||
!target.port ||
!target.method
) {
return false;
}
} else if (target.site.type === "newt") {
if (
!target.internalPort ||
!target.method ||
!target.site.subnet
) {
return false;
}
}
return true;
})
.map((target: TargetWithSite) => {
if (
target.site.type === "local" ||
target.site.type === "wireguard"
) {
return {
url: `${target.method}://${target.ip}:${target.port}`
};
} else if (target.site.type === "newt") {
const ip = target.site.subnet!.split("/")[0];
return {
url: `${target.method}://${ip}:${target.internalPort}`
};
}
}),
...(resource.stickySession
? {
sticky: {
cookie: {
name: "p_sticky", // TODO: make this configurable via config.yml like other cookies
secure: resource.ssl,
httpOnly: true
}
}
}
: {})
}
};
// Add the serversTransport if TLS server name is provided
if (resource.tlsServerName) {
if (!config_output.http.serversTransports) {
config_output.http.serversTransports = {};
}
config_output.http.serversTransports![transportName] = {
serverName: resource.tlsServerName,
//unfortunately the following needs to be set. traefik doesn't merge the default serverTransport settings
// if defined in the static config and here. if not set, self-signed certs won't work
insecureSkipVerify: true
};
config_output.http.services![
serviceName
].loadBalancer.serversTransport = transportName;
}
// Add the host header middleware
if (resource.setHostHeader) {
if (!config_output.http.middlewares) {
config_output.http.middlewares = {};
}
config_output.http.middlewares[hostHeaderMiddlewareName] = {
headers: {
customRequestHeaders: {
Host: resource.setHostHeader
}
}
};
if (!config_output.http.routers![routerName].middlewares) {
config_output.http.routers![routerName].middlewares =
[];
}
config_output.http.routers![routerName].middlewares = [
...config_output.http.routers![routerName].middlewares,
hostHeaderMiddlewareName
];
}
} else {
// Non-HTTP (TCP/UDP) configuration
if (!resource.enableProxy) {
continue;
}
const protocol = resource.protocol.toLowerCase();
const port = resource.proxyPort;
if (!port) {
continue;
}
if (!config_output[protocol]) {
config_output[protocol] = {
routers: {},
services: {}
};
}
config_output[protocol].routers[routerName] = {
entryPoints: [`${protocol}-${port}`],
service: serviceName,
...(protocol === "tcp" ? { rule: "HostSNI(`*`)" } : {})
};
config_output[protocol].services[serviceName] = {
loadBalancer: {
servers: targets
.filter((target: TargetWithSite) => {
if (!target.enabled) {
return false;
}
if (
target.site.type === "local" ||
target.site.type === "wireguard"
) {
if (!target.ip || !target.port) {
return false;
}
} else if (target.site.type === "newt") {
if (!target.internalPort || !target.site.subnet) {
return false;
}
}
return true;
})
.map((target: TargetWithSite) => {
if (
target.site.type === "local" ||
target.site.type === "wireguard"
) {
return {
address: `${target.ip}:${target.port}`
};
} else if (target.site.type === "newt") {
const ip = target.site.subnet!.split("/")[0];
return {
address: `${ip}:${target.internalPort}`
};
}
}),
...(resource.stickySession
? {
sticky: {
ipStrategy: {
depth: 0,
sourcePort: true
}
}
}
: {})
}
priority: 100
};
}
config_output.http.services![serviceName] = {
loadBalancer: {
servers: (targets as TargetWithSite[])
.filter((target: TargetWithSite) => {
if (!target.enabled) {
return false;
}
if (
target.site.type === "local" ||
target.site.type === "wireguard"
) {
if (
!target.ip ||
!target.port ||
!target.method
) {
return false;
}
} else if (target.site.type === "newt") {
if (
!target.internalPort ||
!target.method ||
!target.site.subnet
) {
return false;
}
}
return true;
})
.map((target: TargetWithSite) => {
if (
target.site.type === "local" ||
target.site.type === "wireguard"
) {
return {
url: `${target.method}://${target.ip}:${target.port}`
};
} else if (target.site.type === "newt") {
const ip = target.site.subnet!.split("/")[0];
return {
url: `${target.method}://${ip}:${target.internalPort}`
};
}
}),
...(resource.stickySession
? {
sticky: {
cookie: {
name: "p_sticky", // TODO: make this configurable via config.yml like other cookies
secure: resource.ssl,
httpOnly: true
}
}
}
: {})
}
};
// Add the serversTransport if TLS server name is provided
if (resource.tlsServerName) {
if (!config_output.http.serversTransports) {
config_output.http.serversTransports = {};
}
config_output.http.serversTransports![transportName] = {
serverName: resource.tlsServerName,
//unfortunately the following needs to be set. traefik doesn't merge the default serverTransport settings
// if defined in the static config and here. if not set, self-signed certs won't work
insecureSkipVerify: true
};
config_output.http.services![
serviceName
].loadBalancer.serversTransport = transportName;
}
// Add the host header middleware
if (resource.setHostHeader) {
if (!config_output.http.middlewares) {
config_output.http.middlewares = {};
}
config_output.http.middlewares[hostHeaderMiddlewareName] = {
headers: {
customRequestHeaders: {
Host: resource.setHostHeader
}
}
};
if (!config_output.http.routers![routerName].middlewares) {
config_output.http.routers![routerName].middlewares = [];
}
config_output.http.routers![routerName].middlewares = [
...config_output.http.routers![routerName].middlewares,
hostHeaderMiddlewareName
];
}
} else {
// Non-HTTP (TCP/UDP) configuration
if (!resource.enableProxy) {
continue;
}
const protocol = resource.protocol.toLowerCase();
const port = resource.proxyPort;
if (!port) {
continue;
}
if (!config_output[protocol]) {
config_output[protocol] = {
routers: {},
services: {}
};
}
config_output[protocol].routers[routerName] = {
entryPoints: [`${protocol}-${port}`],
service: serviceName,
...(protocol === "tcp" ? { rule: "HostSNI(`*`)" } : {})
};
config_output[protocol].services[serviceName] = {
loadBalancer: {
servers: (targets as TargetWithSite[])
.filter((target: TargetWithSite) => {
if (!target.enabled) {
return false;
}
if (
target.site.type === "local" ||
target.site.type === "wireguard"
) {
if (!target.ip || !target.port) {
return false;
}
} else if (target.site.type === "newt") {
if (
!target.internalPort ||
!target.site.subnet
) {
return false;
}
}
return true;
})
.map((target: TargetWithSite) => {
if (
target.site.type === "local" ||
target.site.type === "wireguard"
) {
return {
address: `${target.ip}:${target.port}`
};
} else if (target.site.type === "newt") {
const ip = target.site.subnet!.split("/")[0];
return {
address: `${ip}:${target.internalPort}`
};
}
}),
...(resource.stickySession
? {
sticky: {
ipStrategy: {
depth: 0,
sourcePort: true
}
}
}
: {})
}
};
}
return res.status(HttpCode.OK).json(config_output);
} catch (e) {
logger.error(`Failed to build Traefik config: ${e}`);
return res.status(HttpCode.INTERNAL_SERVER_ERROR).json({
error: "Failed to build Traefik config"
});
}
return config_output;
}

292
server/routers/ws/client.ts Normal file
View File

@@ -0,0 +1,292 @@
import WebSocket from 'ws';
import axios from 'axios';
import { URL } from 'url';
import { EventEmitter } from 'events';
import logger from '@server/logger';
export interface Config {
id: string;
secret: string;
endpoint: string;
}
export interface WSMessage {
type: string;
data: any;
}
export type MessageHandler = (message: WSMessage) => void;
export interface ClientOptions {
baseURL?: string;
reconnectInterval?: number;
pingInterval?: number;
pingTimeout?: number;
}
export class WebSocketClient extends EventEmitter {
private conn: WebSocket | null = null;
private baseURL: string;
private handlers: Map<string, MessageHandler> = new Map();
private reconnectInterval: number;
private isConnected: boolean = false;
private pingInterval: number;
private pingTimeout: number;
private shouldReconnect: boolean = true;
private reconnectTimer: NodeJS.Timeout | null = null;
private pingTimer: NodeJS.Timeout | null = null;
private pingTimeoutTimer: NodeJS.Timeout | null = null;
private token: string;
private isConnecting: boolean = false;
constructor(
token: string,
endpoint: string,
options: ClientOptions = {}
) {
super();
this.token = token;
this.baseURL = options.baseURL || endpoint;
this.reconnectInterval = options.reconnectInterval || 5000;
this.pingInterval = options.pingInterval || 30000;
this.pingTimeout = options.pingTimeout || 10000;
}
public async connect(): Promise<void> {
this.shouldReconnect = true;
if (!this.isConnecting) {
await this.connectWithRetry();
}
}
public async close(): Promise<void> {
this.shouldReconnect = false;
// Clear timers
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.pingTimer) {
clearInterval(this.pingTimer);
this.pingTimer = null;
}
if (this.pingTimeoutTimer) {
clearTimeout(this.pingTimeoutTimer);
this.pingTimeoutTimer = null;
}
if (this.conn) {
this.conn.close(1000, 'Client closing');
this.conn = null;
}
this.setConnected(false);
}
public sendMessage(messageType: string, data: any): Promise<void> {
return new Promise((resolve, reject) => {
if (!this.conn || this.conn.readyState !== WebSocket.OPEN) {
reject(new Error('Not connected'));
return;
}
const message: WSMessage = {
type: messageType,
data: data
};
logger.debug(`Sending message: ${messageType}`, data);
this.conn.send(JSON.stringify(message), (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
}
public sendMessageInterval(
messageType: string,
data: any,
interval: number
): () => void {
// Send immediately
this.sendMessage(messageType, data).catch(err => {
logger.error('Failed to send initial message:', err);
});
// Set up interval
const intervalId = setInterval(() => {
this.sendMessage(messageType, data).catch(err => {
logger.error('Failed to send message:', err);
});
}, interval);
// Return stop function
return () => {
clearInterval(intervalId);
};
}
public registerHandler(messageType: string, handler: MessageHandler): void {
this.handlers.set(messageType, handler);
}
public unregisterHandler(messageType: string): void {
this.handlers.delete(messageType);
}
public isClientConnected(): boolean {
return this.isConnected;
}
private async connectWithRetry(): Promise<void> {
if (this.isConnecting) return;
this.isConnecting = true;
while (this.shouldReconnect && !this.isConnected) {
try {
await this.establishConnection();
this.isConnecting = false;
return;
} catch (error) {
logger.error(`Failed to connect: ${error}. Retrying in ${this.reconnectInterval}ms...`);
if (!this.shouldReconnect) {
this.isConnecting = false;
return;
}
await new Promise(resolve => {
this.reconnectTimer = setTimeout(resolve, this.reconnectInterval);
});
}
}
this.isConnecting = false;
}
private async establishConnection(): Promise<void> {
// Parse the base URL to determine protocol and hostname
const baseURL = new URL(this.baseURL);
const wsProtocol = baseURL.protocol === 'https:' ? 'wss' : 'ws';
const wsURL = new URL(`${wsProtocol}://${baseURL.host}/api/v1/ws`);
// Add token and client type to query parameters
wsURL.searchParams.set('token', this.token);
wsURL.searchParams.set('clientType', "remoteExitNode");
return new Promise((resolve, reject) => {
const conn = new WebSocket(wsURL.toString());
conn.on('open', () => {
logger.debug('WebSocket connection established');
this.conn = conn;
this.setConnected(true);
this.isConnecting = false;
this.startPingMonitor();
this.emit('connect');
resolve();
});
conn.on('message', (data: WebSocket.Data) => {
try {
const message: WSMessage = JSON.parse(data.toString());
const handler = this.handlers.get(message.type);
if (handler) {
handler(message);
}
this.emit('message', message);
} catch (error) {
logger.error('Failed to parse message:', error);
}
});
conn.on('close', (code, reason) => {
logger.debug(`WebSocket connection closed: ${code} ${reason}`);
this.handleDisconnect();
});
conn.on('error', (error) => {
logger.error('WebSocket error:', error);
if (this.conn === null) {
// Connection failed during establishment
reject(error);
} else {
this.handleDisconnect();
}
});
conn.on('pong', () => {
if (this.pingTimeoutTimer) {
clearTimeout(this.pingTimeoutTimer);
this.pingTimeoutTimer = null;
}
});
});
}
private startPingMonitor(): void {
this.pingTimer = setInterval(() => {
if (this.conn && this.conn.readyState === WebSocket.OPEN) {
this.conn.ping();
// Set timeout for pong response
this.pingTimeoutTimer = setTimeout(() => {
logger.error('Ping timeout - no pong received');
this.handleDisconnect();
}, this.pingTimeout);
}
}, this.pingInterval);
}
private handleDisconnect(): void {
this.setConnected(false);
this.isConnecting = false;
// Clear ping timers
if (this.pingTimer) {
clearInterval(this.pingTimer);
this.pingTimer = null;
}
if (this.pingTimeoutTimer) {
clearTimeout(this.pingTimeoutTimer);
this.pingTimeoutTimer = null;
}
if (this.conn) {
this.conn.removeAllListeners();
this.conn = null;
}
this.emit('disconnect');
// Reconnect if needed
if (this.shouldReconnect) {
// Add a small delay before starting reconnection to prevent immediate retry
setTimeout(() => {
this.connectWithRetry();
}, 1000);
}
}
private setConnected(status: boolean): void {
this.isConnected = status;
}
}
// Factory function for easier instantiation
export function createWebSocketClient(
token: string,
endpoint: string,
options?: ClientOptions
): WebSocketClient {
return new WebSocketClient(token, endpoint, options);
}
export default WebSocketClient;

View File

@@ -10,7 +10,7 @@ import {
handleOlmRegisterMessage,
handleOlmRelayMessage,
handleOlmPingMessage,
startOfflineChecker
startOlmOfflineChecker
} from "../olm";
import { MessageHandler } from "./ws";
@@ -26,4 +26,4 @@ export const messageHandlers: Record<string, MessageHandler> = {
"newt/ping/request": handleNewtPingRequestMessage
};
startOfflineChecker(); // this is to handle the offline check for olms
startOlmOfflineChecker(); // this is to handle the offline check for olms