diff --git a/server/db/schema.ts b/server/db/schema.ts index 80baa52a..92710278 100644 --- a/server/db/schema.ts +++ b/server/db/schema.ts @@ -153,6 +153,7 @@ export const clients = sqliteTable("clients", { megabytesIn: integer("bytesIn"), megabytesOut: integer("bytesOut"), lastBandwidthUpdate: text("lastBandwidthUpdate"), + lastPing: text("lastPing"), type: text("type").notNull(), // "olm" online: integer("online", { mode: "boolean" }).notNull().default(false), endpoint: text("endpoint"), diff --git a/server/routers/messageHandlers.ts b/server/routers/messageHandlers.ts index 759a88ea..8fb240c1 100644 --- a/server/routers/messageHandlers.ts +++ b/server/routers/messageHandlers.ts @@ -1,6 +1,5 @@ -import { handleNewtRegisterMessage, handleReceiveBandwidthMessage } from "./newt"; -import { handleOlmRegisterMessage, handleOlmRelayMessage } from "./olm"; -import { handleGetConfigMessage } from "./newt/handleGetConfigMessage"; +import { handleNewtRegisterMessage, handleReceiveBandwidthMessage, handleGetConfigMessage } from "./newt"; +import { handleOlmRegisterMessage, handleOlmRelayMessage, handleOlmPingMessage } from "./olm"; import { MessageHandler } from "./ws"; export const messageHandlers: Record = { @@ -8,5 +7,6 @@ export const messageHandlers: Record = { "olm/wg/register": handleOlmRegisterMessage, "newt/wg/get-config": handleGetConfigMessage, "newt/receive-bandwidth": handleReceiveBandwidthMessage, - "olm/wg/relay": handleOlmRelayMessage + "olm/wg/relay": handleOlmRelayMessage, + "olm/ping": handleOlmPingMessage }; diff --git a/server/routers/newt/handleGetConfigMessage.ts b/server/routers/newt/handleGetConfigMessage.ts index 861a2e54..19e91f8a 100644 --- a/server/routers/newt/handleGetConfigMessage.ts +++ b/server/routers/newt/handleGetConfigMessage.ts @@ -105,18 +105,22 @@ export const handleGetConfigMessage: MessageHandler = async (context) => { .innerJoin(clientSites, eq(clients.clientId, clientSites.clientId)) .where(eq(clientSites.siteId, siteId)); - const now = new Date().getTime() / 1000; const peers = await Promise.all( clientsRes .filter((client) => { - // This filter wasn't returning anything - fixed to properly filter clients - if ( - !client.clients.lastHolePunch || - now - client.clients.lastHolePunch > 6 - ) { - logger.warn("Client last hole punch is too old"); + if (!client.clients.pubKey) { return false; } + if (!client.clients.subnet) { + return false; + } + if (!client.clients.endpoint) { + return false; + } + if (!client.clients.online) { + return false; + } + return true; }) .map(async (client) => { diff --git a/server/routers/newt/handleReceiveBandwidthMessage.ts b/server/routers/newt/handleReceiveBandwidthMessage.ts index 80b0c57d..41bfd39a 100644 --- a/server/routers/newt/handleReceiveBandwidthMessage.ts +++ b/server/routers/newt/handleReceiveBandwidthMessage.ts @@ -37,22 +37,6 @@ export const handleReceiveBandwidthMessage: MessageHandler = async (context) => if (!client) { continue; } - let online = client.online; - - // if the bandwidth for the client is > 0 then set it to online. if it has been less than 0 (no update) for 5 minutes then set it to offline - if (bytesIn > 0) { // only track the bytes in because we are always sending bytes out with persistent keep alive - online = true; - } else if (client.lastBandwidthUpdate) { - const lastBandwidthUpdate = new Date( - client.lastBandwidthUpdate - ); - const currentTime = new Date(); - const diff = - currentTime.getTime() - lastBandwidthUpdate.getTime(); - if (diff < 300000) { - online = false; - } - } // Update the client's bandwidth usage await trx @@ -61,7 +45,6 @@ export const handleReceiveBandwidthMessage: MessageHandler = async (context) => megabytesOut: (client.megabytesIn || 0) + bytesIn, megabytesIn: (client.megabytesOut || 0) + bytesOut, lastBandwidthUpdate: new Date().toISOString(), - online }) .where(eq(clients.clientId, client.clientId)); } diff --git a/server/routers/newt/index.ts b/server/routers/newt/index.ts index aa72fc6f..700f3c37 100644 --- a/server/routers/newt/index.ts +++ b/server/routers/newt/index.ts @@ -1,4 +1,5 @@ export * from "./createNewt"; export * from "./getNewtToken"; export * from "./handleNewtRegisterMessage"; -export* from "./handleReceiveBandwidthMessage"; \ No newline at end of file +export * from "./handleReceiveBandwidthMessage"; +export * from "./handleGetConfigMessage"; \ No newline at end of file diff --git a/server/routers/olm/handleOlmPingMessage.ts b/server/routers/olm/handleOlmPingMessage.ts new file mode 100644 index 00000000..10067832 --- /dev/null +++ b/server/routers/olm/handleOlmPingMessage.ts @@ -0,0 +1,94 @@ +import db from "@server/db"; +import { MessageHandler } from "../ws"; +import { clients, Olm } from "@server/db/schema"; +import { eq, lt, isNull } from "drizzle-orm"; +import logger from "@server/logger"; +import { time } from "console"; + +// Track if the offline checker interval is running +let offlineCheckerInterval: NodeJS.Timeout | null = null; +const OFFLINE_CHECK_INTERVAL = 30 * 1000; // Check every 30 seconds +const OFFLINE_THRESHOLD_MS = 2 * 60 * 1000; // 2 minutes + +/** + * Starts the background interval that checks for clients that haven't pinged recently + * and marks them as offline + */ +export const startOfflineChecker = (): void => { + if (offlineCheckerInterval) { + return; // Already running + } + + offlineCheckerInterval = setInterval(async () => { + try { + const twoMinutesAgo = new Date(Date.now() - OFFLINE_THRESHOLD_MS); + + // Find clients that haven't pinged in the last 2 minutes and mark them as offline + await db + .update(clients) + .set({ online: false }) + .where( + eq(clients.online, true) && + (lt(clients.lastPing, twoMinutesAgo.toISOString()) || isNull(clients.lastPing)) + ); + + } catch (error) { + logger.error("Error in offline checker interval", { error }); + } + }, OFFLINE_CHECK_INTERVAL); + + logger.info("Started offline checker interval"); +} + +/** + * Stops the background interval that checks for offline clients + */ +export const stopOfflineChecker = (): void => { + if (offlineCheckerInterval) { + clearInterval(offlineCheckerInterval); + offlineCheckerInterval = null; + logger.info("Stopped offline checker interval"); + } +} + +/** + * Handles ping messages from clients and responds with pong + */ +export const handleOlmPingMessage: MessageHandler = async (context) => { + const { message, client: c, sendToClient } = context; + const olm = c as Olm; + + if (!olm) { + logger.warn("Olm not found"); + return; + } + + if (!olm.clientId) { + logger.warn("Olm has no client ID!"); + return; + } + + try { + // Update the client's last ping timestamp + await db + .update(clients) + .set({ + lastPing: new Date().toISOString(), + online: true, + }) + .where(eq(clients.clientId, olm.clientId)); + } catch (error) { + logger.error("Error handling ping message", { error }); + } + + return { + message: { + type: "pong", + data: { + timestamp: new Date().toISOString(), + } + }, + broadcast: false, + excludeSender: false + }; +}; \ No newline at end of file diff --git a/server/routers/olm/index.ts b/server/routers/olm/index.ts index b7373961..8426612e 100644 --- a/server/routers/olm/index.ts +++ b/server/routers/olm/index.ts @@ -1,4 +1,5 @@ export * from "./handleOlmRegisterMessage"; export * from "./getOlmToken"; export * from "./createOlm"; -export * from "./handleOlmRelayMessage"; \ No newline at end of file +export * from "./handleOlmRelayMessage"; +export * from "./handleOlmPingMessage"; \ No newline at end of file