mirror of
https://github.com/fosrl/pangolin.git
synced 2026-05-10 06:14:06 +00:00
Merge branch 's3' into dev
This commit is contained in:
@@ -30,10 +30,12 @@ import {
|
||||
LOG_TYPES,
|
||||
LogEvent,
|
||||
DestinationFailureState,
|
||||
HttpConfig
|
||||
HttpConfig,
|
||||
S3Config
|
||||
} from "./types";
|
||||
import { LogDestinationProvider } from "./providers/LogDestinationProvider";
|
||||
import { HttpLogDestination } from "./providers/HttpLogDestination";
|
||||
import { S3LogDestination } from "./providers/S3LogDestination";
|
||||
import type { EventStreamingDestination } from "@server/db";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -72,11 +74,11 @@ const MAX_CATCHUP_BATCHES = 20;
|
||||
* After the last entry the max value is re-used.
|
||||
*/
|
||||
const BACKOFF_SCHEDULE_MS = [
|
||||
60_000, // 1 min (failure 1)
|
||||
2 * 60_000, // 2 min (failure 2)
|
||||
5 * 60_000, // 5 min (failure 3)
|
||||
10 * 60_000, // 10 min (failure 4)
|
||||
30 * 60_000 // 30 min (failure 5+)
|
||||
60_000, // 1 min (failure 1)
|
||||
2 * 60_000, // 2 min (failure 2)
|
||||
5 * 60_000, // 5 min (failure 3)
|
||||
10 * 60_000, // 10 min (failure 4)
|
||||
30 * 60_000 // 30 min (failure 5+)
|
||||
];
|
||||
|
||||
/**
|
||||
@@ -204,7 +206,10 @@ export class LogStreamingManager {
|
||||
this.pollTimer = null;
|
||||
this.runPoll()
|
||||
.catch((err) =>
|
||||
logger.error("LogStreamingManager: unexpected poll error", err)
|
||||
logger.error(
|
||||
"LogStreamingManager: unexpected poll error",
|
||||
err
|
||||
)
|
||||
)
|
||||
.finally(() => {
|
||||
if (this.isRunning) {
|
||||
@@ -275,10 +280,13 @@ export class LogStreamingManager {
|
||||
}
|
||||
|
||||
// Decrypt and parse config – skip destination if either step fails
|
||||
let configFromDb: HttpConfig;
|
||||
let configFromDb: unknown;
|
||||
try {
|
||||
const decryptedConfig = decrypt(dest.config, config.getRawConfig().server.secret!);
|
||||
configFromDb = JSON.parse(decryptedConfig) as HttpConfig;
|
||||
const decryptedConfig = decrypt(
|
||||
dest.config,
|
||||
config.getRawConfig().server.secret!
|
||||
);
|
||||
configFromDb = JSON.parse(decryptedConfig);
|
||||
} catch (err) {
|
||||
logger.error(
|
||||
`LogStreamingManager: destination ${dest.destinationId} has invalid or undecryptable config`,
|
||||
@@ -305,6 +313,7 @@ export class LogStreamingManager {
|
||||
if (enabledTypes.length === 0) return;
|
||||
|
||||
let anyFailure = false;
|
||||
let firstError: string | null = null;
|
||||
|
||||
for (const logType of enabledTypes) {
|
||||
if (!this.isRunning) break;
|
||||
@@ -312,6 +321,10 @@ export class LogStreamingManager {
|
||||
await this.processLogType(dest, provider, logType);
|
||||
} catch (err) {
|
||||
anyFailure = true;
|
||||
if (firstError === null) {
|
||||
firstError =
|
||||
err instanceof Error ? err.message : String(err);
|
||||
}
|
||||
logger.error(
|
||||
`LogStreamingManager: failed to process "${logType}" logs ` +
|
||||
`for destination ${dest.destinationId}`,
|
||||
@@ -322,6 +335,10 @@ export class LogStreamingManager {
|
||||
|
||||
if (anyFailure) {
|
||||
this.recordFailure(dest.destinationId);
|
||||
await this.setDestinationError(
|
||||
dest.destinationId,
|
||||
firstError ?? "Unknown error"
|
||||
);
|
||||
} else {
|
||||
// Any success resets the failure/back-off state
|
||||
if (this.failures.has(dest.destinationId)) {
|
||||
@@ -330,6 +347,7 @@ export class LogStreamingManager {
|
||||
`LogStreamingManager: destination ${dest.destinationId} recovered`
|
||||
);
|
||||
}
|
||||
await this.clearDestinationError(dest.destinationId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -362,7 +380,10 @@ export class LogStreamingManager {
|
||||
.from(eventStreamingCursors)
|
||||
.where(
|
||||
and(
|
||||
eq(eventStreamingCursors.destinationId, dest.destinationId),
|
||||
eq(
|
||||
eventStreamingCursors.destinationId,
|
||||
dest.destinationId
|
||||
),
|
||||
eq(eventStreamingCursors.logType, logType)
|
||||
)
|
||||
)
|
||||
@@ -431,9 +452,7 @@ export class LogStreamingManager {
|
||||
|
||||
if (rows.length === 0) break;
|
||||
|
||||
const events = rows.map((row) =>
|
||||
this.rowToLogEvent(logType, row)
|
||||
);
|
||||
const events = rows.map((row) => this.rowToLogEvent(logType, row));
|
||||
|
||||
// Throws on failure – caught by the caller which applies back-off
|
||||
await provider.send(events);
|
||||
@@ -677,8 +696,7 @@ export class LogStreamingManager {
|
||||
break;
|
||||
}
|
||||
|
||||
const orgId =
|
||||
typeof row.orgId === "string" ? row.orgId : "";
|
||||
const orgId = typeof row.orgId === "string" ? row.orgId : "";
|
||||
|
||||
return {
|
||||
id: row.id,
|
||||
@@ -708,6 +726,8 @@ export class LogStreamingManager {
|
||||
switch (type) {
|
||||
case "http":
|
||||
return new HttpLogDestination(config as HttpConfig);
|
||||
case "s3":
|
||||
return new S3LogDestination(config as S3Config);
|
||||
// Future providers:
|
||||
// case "datadog": return new DatadogLogDestination(config as DatadogConfig);
|
||||
default:
|
||||
@@ -749,6 +769,45 @@ export class LogStreamingManager {
|
||||
// DB helpers
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private async setDestinationError(
|
||||
destinationId: number,
|
||||
errorMessage: string
|
||||
): Promise<void> {
|
||||
// Truncate to 1000 chars so it fits comfortably in the text column.
|
||||
const truncated = errorMessage.slice(0, 1000);
|
||||
try {
|
||||
await db
|
||||
.update(eventStreamingDestinations)
|
||||
.set({ lastError: truncated, lastErrorAt: Date.now() })
|
||||
.where(
|
||||
eq(eventStreamingDestinations.destinationId, destinationId)
|
||||
);
|
||||
} catch (err) {
|
||||
logger.warn(
|
||||
`LogStreamingManager: could not persist error status for destination ${destinationId}`,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async clearDestinationError(destinationId: number): Promise<void> {
|
||||
try {
|
||||
// Only update if there is actually an error stored, to avoid
|
||||
// unnecessary writes on every successful poll cycle.
|
||||
await db
|
||||
.update(eventStreamingDestinations)
|
||||
.set({ lastError: null, lastErrorAt: null })
|
||||
.where(
|
||||
eq(eventStreamingDestinations.destinationId, destinationId)
|
||||
);
|
||||
} catch (err) {
|
||||
logger.warn(
|
||||
`LogStreamingManager: could not clear error status for destination ${destinationId}`,
|
||||
err
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
private async loadEnabledDestinations(): Promise<
|
||||
EventStreamingDestination[]
|
||||
> {
|
||||
|
||||
279
server/private/lib/logStreaming/providers/S3LogDestination.ts
Normal file
279
server/private/lib/logStreaming/providers/S3LogDestination.ts
Normal file
@@ -0,0 +1,279 @@
|
||||
/*
|
||||
* This file is part of a proprietary work.
|
||||
*
|
||||
* Copyright (c) 2025-2026 Fossorial, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This file is licensed under the Fossorial Commercial License.
|
||||
* You may not use this file except in compliance with the License.
|
||||
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
||||
*
|
||||
* This file is not licensed under the AGPLv3.
|
||||
*/
|
||||
|
||||
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
|
||||
import { gzip as gzipCallback } from "zlib";
|
||||
import { promisify } from "util";
|
||||
import { randomUUID } from "crypto";
|
||||
import logger from "@server/logger";
|
||||
import { LogEvent, S3Config, S3PayloadFormat } from "../types";
|
||||
import { LogDestinationProvider } from "./LogDestinationProvider";
|
||||
|
||||
const gzipAsync = promisify(gzipCallback);
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Constants
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/** Maximum time (ms) to wait for a single S3 PutObject response. */
|
||||
const REQUEST_TIMEOUT_MS = 60_000;
|
||||
|
||||
/** Default payload format when none is specified in the config. */
|
||||
const DEFAULT_FORMAT: S3PayloadFormat = "json_array";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// S3LogDestination
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Forwards a batch of log events to an S3-compatible object store by
|
||||
* uploading a single object per `send()` call.
|
||||
*
|
||||
* **Object key layout**
|
||||
* ```
|
||||
* {prefix}/{logType}/{YYYY}/{MM}/{DD}/{HH}-{mm}-{ss}-{uuid}.{ext}[.gz]
|
||||
* ```
|
||||
* - `prefix` – from `config.prefix` (default: empty – key starts at logType)
|
||||
* - `logType` – one of "request", "action", "access", "connection"
|
||||
* - Date components are derived from the upload time (UTC)
|
||||
* - `ext` – `json` | `ndjson` | `csv`
|
||||
* - `.gz` – appended when `config.gzip` is true
|
||||
*
|
||||
* **Payload formats** (controlled by `config.format`):
|
||||
* - `json_array` (default) – body is a JSON array of event objects.
|
||||
* - `ndjson` – one JSON object per line (newline-delimited).
|
||||
* - `csv` – RFC-4180 CSV with a header row; columns are the
|
||||
* union of all field names in the batch's event data.
|
||||
*
|
||||
* **Compression**: when `config.gzip` is `true` the body is gzip-compressed
|
||||
* before upload and `Content-Encoding: gzip` is set on the object.
|
||||
*
|
||||
* **Custom endpoint**: set `config.endpoint` to target any S3-compatible
|
||||
* storage service (e.g. MinIO, Cloudflare R2).
|
||||
*/
|
||||
export class S3LogDestination implements LogDestinationProvider {
|
||||
readonly type = "s3";
|
||||
|
||||
private readonly config: S3Config;
|
||||
|
||||
constructor(config: S3Config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// LogDestinationProvider implementation
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
async send(events: LogEvent[]): Promise<void> {
|
||||
if (events.length === 0) return;
|
||||
|
||||
const format = this.config.format ?? DEFAULT_FORMAT;
|
||||
const useGzip = this.config.gzip ?? false;
|
||||
const logType = events[0].logType;
|
||||
|
||||
const rawBody = this.serialize(events, format);
|
||||
const bodyBuffer = Buffer.from(rawBody, "utf-8");
|
||||
|
||||
let uploadBody: Buffer;
|
||||
let contentEncoding: string | undefined;
|
||||
|
||||
if (useGzip) {
|
||||
uploadBody = (await gzipAsync(bodyBuffer)) as Buffer;
|
||||
contentEncoding = "gzip";
|
||||
} else {
|
||||
uploadBody = bodyBuffer;
|
||||
}
|
||||
|
||||
const key = this.buildObjectKey(logType, format, useGzip);
|
||||
const contentType = this.contentType(format);
|
||||
|
||||
const clientConfig: ConstructorParameters<typeof S3Client>[0] = {
|
||||
region: this.config.region,
|
||||
credentials: {
|
||||
accessKeyId: this.config.accessKeyId,
|
||||
secretAccessKey: this.config.secretAccessKey
|
||||
},
|
||||
requestHandler: {
|
||||
requestTimeout: REQUEST_TIMEOUT_MS
|
||||
}
|
||||
};
|
||||
|
||||
if (this.config.endpoint?.trim()) {
|
||||
clientConfig.endpoint = this.config.endpoint.trim();
|
||||
}
|
||||
|
||||
const client = new S3Client(clientConfig);
|
||||
|
||||
try {
|
||||
await client.send(
|
||||
new PutObjectCommand({
|
||||
Bucket: this.config.bucket,
|
||||
Key: key,
|
||||
Body: uploadBody,
|
||||
ContentType: contentType,
|
||||
...(contentEncoding
|
||||
? { ContentEncoding: contentEncoding }
|
||||
: {})
|
||||
})
|
||||
);
|
||||
} catch (err: unknown) {
|
||||
const msg = err instanceof Error ? err.message : String(err);
|
||||
throw new Error(
|
||||
`S3LogDestination: failed to upload object "${key}" ` +
|
||||
`to bucket "${this.config.bucket}" – ${msg}`
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Internal helpers
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Construct a unique S3 object key for the given log type and format.
|
||||
* Keys are partitioned by logType and date so they can be queried or
|
||||
* lifecycle-managed independently.
|
||||
*/
|
||||
private buildObjectKey(
|
||||
logType: string,
|
||||
format: S3PayloadFormat,
|
||||
gzip: boolean
|
||||
): string {
|
||||
const now = new Date();
|
||||
const year = now.getUTCFullYear();
|
||||
const month = String(now.getUTCMonth() + 1).padStart(2, "0");
|
||||
const day = String(now.getUTCDate()).padStart(2, "0");
|
||||
const hh = String(now.getUTCHours()).padStart(2, "0");
|
||||
const mm = String(now.getUTCMinutes()).padStart(2, "0");
|
||||
const ss = String(now.getUTCSeconds()).padStart(2, "0");
|
||||
const uid = randomUUID();
|
||||
|
||||
const ext =
|
||||
format === "csv" ? "csv" : format === "ndjson" ? "ndjson" : "json";
|
||||
const fileName = `${hh}-${mm}-${ss}-${uid}.${ext}${gzip ? ".gz" : ""}`;
|
||||
|
||||
const rawPrefix = (this.config.prefix ?? "").trim().replace(/\/+$/, "");
|
||||
const parts = [
|
||||
rawPrefix,
|
||||
logType,
|
||||
`${year}/${month}/${day}`,
|
||||
fileName
|
||||
].filter((p) => p !== "");
|
||||
|
||||
return parts.join("/");
|
||||
}
|
||||
|
||||
private contentType(format: S3PayloadFormat): string {
|
||||
switch (format) {
|
||||
case "csv":
|
||||
return "text/csv; charset=utf-8";
|
||||
case "ndjson":
|
||||
return "application/x-ndjson";
|
||||
default:
|
||||
return "application/json";
|
||||
}
|
||||
}
|
||||
|
||||
private serialize(events: LogEvent[], format: S3PayloadFormat): string {
|
||||
switch (format) {
|
||||
case "json_array":
|
||||
return JSON.stringify(events.map(toPayload));
|
||||
case "ndjson":
|
||||
return events
|
||||
.map((e) => JSON.stringify(toPayload(e)))
|
||||
.join("\n");
|
||||
case "csv":
|
||||
return toCsv(events);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Payload helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function toPayload(event: LogEvent): unknown {
|
||||
return {
|
||||
event: event.logType,
|
||||
timestamp: new Date(event.timestamp * 1000).toISOString(),
|
||||
data: event.data
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a batch of events to RFC-4180 CSV.
|
||||
*
|
||||
* The column set is the union of `event`, `timestamp`, and all keys present in
|
||||
* `event.data` across the batch, preserving insertion order. Values that
|
||||
* contain commas, double-quotes, or newlines are quoted and escaped.
|
||||
*/
|
||||
function toCsv(events: LogEvent[]): string {
|
||||
if (events.length === 0) return "";
|
||||
|
||||
// Collect all unique data keys in stable order
|
||||
const keySet = new LinkedSet<string>();
|
||||
keySet.add("event");
|
||||
keySet.add("timestamp");
|
||||
for (const e of events) {
|
||||
for (const k of Object.keys(e.data)) {
|
||||
keySet.add(k);
|
||||
}
|
||||
}
|
||||
const headers = keySet.toArray();
|
||||
|
||||
const rows: string[] = [headers.map(csvEscape).join(",")];
|
||||
|
||||
for (const e of events) {
|
||||
const flat: Record<string, unknown> = {
|
||||
event: e.logType,
|
||||
timestamp: new Date(e.timestamp * 1000).toISOString(),
|
||||
...e.data
|
||||
};
|
||||
rows.push(
|
||||
headers.map((h) => csvEscape(flattenValue(flat[h]))).join(",")
|
||||
);
|
||||
}
|
||||
|
||||
return rows.join("\n");
|
||||
}
|
||||
|
||||
/** Flatten a value to a plain string suitable for a CSV cell. */
|
||||
function flattenValue(value: unknown): string {
|
||||
if (value === null || value === undefined) return "";
|
||||
if (typeof value === "object") return JSON.stringify(value);
|
||||
return String(value);
|
||||
}
|
||||
|
||||
/** RFC-4180 CSV escaping. */
|
||||
function csvEscape(value: string): string {
|
||||
if (/[",\n\r]/.test(value)) {
|
||||
return `"${value.replace(/"/g, '""')}"`;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Minimal ordered set (preserves insertion order, deduplicates)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
class LinkedSet<T> {
|
||||
private readonly map = new Map<T, true>();
|
||||
|
||||
add(value: T): void {
|
||||
this.map.set(value, true);
|
||||
}
|
||||
|
||||
toArray(): T[] {
|
||||
return Array.from(this.map.keys());
|
||||
}
|
||||
}
|
||||
@@ -107,6 +107,40 @@ export interface HttpConfig {
|
||||
bodyTemplate?: string;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// S3 destination configuration
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Controls how the batch of events is serialised into each S3 object.
|
||||
*
|
||||
* - `json_array` – `[{…}, {…}]` – default; each object is a JSON array.
|
||||
* - `ndjson` – `{…}\n{…}` – newline-delimited JSON, one object per line.
|
||||
* - `csv` – RFC-4180 CSV with a header row derived from the event fields.
|
||||
*/
|
||||
export type S3PayloadFormat = "json_array" | "ndjson" | "csv";
|
||||
|
||||
export interface S3Config {
|
||||
/** Human-readable label for the destination */
|
||||
name: string;
|
||||
/** AWS Access Key ID */
|
||||
accessKeyId: string;
|
||||
/** AWS Secret Access Key */
|
||||
secretAccessKey: string;
|
||||
/** AWS region (e.g. "us-east-1") */
|
||||
region: string;
|
||||
/** Target S3 bucket name */
|
||||
bucket: string;
|
||||
/** Optional key prefix – appended before the auto-generated path */
|
||||
prefix?: string;
|
||||
/** Override the S3 endpoint for S3-compatible storage (e.g. MinIO, R2) */
|
||||
endpoint?: string;
|
||||
/** How events are serialised into each object. Defaults to "json_array". */
|
||||
format: S3PayloadFormat;
|
||||
/** Whether to gzip-compress the object before upload. */
|
||||
gzip: boolean;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Per-destination per-log-type cursor (reflects the DB table)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -51,6 +51,8 @@ export type ListEventStreamingDestinationsResponse = {
|
||||
type: string;
|
||||
config: string;
|
||||
enabled: boolean;
|
||||
lastError: string | null;
|
||||
lastErrorAt: number | null;
|
||||
createdAt: number;
|
||||
updatedAt: number;
|
||||
sendConnectionLogs: boolean;
|
||||
@@ -79,7 +81,8 @@ async function query(orgId: string, limit: number, offset: number) {
|
||||
registry.registerPath({
|
||||
method: "get",
|
||||
path: "/org/{orgId}/event-streaming-destination",
|
||||
description: "List all event streaming destinations for a specific organization.",
|
||||
description:
|
||||
"List all event streaming destinations for a specific organization.",
|
||||
tags: [OpenAPITags.Org],
|
||||
request: {
|
||||
query: querySchema,
|
||||
|
||||
Reference in New Issue
Block a user