Speed up writing to not block io operations

This commit is contained in:
Owen
2025-12-06 17:54:39 -05:00
parent 66fc8529c2
commit 8a8c0edad3
3 changed files with 109 additions and 15 deletions

View File

@@ -537,7 +537,7 @@ export async function updateClientSiteDestinations(
}
if (!site.clientSitesAssociationsCache.endpoint) {
logger.warn(`Site ${site.sites.siteId} has no endpoint, skipping`); // if this is a new association the endpoint is not set yet // TODO: FIX THIS
// if this is a new association the endpoint is not set yet
continue;
}

View File

@@ -24,6 +24,78 @@ Reasons:
*/
// In-memory buffer for batching audit logs
const auditLogBuffer: Array<{
timestamp: number;
orgId?: string;
actorType?: string;
actor?: string;
actorId?: string;
metadata: any;
action: boolean;
resourceId?: number;
reason: number;
location?: string;
originalRequestURL: string;
scheme: string;
host: string;
path: string;
method: string;
ip?: string;
tls: boolean;
}> = [];
const BATCH_SIZE = 100; // Write to DB every 100 logs
const BATCH_INTERVAL_MS = 5000; // Or every 5 seconds, whichever comes first
let flushTimer: NodeJS.Timeout | null = null;
/**
* Flush buffered logs to database
*/
async function flushAuditLogs() {
if (auditLogBuffer.length === 0) {
return;
}
// Take all current logs and clear buffer
const logsToWrite = auditLogBuffer.splice(0, auditLogBuffer.length);
try {
// Batch insert all logs at once
await db.insert(requestAuditLog).values(logsToWrite);
logger.debug(`Flushed ${logsToWrite.length} audit logs to database`);
} catch (error) {
logger.error("Error flushing audit logs:", error);
// On error, we lose these logs - consider a fallback strategy if needed
// (e.g., write to file, or put back in buffer with retry limit)
}
}
/**
* Schedule a flush if not already scheduled
*/
function scheduleFlush() {
if (flushTimer === null) {
flushTimer = setTimeout(() => {
flushTimer = null;
flushAuditLogs().catch((err) =>
logger.error("Error in scheduled flush:", err)
);
}, BATCH_INTERVAL_MS);
}
}
/**
* Gracefully flush all pending logs (call this on shutdown)
*/
export async function shutdownAuditLogger() {
if (flushTimer) {
clearTimeout(flushTimer);
flushTimer = null;
}
await flushAuditLogs();
}
async function getRetentionDays(orgId: string): Promise<number> {
// check cache first
const cached = cache.get<number>(`org_${orgId}_retentionDays`);
@@ -77,7 +149,7 @@ export async function cleanUpOldLogs(orgId: string, retentionDays: number) {
}
}
export async function logRequestAudit(
export function logRequestAudit(
data: {
action: boolean;
reason: number;
@@ -103,12 +175,14 @@ export async function logRequestAudit(
}
) {
try {
// Quick synchronous check - if org has 0 retention, skip immediately
if (data.orgId) {
const retentionDays = await getRetentionDays(data.orgId);
if (retentionDays == 0) {
const cached = cache.get<number>(`org_${data.orgId}_retentionDays`);
if (cached === 0) {
// do not log
return;
}
// If not cached or > 0, we'll log it (async retention check happens in background)
}
let actorType: string | undefined;
@@ -128,16 +202,11 @@ export async function logRequestAudit(
actorId = apiKey.apiKeyId;
}
// if (!actorType || !actor || !actorId) {
// logger.warn("logRequestAudit: Incomplete actor information");
// return;
// }
const timestamp = Math.floor(Date.now() / 1000);
let metadata = null;
if (metadata) {
metadata = JSON.stringify(metadata);
if (data.metadata) {
metadata = JSON.stringify(data.metadata);
}
const clientIp = body.requestIp
@@ -163,7 +232,8 @@ export async function logRequestAudit(
})()
: undefined;
await db.insert(requestAuditLog).values({
// Add to buffer instead of writing directly to DB
auditLogBuffer.push({
timestamp,
orgId: data.orgId,
actorType,
@@ -174,9 +244,6 @@ export async function logRequestAudit(
resourceId: data.resourceId,
reason: data.reason,
location: data.location,
// userAgent: data.userAgent, // TODO: add this
// headers: data.body.headers,
// query: data.body.query,
originalRequestURL: body.originalRequestURL,
scheme: body.scheme,
host: body.host,
@@ -185,6 +252,23 @@ export async function logRequestAudit(
ip: clientIp,
tls: body.tls
});
// Flush immediately if buffer is full, otherwise schedule a flush
if (auditLogBuffer.length >= BATCH_SIZE) {
// Fire and forget - don't block the caller
flushAuditLogs().catch((err) =>
logger.error("Error flushing audit logs:", err)
);
} else {
scheduleFlush();
}
// Async retention check in background (don't await)
if (data.orgId && cache.get<number>(`org_${data.orgId}_retentionDays`) === undefined) {
getRetentionDays(data.orgId).catch((err) =>
logger.error("Error checking retention days:", err)
);
}
} catch (error) {
logger.error(error);
}

View File

@@ -54,6 +54,15 @@ export async function addTargets(
});
}
// try to parse the hcStatus into a int and if not possible set to undefined
let hcStatus: number | undefined = undefined;
if (hc.hcStatus) {
const parsedStatus = parseInt(hc.hcStatus.toString());
if (!isNaN(parsedStatus)) {
hcStatus = parsedStatus;
}
}
return {
id: target.targetId,
hcEnabled: hc.hcEnabled,
@@ -67,6 +76,7 @@ export async function addTargets(
hcTimeout: hc.hcTimeout, // in seconds
hcHeaders: hcHeadersSend,
hcMethod: hc.hcMethod,
hcStatus: hcStatus,
hcTlsServerName: hc.hcTlsServerName,
};
});