From 02c68b6cd3d3b70860cd713a633d8f8077185316 Mon Sep 17 00:00:00 2001 From: Owen Date: Wed, 20 May 2026 15:38:40 -0700 Subject: [PATCH] Reconnect newts when a exit node comes back online --- .../exitNodeReconnectScheduler.ts | 202 ++++++++++++++++++ .../handleRemoteExitNodePingMessage.ts | 18 ++ .../private/routers/remoteExitNode/index.ts | 1 + server/private/routers/ws/messageHandlers.ts | 4 +- 4 files changed, 224 insertions(+), 1 deletion(-) create mode 100644 server/private/routers/remoteExitNode/exitNodeReconnectScheduler.ts diff --git a/server/private/routers/remoteExitNode/exitNodeReconnectScheduler.ts b/server/private/routers/remoteExitNode/exitNodeReconnectScheduler.ts new file mode 100644 index 000000000..0d871583f --- /dev/null +++ b/server/private/routers/remoteExitNode/exitNodeReconnectScheduler.ts @@ -0,0 +1,202 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025-2026 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import axios from "axios"; +import { db, exitNodes, newts, sites } from "@server/db"; +import { eq } from "drizzle-orm"; +import logger from "@server/logger"; +import redisManager from "#private/lib/redis"; +import { sendToClient } from "#private/routers/ws"; + +const INITIAL_DELAY_MS = 15 * 1000; // 15 seconds before first check +const CHECK_INTERVAL_MS = 10 * 1000; // Check every 10 seconds +const MAX_DURATION_MS = 5 * 60 * 1000; // Give up after 5 minutes +const REDIS_PENDING_SET = "exit-node-reconnect-pending"; +const REDIS_HASH_PREFIX = "exit-node-reconnect:"; + +interface PendingReconnect { + startTime: number; + reachableAt: string; +} + +// In-memory tracking for this node +const pendingReconnects = new Map(); + +let schedulerInterval: NodeJS.Timeout | null = null; + +/** + * Schedules a reconnect check for newts connected to the given exit node. + * Called when an exit node transitions from offline to online. + */ +export async function scheduleExitNodeReconnect( + exitNodeId: number, + reachableAt: string +): Promise { + logger.info( + `Scheduling newt reconnect for exit node ${exitNodeId} (reachableAt: ${reachableAt})` + ); + + const entry: PendingReconnect = { + startTime: Date.now(), + reachableAt + }; + + pendingReconnects.set(exitNodeId, entry); + + // Store in Redis if available for cross-node coordination + if (redisManager.isRedisEnabled()) { + await redisManager.sadd(REDIS_PENDING_SET, exitNodeId.toString()); + await redisManager.hset( + `${REDIS_HASH_PREFIX}${exitNodeId}`, + "startTime", + entry.startTime.toString() + ); + await redisManager.hset( + `${REDIS_HASH_PREFIX}${exitNodeId}`, + "reachableAt", + reachableAt + ); + } +} + +/** + * Starts the background interval that checks pending exit node reconnects. + */ +export function startExitNodeReconnectScheduler(): void { + if (schedulerInterval) { + return; + } + + schedulerInterval = setInterval(async () => { + try { + await processPendingReconnects(); + } catch (error) { + logger.error("Error in exit node reconnect scheduler", { error }); + } + }, CHECK_INTERVAL_MS); + + logger.debug("Started exit node reconnect scheduler"); +} + +async function processPendingReconnects(): Promise { + // Merge in-memory and Redis-tracked pending reconnects + const toProcess = new Map(pendingReconnects); + + if (redisManager.isRedisEnabled()) { + const redisIds = await redisManager.smembers(REDIS_PENDING_SET); + for (const idStr of redisIds) { + const id = parseInt(idStr, 10); + if (!toProcess.has(id)) { + const startTimeStr = await redisManager.hget( + `${REDIS_HASH_PREFIX}${id}`, + "startTime" + ); + const reachableAt = await redisManager.hget( + `${REDIS_HASH_PREFIX}${id}`, + "reachableAt" + ); + if (startTimeStr && reachableAt) { + toProcess.set(id, { + startTime: parseInt(startTimeStr, 10), + reachableAt + }); + } + } + } + } + + const now = Date.now(); + + for (const [exitNodeId, entry] of toProcess) { + const elapsed = now - entry.startTime; + + // Give up after max duration + if (elapsed >= MAX_DURATION_MS) { + logger.warn( + `Exit node reconnect check timed out for exit node ${exitNodeId} after 5 minutes` + ); + await removePending(exitNodeId); + continue; + } + + // Respect initial delay + if (elapsed < INITIAL_DELAY_MS) { + continue; + } + + // Check if the exit node HTTP endpoint is reachable + const pingUrl = `${entry.reachableAt}/ping`; + try { + await axios.get(pingUrl, { timeout: 5000 }); + } catch { + logger.debug( + `Exit node ${exitNodeId} not yet reachable at ${pingUrl}` + ); + continue; + } + + // Node is reachable — send reconnect to all connected newts + logger.info( + `Exit node ${exitNodeId} is reachable. Sending newt/wg/reconnect to connected newts.` + ); + + await sendReconnectToNewts(exitNodeId); + await removePending(exitNodeId); + } +} + +async function sendReconnectToNewts(exitNodeId: number): Promise { + try { + const connectedNewts = await db + .select({ newtId: newts.newtId }) + .from(newts) + .innerJoin(sites, eq(newts.siteId, sites.siteId)) + .where(eq(sites.exitNodeId, exitNodeId)); + + if (connectedNewts.length === 0) { + logger.debug( + `No newts found for exit node ${exitNodeId}, nothing to reconnect` + ); + return; + } + + logger.info( + `Sending newt/wg/reconnect to ${connectedNewts.length} newt(s) for exit node ${exitNodeId}` + ); + + const reconnectMessage = { + type: "newt/wg/reconnect", + data: {} + }; + + await Promise.allSettled( + connectedNewts.map(({ newtId }) => + sendToClient(newtId, reconnectMessage) + ) + ); + } catch (error) { + logger.error( + `Failed to send reconnect messages for exit node ${exitNodeId}`, + { error } + ); + } +} + +async function removePending(exitNodeId: number): Promise { + pendingReconnects.delete(exitNodeId); + + if (redisManager.isRedisEnabled()) { + await redisManager.srem(REDIS_PENDING_SET, exitNodeId.toString()); + await redisManager.del(`${REDIS_HASH_PREFIX}${exitNodeId}`); + } +} diff --git a/server/private/routers/remoteExitNode/handleRemoteExitNodePingMessage.ts b/server/private/routers/remoteExitNode/handleRemoteExitNodePingMessage.ts index c2c710e11..10bf36d7c 100644 --- a/server/private/routers/remoteExitNode/handleRemoteExitNodePingMessage.ts +++ b/server/private/routers/remoteExitNode/handleRemoteExitNodePingMessage.ts @@ -16,6 +16,7 @@ import { MessageHandler } from "@server/routers/ws"; import { RemoteExitNode } from "@server/db"; import { eq } from "drizzle-orm"; import logger from "@server/logger"; +import { scheduleExitNodeReconnect } from "./exitNodeReconnectScheduler"; /** * Handles ping messages from clients and responds with pong @@ -37,6 +38,13 @@ export const handleRemoteExitNodePingMessage: MessageHandler = async ( } try { + // Fetch the current state before updating so we can detect the offline→online transition + const [currentExitNode] = await db + .select({ online: exitNodes.online, reachableAt: exitNodes.reachableAt }) + .from(exitNodes) + .where(eq(exitNodes.exitNodeId, remoteExitNode.exitNodeId)) + .limit(1); + // Update the exit node's last ping timestamp await db .update(exitNodes) @@ -45,6 +53,16 @@ export const handleRemoteExitNodePingMessage: MessageHandler = async ( online: true }) .where(eq(exitNodes.exitNodeId, remoteExitNode.exitNodeId)); + + // If the exit node was offline and is now coming online, schedule newt reconnects + if (currentExitNode && !currentExitNode.online && currentExitNode.reachableAt) { + scheduleExitNodeReconnect( + remoteExitNode.exitNodeId, + currentExitNode.reachableAt + ).catch((error) => { + logger.error("Failed to schedule exit node reconnect", { error }); + }); + } } catch (error) { logger.error("Error handling ping message", { error }); } diff --git a/server/private/routers/remoteExitNode/index.ts b/server/private/routers/remoteExitNode/index.ts index 730f6b693..953ccba88 100644 --- a/server/private/routers/remoteExitNode/index.ts +++ b/server/private/routers/remoteExitNode/index.ts @@ -22,3 +22,4 @@ export * from "./listRemoteExitNodes"; export * from "./pickRemoteExitNodeDefaults"; export * from "./quickStartRemoteExitNode"; export * from "./offlineChecker"; +export * from "./exitNodeReconnectScheduler"; diff --git a/server/private/routers/ws/messageHandlers.ts b/server/private/routers/ws/messageHandlers.ts index b2553871e..d91726393 100644 --- a/server/private/routers/ws/messageHandlers.ts +++ b/server/private/routers/ws/messageHandlers.ts @@ -14,7 +14,8 @@ import { handleRemoteExitNodeRegisterMessage, handleRemoteExitNodePingMessage, - startRemoteExitNodeOfflineChecker + startRemoteExitNodeOfflineChecker, + startExitNodeReconnectScheduler } from "#private/routers/remoteExitNode"; import { MessageHandler } from "@server/routers/ws"; import { build } from "@server/build"; @@ -29,4 +30,5 @@ export const messageHandlers: Record = { if (build != "saas") { startRemoteExitNodeOfflineChecker(); // this is to handle the offline check for remote exit nodes + startExitNodeReconnectScheduler(); // check pending exit node reconnects and notify newts }