From d6abe83fdcabc1c22ddf9c211cb82bf33815413c Mon Sep 17 00:00:00 2001 From: Josh Voyles <120749218+Josh-Voyles@users.noreply.github.com> Date: Sat, 2 May 2026 07:37:18 -0400 Subject: [PATCH] fix: memory improvements - SQLite: enable WAL mode and PRAGMA performance settings - ws.ts (public + private): fix clientConfigVersions memory leak - internal server: add rate limiting and request timeouts - audit log: fix flush re-queue feedback loop - memory: add monitoring instrumentation - security: remove debug log of full request body --- server/cleanup.ts | 2 ++ server/db/sqlite/driver.ts | 30 ++++++++++++++++++++++++ server/index.ts | 26 ++++++++++++++++++++ server/internalServer.ts | 21 +++++++++++++++++ server/private/routers/ws/ws.ts | 13 ++++++++++ server/routers/badger/exchangeSession.ts | 2 +- server/routers/badger/logRequestAudit.ts | 16 ++++++------- server/routers/badger/verifySession.ts | 2 +- server/routers/ws/ws.ts | 10 ++++++++ 9 files changed, 112 insertions(+), 10 deletions(-) diff --git a/server/cleanup.ts b/server/cleanup.ts index 10e9f4cc3..a1c7fec59 100644 --- a/server/cleanup.ts +++ b/server/cleanup.ts @@ -3,9 +3,11 @@ import { flushConnectionLogToDb } from "#dynamic/routers/newt"; import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator"; import { cleanup as wsCleanup } from "#dynamic/routers/ws"; +import { shutdownAuditLogger } from "@server/routers/badger/logRequestAudit"; async function cleanup() { await stopPingAccumulator(); + await shutdownAuditLogger(); await flushBandwidthToDb(); await flushConnectionLogToDb(); await flushSiteBandwidthToDb(); diff --git a/server/db/sqlite/driver.ts b/server/db/sqlite/driver.ts index 832ff16f9..1b0b0b526 100644 --- a/server/db/sqlite/driver.ts +++ b/server/db/sqlite/driver.ts @@ -13,6 +13,36 @@ bootstrapVolume(); function createDb() { const sqlite = new Database(location); + + // Enable WAL mode for dramatically better concurrent read/write + // performance. Without this, readers block writers and vice versa, + // causing severe contention when multiple subsystems (verifySession, + // TraefikConfigManager, audit log flushes, ping flushes) all share + // this single connection. WAL mode allows concurrent readers with a + // single writer, which is the typical access pattern. + sqlite.pragma("journal_mode = WAL"); + + // Wait up to 5 seconds when the database is locked instead of + // failing immediately with SQLITE_BUSY. This prevents transient + // write failures from causing audit log buffer re-queues and retry + // loops that accumulate memory. + sqlite.pragma("busy_timeout = 5000"); + + // NORMAL synchronous mode is safe with WAL and significantly reduces + // the time each write holds the database lock. + sqlite.pragma("synchronous = NORMAL"); + + // Increase the page cache to 64 MB (negative value = KB). The + // default (2 MB) causes frequent I/O round-trips on the large JOIN + // queries used by TraefikConfigManager, which block the event loop + // for longer than necessary. + sqlite.pragma("cache_size = -65536"); + + // Enable memory-mapped I/O for reads (256 MB). This allows the OS + // to serve read queries from the page cache without going through + // SQLite's own cache, reducing event-loop blocking time. + sqlite.pragma("mmap_size = 268435456"); + return DrizzleSqlite(sqlite, { schema }); diff --git a/server/index.ts b/server/index.ts index e3a6ba049..f3274d2fb 100644 --- a/server/index.ts +++ b/server/index.ts @@ -24,6 +24,29 @@ import license from "#dynamic/license/license"; import { initLogCleanupInterval } from "@server/lib/cleanupLogs"; import { initAcmeCertSync } from "#dynamic/lib/acmeCertSync"; import { fetchServerIp } from "@server/lib/serverIpService"; +import logger from "@server/logger"; + +/** + * Periodic memory usage logging for monitoring and leak detection. + * Logs heap usage, external (native) memory, and RSS every 60 seconds. + * This is lightweight (single process.memoryUsage() call) and provides + * the data needed to detect slow memory growth over hours/days. + */ +function startMemoryMonitor(): void { + const INTERVAL_MS = 60_000; // every 60 seconds + const timer = setInterval(() => { + const mem = process.memoryUsage(); + logger.info( + `Memory usage - ` + + `heapUsed: ${(mem.heapUsed / 1024 / 1024).toFixed(1)}MB, ` + + `heapTotal: ${(mem.heapTotal / 1024 / 1024).toFixed(1)}MB, ` + + `rss: ${(mem.rss / 1024 / 1024).toFixed(1)}MB, ` + + `external: ${(mem.external / 1024 / 1024).toFixed(1)}MB, ` + + `arrayBuffers: ${(mem.arrayBuffers / 1024 / 1024).toFixed(1)}MB` + ); + }, INTERVAL_MS); + timer.unref(); +} async function startServers() { await setHostMeta(); @@ -42,6 +65,9 @@ async function startServers() { initLogCleanupInterval(); initAcmeCertSync(); + // Start memory monitoring for leak detection + startMemoryMonitor(); + // Start all servers const apiServer = createApiServer(); const internalServer = createInternalServer(); diff --git a/server/internalServer.ts b/server/internalServer.ts index 83872e7f9..6eb207121 100644 --- a/server/internalServer.ts +++ b/server/internalServer.ts @@ -10,6 +10,8 @@ import { } from "@server/middlewares"; import { internalRouter } from "#dynamic/routers/internal"; import { stripDuplicateSesions } from "./middlewares/stripDuplicateSessions"; +import { requestTimeoutMiddleware } from "./middlewares/requestTimeout"; +import rateLimit from "express-rate-limit"; const internalPort = config.getRawConfig().server.internal_port; @@ -27,6 +29,25 @@ export function createInternalServer() { internalServer.use(cookieParser()); internalServer.use(express.json()); + // Prevent requests from hanging indefinitely. Without this, if a + // database query blocks (especially on SQLite), pending requests + // accumulate in memory with no upper bound on lifetime. + internalServer.use(requestTimeoutMiddleware(30000)); // 30 second timeout + + // Rate-limit the internal verify-session endpoint. This server + // handles forward-auth requests from Traefik/Badger. Under heavy + // monitoring (e.g. Uptime Kuma), requests can arrive faster than + // SQLite can serve them, causing unbounded request queuing and + // memory growth. + internalServer.use( + rateLimit({ + windowMs: 60 * 1000, // 1 minute window + max: 1000, // generous limit: ~17 req/s + standardHeaders: true, + legacyHeaders: false + }) + ); + const prefix = `/api/v1`; internalServer.use(prefix, internalRouter); diff --git a/server/private/routers/ws/ws.ts b/server/private/routers/ws/ws.ts index 0970735e0..83bae0162 100644 --- a/server/private/routers/ws/ws.ts +++ b/server/private/routers/ws/ws.ts @@ -406,6 +406,10 @@ const removeClient = async ( const updatedClients = existingClients.filter((client) => client !== ws); if (updatedClients.length === 0) { connectedClients.delete(mapKey); + // Clean up config version tracking to prevent unbounded memory + // growth. Without this, every unique clientId that ever connects + // leaves a permanent entry in clientConfigVersions. + clientConfigVersions.delete(clientId); if (redisManager.isRedisEnabled()) { try { @@ -413,6 +417,9 @@ const removeClient = async ( await redisManager.del( getNodeConnectionsKey(NODE_ID, clientId) ); + // Also clean up the Redis config version key so it doesn't + // accumulate indefinitely in Redis either. + await redisManager.del(getConfigVersionKey(clientId)); } catch (error) { logger.error( "Failed to remove client from Redis tracking (cleanup will occur on recovery):", @@ -1097,6 +1104,12 @@ const disconnectClient = async (clientId: string): Promise => { } }); + // Eagerly clean up tracking maps. The close event handlers will also + // call removeClient, but if the socket is already in CLOSING state + // the close event may never fire, leaving zombie entries. + connectedClients.delete(mapKey); + clientConfigVersions.delete(clientId); + return true; }; diff --git a/server/routers/badger/exchangeSession.ts b/server/routers/badger/exchangeSession.ts index 08987961d..1fc52cea7 100644 --- a/server/routers/badger/exchangeSession.ts +++ b/server/routers/badger/exchangeSession.ts @@ -41,7 +41,7 @@ export async function exchangeSession( res: Response, next: NextFunction ): Promise { - logger.debug("Exchange session: Badger sent", req.body); + logger.debug("Exchange session: Badger request received"); const parsedBody = exchangeSessionBodySchema.safeParse(req.body); diff --git a/server/routers/badger/logRequestAudit.ts b/server/routers/badger/logRequestAudit.ts index 884fb7ae4..39f0eb3ca 100644 --- a/server/routers/badger/logRequestAudit.ts +++ b/server/routers/badger/logRequestAudit.ts @@ -84,14 +84,14 @@ async function flushAuditLogs() { logger.debug(`Flushed ${logsToWrite.length} audit logs to database`); } catch (error) { logger.error("Error flushing audit logs:", error); - // On transaction error, put logs back at the front of the buffer to retry - // but only if buffer isn't too large - if (auditLogBuffer.length < MAX_BUFFER_SIZE - logsToWrite.length) { - auditLogBuffer.unshift(...logsToWrite); - logger.info(`Re-queued ${logsToWrite.length} audit logs for retry`); - } else { - logger.error(`Buffer full, dropped ${logsToWrite.length} audit logs`); - } + // On transaction error, drop the logs rather than re-queuing them. + // The previous re-queue approach created a positive feedback loop: + // failed flush → re-queue → larger next flush → longer DB lock → + // higher chance of next failure → repeat. This caused unbounded + // memory growth on SQLite where write contention is common. + // Audit logs are best-effort telemetry — losing a batch on error + // is acceptable; leaking memory until the process crashes is not. + logger.warn(`Dropped ${logsToWrite.length} audit logs after flush failure`); } finally { isFlushInProgress = false; // If buffer filled up while we were flushing, flush again diff --git a/server/routers/badger/verifySession.ts b/server/routers/badger/verifySession.ts index e2e5f6766..b80a1ada0 100644 --- a/server/routers/badger/verifySession.ts +++ b/server/routers/badger/verifySession.ts @@ -80,7 +80,7 @@ export async function verifyResourceSession( res: Response, next: NextFunction ): Promise { - logger.debug("Verify session: Badger sent", req.body); // remove when done testing + logger.debug("Verify session: Badger request received"); const parsedBody = verifyResourceSessionSchema.safeParse(req.body); diff --git a/server/routers/ws/ws.ts b/server/routers/ws/ws.ts index 6e6312715..51274cc20 100644 --- a/server/routers/ws/ws.ts +++ b/server/routers/ws/ws.ts @@ -80,6 +80,10 @@ const removeClient = async ( const updatedClients = existingClients.filter((client) => client !== ws); if (updatedClients.length === 0) { connectedClients.delete(mapKey); + // Clean up config version tracking to prevent unbounded memory + // growth. Without this, every unique clientId that ever connects + // leaves a permanent entry in clientConfigVersions. + clientConfigVersions.delete(clientId); logger.info( `All connections removed for ${clientType.toUpperCase()} ID: ${clientId}` @@ -507,6 +511,12 @@ const disconnectClient = async (clientId: string): Promise => { } }); + // Eagerly clean up tracking maps. The close event handlers will also + // call removeClient, but if the socket is already in CLOSING state + // the close event may never fire, leaving zombie entries. + connectedClients.delete(mapKey); + clientConfigVersions.delete(clientId); + return true; };