Implement s3 streaming destination

This commit is contained in:
Owen
2026-05-07 21:09:21 -07:00
parent 10f95896aa
commit cbdc74768f
6 changed files with 895 additions and 37 deletions

View File

@@ -30,10 +30,12 @@ import {
LOG_TYPES,
LogEvent,
DestinationFailureState,
HttpConfig
HttpConfig,
S3Config
} from "./types";
import { LogDestinationProvider } from "./providers/LogDestinationProvider";
import { HttpLogDestination } from "./providers/HttpLogDestination";
import { S3LogDestination } from "./providers/S3LogDestination";
import type { EventStreamingDestination } from "@server/db";
// ---------------------------------------------------------------------------
@@ -72,11 +74,11 @@ const MAX_CATCHUP_BATCHES = 20;
* After the last entry the max value is re-used.
*/
const BACKOFF_SCHEDULE_MS = [
60_000, // 1 min (failure 1)
2 * 60_000, // 2 min (failure 2)
5 * 60_000, // 5 min (failure 3)
10 * 60_000, // 10 min (failure 4)
30 * 60_000 // 30 min (failure 5+)
60_000, // 1 min (failure 1)
2 * 60_000, // 2 min (failure 2)
5 * 60_000, // 5 min (failure 3)
10 * 60_000, // 10 min (failure 4)
30 * 60_000 // 30 min (failure 5+)
];
/**
@@ -204,7 +206,10 @@ export class LogStreamingManager {
this.pollTimer = null;
this.runPoll()
.catch((err) =>
logger.error("LogStreamingManager: unexpected poll error", err)
logger.error(
"LogStreamingManager: unexpected poll error",
err
)
)
.finally(() => {
if (this.isRunning) {
@@ -275,10 +280,13 @@ export class LogStreamingManager {
}
// Decrypt and parse config skip destination if either step fails
let configFromDb: HttpConfig;
let configFromDb: unknown;
try {
const decryptedConfig = decrypt(dest.config, config.getRawConfig().server.secret!);
configFromDb = JSON.parse(decryptedConfig) as HttpConfig;
const decryptedConfig = decrypt(
dest.config,
config.getRawConfig().server.secret!
);
configFromDb = JSON.parse(decryptedConfig);
} catch (err) {
logger.error(
`LogStreamingManager: destination ${dest.destinationId} has invalid or undecryptable config`,
@@ -362,7 +370,10 @@ export class LogStreamingManager {
.from(eventStreamingCursors)
.where(
and(
eq(eventStreamingCursors.destinationId, dest.destinationId),
eq(
eventStreamingCursors.destinationId,
dest.destinationId
),
eq(eventStreamingCursors.logType, logType)
)
)
@@ -431,9 +442,7 @@ export class LogStreamingManager {
if (rows.length === 0) break;
const events = rows.map((row) =>
this.rowToLogEvent(logType, row)
);
const events = rows.map((row) => this.rowToLogEvent(logType, row));
// Throws on failure caught by the caller which applies back-off
await provider.send(events);
@@ -677,8 +686,7 @@ export class LogStreamingManager {
break;
}
const orgId =
typeof row.orgId === "string" ? row.orgId : "";
const orgId = typeof row.orgId === "string" ? row.orgId : "";
return {
id: row.id,
@@ -708,6 +716,8 @@ export class LogStreamingManager {
switch (type) {
case "http":
return new HttpLogDestination(config as HttpConfig);
case "s3":
return new S3LogDestination(config as S3Config);
// Future providers:
// case "datadog": return new DatadogLogDestination(config as DatadogConfig);
default:

View File

@@ -0,0 +1,279 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { gzip as gzipCallback } from "zlib";
import { promisify } from "util";
import { randomUUID } from "crypto";
import logger from "@server/logger";
import { LogEvent, S3Config, S3PayloadFormat } from "../types";
import { LogDestinationProvider } from "./LogDestinationProvider";
const gzipAsync = promisify(gzipCallback);
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
/** Maximum time (ms) to wait for a single S3 PutObject response. */
const REQUEST_TIMEOUT_MS = 60_000;
/** Default payload format when none is specified in the config. */
const DEFAULT_FORMAT: S3PayloadFormat = "json_array";
// ---------------------------------------------------------------------------
// S3LogDestination
// ---------------------------------------------------------------------------
/**
* Forwards a batch of log events to an S3-compatible object store by
* uploading a single object per `send()` call.
*
* **Object key layout**
* ```
* {prefix}/{logType}/{YYYY}/{MM}/{DD}/{HH}-{mm}-{ss}-{uuid}.{ext}[.gz]
* ```
* - `prefix` from `config.prefix` (default: empty key starts at logType)
* - `logType` one of "request", "action", "access", "connection"
* - Date components are derived from the upload time (UTC)
* - `ext` `json` | `ndjson` | `csv`
* - `.gz` appended when `config.gzip` is true
*
* **Payload formats** (controlled by `config.format`):
* - `json_array` (default) body is a JSON array of event objects.
* - `ndjson` one JSON object per line (newline-delimited).
* - `csv` RFC-4180 CSV with a header row; columns are the
* union of all field names in the batch's event data.
*
* **Compression**: when `config.gzip` is `true` the body is gzip-compressed
* before upload and `Content-Encoding: gzip` is set on the object.
*
* **Custom endpoint**: set `config.endpoint` to target any S3-compatible
* storage service (e.g. MinIO, Cloudflare R2).
*/
export class S3LogDestination implements LogDestinationProvider {
readonly type = "s3";
private readonly config: S3Config;
constructor(config: S3Config) {
this.config = config;
}
// -----------------------------------------------------------------------
// LogDestinationProvider implementation
// -----------------------------------------------------------------------
async send(events: LogEvent[]): Promise<void> {
if (events.length === 0) return;
const format = this.config.format ?? DEFAULT_FORMAT;
const useGzip = this.config.gzip ?? false;
const logType = events[0].logType;
const rawBody = this.serialize(events, format);
const bodyBuffer = Buffer.from(rawBody, "utf-8");
let uploadBody: Buffer;
let contentEncoding: string | undefined;
if (useGzip) {
uploadBody = (await gzipAsync(bodyBuffer)) as Buffer;
contentEncoding = "gzip";
} else {
uploadBody = bodyBuffer;
}
const key = this.buildObjectKey(logType, format, useGzip);
const contentType = this.contentType(format);
const clientConfig: ConstructorParameters<typeof S3Client>[0] = {
region: this.config.region,
credentials: {
accessKeyId: this.config.accessKeyId,
secretAccessKey: this.config.secretAccessKey
},
requestHandler: {
requestTimeout: REQUEST_TIMEOUT_MS
}
};
if (this.config.endpoint?.trim()) {
clientConfig.endpoint = this.config.endpoint.trim();
}
const client = new S3Client(clientConfig);
try {
await client.send(
new PutObjectCommand({
Bucket: this.config.bucket,
Key: key,
Body: uploadBody,
ContentType: contentType,
...(contentEncoding
? { ContentEncoding: contentEncoding }
: {})
})
);
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err);
throw new Error(
`S3LogDestination: failed to upload object "${key}" ` +
`to bucket "${this.config.bucket}" ${msg}`
);
}
}
// -----------------------------------------------------------------------
// Internal helpers
// -----------------------------------------------------------------------
/**
* Construct a unique S3 object key for the given log type and format.
* Keys are partitioned by logType and date so they can be queried or
* lifecycle-managed independently.
*/
private buildObjectKey(
logType: string,
format: S3PayloadFormat,
gzip: boolean
): string {
const now = new Date();
const year = now.getUTCFullYear();
const month = String(now.getUTCMonth() + 1).padStart(2, "0");
const day = String(now.getUTCDate()).padStart(2, "0");
const hh = String(now.getUTCHours()).padStart(2, "0");
const mm = String(now.getUTCMinutes()).padStart(2, "0");
const ss = String(now.getUTCSeconds()).padStart(2, "0");
const uid = randomUUID();
const ext =
format === "csv" ? "csv" : format === "ndjson" ? "ndjson" : "json";
const fileName = `${hh}-${mm}-${ss}-${uid}.${ext}${gzip ? ".gz" : ""}`;
const rawPrefix = (this.config.prefix ?? "").trim().replace(/\/+$/, "");
const parts = [
rawPrefix,
logType,
`${year}/${month}/${day}`,
fileName
].filter((p) => p !== "");
return parts.join("/");
}
private contentType(format: S3PayloadFormat): string {
switch (format) {
case "csv":
return "text/csv; charset=utf-8";
case "ndjson":
return "application/x-ndjson";
default:
return "application/json";
}
}
private serialize(events: LogEvent[], format: S3PayloadFormat): string {
switch (format) {
case "json_array":
return JSON.stringify(events.map(toPayload));
case "ndjson":
return events
.map((e) => JSON.stringify(toPayload(e)))
.join("\n");
case "csv":
return toCsv(events);
}
}
}
// ---------------------------------------------------------------------------
// Payload helpers
// ---------------------------------------------------------------------------
function toPayload(event: LogEvent): unknown {
return {
event: event.logType,
timestamp: new Date(event.timestamp * 1000).toISOString(),
data: event.data
};
}
/**
* Convert a batch of events to RFC-4180 CSV.
*
* The column set is the union of `event`, `timestamp`, and all keys present in
* `event.data` across the batch, preserving insertion order. Values that
* contain commas, double-quotes, or newlines are quoted and escaped.
*/
function toCsv(events: LogEvent[]): string {
if (events.length === 0) return "";
// Collect all unique data keys in stable order
const keySet = new LinkedSet<string>();
keySet.add("event");
keySet.add("timestamp");
for (const e of events) {
for (const k of Object.keys(e.data)) {
keySet.add(k);
}
}
const headers = keySet.toArray();
const rows: string[] = [headers.map(csvEscape).join(",")];
for (const e of events) {
const flat: Record<string, unknown> = {
event: e.logType,
timestamp: new Date(e.timestamp * 1000).toISOString(),
...e.data
};
rows.push(
headers.map((h) => csvEscape(flattenValue(flat[h]))).join(",")
);
}
return rows.join("\n");
}
/** Flatten a value to a plain string suitable for a CSV cell. */
function flattenValue(value: unknown): string {
if (value === null || value === undefined) return "";
if (typeof value === "object") return JSON.stringify(value);
return String(value);
}
/** RFC-4180 CSV escaping. */
function csvEscape(value: string): string {
if (/[",\n\r]/.test(value)) {
return `"${value.replace(/"/g, '""')}"`;
}
return value;
}
// ---------------------------------------------------------------------------
// Minimal ordered set (preserves insertion order, deduplicates)
// ---------------------------------------------------------------------------
class LinkedSet<T> {
private readonly map = new Map<T, true>();
add(value: T): void {
this.map.set(value, true);
}
toArray(): T[] {
return Array.from(this.map.keys());
}
}

View File

@@ -107,6 +107,40 @@ export interface HttpConfig {
bodyTemplate?: string;
}
// ---------------------------------------------------------------------------
// S3 destination configuration
// ---------------------------------------------------------------------------
/**
* Controls how the batch of events is serialised into each S3 object.
*
* - `json_array` `[{…}, {…}]` default; each object is a JSON array.
* - `ndjson` `{…}\n{…}` newline-delimited JSON, one object per line.
* - `csv` RFC-4180 CSV with a header row derived from the event fields.
*/
export type S3PayloadFormat = "json_array" | "ndjson" | "csv";
export interface S3Config {
/** Human-readable label for the destination */
name: string;
/** AWS Access Key ID */
accessKeyId: string;
/** AWS Secret Access Key */
secretAccessKey: string;
/** AWS region (e.g. "us-east-1") */
region: string;
/** Target S3 bucket name */
bucket: string;
/** Optional key prefix appended before the auto-generated path */
prefix?: string;
/** Override the S3 endpoint for S3-compatible storage (e.g. MinIO, R2) */
endpoint?: string;
/** How events are serialised into each object. Defaults to "json_array". */
format: S3PayloadFormat;
/** Whether to gzip-compress the object before upload. */
gzip: boolean;
}
// ---------------------------------------------------------------------------
// Per-destination per-log-type cursor (reflects the DB table)
// ---------------------------------------------------------------------------