diff --git a/server/db/pg/schema/privateSchema.ts b/server/db/pg/schema/privateSchema.ts index 266a8646..17d262c6 100644 --- a/server/db/pg/schema/privateSchema.ts +++ b/server/db/pg/schema/privateSchema.ts @@ -167,6 +167,7 @@ export const remoteExitNodes = pgTable("remoteExitNode", { secretHash: varchar("secretHash").notNull(), dateCreated: varchar("dateCreated").notNull(), version: varchar("version"), + secondaryVersion: varchar("secondaryVersion"), // This is to detect the new nodes after the transition to pangolin-node exitNodeId: integer("exitNodeId").references(() => exitNodes.exitNodeId, { onDelete: "cascade" }) diff --git a/server/db/sqlite/schema/privateSchema.ts b/server/db/sqlite/schema/privateSchema.ts index 89d11310..65396770 100644 --- a/server/db/sqlite/schema/privateSchema.ts +++ b/server/db/sqlite/schema/privateSchema.ts @@ -162,6 +162,7 @@ export const remoteExitNodes = sqliteTable("remoteExitNode", { secretHash: text("secretHash").notNull(), dateCreated: text("dateCreated").notNull(), version: text("version"), + secondaryVersion: text("secondaryVersion"), // This is to detect the new nodes after the transition to pangolin-node exitNodeId: integer("exitNodeId").references(() => exitNodes.exitNodeId, { onDelete: "cascade" }) diff --git a/server/private/routers/hybrid.ts b/server/private/routers/hybrid.ts index 2b54ae9d..a8b6a174 100644 --- a/server/private/routers/hybrid.ts +++ b/server/private/routers/hybrid.ts @@ -1077,7 +1077,12 @@ hybridRouter.get( .where(eq(resourceRules.resourceId, resourceId)); // backward compatibility: COUNTRY -> GEOIP - if ((remoteExitNode.version && semver.lt(remoteExitNode.version, "1.1.0")) || !remoteExitNode.version) { + // TODO: remove this after a few versions once all exit nodes are updated + if ( + (remoteExitNode.secondaryVersion && + semver.lt(remoteExitNode.secondaryVersion, "1.1.0")) || + !remoteExitNode.secondaryVersion + ) { for (const rule of rules) { if (rule.match == "COUNTRY") { rule.match = "GEOIP"; @@ -1085,6 +1090,10 @@ hybridRouter.get( } } + logger.debug( + `Retrieved ${rules.length} rules for resource ID ${resourceId}: ${JSON.stringify(rules)}` + ); + return response<(typeof resourceRules.$inferSelect)[]>(res, { data: rules, success: true, @@ -1692,23 +1701,9 @@ const batchLogsSchema = z.object({ }); hybridRouter.post( - "/org/:orgId/logs/batch", + "/logs/batch", async (req: Request, res: Response, next: NextFunction) => { try { - const parsedParams = getOrgLoginPageParamsSchema.safeParse( - req.params - ); - if (!parsedParams.success) { - return next( - createHttpError( - HttpCode.BAD_REQUEST, - fromError(parsedParams.error).toString() - ) - ); - } - - const { orgId } = parsedParams.data; - const parsedBody = batchLogsSchema.safeParse(req.body); if (!parsedBody.success) { return next( @@ -1732,39 +1727,48 @@ hybridRouter.post( ); } - if (await checkExitNodeOrg(remoteExitNode.exitNodeId, orgId)) { - // If the exit node is not allowed for the org, return an error - return next( - createHttpError( - HttpCode.FORBIDDEN, - "Exit node not allowed for this organization" - ) - ); - } + const exitNodeOrgsRes = await db + .select() + .from(exitNodeOrgs) + .where( + and(eq(exitNodeOrgs.exitNodeId, remoteExitNode.exitNodeId)) + ) + .limit(1); // Batch insert all logs in a single query - const logEntries = logs.map((logEntry) => ({ - timestamp: logEntry.timestamp, - orgId: logEntry.orgId, - actorType: logEntry.actorType, - actor: logEntry.actor, - actorId: logEntry.actorId, - metadata: logEntry.metadata, - action: logEntry.action, - resourceId: logEntry.resourceId, - reason: logEntry.reason, - location: logEntry.location, - // userAgent: data.userAgent, // TODO: add this - // headers: data.body.headers, - // query: data.body.query, - originalRequestURL: logEntry.originalRequestURL, - scheme: logEntry.scheme, - host: logEntry.host, - path: logEntry.path, - method: logEntry.method, - ip: logEntry.ip, - tls: logEntry.tls - })); + const logEntries = logs + .filter((logEntry) => { + if (!logEntry.orgId) { + return false; + } + + const isOrgAllowed = exitNodeOrgsRes.some( + (eno) => eno.orgId === logEntry.orgId + ); + return isOrgAllowed; + }) + .map((logEntry) => ({ + timestamp: logEntry.timestamp, + orgId: logEntry.orgId, + actorType: logEntry.actorType, + actor: logEntry.actor, + actorId: logEntry.actorId, + metadata: logEntry.metadata, + action: logEntry.action, + resourceId: logEntry.resourceId, + reason: logEntry.reason, + location: logEntry.location, + // userAgent: data.userAgent, // TODO: add this + // headers: data.body.headers, + // query: data.body.query, + originalRequestURL: logEntry.originalRequestURL, + scheme: logEntry.scheme, + host: logEntry.host, + path: logEntry.path, + method: logEntry.method, + ip: logEntry.ip, + tls: logEntry.tls + })); await db.insert(requestAuditLog).values(logEntries); diff --git a/server/private/routers/remoteExitNode/handleRemoteExitNodeRegisterMessage.ts b/server/private/routers/remoteExitNode/handleRemoteExitNodeRegisterMessage.ts index 9e50a841..a733db7d 100644 --- a/server/private/routers/remoteExitNode/handleRemoteExitNodeRegisterMessage.ts +++ b/server/private/routers/remoteExitNode/handleRemoteExitNodeRegisterMessage.ts @@ -29,7 +29,7 @@ export const handleRemoteExitNodeRegisterMessage: MessageHandler = async ( return; } - const { remoteExitNodeVersion } = message.data; + const { remoteExitNodeVersion, remoteExitNodeSecondaryVersion } = message.data; if (!remoteExitNodeVersion) { logger.warn("Remote exit node version not found"); @@ -39,7 +39,7 @@ export const handleRemoteExitNodeRegisterMessage: MessageHandler = async ( // update the version await db .update(remoteExitNodes) - .set({ version: remoteExitNodeVersion }) + .set({ version: remoteExitNodeVersion, secondaryVersion: remoteExitNodeSecondaryVersion }) .where( eq( remoteExitNodes.remoteExitNodeId, diff --git a/server/routers/ws/client.ts b/server/routers/ws/client.ts deleted file mode 100644 index 13b5d0da..00000000 --- a/server/routers/ws/client.ts +++ /dev/null @@ -1,315 +0,0 @@ -import WebSocket from 'ws'; -import axios from 'axios'; -import { URL } from 'url'; -import { EventEmitter } from 'events'; -import logger from '@server/logger'; - -export interface Config { - id: string; - secret: string; - endpoint: string; -} - -export interface WSMessage { - type: string; - data: any; -} - -export type MessageHandler = (message: WSMessage) => void; - -export interface ClientOptions { - baseURL?: string; - reconnectInterval?: number; - pingInterval?: number; - pingTimeout?: number; -} - -export class WebSocketClient extends EventEmitter { - private conn: WebSocket | null = null; - private baseURL: string; - private handlers: Map = new Map(); - private reconnectInterval: number; - private isConnected: boolean = false; - private pingInterval: number; - private pingTimeout: number; - private shouldReconnect: boolean = true; - private reconnectTimer: NodeJS.Timeout | null = null; - private pingTimer: NodeJS.Timeout | null = null; - private pingTimeoutTimer: NodeJS.Timeout | null = null; - private token: string; - private isConnecting: boolean = false; - - constructor( - token: string, - endpoint: string, - options: ClientOptions = {} - ) { - super(); - - this.token = token; - this.baseURL = options.baseURL || endpoint; - this.reconnectInterval = options.reconnectInterval || 5000; - this.pingInterval = options.pingInterval || 30000; - this.pingTimeout = options.pingTimeout || 10000; - } - - public async connect(): Promise { - this.shouldReconnect = true; - if (!this.isConnecting) { - await this.connectWithRetry(); - } - } - - public async close(): Promise { - this.shouldReconnect = false; - - // Clear timers - if (this.reconnectTimer) { - clearTimeout(this.reconnectTimer); - this.reconnectTimer = null; - } - if (this.pingTimer) { - clearInterval(this.pingTimer); - this.pingTimer = null; - } - if (this.pingTimeoutTimer) { - clearTimeout(this.pingTimeoutTimer); - this.pingTimeoutTimer = null; - } - - if (this.conn) { - this.conn.close(1000, 'Client closing'); - this.conn = null; - } - - this.setConnected(false); - } - - public sendMessage(messageType: string, data: any): Promise { - return new Promise((resolve, reject) => { - if (!this.conn || this.conn.readyState !== WebSocket.OPEN) { - reject(new Error('Not connected')); - return; - } - - const message: WSMessage = { - type: messageType, - data: data - }; - - logger.debug(`Sending message: ${messageType}`, data); - - this.conn.send(JSON.stringify(message), (error) => { - if (error) { - reject(error); - } else { - resolve(); - } - }); - }); - } - - public sendMessageInterval( - messageType: string, - data: any, - interval: number - ): () => void { - // Send immediately - this.sendMessage(messageType, data).catch(err => { - logger.error('Failed to send initial message:', err); - }); - - // Set up interval - const intervalId = setInterval(() => { - this.sendMessage(messageType, data).catch(err => { - logger.error('Failed to send message:', err); - }); - }, interval); - - // Return stop function - return () => { - clearInterval(intervalId); - }; - } - - public registerHandler(messageType: string, handler: MessageHandler): void { - this.handlers.set(messageType, handler); - } - - public unregisterHandler(messageType: string): void { - this.handlers.delete(messageType); - } - - public isClientConnected(): boolean { - return this.isConnected; - } - - private async connectWithRetry(): Promise { - if (this.isConnecting || this.isConnected) return; - - this.isConnecting = true; - - while (this.shouldReconnect && !this.isConnected && this.isConnecting) { - try { - await this.establishConnection(); - this.isConnecting = false; - return; - } catch (error) { - logger.error(`Failed to connect: ${error}. Retrying in ${this.reconnectInterval}ms...`); - - if (!this.shouldReconnect || !this.isConnecting) { - this.isConnecting = false; - return; - } - - await new Promise(resolve => { - this.reconnectTimer = setTimeout(resolve, this.reconnectInterval); - }); - } - } - - this.isConnecting = false; - } - - private async establishConnection(): Promise { - // Clean up any existing connection before establishing a new one - if (this.conn) { - this.conn.removeAllListeners(); - this.conn.close(); - this.conn = null; - } - - // Parse the base URL to determine protocol and hostname - const baseURL = new URL(this.baseURL); - const wsProtocol = baseURL.protocol === 'https:' ? 'wss' : 'ws'; - const wsURL = new URL(`${wsProtocol}://${baseURL.host}/api/v1/ws`); - - // Add token and client type to query parameters - wsURL.searchParams.set('token', this.token); - wsURL.searchParams.set('clientType', "remoteExitNode"); - - return new Promise((resolve, reject) => { - const conn = new WebSocket(wsURL.toString()); - - conn.on('open', () => { - logger.debug('WebSocket connection established'); - this.conn = conn; - this.setConnected(true); - this.isConnecting = false; - this.startPingMonitor(); - this.emit('connect'); - resolve(); - }); - - conn.on('message', (data: WebSocket.Data) => { - try { - const message: WSMessage = JSON.parse(data.toString()); - const handler = this.handlers.get(message.type); - if (handler) { - handler(message); - } - this.emit('message', message); - } catch (error) { - logger.error('Failed to parse message:', error); - } - }); - - conn.on('close', (code, reason) => { - logger.debug(`WebSocket connection closed: ${code} ${reason}`); - this.handleDisconnect(); - }); - - conn.on('error', (error) => { - logger.error('WebSocket error:', error); - if (this.conn === null) { - // Connection failed during establishment - reject(error); - } - // Don't call handleDisconnect here as the 'close' event will handle it - }); - - conn.on('pong', () => { - if (this.pingTimeoutTimer) { - clearTimeout(this.pingTimeoutTimer); - this.pingTimeoutTimer = null; - } - }); - }); - } - - private startPingMonitor(): void { - // Clear any existing ping timer to prevent duplicates - if (this.pingTimer) { - clearInterval(this.pingTimer); - this.pingTimer = null; - } - - this.pingTimer = setInterval(() => { - if (this.conn && this.conn.readyState === WebSocket.OPEN) { - this.conn.ping(); - - // Set timeout for pong response - this.pingTimeoutTimer = setTimeout(() => { - logger.error('Ping timeout - no pong received'); - this.handleDisconnect(); - }, this.pingTimeout); - } - }, this.pingInterval); - } - - private handleDisconnect(): void { - // Prevent multiple disconnect handlers from running simultaneously - if (!this.isConnected && !this.isConnecting) { - return; - } - - this.setConnected(false); - this.isConnecting = false; - - // Clear ping timers - if (this.pingTimer) { - clearInterval(this.pingTimer); - this.pingTimer = null; - } - if (this.pingTimeoutTimer) { - clearTimeout(this.pingTimeoutTimer); - this.pingTimeoutTimer = null; - } - - // Clear any existing reconnect timer to prevent multiple reconnection attempts - if (this.reconnectTimer) { - clearTimeout(this.reconnectTimer); - this.reconnectTimer = null; - } - - if (this.conn) { - this.conn.removeAllListeners(); - this.conn = null; - } - - this.emit('disconnect'); - - // Reconnect if needed - if (this.shouldReconnect) { - // Add a small delay before starting reconnection to prevent immediate retry - this.reconnectTimer = setTimeout(() => { - this.connectWithRetry(); - }, 1000); - } - } - - private setConnected(status: boolean): void { - this.isConnected = status; - } -} - -// Factory function for easier instantiation -export function createWebSocketClient( - token: string, - endpoint: string, - options?: ClientOptions -): WebSocketClient { - return new WebSocketClient(token, endpoint, options); -} - -export default WebSocketClient; \ No newline at end of file diff --git a/server/setup/scriptsPg/1.12.0.ts b/server/setup/scriptsPg/1.12.0.ts index 0a87eb96..38cdaf43 100644 --- a/server/setup/scriptsPg/1.12.0.ts +++ b/server/setup/scriptsPg/1.12.0.ts @@ -94,6 +94,7 @@ export default async function migration() { await db.execute(sql`ALTER TABLE "blueprints" ADD CONSTRAINT "blueprints_orgId_orgs_orgId_fk" FOREIGN KEY ("orgId") REFERENCES "public"."orgs"("orgId") ON DELETE cascade ON UPDATE no action;`); + await db.execute(sql`ALTER TABLE "remoteExitNode" ADD COLUMN "secondaryVersion" varchar;`); await db.execute(sql`ALTER TABLE "resources" DROP CONSTRAINT "resources_skipToIdpId_idp_idpId_fk";`); await db.execute(sql`ALTER TABLE "domains" ADD COLUMN "certResolver" varchar;`); await db.execute(sql`ALTER TABLE "domains" ADD COLUMN "customCertResolver" varchar;`); diff --git a/server/setup/scriptsSqlite/1.12.0.ts b/server/setup/scriptsSqlite/1.12.0.ts index 19ea2892..bb357c81 100644 --- a/server/setup/scriptsSqlite/1.12.0.ts +++ b/server/setup/scriptsSqlite/1.12.0.ts @@ -212,6 +212,7 @@ export default async function migration() { db.prepare( `ALTER TABLE 'user' ADD 'lastPasswordChange' integer;` ).run(); + db.prepare(`ALTER TABLE 'remoteExitNode' ADD 'secondaryVersion' text;`).run(); // get all of the domains const domains = db.prepare(`SELECT domainId, baseDomain from domains`).all() as {