mirror of
https://github.com/fosrl/pangolin.git
synced 2026-05-12 15:16:49 +00:00
Merge pull request #2998 from Josh-Voyles/mem-fix-2
fix: deterministically finalize SQLite prepared statements to prevent native memory leak (#2120)
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
import { drizzle as DrizzleSqlite } from "drizzle-orm/better-sqlite3";
|
import { drizzle as DrizzleSqlite } from "drizzle-orm/better-sqlite3";
|
||||||
import Database from "better-sqlite3";
|
import Database from "better-sqlite3";
|
||||||
|
import type BetterSqlite3 from "better-sqlite3";
|
||||||
import * as schema from "./schema/schema";
|
import * as schema from "./schema/schema";
|
||||||
import path from "path";
|
import path from "path";
|
||||||
import fs from "fs";
|
import fs from "fs";
|
||||||
@@ -11,8 +12,68 @@ export const exists = checkFileExists(location);
|
|||||||
|
|
||||||
bootstrapVolume();
|
bootstrapVolume();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wraps better-sqlite3 Statement to call `finalize()` immediately after
|
||||||
|
* execution, freeing native sqlite3_stmt memory deterministically instead
|
||||||
|
* of waiting for GC. Fixes steady off-heap growth under load (#2120).
|
||||||
|
* WARNING: Finalizes after first execution — incompatible with drizzle's
|
||||||
|
* reusable .prepare() builders. No such usage exists in this codebase.
|
||||||
|
*/
|
||||||
|
function autoFinalizeStatement(
|
||||||
|
stmt: BetterSqlite3.Statement
|
||||||
|
): BetterSqlite3.Statement {
|
||||||
|
const wrapExec = <T extends (...args: any[]) => any>(fn: T): T => {
|
||||||
|
return function (this: any, ...args: any[]) {
|
||||||
|
try {
|
||||||
|
return fn.apply(this, args);
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
// finalize() exists on the native Statement at runtime but
|
||||||
|
// is missing from @types/better-sqlite3.
|
||||||
|
(stmt as any).finalize();
|
||||||
|
} catch {
|
||||||
|
// Already finalized — harmless
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} as unknown as T;
|
||||||
|
};
|
||||||
|
|
||||||
|
stmt.run = wrapExec(stmt.run);
|
||||||
|
stmt.get = wrapExec(stmt.get);
|
||||||
|
stmt.all = wrapExec(stmt.all);
|
||||||
|
|
||||||
|
return stmt;
|
||||||
|
}
|
||||||
|
|
||||||
function createDb() {
|
function createDb() {
|
||||||
const sqlite = new Database(location);
|
const sqlite = new Database(location);
|
||||||
|
|
||||||
|
// Enable WAL mode — allows concurrent readers + single writer, preventing
|
||||||
|
// contention across subsystems (verifySession, Traefik, audit, ping).
|
||||||
|
sqlite.pragma("journal_mode = WAL");
|
||||||
|
|
||||||
|
// Wait up to 5s on SQLITE_BUSY instead of failing — prevents audit log
|
||||||
|
// retry loops that accumulate memory.
|
||||||
|
sqlite.pragma("busy_timeout = 5000");
|
||||||
|
|
||||||
|
// NORMAL sync mode: safe with WAL, reduces write lock hold time.
|
||||||
|
sqlite.pragma("synchronous = NORMAL");
|
||||||
|
|
||||||
|
// 64 MB page cache (default 2 MB) — reduces I/O round-trips on large
|
||||||
|
// TraefikConfigManager JOINs that block the event loop.
|
||||||
|
sqlite.pragma("cache_size = -65536");
|
||||||
|
|
||||||
|
// 256 MB memory-mapped I/O — OS serves reads from page cache directly,
|
||||||
|
// reducing event-loop blocking.
|
||||||
|
sqlite.pragma("mmap_size = 268435456");
|
||||||
|
|
||||||
|
// Wrap prepare() so every drizzle-orm statement is auto-finalized after
|
||||||
|
// first use, preventing sqlite3_stmt accumulation between GC cycles.
|
||||||
|
const originalPrepare = sqlite.prepare.bind(sqlite);
|
||||||
|
(sqlite as any).prepare = function autoFinalizePrepare(source: string) {
|
||||||
|
return autoFinalizeStatement(originalPrepare(source));
|
||||||
|
};
|
||||||
|
|
||||||
return DrizzleSqlite(sqlite, {
|
return DrizzleSqlite(sqlite, {
|
||||||
schema
|
schema
|
||||||
});
|
});
|
||||||
@@ -23,7 +84,7 @@ export default db;
|
|||||||
export const primaryDb = db;
|
export const primaryDb = db;
|
||||||
export type Transaction = Parameters<
|
export type Transaction = Parameters<
|
||||||
Parameters<(typeof db)["transaction"]>[0]
|
Parameters<(typeof db)["transaction"]>[0]
|
||||||
>[0];
|
>[0];
|
||||||
export const DB_TYPE: "pg" | "sqlite" = "sqlite";
|
export const DB_TYPE: "pg" | "sqlite" = "sqlite";
|
||||||
|
|
||||||
function checkFileExists(filePath: string): boolean {
|
function checkFileExists(filePath: string): boolean {
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ import {
|
|||||||
Olm,
|
Olm,
|
||||||
olms,
|
olms,
|
||||||
RemoteExitNode,
|
RemoteExitNode,
|
||||||
remoteExitNodes,
|
remoteExitNodes
|
||||||
} from "@server/db";
|
} from "@server/db";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
@@ -194,8 +194,6 @@ const connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
|
|||||||
// Config version tracking map (local to this node, resets on server restart)
|
// Config version tracking map (local to this node, resets on server restart)
|
||||||
const clientConfigVersions: Map<string, number> = new Map();
|
const clientConfigVersions: Map<string, number> = new Map();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// Recovery tracking
|
// Recovery tracking
|
||||||
let isRedisRecoveryInProgress = false;
|
let isRedisRecoveryInProgress = false;
|
||||||
|
|
||||||
@@ -406,6 +404,9 @@ const removeClient = async (
|
|||||||
const updatedClients = existingClients.filter((client) => client !== ws);
|
const updatedClients = existingClients.filter((client) => client !== ws);
|
||||||
if (updatedClients.length === 0) {
|
if (updatedClients.length === 0) {
|
||||||
connectedClients.delete(mapKey);
|
connectedClients.delete(mapKey);
|
||||||
|
// Remove clientId from clientConfigVersions on disconnect — prevents
|
||||||
|
// unbounded memory growth from stale entries.
|
||||||
|
clientConfigVersions.delete(clientId);
|
||||||
|
|
||||||
if (redisManager.isRedisEnabled()) {
|
if (redisManager.isRedisEnabled()) {
|
||||||
try {
|
try {
|
||||||
@@ -1097,6 +1098,11 @@ const disconnectClient = async (clientId: string): Promise<boolean> => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Eagerly remove client — close event may not fire if socket is already
|
||||||
|
// CLOSING, leaving zombie entries.
|
||||||
|
connectedClients.delete(mapKey);
|
||||||
|
clientConfigVersions.delete(clientId);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,15 @@ import zlib from "zlib";
|
|||||||
import { Server as HttpServer } from "http";
|
import { Server as HttpServer } from "http";
|
||||||
import { WebSocket, WebSocketServer } from "ws";
|
import { WebSocket, WebSocketServer } from "ws";
|
||||||
import { Socket } from "net";
|
import { Socket } from "net";
|
||||||
import { Newt, newts, NewtSession, olms, Olm, OlmSession, sites } from "@server/db";
|
import {
|
||||||
|
Newt,
|
||||||
|
newts,
|
||||||
|
NewtSession,
|
||||||
|
olms,
|
||||||
|
Olm,
|
||||||
|
OlmSession,
|
||||||
|
sites
|
||||||
|
} from "@server/db";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
import { recordPing } from "@server/routers/newt/pingAccumulator";
|
import { recordPing } from "@server/routers/newt/pingAccumulator";
|
||||||
@@ -80,6 +88,9 @@ const removeClient = async (
|
|||||||
const updatedClients = existingClients.filter((client) => client !== ws);
|
const updatedClients = existingClients.filter((client) => client !== ws);
|
||||||
if (updatedClients.length === 0) {
|
if (updatedClients.length === 0) {
|
||||||
connectedClients.delete(mapKey);
|
connectedClients.delete(mapKey);
|
||||||
|
// Remove clientId from clientConfigVersions — prevents unbounded growth
|
||||||
|
// from stale entries.
|
||||||
|
clientConfigVersions.delete(clientId);
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
`All connections removed for ${clientType.toUpperCase()} ID: ${clientId}`
|
`All connections removed for ${clientType.toUpperCase()} ID: ${clientId}`
|
||||||
@@ -218,9 +229,13 @@ const hasActiveConnections = async (clientId: string): Promise<boolean> => {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Get the current config version for a client
|
// Get the current config version for a client
|
||||||
const getClientConfigVersion = async (clientId: string): Promise<number | undefined> => {
|
const getClientConfigVersion = async (
|
||||||
|
clientId: string
|
||||||
|
): Promise<number | undefined> => {
|
||||||
const version = clientConfigVersions.get(clientId);
|
const version = clientConfigVersions.get(clientId);
|
||||||
logger.debug(`getClientConfigVersion called for clientId: ${clientId}, returning: ${version} (type: ${typeof version})`);
|
logger.debug(
|
||||||
|
`getClientConfigVersion called for clientId: ${clientId}, returning: ${version} (type: ${typeof version})`
|
||||||
|
);
|
||||||
return version;
|
return version;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -507,6 +522,11 @@ const disconnectClient = async (clientId: string): Promise<boolean> => {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Eagerly remove client — close event may not fire if socket already
|
||||||
|
// CLOSING, leaving zombie entries.
|
||||||
|
connectedClients.delete(mapKey);
|
||||||
|
clientConfigVersions.delete(clientId);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user