mirror of
https://github.com/fosrl/pangolin.git
synced 2026-05-09 22:04:16 +00:00
Add streaming errors for debug
This commit is contained in:
@@ -439,6 +439,8 @@ export const eventStreamingDestinations = pgTable(
|
||||
type: varchar("type", { length: 50 }).notNull(), // e.g. "http", "kafka", etc.
|
||||
config: text("config").notNull(), // JSON string with the configuration for the destination
|
||||
enabled: boolean("enabled").notNull().default(true),
|
||||
lastError: text("lastError"), // last send error message, null if healthy
|
||||
lastErrorAt: bigint("lastErrorAt", { mode: "number" }), // epoch ms of last error, null if healthy
|
||||
createdAt: bigint("createdAt", { mode: "number" }).notNull(),
|
||||
updatedAt: bigint("updatedAt", { mode: "number" }).notNull()
|
||||
}
|
||||
|
||||
@@ -445,6 +445,8 @@ export const eventStreamingDestinations = sqliteTable(
|
||||
enabled: integer("enabled", { mode: "boolean" })
|
||||
.notNull()
|
||||
.default(true),
|
||||
lastError: text("lastError"), // last send error message, null if healthy
|
||||
lastErrorAt: integer("lastErrorAt"), // epoch ms of last error, null if healthy
|
||||
createdAt: integer("createdAt").notNull(),
|
||||
updatedAt: integer("updatedAt").notNull()
|
||||
}
|
||||
|
||||
@@ -313,6 +313,7 @@ export class LogStreamingManager {
|
||||
if (enabledTypes.length === 0) return;
|
||||
|
||||
let anyFailure = false;
|
||||
let firstError: string | null = null;
|
||||
|
||||
for (const logType of enabledTypes) {
|
||||
if (!this.isRunning) break;
|
||||
@@ -320,6 +321,10 @@ export class LogStreamingManager {
|
||||
await this.processLogType(dest, provider, logType);
|
||||
} catch (err) {
|
||||
anyFailure = true;
|
||||
if (firstError === null) {
|
||||
firstError =
|
||||
err instanceof Error ? err.message : String(err);
|
||||
}
|
||||
logger.error(
|
||||
`LogStreamingManager: failed to process "${logType}" logs ` +
|
||||
`for destination ${dest.destinationId}`,
|
||||
@@ -330,6 +335,10 @@ export class LogStreamingManager {
|
||||
|
||||
if (anyFailure) {
|
||||
this.recordFailure(dest.destinationId);
|
||||
await this.setDestinationError(
|
||||
dest.destinationId,
|
||||
firstError ?? "Unknown error"
|
||||
);
|
||||
} else {
|
||||
// Any success resets the failure/back-off state
|
||||
if (this.failures.has(dest.destinationId)) {
|
||||
@@ -338,6 +347,7 @@ export class LogStreamingManager {
|
||||
`LogStreamingManager: destination ${dest.destinationId} recovered`
|
||||
);
|
||||
}
|
||||
await this.clearDestinationError(dest.destinationId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -759,6 +769,45 @@ export class LogStreamingManager {
|
||||
// DB helpers
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private async setDestinationError(
|
||||
destinationId: number,
|
||||
errorMessage: string
|
||||
): Promise<void> {
|
||||
// Truncate to 1000 chars so it fits comfortably in the text column.
|
||||
const truncated = errorMessage.slice(0, 1000);
|
||||
try {
|
||||
await db
|
||||
.update(eventStreamingDestinations)
|
||||
.set({ lastError: truncated, lastErrorAt: Date.now() })
|
||||
.where(
|
||||
eq(eventStreamingDestinations.destinationId, destinationId)
|
||||
);
|
||||
} catch (err) {
|
||||
logger.warn(
|
||||
`LogStreamingManager: could not persist error status for destination ${destinationId}`,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async clearDestinationError(destinationId: number): Promise<void> {
|
||||
try {
|
||||
// Only update if there is actually an error stored, to avoid
|
||||
// unnecessary writes on every successful poll cycle.
|
||||
await db
|
||||
.update(eventStreamingDestinations)
|
||||
.set({ lastError: null, lastErrorAt: null })
|
||||
.where(
|
||||
eq(eventStreamingDestinations.destinationId, destinationId)
|
||||
);
|
||||
} catch (err) {
|
||||
logger.warn(
|
||||
`LogStreamingManager: could not clear error status for destination ${destinationId}`,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async loadEnabledDestinations(): Promise<
|
||||
EventStreamingDestination[]
|
||||
> {
|
||||
|
||||
@@ -51,6 +51,8 @@ export type ListEventStreamingDestinationsResponse = {
|
||||
type: string;
|
||||
config: string;
|
||||
enabled: boolean;
|
||||
lastError: string | null;
|
||||
lastErrorAt: number | null;
|
||||
createdAt: number;
|
||||
updatedAt: number;
|
||||
sendConnectionLogs: boolean;
|
||||
@@ -79,7 +81,8 @@ async function query(orgId: string, limit: number, offset: number) {
|
||||
registry.registerPath({
|
||||
method: "get",
|
||||
path: "/org/{orgId}/event-streaming-destination",
|
||||
description: "List all event streaming destinations for a specific organization.",
|
||||
description:
|
||||
"List all event streaming destinations for a specific organization.",
|
||||
tags: [OpenAPITags.Org],
|
||||
request: {
|
||||
query: querySchema,
|
||||
|
||||
Reference in New Issue
Block a user