mirror of
https://github.com/fosrl/pangolin.git
synced 2026-07-05 19:59:43 +00:00
Make sure to retry rebuilds
This commit is contained in:
83
server/lib/dbRetry.ts
Normal file
83
server/lib/dbRetry.ts
Normal file
@@ -0,0 +1,83 @@
|
||||
import logger from "@server/logger";
|
||||
|
||||
const MAX_RETRIES = 5;
|
||||
const BASE_DELAY_MS = 50;
|
||||
|
||||
/**
|
||||
* Detect transient errors that are safe to retry (connection drops, deadlocks,
|
||||
* serialization failures). PostgreSQL deadlocks (40P01) are always safe to
|
||||
* retry: the database guarantees exactly one winner per deadlock pair, so the
|
||||
* loser just needs to try again.
|
||||
*/
|
||||
export function isTransientError(error: any): boolean {
|
||||
if (!error) return false;
|
||||
|
||||
const message = (error.message || "").toLowerCase();
|
||||
const causeMessage = (error.cause?.message || "").toLowerCase();
|
||||
const code = error.code || error.cause?.code || "";
|
||||
|
||||
// Connection timeout / terminated
|
||||
if (
|
||||
message.includes("connection timeout") ||
|
||||
message.includes("connection terminated") ||
|
||||
message.includes("timeout exceeded when trying to connect") ||
|
||||
causeMessage.includes("connection terminated unexpectedly") ||
|
||||
causeMessage.includes("connection timeout")
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// PostgreSQL deadlock detected - always safe to retry (one winner guaranteed)
|
||||
if (code === "40P01" || message.includes("deadlock")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// PostgreSQL serialization failure
|
||||
if (code === "40001") {
|
||||
return true;
|
||||
}
|
||||
|
||||
// ECONNRESET, ECONNREFUSED, EPIPE, ETIMEDOUT
|
||||
if (
|
||||
code === "ECONNRESET" ||
|
||||
code === "ECONNREFUSED" ||
|
||||
code === "EPIPE" ||
|
||||
code === "ETIMEDOUT"
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple retry wrapper with exponential backoff for transient errors
|
||||
* (deadlocks, connection timeouts, unexpected disconnects).
|
||||
*/
|
||||
export async function withRetry<T>(
|
||||
operation: () => Promise<T>,
|
||||
context: string,
|
||||
maxRetries: number = MAX_RETRIES,
|
||||
baseDelayMs: number = BASE_DELAY_MS
|
||||
): Promise<T> {
|
||||
let attempt = 0;
|
||||
while (true) {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error: any) {
|
||||
if (isTransientError(error) && attempt < maxRetries) {
|
||||
attempt++;
|
||||
const baseDelay = Math.pow(2, attempt - 1) * baseDelayMs;
|
||||
const jitter = Math.random() * baseDelay;
|
||||
const delay = baseDelay + jitter;
|
||||
logger.warn(
|
||||
`Transient DB error in ${context}, retrying attempt ${attempt}/${maxRetries} after ${delay.toFixed(0)}ms`,
|
||||
{ code: error?.code ?? error?.cause?.code }
|
||||
);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
continue;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -45,6 +45,7 @@ import {
|
||||
} from "@server/routers/client/targets";
|
||||
import { lockManager } from "#dynamic/lib/lock";
|
||||
import { rebuildQueue } from "#dynamic/lib/rebuildQueue";
|
||||
import { withRetry, isTransientError } from "@server/lib/dbRetry";
|
||||
import {
|
||||
checkOrgRebuildRateLimit,
|
||||
decrementOrgRebuildCount,
|
||||
@@ -285,10 +286,20 @@ export async function rebuildClientAssociationsFromSiteResource(
|
||||
) {
|
||||
await incrementOrgRebuildCount(siteResource.orgId);
|
||||
try {
|
||||
return await lockManager.withLock(
|
||||
`rebuild-client-associations:site-resource:${siteResource.siteResourceId}`,
|
||||
() => rebuildClientAssociationsFromSiteResourceImpl(siteResource),
|
||||
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
||||
// The whole locked rebuild is idempotent (it diffs full expected vs.
|
||||
// actual state each time), so on a transient DB error it's safe to
|
||||
// retry the entire thing rather than just the failed query.
|
||||
return await withRetry(
|
||||
() =>
|
||||
lockManager.withLock(
|
||||
`rebuild-client-associations:site-resource:${siteResource.siteResourceId}`,
|
||||
() =>
|
||||
rebuildClientAssociationsFromSiteResourceImpl(
|
||||
siteResource
|
||||
),
|
||||
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
||||
),
|
||||
`rebuildClientAssociationsFromSiteResource:${siteResource.siteResourceId}`
|
||||
);
|
||||
} catch (err: any) {
|
||||
if (
|
||||
@@ -304,6 +315,17 @@ export async function rebuildClientAssociationsFromSiteResource(
|
||||
});
|
||||
return { mergedAllClients: [] };
|
||||
}
|
||||
if (isTransientError(err)) {
|
||||
logger.warn(
|
||||
`rebuildClientAssociations: transient DB error rebuilding site resource ${siteResource.siteResourceId} persisted after retries, queuing for deferred processing:`,
|
||||
err
|
||||
);
|
||||
await rebuildQueue.enqueue({
|
||||
type: "site-resource",
|
||||
id: siteResource.siteResourceId
|
||||
});
|
||||
return { mergedAllClients: [] };
|
||||
}
|
||||
throw err;
|
||||
} finally {
|
||||
await decrementOrgRebuildCount(siteResource.orgId);
|
||||
@@ -1656,10 +1678,17 @@ export async function rebuildClientAssociationsFromClient(
|
||||
await incrementOrgRebuildCount(client.orgId);
|
||||
try {
|
||||
const trx = primaryDb;
|
||||
return await lockManager.withLock(
|
||||
`rebuild-client-associations:client:${client.clientId}`,
|
||||
() => rebuildClientAssociationsFromClientImpl(client, trx),
|
||||
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
||||
// The whole locked rebuild is idempotent (it diffs full expected vs.
|
||||
// actual state each time), so on a transient DB error it's safe to
|
||||
// retry the entire thing rather than just the failed query.
|
||||
return await withRetry(
|
||||
() =>
|
||||
lockManager.withLock(
|
||||
`rebuild-client-associations:client:${client.clientId}`,
|
||||
() => rebuildClientAssociationsFromClientImpl(client, trx),
|
||||
REBUILD_ASSOCIATIONS_LOCK_TTL_MS
|
||||
),
|
||||
`rebuildClientAssociationsFromClient:${client.clientId}`
|
||||
);
|
||||
} catch (err: any) {
|
||||
if (
|
||||
@@ -1675,6 +1704,17 @@ export async function rebuildClientAssociationsFromClient(
|
||||
});
|
||||
return;
|
||||
}
|
||||
if (isTransientError(err)) {
|
||||
logger.warn(
|
||||
`rebuildClientAssociations: transient DB error rebuilding client ${client.clientId} persisted after retries, queuing for deferred processing:`,
|
||||
err
|
||||
);
|
||||
await rebuildQueue.enqueue({
|
||||
type: "client",
|
||||
id: client.clientId
|
||||
});
|
||||
return;
|
||||
}
|
||||
throw err;
|
||||
} finally {
|
||||
await decrementOrgRebuildCount(client.orgId);
|
||||
|
||||
@@ -1,10 +1,14 @@
|
||||
import logger from "@server/logger";
|
||||
import { isTransientError } from "@server/lib/dbRetry";
|
||||
|
||||
export type RebuildJobType = "site-resource" | "client";
|
||||
|
||||
export interface RebuildJob {
|
||||
type: RebuildJobType;
|
||||
id: number;
|
||||
// Number of times this job has already been re-queued after a transient
|
||||
// failure. Absent/0 means it has not failed yet.
|
||||
attempt?: number;
|
||||
}
|
||||
|
||||
export interface RebuildJobHandlers {
|
||||
@@ -24,6 +28,10 @@ export interface RebuildQueueManager {
|
||||
// retried shortly after against fresh DB state.
|
||||
const POLL_INTERVAL_MS = 500;
|
||||
const BATCH_SIZE = 5;
|
||||
// A job that fails with a transient DB error gets re-queued with backoff
|
||||
// instead of being dropped, up to this many times.
|
||||
const MAX_JOB_ATTEMPTS = 5;
|
||||
const JOB_RETRY_BASE_DELAY_MS = 1000;
|
||||
|
||||
function dedupeKey(job: RebuildJob): string {
|
||||
return `${job.type}:${job.id}`;
|
||||
@@ -106,10 +114,29 @@ class InMemoryRebuildQueue implements RebuildQueueManager {
|
||||
`Rebuild queue: completed ${job.type}:${job.id}`
|
||||
);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
|
||||
err
|
||||
);
|
||||
const attempt = (job.attempt ?? 0) + 1;
|
||||
if (isTransientError(err) && attempt <= MAX_JOB_ATTEMPTS) {
|
||||
const delay =
|
||||
JOB_RETRY_BASE_DELAY_MS * Math.pow(2, attempt - 1);
|
||||
logger.warn(
|
||||
`Rebuild queue: job ${job.type}:${job.id} hit a transient error (attempt ${attempt}/${MAX_JOB_ATTEMPTS}), re-queuing in ${delay}ms:`,
|
||||
err
|
||||
);
|
||||
setTimeout(() => {
|
||||
this.enqueue({ ...job, attempt }).catch(
|
||||
(enqueueErr) =>
|
||||
logger.error(
|
||||
`Rebuild queue: failed to re-queue ${job.type}:${job.id} after transient error:`,
|
||||
enqueueErr
|
||||
)
|
||||
);
|
||||
}, delay);
|
||||
} else {
|
||||
logger.error(
|
||||
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
||||
@@ -14,12 +14,16 @@
|
||||
import { redis } from "#private/lib/redis";
|
||||
import { lockManager } from "#private/lib/lock";
|
||||
import logger from "@server/logger";
|
||||
import { isTransientError } from "@server/lib/dbRetry";
|
||||
|
||||
export type RebuildJobType = "site-resource" | "client";
|
||||
|
||||
export interface RebuildJob {
|
||||
type: RebuildJobType;
|
||||
id: number;
|
||||
// Number of times this job has already been re-queued after a transient
|
||||
// failure. Absent/0 means it has not failed yet.
|
||||
attempt?: number;
|
||||
}
|
||||
|
||||
export interface RebuildJobHandlers {
|
||||
@@ -43,6 +47,11 @@ const PROCESSOR_LOCK_TTL_MS = 120000 * BATCH_SIZE + 30000; // ~630 s
|
||||
|
||||
const POLL_INTERVAL_MS = 500;
|
||||
|
||||
// A job that fails with a transient DB error gets re-queued with backoff
|
||||
// instead of being dropped, up to this many times.
|
||||
const MAX_JOB_ATTEMPTS = 5;
|
||||
const JOB_RETRY_BASE_DELAY_MS = 1000;
|
||||
|
||||
class RedisRebuildQueue {
|
||||
private processingStarted = false;
|
||||
|
||||
@@ -180,10 +189,33 @@ class RedisRebuildQueue {
|
||||
`Rebuild queue: completed ${job.type}:${job.id}`
|
||||
);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
|
||||
err
|
||||
);
|
||||
const attempt = (job.attempt ?? 0) + 1;
|
||||
if (
|
||||
isTransientError(err) &&
|
||||
attempt <= MAX_JOB_ATTEMPTS
|
||||
) {
|
||||
const delay =
|
||||
JOB_RETRY_BASE_DELAY_MS *
|
||||
Math.pow(2, attempt - 1);
|
||||
logger.warn(
|
||||
`Rebuild queue: job ${job.type}:${job.id} hit a transient error (attempt ${attempt}/${MAX_JOB_ATTEMPTS}), re-queuing in ${delay}ms:`,
|
||||
err
|
||||
);
|
||||
setTimeout(() => {
|
||||
this.enqueue({ ...job, attempt }).catch(
|
||||
(enqueueErr) =>
|
||||
logger.error(
|
||||
`Rebuild queue: failed to re-queue ${job.type}:${job.id} after transient error:`,
|
||||
enqueueErr
|
||||
)
|
||||
);
|
||||
}, delay);
|
||||
} else {
|
||||
logger.error(
|
||||
`Rebuild queue: job ${job.type}:${job.id} threw an error:`,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
@@ -3,6 +3,7 @@ import { sites, clients, olms } from "@server/db";
|
||||
import { and, eq, inArray } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { fireSiteOnlineAlert } from "@server/lib/alerts";
|
||||
import { withRetry } from "@server/lib/dbRetry";
|
||||
|
||||
/**
|
||||
* Ping Accumulator
|
||||
@@ -22,8 +23,6 @@ import { fireSiteOnlineAlert } from "@server/lib/alerts";
|
||||
*/
|
||||
|
||||
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
|
||||
const MAX_RETRIES = 5;
|
||||
const BASE_DELAY_MS = 50;
|
||||
|
||||
// ── Site (newt) pings ──────────────────────────────────────────────────
|
||||
// Map of siteId -> latest ping timestamp (unix seconds)
|
||||
@@ -266,85 +265,7 @@ export async function flushPingsToDb(): Promise<void> {
|
||||
}
|
||||
|
||||
// ── Retry / Error Helpers ──────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Simple retry wrapper with exponential backoff for transient errors
|
||||
* (deadlocks, connection timeouts, unexpected disconnects).
|
||||
*
|
||||
* PostgreSQL deadlocks (40P01) are always safe to retry: the database
|
||||
* guarantees exactly one winner per deadlock pair, so the loser just needs
|
||||
* to try again. MAX_RETRIES is intentionally higher than typical connection
|
||||
* retry budgets to give deadlock victims enough chances to succeed.
|
||||
*/
|
||||
async function withRetry<T>(
|
||||
operation: () => Promise<T>,
|
||||
context: string
|
||||
): Promise<T> {
|
||||
let attempt = 0;
|
||||
while (true) {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error: any) {
|
||||
if (isTransientError(error) && attempt < MAX_RETRIES) {
|
||||
attempt++;
|
||||
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
|
||||
const jitter = Math.random() * baseDelay;
|
||||
const delay = baseDelay + jitter;
|
||||
logger.warn(
|
||||
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`,
|
||||
{ code: error?.code ?? error?.cause?.code }
|
||||
);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
continue;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect transient errors that are safe to retry.
|
||||
*/
|
||||
function isTransientError(error: any): boolean {
|
||||
if (!error) return false;
|
||||
|
||||
const message = (error.message || "").toLowerCase();
|
||||
const causeMessage = (error.cause?.message || "").toLowerCase();
|
||||
const code = error.code || error.cause?.code || "";
|
||||
|
||||
// Connection timeout / terminated
|
||||
if (
|
||||
message.includes("connection timeout") ||
|
||||
message.includes("connection terminated") ||
|
||||
message.includes("timeout exceeded when trying to connect") ||
|
||||
causeMessage.includes("connection terminated unexpectedly") ||
|
||||
causeMessage.includes("connection timeout")
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// PostgreSQL deadlock detected - always safe to retry (one winner guaranteed)
|
||||
if (code === "40P01" || message.includes("deadlock")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// PostgreSQL serialization failure
|
||||
if (code === "40001") {
|
||||
return true;
|
||||
}
|
||||
|
||||
// ECONNRESET, ECONNREFUSED, EPIPE, ETIMEDOUT
|
||||
if (
|
||||
code === "ECONNRESET" ||
|
||||
code === "ECONNREFUSED" ||
|
||||
code === "EPIPE" ||
|
||||
code === "ETIMEDOUT"
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
// See @server/lib/dbRetry for the shared withRetry/isTransientError helpers.
|
||||
|
||||
// ── Lifecycle ──────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
Reference in New Issue
Block a user