From 1a43f1ef4bb1454c1bc60a1bf2d2dc1c45ae6688 Mon Sep 17 00:00:00 2001 From: Owen Date: Sat, 14 Mar 2026 11:59:20 -0700 Subject: [PATCH] Handle newt online offline with websocket --- server/db/pg/schema/schema.ts | 1 + server/db/sqlite/schema/schema.ts | 1 + server/private/routers/ws/ws.ts | 28 ++- server/routers/newt/handleNewtPingMessage.ts | 197 ++++++++++--------- server/routers/ws/messageHandlers.ts | 4 +- server/routers/ws/ws.ts | 27 ++- 6 files changed, 159 insertions(+), 99 deletions(-) diff --git a/server/db/pg/schema/schema.ts b/server/db/pg/schema/schema.ts index 2c98fd323..b93c21fd6 100644 --- a/server/db/pg/schema/schema.ts +++ b/server/db/pg/schema/schema.ts @@ -89,6 +89,7 @@ export const sites = pgTable("sites", { lastBandwidthUpdate: varchar("lastBandwidthUpdate"), type: varchar("type").notNull(), // "newt" or "wireguard" online: boolean("online").notNull().default(false), + lastPing: integer("lastPing"), address: varchar("address"), endpoint: varchar("endpoint"), publicKey: varchar("publicKey"), diff --git a/server/db/sqlite/schema/schema.ts b/server/db/sqlite/schema/schema.ts index 510d3a1a0..188caac2b 100644 --- a/server/db/sqlite/schema/schema.ts +++ b/server/db/sqlite/schema/schema.ts @@ -90,6 +90,7 @@ export const sites = sqliteTable("sites", { lastBandwidthUpdate: text("lastBandwidthUpdate"), type: text("type").notNull(), // "newt" or "wireguard" online: integer("online", { mode: "boolean" }).notNull().default(false), + lastPing: integer("lastPing"), // exit node stuff that is how to connect to the site when it has a wg server address: text("address"), // this is the address of the wireguard interface in newt diff --git a/server/private/routers/ws/ws.ts b/server/private/routers/ws/ws.ts index f10df2863..eec9cfe89 100644 --- a/server/private/routers/ws/ws.ts +++ b/server/private/routers/ws/ws.ts @@ -25,7 +25,8 @@ import { OlmSession, RemoteExitNode, RemoteExitNodeSession, - remoteExitNodes + remoteExitNodes, + sites } from "@server/db"; import { eq } from "drizzle-orm"; import { db } from "@server/db"; @@ -846,6 +847,31 @@ const setupConnection = async ( ); }); + // Handle WebSocket protocol-level pings from older newt clients that do + // not send application-level "newt/ping" messages. Update the site's + // online state and lastPing timestamp so the offline checker treats them + // the same as modern newt clients. + if (clientType === "newt") { + const newtClient = client as Newt; + ws.on("ping", async () => { + if (!newtClient.siteId) return; + try { + await db + .update(sites) + .set({ + online: true, + lastPing: Math.floor(Date.now() / 1000) + }) + .where(eq(sites.siteId, newtClient.siteId)); + } catch (error) { + logger.error( + "Error updating newt site online state on WS ping", + { error } + ); + } + }); + } + ws.on("error", (error: Error) => { logger.error( `WebSocket error for ${clientType.toUpperCase()} ID ${clientId}:`, diff --git a/server/routers/newt/handleNewtPingMessage.ts b/server/routers/newt/handleNewtPingMessage.ts index dc9aacdd9..319647b83 100644 --- a/server/routers/newt/handleNewtPingMessage.ts +++ b/server/routers/newt/handleNewtPingMessage.ts @@ -1,105 +1,107 @@ -import { db, sites } from "@server/db"; -import { disconnectClient, getClientConfigVersion } from "#dynamic/routers/ws"; +import { db, newts, sites } from "@server/db"; +import { hasActiveConnections, getClientConfigVersion } from "#dynamic/routers/ws"; import { MessageHandler } from "@server/routers/ws"; -import { clients, Newt } from "@server/db"; +import { Newt } from "@server/db"; import { eq, lt, isNull, and, or } from "drizzle-orm"; import logger from "@server/logger"; -import { validateSessionToken } from "@server/auth/sessions/app"; -import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy"; -import { sendTerminateClient } from "../client/terminate"; -import { encodeHexLowerCase } from "@oslojs/encoding"; -import { sha256 } from "@oslojs/crypto/sha2"; import { sendNewtSyncMessage } from "./sync"; // Track if the offline checker interval is running -// let offlineCheckerInterval: NodeJS.Timeout | null = null; -// const OFFLINE_CHECK_INTERVAL = 30 * 1000; // Check every 30 seconds -// const OFFLINE_THRESHOLD_MS = 2 * 60 * 1000; // 2 minutes +let offlineCheckerInterval: NodeJS.Timeout | null = null; +const OFFLINE_CHECK_INTERVAL = 30 * 1000; // Check every 30 seconds +const OFFLINE_THRESHOLD_MS = 2 * 60 * 1000; // 2 minutes /** - * Starts the background interval that checks for clients that haven't pinged recently - * and marks them as offline + * Starts the background interval that checks for newt sites that haven't + * pinged recently and marks them as offline. For backward compatibility, + * a site is only marked offline when there is no active WebSocket connection + * either — so older newt versions that don't send pings but remain connected + * continue to be treated as online. */ -// export const startNewtOfflineChecker = (): void => { -// if (offlineCheckerInterval) { -// return; // Already running -// } +export const startNewtOfflineChecker = (): void => { + if (offlineCheckerInterval) { + return; // Already running + } -// offlineCheckerInterval = setInterval(async () => { -// try { -// const twoMinutesAgo = Math.floor( -// (Date.now() - OFFLINE_THRESHOLD_MS) / 1000 -// ); + offlineCheckerInterval = setInterval(async () => { + try { + const twoMinutesAgo = Math.floor( + (Date.now() - OFFLINE_THRESHOLD_MS) / 1000 + ); -// // TODO: WE NEED TO MAKE SURE THIS WORKS WITH DISTRIBUTED NODES ALL DOING THE SAME THING + // Find all online newt-type sites that haven't pinged recently + // (or have never pinged at all). Join newts to obtain the newtId + // needed for the WebSocket connection check. + const staleSites = await db + .select({ + siteId: sites.siteId, + newtId: newts.newtId, + lastPing: sites.lastPing + }) + .from(sites) + .innerJoin(newts, eq(newts.siteId, sites.siteId)) + .where( + and( + eq(sites.online, true), + eq(sites.type, "newt"), + or( + lt(sites.lastPing, twoMinutesAgo), + isNull(sites.lastPing) + ) + ) + ); -// // Find clients that haven't pinged in the last 2 minutes and mark them as offline -// const offlineClients = await db -// .update(clients) -// .set({ online: false }) -// .where( -// and( -// eq(clients.online, true), -// or( -// lt(clients.lastPing, twoMinutesAgo), -// isNull(clients.lastPing) -// ) -// ) -// ) -// .returning(); + for (const staleSite of staleSites) { + // Backward-compatibility check: if the newt still has an + // active WebSocket connection (older clients that don't send + // pings), keep the site online. + const isConnected = await hasActiveConnections(staleSite.newtId); + if (isConnected) { + logger.debug( + `Newt ${staleSite.newtId} has not pinged recently but is still connected via WebSocket — keeping site ${staleSite.siteId} online` + ); + continue; + } -// for (const offlineClient of offlineClients) { -// logger.info( -// `Kicking offline newt client ${offlineClient.clientId} due to inactivity` -// ); + logger.info( + `Marking site ${staleSite.siteId} offline: newt ${staleSite.newtId} has no recent ping and no active WebSocket connection` + ); -// if (!offlineClient.newtId) { -// logger.warn( -// `Offline client ${offlineClient.clientId} has no newtId, cannot disconnect` -// ); -// continue; -// } + await db + .update(sites) + .set({ online: false }) + .where(eq(sites.siteId, staleSite.siteId)); + } + } catch (error) { + logger.error("Error in newt offline checker interval", { error }); + } + }, OFFLINE_CHECK_INTERVAL); -// // Send a disconnect message to the client if connected -// try { -// await sendTerminateClient( -// offlineClient.clientId, -// offlineClient.newtId -// ); // terminate first -// // wait a moment to ensure the message is sent -// await new Promise((resolve) => setTimeout(resolve, 1000)); -// await disconnectClient(offlineClient.newtId); -// } catch (error) { -// logger.error( -// `Error sending disconnect to offline newt ${offlineClient.clientId}`, -// { error } -// ); -// } -// } -// } catch (error) { -// logger.error("Error in offline checker interval", { error }); -// } -// }, OFFLINE_CHECK_INTERVAL); - -// logger.debug("Started offline checker interval"); -// }; + logger.debug("Started newt offline checker interval"); +}; /** - * Stops the background interval that checks for offline clients + * Stops the background interval that checks for offline newt sites. */ -// export const stopNewtOfflineChecker = (): void => { -// if (offlineCheckerInterval) { -// clearInterval(offlineCheckerInterval); -// offlineCheckerInterval = null; -// logger.info("Stopped offline checker interval"); -// } -// }; +export const stopNewtOfflineChecker = (): void => { + if (offlineCheckerInterval) { + clearInterval(offlineCheckerInterval); + offlineCheckerInterval = null; + logger.info("Stopped newt offline checker interval"); + } +}; /** - * Handles ping messages from clients and responds with pong + * Handles ping messages from newt clients. + * + * On each ping: + * - Marks the associated site as online. + * - Records the current timestamp as the newt's last-ping time. + * - Triggers a config sync if the newt is running an outdated config version. + * - Responds with a pong message. */ export const handleNewtPingMessage: MessageHandler = async (context) => { - const { message, client: c, sendToClient } = context; + const { message, client: c } = context; const newt = c as Newt; if (!newt) { @@ -112,15 +114,31 @@ export const handleNewtPingMessage: MessageHandler = async (context) => { return; } - // get the version + try { + // Mark the site as online and record the ping timestamp. + await db + .update(sites) + .set({ + online: true, + lastPing: Math.floor(Date.now() / 1000) + }) + .where(eq(sites.siteId, newt.siteId)); + } catch (error) { + logger.error("Error updating online state on newt ping", { error }); + } + + // Check config version and sync if stale. const configVersion = await getClientConfigVersion(newt.newtId); - if (message.configVersion && configVersion != null && configVersion != message.configVersion) { + if ( + message.configVersion != null && + configVersion != null && + configVersion !== message.configVersion + ) { logger.warn( `Newt ping with outdated config version: ${message.configVersion} (current: ${configVersion})` ); - // get the site const [site] = await db .select() .from(sites) @@ -137,19 +155,6 @@ export const handleNewtPingMessage: MessageHandler = async (context) => { await sendNewtSyncMessage(newt, site); } - // try { - // // Update the client's last ping timestamp - // await db - // .update(clients) - // .set({ - // lastPing: Math.floor(Date.now() / 1000), - // online: true - // }) - // .where(eq(clients.clientId, newt.clientId)); - // } catch (error) { - // logger.error("Error handling ping message", { error }); - // } - return { message: { type: "pong", diff --git a/server/routers/ws/messageHandlers.ts b/server/routers/ws/messageHandlers.ts index f041c9d56..25eb578e1 100644 --- a/server/routers/ws/messageHandlers.ts +++ b/server/routers/ws/messageHandlers.ts @@ -6,7 +6,8 @@ import { handleDockerContainersMessage, handleNewtPingRequestMessage, handleApplyBlueprintMessage, - handleNewtPingMessage + handleNewtPingMessage, + startNewtOfflineChecker } from "../newt"; import { handleOlmRegisterMessage, @@ -43,3 +44,4 @@ export const messageHandlers: Record = { }; startOlmOfflineChecker(); // this is to handle the offline check for olms +startNewtOfflineChecker(); // this is to handle the offline check for newts diff --git a/server/routers/ws/ws.ts b/server/routers/ws/ws.ts index c7085fba9..08a7dbd4c 100644 --- a/server/routers/ws/ws.ts +++ b/server/routers/ws/ws.ts @@ -3,7 +3,7 @@ import zlib from "zlib"; import { Server as HttpServer } from "http"; import { WebSocket, WebSocketServer } from "ws"; import { Socket } from "net"; -import { Newt, newts, NewtSession, olms, Olm, OlmSession } from "@server/db"; +import { Newt, newts, NewtSession, olms, Olm, OlmSession, sites } from "@server/db"; import { eq } from "drizzle-orm"; import { db } from "@server/db"; import { validateNewtSessionToken } from "@server/auth/sessions/newt"; @@ -380,6 +380,31 @@ const setupConnection = async ( ); }); + // Handle WebSocket protocol-level pings from older newt clients that do + // not send application-level "newt/ping" messages. Update the site's + // online state and lastPing timestamp so the offline checker treats them + // the same as modern newt clients. + if (clientType === "newt") { + const newtClient = client as Newt; + ws.on("ping", async () => { + if (!newtClient.siteId) return; + try { + await db + .update(sites) + .set({ + online: true, + lastPing: Math.floor(Date.now() / 1000) + }) + .where(eq(sites.siteId, newtClient.siteId)); + } catch (error) { + logger.error( + "Error updating newt site online state on WS ping", + { error } + ); + } + }); + } + ws.on("error", (error: Error) => { logger.error( `WebSocket error for ${clientType.toUpperCase()} ID ${clientId}:`,