mirror of
https://github.com/fosrl/pangolin.git
synced 2026-05-22 08:45:24 +00:00
fix: memory improvements
- SQLite: enable WAL mode and PRAGMA performance settings - ws.ts (public + private): fix clientConfigVersions memory leak - internal server: add rate limiting and request timeouts - audit log: fix flush re-queue feedback loop - memory: add monitoring instrumentation - security: remove debug log of full request body
This commit is contained in:
@@ -3,9 +3,11 @@ import { flushConnectionLogToDb } from "#dynamic/routers/newt";
|
|||||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||||
import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator";
|
import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator";
|
||||||
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
|
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
|
||||||
|
import { shutdownAuditLogger } from "@server/routers/badger/logRequestAudit";
|
||||||
|
|
||||||
async function cleanup() {
|
async function cleanup() {
|
||||||
await stopPingAccumulator();
|
await stopPingAccumulator();
|
||||||
|
await shutdownAuditLogger();
|
||||||
await flushBandwidthToDb();
|
await flushBandwidthToDb();
|
||||||
await flushConnectionLogToDb();
|
await flushConnectionLogToDb();
|
||||||
await flushSiteBandwidthToDb();
|
await flushSiteBandwidthToDb();
|
||||||
|
|||||||
@@ -13,6 +13,36 @@ bootstrapVolume();
|
|||||||
|
|
||||||
function createDb() {
|
function createDb() {
|
||||||
const sqlite = new Database(location);
|
const sqlite = new Database(location);
|
||||||
|
|
||||||
|
// Enable WAL mode for dramatically better concurrent read/write
|
||||||
|
// performance. Without this, readers block writers and vice versa,
|
||||||
|
// causing severe contention when multiple subsystems (verifySession,
|
||||||
|
// TraefikConfigManager, audit log flushes, ping flushes) all share
|
||||||
|
// this single connection. WAL mode allows concurrent readers with a
|
||||||
|
// single writer, which is the typical access pattern.
|
||||||
|
sqlite.pragma("journal_mode = WAL");
|
||||||
|
|
||||||
|
// Wait up to 5 seconds when the database is locked instead of
|
||||||
|
// failing immediately with SQLITE_BUSY. This prevents transient
|
||||||
|
// write failures from causing audit log buffer re-queues and retry
|
||||||
|
// loops that accumulate memory.
|
||||||
|
sqlite.pragma("busy_timeout = 5000");
|
||||||
|
|
||||||
|
// NORMAL synchronous mode is safe with WAL and significantly reduces
|
||||||
|
// the time each write holds the database lock.
|
||||||
|
sqlite.pragma("synchronous = NORMAL");
|
||||||
|
|
||||||
|
// Increase the page cache to 64 MB (negative value = KB). The
|
||||||
|
// default (2 MB) causes frequent I/O round-trips on the large JOIN
|
||||||
|
// queries used by TraefikConfigManager, which block the event loop
|
||||||
|
// for longer than necessary.
|
||||||
|
sqlite.pragma("cache_size = -65536");
|
||||||
|
|
||||||
|
// Enable memory-mapped I/O for reads (256 MB). This allows the OS
|
||||||
|
// to serve read queries from the page cache without going through
|
||||||
|
// SQLite's own cache, reducing event-loop blocking time.
|
||||||
|
sqlite.pragma("mmap_size = 268435456");
|
||||||
|
|
||||||
return DrizzleSqlite(sqlite, {
|
return DrizzleSqlite(sqlite, {
|
||||||
schema
|
schema
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -24,6 +24,29 @@ import license from "#dynamic/license/license";
|
|||||||
import { initLogCleanupInterval } from "@server/lib/cleanupLogs";
|
import { initLogCleanupInterval } from "@server/lib/cleanupLogs";
|
||||||
import { initAcmeCertSync } from "#dynamic/lib/acmeCertSync";
|
import { initAcmeCertSync } from "#dynamic/lib/acmeCertSync";
|
||||||
import { fetchServerIp } from "@server/lib/serverIpService";
|
import { fetchServerIp } from "@server/lib/serverIpService";
|
||||||
|
import logger from "@server/logger";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Periodic memory usage logging for monitoring and leak detection.
|
||||||
|
* Logs heap usage, external (native) memory, and RSS every 60 seconds.
|
||||||
|
* This is lightweight (single process.memoryUsage() call) and provides
|
||||||
|
* the data needed to detect slow memory growth over hours/days.
|
||||||
|
*/
|
||||||
|
function startMemoryMonitor(): void {
|
||||||
|
const INTERVAL_MS = 60_000; // every 60 seconds
|
||||||
|
const timer = setInterval(() => {
|
||||||
|
const mem = process.memoryUsage();
|
||||||
|
logger.info(
|
||||||
|
`Memory usage - ` +
|
||||||
|
`heapUsed: ${(mem.heapUsed / 1024 / 1024).toFixed(1)}MB, ` +
|
||||||
|
`heapTotal: ${(mem.heapTotal / 1024 / 1024).toFixed(1)}MB, ` +
|
||||||
|
`rss: ${(mem.rss / 1024 / 1024).toFixed(1)}MB, ` +
|
||||||
|
`external: ${(mem.external / 1024 / 1024).toFixed(1)}MB, ` +
|
||||||
|
`arrayBuffers: ${(mem.arrayBuffers / 1024 / 1024).toFixed(1)}MB`
|
||||||
|
);
|
||||||
|
}, INTERVAL_MS);
|
||||||
|
timer.unref();
|
||||||
|
}
|
||||||
|
|
||||||
async function startServers() {
|
async function startServers() {
|
||||||
await setHostMeta();
|
await setHostMeta();
|
||||||
@@ -42,6 +65,9 @@ async function startServers() {
|
|||||||
initLogCleanupInterval();
|
initLogCleanupInterval();
|
||||||
initAcmeCertSync();
|
initAcmeCertSync();
|
||||||
|
|
||||||
|
// Start memory monitoring for leak detection
|
||||||
|
startMemoryMonitor();
|
||||||
|
|
||||||
// Start all servers
|
// Start all servers
|
||||||
const apiServer = createApiServer();
|
const apiServer = createApiServer();
|
||||||
const internalServer = createInternalServer();
|
const internalServer = createInternalServer();
|
||||||
|
|||||||
@@ -10,6 +10,8 @@ import {
|
|||||||
} from "@server/middlewares";
|
} from "@server/middlewares";
|
||||||
import { internalRouter } from "#dynamic/routers/internal";
|
import { internalRouter } from "#dynamic/routers/internal";
|
||||||
import { stripDuplicateSesions } from "./middlewares/stripDuplicateSessions";
|
import { stripDuplicateSesions } from "./middlewares/stripDuplicateSessions";
|
||||||
|
import { requestTimeoutMiddleware } from "./middlewares/requestTimeout";
|
||||||
|
import rateLimit from "express-rate-limit";
|
||||||
|
|
||||||
const internalPort = config.getRawConfig().server.internal_port;
|
const internalPort = config.getRawConfig().server.internal_port;
|
||||||
|
|
||||||
@@ -27,6 +29,25 @@ export function createInternalServer() {
|
|||||||
internalServer.use(cookieParser());
|
internalServer.use(cookieParser());
|
||||||
internalServer.use(express.json());
|
internalServer.use(express.json());
|
||||||
|
|
||||||
|
// Prevent requests from hanging indefinitely. Without this, if a
|
||||||
|
// database query blocks (especially on SQLite), pending requests
|
||||||
|
// accumulate in memory with no upper bound on lifetime.
|
||||||
|
internalServer.use(requestTimeoutMiddleware(30000)); // 30 second timeout
|
||||||
|
|
||||||
|
// Rate-limit the internal verify-session endpoint. This server
|
||||||
|
// handles forward-auth requests from Traefik/Badger. Under heavy
|
||||||
|
// monitoring (e.g. Uptime Kuma), requests can arrive faster than
|
||||||
|
// SQLite can serve them, causing unbounded request queuing and
|
||||||
|
// memory growth.
|
||||||
|
internalServer.use(
|
||||||
|
rateLimit({
|
||||||
|
windowMs: 60 * 1000, // 1 minute window
|
||||||
|
max: 1000, // generous limit: ~17 req/s
|
||||||
|
standardHeaders: true,
|
||||||
|
legacyHeaders: false
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
const prefix = `/api/v1`;
|
const prefix = `/api/v1`;
|
||||||
internalServer.use(prefix, internalRouter);
|
internalServer.use(prefix, internalRouter);
|
||||||
|
|
||||||
|
|||||||
@@ -406,6 +406,10 @@ const removeClient = async (
|
|||||||
const updatedClients = existingClients.filter((client) => client !== ws);
|
const updatedClients = existingClients.filter((client) => client !== ws);
|
||||||
if (updatedClients.length === 0) {
|
if (updatedClients.length === 0) {
|
||||||
connectedClients.delete(mapKey);
|
connectedClients.delete(mapKey);
|
||||||
|
// Clean up config version tracking to prevent unbounded memory
|
||||||
|
// growth. Without this, every unique clientId that ever connects
|
||||||
|
// leaves a permanent entry in clientConfigVersions.
|
||||||
|
clientConfigVersions.delete(clientId);
|
||||||
|
|
||||||
if (redisManager.isRedisEnabled()) {
|
if (redisManager.isRedisEnabled()) {
|
||||||
try {
|
try {
|
||||||
@@ -413,6 +417,9 @@ const removeClient = async (
|
|||||||
await redisManager.del(
|
await redisManager.del(
|
||||||
getNodeConnectionsKey(NODE_ID, clientId)
|
getNodeConnectionsKey(NODE_ID, clientId)
|
||||||
);
|
);
|
||||||
|
// Also clean up the Redis config version key so it doesn't
|
||||||
|
// accumulate indefinitely in Redis either.
|
||||||
|
await redisManager.del(getConfigVersionKey(clientId));
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(
|
logger.error(
|
||||||
"Failed to remove client from Redis tracking (cleanup will occur on recovery):",
|
"Failed to remove client from Redis tracking (cleanup will occur on recovery):",
|
||||||
@@ -1097,6 +1104,12 @@ const disconnectClient = async (clientId: string): Promise<boolean> => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Eagerly clean up tracking maps. The close event handlers will also
|
||||||
|
// call removeClient, but if the socket is already in CLOSING state
|
||||||
|
// the close event may never fire, leaving zombie entries.
|
||||||
|
connectedClients.delete(mapKey);
|
||||||
|
clientConfigVersions.delete(clientId);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ export async function exchangeSession(
|
|||||||
res: Response,
|
res: Response,
|
||||||
next: NextFunction
|
next: NextFunction
|
||||||
): Promise<any> {
|
): Promise<any> {
|
||||||
logger.debug("Exchange session: Badger sent", req.body);
|
logger.debug("Exchange session: Badger request received");
|
||||||
|
|
||||||
const parsedBody = exchangeSessionBodySchema.safeParse(req.body);
|
const parsedBody = exchangeSessionBodySchema.safeParse(req.body);
|
||||||
|
|
||||||
|
|||||||
@@ -84,14 +84,14 @@ async function flushAuditLogs() {
|
|||||||
logger.debug(`Flushed ${logsToWrite.length} audit logs to database`);
|
logger.debug(`Flushed ${logsToWrite.length} audit logs to database`);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error("Error flushing audit logs:", error);
|
logger.error("Error flushing audit logs:", error);
|
||||||
// On transaction error, put logs back at the front of the buffer to retry
|
// On transaction error, drop the logs rather than re-queuing them.
|
||||||
// but only if buffer isn't too large
|
// The previous re-queue approach created a positive feedback loop:
|
||||||
if (auditLogBuffer.length < MAX_BUFFER_SIZE - logsToWrite.length) {
|
// failed flush → re-queue → larger next flush → longer DB lock →
|
||||||
auditLogBuffer.unshift(...logsToWrite);
|
// higher chance of next failure → repeat. This caused unbounded
|
||||||
logger.info(`Re-queued ${logsToWrite.length} audit logs for retry`);
|
// memory growth on SQLite where write contention is common.
|
||||||
} else {
|
// Audit logs are best-effort telemetry — losing a batch on error
|
||||||
logger.error(`Buffer full, dropped ${logsToWrite.length} audit logs`);
|
// is acceptable; leaking memory until the process crashes is not.
|
||||||
}
|
logger.warn(`Dropped ${logsToWrite.length} audit logs after flush failure`);
|
||||||
} finally {
|
} finally {
|
||||||
isFlushInProgress = false;
|
isFlushInProgress = false;
|
||||||
// If buffer filled up while we were flushing, flush again
|
// If buffer filled up while we were flushing, flush again
|
||||||
|
|||||||
@@ -80,7 +80,7 @@ export async function verifyResourceSession(
|
|||||||
res: Response,
|
res: Response,
|
||||||
next: NextFunction
|
next: NextFunction
|
||||||
): Promise<any> {
|
): Promise<any> {
|
||||||
logger.debug("Verify session: Badger sent", req.body); // remove when done testing
|
logger.debug("Verify session: Badger request received");
|
||||||
|
|
||||||
const parsedBody = verifyResourceSessionSchema.safeParse(req.body);
|
const parsedBody = verifyResourceSessionSchema.safeParse(req.body);
|
||||||
|
|
||||||
|
|||||||
@@ -80,6 +80,10 @@ const removeClient = async (
|
|||||||
const updatedClients = existingClients.filter((client) => client !== ws);
|
const updatedClients = existingClients.filter((client) => client !== ws);
|
||||||
if (updatedClients.length === 0) {
|
if (updatedClients.length === 0) {
|
||||||
connectedClients.delete(mapKey);
|
connectedClients.delete(mapKey);
|
||||||
|
// Clean up config version tracking to prevent unbounded memory
|
||||||
|
// growth. Without this, every unique clientId that ever connects
|
||||||
|
// leaves a permanent entry in clientConfigVersions.
|
||||||
|
clientConfigVersions.delete(clientId);
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
`All connections removed for ${clientType.toUpperCase()} ID: ${clientId}`
|
`All connections removed for ${clientType.toUpperCase()} ID: ${clientId}`
|
||||||
@@ -507,6 +511,12 @@ const disconnectClient = async (clientId: string): Promise<boolean> => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Eagerly clean up tracking maps. The close event handlers will also
|
||||||
|
// call removeClient, but if the socket is already in CLOSING state
|
||||||
|
// the close event may never fire, leaving zombie entries.
|
||||||
|
connectedClients.delete(mapKey);
|
||||||
|
clientConfigVersions.delete(clientId);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user