Merge branch 'dev' into feat/labels-on-sites-and-resources

This commit is contained in:
Fred KISSIE
2026-05-14 18:15:14 +02:00
62 changed files with 2319 additions and 311 deletions

View File

@@ -332,6 +332,7 @@ export const connectionAuditLog = pgTable(
clientId: integer("clientId").references(() => clients.clientId, {
onDelete: "cascade"
}),
clientEndpoint: text("clientEndpoint"),
userId: text("userId").references(() => users.userId, {
onDelete: "cascade"
}),
@@ -439,6 +440,8 @@ export const eventStreamingDestinations = pgTable(
type: varchar("type", { length: 50 }).notNull(), // e.g. "http", "kafka", etc.
config: text("config").notNull(), // JSON string with the configuration for the destination
enabled: boolean("enabled").notNull().default(true),
lastError: text("lastError"), // last send error message, null if healthy
lastErrorAt: bigint("lastErrorAt", { mode: "number" }), // epoch ms of last error, null if healthy
createdAt: bigint("createdAt", { mode: "number" }).notNull(),
updatedAt: bigint("updatedAt", { mode: "number" }).notNull()
}

View File

@@ -332,6 +332,7 @@ export const connectionAuditLog = sqliteTable(
clientId: integer("clientId").references(() => clients.clientId, {
onDelete: "cascade"
}),
clientEndpoint: text("clientEndpoint"),
userId: text("userId").references(() => users.userId, {
onDelete: "cascade"
}),
@@ -445,6 +446,8 @@ export const eventStreamingDestinations = sqliteTable(
enabled: integer("enabled", { mode: "boolean" })
.notNull()
.default(true),
lastError: text("lastError"), // last send error message, null if healthy
lastErrorAt: integer("lastErrorAt"), // epoch ms of last error, null if healthy
createdAt: integer("createdAt").notNull(),
updatedAt: integer("updatedAt").notNull()
}

View File

@@ -1227,7 +1227,11 @@ async function getDomainId(
return null;
}
const domainSelection = validDomains[0].domains;
// Pick the most specific (longest baseDomain) valid domain so that, e.g.,
// *.test.dev.example.com is assigned to *.dev.example.com rather than *.example.com.
const domainSelection = validDomains.sort(
(a, b) => b.domains.baseDomain.length - a.domains.baseDomain.length
)[0].domains;
const baseDomain = domainSelection.baseDomain;
// Wildcard full-domains are not allowed on namespace (provided/free) domains

View File

@@ -2,7 +2,7 @@ import path from "path";
import { fileURLToPath } from "url";
// This is a placeholder value replaced by the build process
export const APP_VERSION = "1.18.3";
export const APP_VERSION = "1.18.4";
export const __FILENAME = fileURLToPath(import.meta.url);
export const __DIRNAME = path.dirname(__FILENAME);

View File

@@ -20,9 +20,7 @@ import {
} from "@server/db";
import { and, eq, inArray, ne } from "drizzle-orm";
import {
deletePeer as newtDeletePeer
} from "@server/routers/newt/peers";
import { deletePeer as newtDeletePeer } from "@server/routers/newt/peers";
import {
initPeerAddHandshake,
deletePeer as olmDeletePeer
@@ -33,7 +31,7 @@ import {
generateAliasConfig,
generateRemoteSubnets,
generateSubnetProxyTargetV2,
parseEndpoint,
parseEndpoint
} from "@server/lib/ip";
import {
addPeerData,
@@ -51,10 +49,7 @@ export async function getClientSiteResourceAccess(
? await trx
.select()
.from(sites)
.innerJoin(
siteNetworks,
eq(siteNetworks.siteId, sites.siteId)
)
.innerJoin(siteNetworks, eq(siteNetworks.siteId, sites.siteId))
.where(eq(siteNetworks.networkId, siteResource.networkId))
.then((rows) => rows.map((row) => row.sites))
: [];
@@ -362,7 +357,8 @@ export async function rebuildClientAssociationsFromSiteResource(
.where(inArray(clients.clientId, existingClientSiteIds))
: [];
const otherResourceClientIds = clientsFromOtherResourcesBySite.get(siteId) ?? new Set<number>();
const otherResourceClientIds =
clientsFromOtherResourcesBySite.get(siteId) ?? new Set<number>();
logger.debug(
`rebuildClientAssociations: [rebuildClientAssociationsFromSiteResource] siteId=${siteId} otherResourceClientIds=[${[...otherResourceClientIds].join(", ")}] mergedAllClientIds=[${mergedAllClientIds.join(", ")}]`
@@ -709,7 +705,7 @@ export async function updateClientSiteDestinations(
sourcePort: destination.sourcePort,
destinations: destination.destinations
};
logger.info(
logger.debug(
`Payload for update-destinations: ${JSON.stringify(payload, null, 2)}`
);

View File

@@ -97,6 +97,13 @@ export class PrivateConfig {
);
}
process.env.BRANDING_HIDE_POWERED_BY =
this.rawPrivateConfig.branding?.hide_powered_by === true ||
this.rawPrivateConfig.branding?.resource_auth_page
?.hide_powered_by === true
? "true"
: "false";
process.env.LOGIN_PAGE_SUBTITLE_TEXT =
this.rawPrivateConfig.branding?.login_page?.subtitle_text || "";

View File

@@ -46,6 +46,7 @@ export interface ConnectionLogRecord {
orgId: string;
siteId: number;
clientId: number | null;
clientEndpoint: string | null;
userId: string | null;
sourceAddr: string;
destAddr: string;

View File

@@ -30,10 +30,12 @@ import {
LOG_TYPES,
LogEvent,
DestinationFailureState,
HttpConfig
HttpConfig,
S3Config
} from "./types";
import { LogDestinationProvider } from "./providers/LogDestinationProvider";
import { HttpLogDestination } from "./providers/HttpLogDestination";
import { S3LogDestination } from "./providers/S3LogDestination";
import type { EventStreamingDestination } from "@server/db";
// ---------------------------------------------------------------------------
@@ -72,11 +74,11 @@ const MAX_CATCHUP_BATCHES = 20;
* After the last entry the max value is re-used.
*/
const BACKOFF_SCHEDULE_MS = [
60_000, // 1 min (failure 1)
2 * 60_000, // 2 min (failure 2)
5 * 60_000, // 5 min (failure 3)
10 * 60_000, // 10 min (failure 4)
30 * 60_000 // 30 min (failure 5+)
60_000, // 1 min (failure 1)
2 * 60_000, // 2 min (failure 2)
5 * 60_000, // 5 min (failure 3)
10 * 60_000, // 10 min (failure 4)
30 * 60_000 // 30 min (failure 5+)
];
/**
@@ -204,7 +206,10 @@ export class LogStreamingManager {
this.pollTimer = null;
this.runPoll()
.catch((err) =>
logger.error("LogStreamingManager: unexpected poll error", err)
logger.error(
"LogStreamingManager: unexpected poll error",
err
)
)
.finally(() => {
if (this.isRunning) {
@@ -275,10 +280,13 @@ export class LogStreamingManager {
}
// Decrypt and parse config skip destination if either step fails
let configFromDb: HttpConfig;
let configFromDb: unknown;
try {
const decryptedConfig = decrypt(dest.config, config.getRawConfig().server.secret!);
configFromDb = JSON.parse(decryptedConfig) as HttpConfig;
const decryptedConfig = decrypt(
dest.config,
config.getRawConfig().server.secret!
);
configFromDb = JSON.parse(decryptedConfig);
} catch (err) {
logger.error(
`LogStreamingManager: destination ${dest.destinationId} has invalid or undecryptable config`,
@@ -305,6 +313,7 @@ export class LogStreamingManager {
if (enabledTypes.length === 0) return;
let anyFailure = false;
let firstError: string | null = null;
for (const logType of enabledTypes) {
if (!this.isRunning) break;
@@ -312,6 +321,10 @@ export class LogStreamingManager {
await this.processLogType(dest, provider, logType);
} catch (err) {
anyFailure = true;
if (firstError === null) {
firstError =
err instanceof Error ? err.message : String(err);
}
logger.error(
`LogStreamingManager: failed to process "${logType}" logs ` +
`for destination ${dest.destinationId}`,
@@ -322,6 +335,10 @@ export class LogStreamingManager {
if (anyFailure) {
this.recordFailure(dest.destinationId);
await this.setDestinationError(
dest.destinationId,
firstError ?? "Unknown error"
);
} else {
// Any success resets the failure/back-off state
if (this.failures.has(dest.destinationId)) {
@@ -330,6 +347,7 @@ export class LogStreamingManager {
`LogStreamingManager: destination ${dest.destinationId} recovered`
);
}
await this.clearDestinationError(dest.destinationId);
}
}
@@ -362,7 +380,10 @@ export class LogStreamingManager {
.from(eventStreamingCursors)
.where(
and(
eq(eventStreamingCursors.destinationId, dest.destinationId),
eq(
eventStreamingCursors.destinationId,
dest.destinationId
),
eq(eventStreamingCursors.logType, logType)
)
)
@@ -431,9 +452,7 @@ export class LogStreamingManager {
if (rows.length === 0) break;
const events = rows.map((row) =>
this.rowToLogEvent(logType, row)
);
const events = rows.map((row) => this.rowToLogEvent(logType, row));
// Throws on failure caught by the caller which applies back-off
await provider.send(events);
@@ -677,8 +696,7 @@ export class LogStreamingManager {
break;
}
const orgId =
typeof row.orgId === "string" ? row.orgId : "";
const orgId = typeof row.orgId === "string" ? row.orgId : "";
return {
id: row.id,
@@ -708,6 +726,8 @@ export class LogStreamingManager {
switch (type) {
case "http":
return new HttpLogDestination(config as HttpConfig);
case "s3":
return new S3LogDestination(config as S3Config);
// Future providers:
// case "datadog": return new DatadogLogDestination(config as DatadogConfig);
default:
@@ -749,6 +769,45 @@ export class LogStreamingManager {
// DB helpers
// -------------------------------------------------------------------------
private async setDestinationError(
destinationId: number,
errorMessage: string
): Promise<void> {
// Truncate to 1000 chars so it fits comfortably in the text column.
const truncated = errorMessage.slice(0, 1000);
try {
await db
.update(eventStreamingDestinations)
.set({ lastError: truncated, lastErrorAt: Date.now() })
.where(
eq(eventStreamingDestinations.destinationId, destinationId)
);
} catch (err) {
logger.warn(
`LogStreamingManager: could not persist error status for destination ${destinationId}`,
err
);
}
}
private async clearDestinationError(destinationId: number): Promise<void> {
try {
// Only update if there is actually an error stored, to avoid
// unnecessary writes on every successful poll cycle.
await db
.update(eventStreamingDestinations)
.set({ lastError: null, lastErrorAt: null })
.where(
eq(eventStreamingDestinations.destinationId, destinationId)
);
} catch (err) {
logger.warn(
`LogStreamingManager: could not clear error status for destination ${destinationId}`,
err
);
}
}
private async loadEnabledDestinations(): Promise<
EventStreamingDestination[]
> {

View File

@@ -0,0 +1,279 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025-2026 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { S3Client, PutObjectCommand } from "@aws-sdk/client-s3";
import { gzip as gzipCallback } from "zlib";
import { promisify } from "util";
import { randomUUID } from "crypto";
import logger from "@server/logger";
import { LogEvent, S3Config, S3PayloadFormat } from "../types";
import { LogDestinationProvider } from "./LogDestinationProvider";
const gzipAsync = promisify(gzipCallback);
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
/** Maximum time (ms) to wait for a single S3 PutObject response. */
const REQUEST_TIMEOUT_MS = 60_000;
/** Default payload format when none is specified in the config. */
const DEFAULT_FORMAT: S3PayloadFormat = "json_array";
// ---------------------------------------------------------------------------
// S3LogDestination
// ---------------------------------------------------------------------------
/**
* Forwards a batch of log events to an S3-compatible object store by
* uploading a single object per `send()` call.
*
* **Object key layout**
* ```
* {prefix}/{logType}/{YYYY}/{MM}/{DD}/{HH}-{mm}-{ss}-{uuid}.{ext}[.gz]
* ```
* - `prefix` from `config.prefix` (default: empty key starts at logType)
* - `logType` one of "request", "action", "access", "connection"
* - Date components are derived from the upload time (UTC)
* - `ext` `json` | `ndjson` | `csv`
* - `.gz` appended when `config.gzip` is true
*
* **Payload formats** (controlled by `config.format`):
* - `json_array` (default) body is a JSON array of event objects.
* - `ndjson` one JSON object per line (newline-delimited).
* - `csv` RFC-4180 CSV with a header row; columns are the
* union of all field names in the batch's event data.
*
* **Compression**: when `config.gzip` is `true` the body is gzip-compressed
* before upload and `Content-Encoding: gzip` is set on the object.
*
* **Custom endpoint**: set `config.endpoint` to target any S3-compatible
* storage service (e.g. MinIO, Cloudflare R2).
*/
export class S3LogDestination implements LogDestinationProvider {
readonly type = "s3";
private readonly config: S3Config;
constructor(config: S3Config) {
this.config = config;
}
// -----------------------------------------------------------------------
// LogDestinationProvider implementation
// -----------------------------------------------------------------------
async send(events: LogEvent[]): Promise<void> {
if (events.length === 0) return;
const format = this.config.format ?? DEFAULT_FORMAT;
const useGzip = this.config.gzip ?? false;
const logType = events[0].logType;
const rawBody = this.serialize(events, format);
const bodyBuffer = Buffer.from(rawBody, "utf-8");
let uploadBody: Buffer;
let contentEncoding: string | undefined;
if (useGzip) {
uploadBody = (await gzipAsync(bodyBuffer)) as Buffer;
contentEncoding = "gzip";
} else {
uploadBody = bodyBuffer;
}
const key = this.buildObjectKey(logType, format, useGzip);
const contentType = this.contentType(format);
const clientConfig: ConstructorParameters<typeof S3Client>[0] = {
region: this.config.region,
credentials: {
accessKeyId: this.config.accessKeyId,
secretAccessKey: this.config.secretAccessKey
},
requestHandler: {
requestTimeout: REQUEST_TIMEOUT_MS
}
};
if (this.config.endpoint?.trim()) {
clientConfig.endpoint = this.config.endpoint.trim();
}
const client = new S3Client(clientConfig);
try {
await client.send(
new PutObjectCommand({
Bucket: this.config.bucket,
Key: key,
Body: uploadBody,
ContentType: contentType,
...(contentEncoding
? { ContentEncoding: contentEncoding }
: {})
})
);
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err);
throw new Error(
`S3LogDestination: failed to upload object "${key}" ` +
`to bucket "${this.config.bucket}" ${msg}`
);
}
}
// -----------------------------------------------------------------------
// Internal helpers
// -----------------------------------------------------------------------
/**
* Construct a unique S3 object key for the given log type and format.
* Keys are partitioned by logType and date so they can be queried or
* lifecycle-managed independently.
*/
private buildObjectKey(
logType: string,
format: S3PayloadFormat,
gzip: boolean
): string {
const now = new Date();
const year = now.getUTCFullYear();
const month = String(now.getUTCMonth() + 1).padStart(2, "0");
const day = String(now.getUTCDate()).padStart(2, "0");
const hh = String(now.getUTCHours()).padStart(2, "0");
const mm = String(now.getUTCMinutes()).padStart(2, "0");
const ss = String(now.getUTCSeconds()).padStart(2, "0");
const uid = randomUUID();
const ext =
format === "csv" ? "csv" : format === "ndjson" ? "ndjson" : "json";
const fileName = `${hh}-${mm}-${ss}-${uid}.${ext}${gzip ? ".gz" : ""}`;
const rawPrefix = (this.config.prefix ?? "").trim().replace(/\/+$/, "");
const parts = [
rawPrefix,
logType,
`${year}/${month}/${day}`,
fileName
].filter((p) => p !== "");
return parts.join("/");
}
private contentType(format: S3PayloadFormat): string {
switch (format) {
case "csv":
return "text/csv; charset=utf-8";
case "ndjson":
return "application/x-ndjson";
default:
return "application/json";
}
}
private serialize(events: LogEvent[], format: S3PayloadFormat): string {
switch (format) {
case "json_array":
return JSON.stringify(events.map(toPayload));
case "ndjson":
return events
.map((e) => JSON.stringify(toPayload(e)))
.join("\n");
case "csv":
return toCsv(events);
}
}
}
// ---------------------------------------------------------------------------
// Payload helpers
// ---------------------------------------------------------------------------
function toPayload(event: LogEvent): unknown {
return {
event: event.logType,
timestamp: new Date(event.timestamp * 1000).toISOString(),
data: event.data
};
}
/**
* Convert a batch of events to RFC-4180 CSV.
*
* The column set is the union of `event`, `timestamp`, and all keys present in
* `event.data` across the batch, preserving insertion order. Values that
* contain commas, double-quotes, or newlines are quoted and escaped.
*/
function toCsv(events: LogEvent[]): string {
if (events.length === 0) return "";
// Collect all unique data keys in stable order
const keySet = new LinkedSet<string>();
keySet.add("event");
keySet.add("timestamp");
for (const e of events) {
for (const k of Object.keys(e.data)) {
keySet.add(k);
}
}
const headers = keySet.toArray();
const rows: string[] = [headers.map(csvEscape).join(",")];
for (const e of events) {
const flat: Record<string, unknown> = {
event: e.logType,
timestamp: new Date(e.timestamp * 1000).toISOString(),
...e.data
};
rows.push(
headers.map((h) => csvEscape(flattenValue(flat[h]))).join(",")
);
}
return rows.join("\n");
}
/** Flatten a value to a plain string suitable for a CSV cell. */
function flattenValue(value: unknown): string {
if (value === null || value === undefined) return "";
if (typeof value === "object") return JSON.stringify(value);
return String(value);
}
/** RFC-4180 CSV escaping. */
function csvEscape(value: string): string {
if (/[",\n\r]/.test(value)) {
return `"${value.replace(/"/g, '""')}"`;
}
return value;
}
// ---------------------------------------------------------------------------
// Minimal ordered set (preserves insertion order, deduplicates)
// ---------------------------------------------------------------------------
class LinkedSet<T> {
private readonly map = new Map<T, true>();
add(value: T): void {
this.map.set(value, true);
}
toArray(): T[] {
return Array.from(this.map.keys());
}
}

View File

@@ -107,6 +107,40 @@ export interface HttpConfig {
bodyTemplate?: string;
}
// ---------------------------------------------------------------------------
// S3 destination configuration
// ---------------------------------------------------------------------------
/**
* Controls how the batch of events is serialised into each S3 object.
*
* - `json_array` `[{…}, {…}]` default; each object is a JSON array.
* - `ndjson` `{…}\n{…}` newline-delimited JSON, one object per line.
* - `csv` RFC-4180 CSV with a header row derived from the event fields.
*/
export type S3PayloadFormat = "json_array" | "ndjson" | "csv";
export interface S3Config {
/** Human-readable label for the destination */
name: string;
/** AWS Access Key ID */
accessKeyId: string;
/** AWS Secret Access Key */
secretAccessKey: string;
/** AWS region (e.g. "us-east-1") */
region: string;
/** Target S3 bucket name */
bucket: string;
/** Optional key prefix appended before the auto-generated path */
prefix?: string;
/** Override the S3 endpoint for S3-compatible storage (e.g. MinIO, R2) */
endpoint?: string;
/** How events are serialised into each object. Defaults to "json_array". */
format: S3PayloadFormat;
/** Whether to gzip-compress the object before upload. */
gzip: boolean;
}
// ---------------------------------------------------------------------------
// Per-destination per-log-type cursor (reflects the DB table)
// ---------------------------------------------------------------------------

View File

@@ -141,6 +141,7 @@ export const privateConfigSchema = z
)
.optional(),
hide_auth_layout_footer: z.boolean().optional().default(false),
hide_powered_by: z.boolean().optional(),
login_page: z
.object({
subtitle_text: z.string().optional()

View File

@@ -124,15 +124,11 @@ function getWhere(data: Q) {
data.clientId
? eq(connectionAuditLog.clientId, data.clientId)
: undefined,
data.siteId
? eq(connectionAuditLog.siteId, data.siteId)
: undefined,
data.siteId ? eq(connectionAuditLog.siteId, data.siteId) : undefined,
data.siteResourceId
? eq(connectionAuditLog.siteResourceId, data.siteResourceId)
: undefined,
data.userId
? eq(connectionAuditLog.userId, data.userId)
: undefined
data.userId ? eq(connectionAuditLog.userId, data.userId) : undefined
);
}
@@ -144,6 +140,7 @@ export function queryConnection(data: Q) {
orgId: connectionAuditLog.orgId,
siteId: connectionAuditLog.siteId,
clientId: connectionAuditLog.clientId,
clientEndpoint: connectionAuditLog.clientEndpoint,
userId: connectionAuditLog.userId,
sourceAddr: connectionAuditLog.sourceAddr,
destAddr: connectionAuditLog.destAddr,
@@ -203,10 +200,7 @@ async function enrichWithDetails(
];
// Fetch resource details from main database
const resourceMap = new Map<
number,
{ name: string; niceId: string }
>();
const resourceMap = new Map<number, { name: string; niceId: string }>();
if (siteResourceIds.length > 0) {
const resourceDetails = await primaryDb
.select({
@@ -268,10 +262,7 @@ async function enrichWithDetails(
}
// Fetch user details from main database
const userMap = new Map<
string,
{ email: string | null }
>();
const userMap = new Map<string, { email: string | null }>();
if (userIds.length > 0) {
const userDetails = await primaryDb
.select({
@@ -290,29 +281,25 @@ async function enrichWithDetails(
return logs.map((log) => ({
...log,
resourceName: log.siteResourceId
? resourceMap.get(log.siteResourceId)?.name ?? null
? (resourceMap.get(log.siteResourceId)?.name ?? null)
: null,
resourceNiceId: log.siteResourceId
? resourceMap.get(log.siteResourceId)?.niceId ?? null
: null,
siteName: log.siteId
? siteMap.get(log.siteId)?.name ?? null
? (resourceMap.get(log.siteResourceId)?.niceId ?? null)
: null,
siteName: log.siteId ? (siteMap.get(log.siteId)?.name ?? null) : null,
siteNiceId: log.siteId
? siteMap.get(log.siteId)?.niceId ?? null
? (siteMap.get(log.siteId)?.niceId ?? null)
: null,
clientName: log.clientId
? clientMap.get(log.clientId)?.name ?? null
? (clientMap.get(log.clientId)?.name ?? null)
: null,
clientNiceId: log.clientId
? clientMap.get(log.clientId)?.niceId ?? null
? (clientMap.get(log.clientId)?.niceId ?? null)
: null,
clientType: log.clientId
? clientMap.get(log.clientId)?.type ?? null
? (clientMap.get(log.clientId)?.type ?? null)
: null,
userEmail: log.userId
? userMap.get(log.userId)?.email ?? null
: null
userEmail: log.userId ? (userMap.get(log.userId)?.email ?? null) : null
}));
}
@@ -521,4 +508,4 @@ export async function queryConnectionAuditLogs(
createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred")
);
}
}
}

View File

@@ -51,6 +51,8 @@ export type ListEventStreamingDestinationsResponse = {
type: string;
config: string;
enabled: boolean;
lastError: string | null;
lastErrorAt: number | null;
createdAt: number;
updatedAt: number;
sendConnectionLogs: boolean;
@@ -79,7 +81,8 @@ async function query(orgId: string, limit: number, offset: number) {
registry.registerPath({
method: "get",
path: "/org/{orgId}/event-streaming-destination",
description: "List all event streaming destinations for a specific organization.",
description:
"List all event streaming destinations for a specific organization.",
tags: [OpenAPITags.Org],
request: {
query: querySchema,

View File

@@ -11,7 +11,7 @@
* This file is not licensed under the AGPLv3.
*/
import { db } from "@server/db";
import { clientSitesAssociationsCache, db } from "@server/db";
import { MessageHandler } from "@server/routers/ws";
import { sites, Newt, clients, orgs } from "@server/db";
import { and, eq, inArray } from "drizzle-orm";
@@ -146,7 +146,11 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
// each unique sourceAddr + the org's CIDR suffix and do a targeted IN query.
const ipToClient = new Map<
string,
{ clientId: number; userId: string | null }
{
clientId: number;
userId: string | null;
clientEndpoint: string | null;
}
>();
if (cidrSuffix) {
@@ -172,9 +176,21 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
.select({
clientId: clients.clientId,
userId: clients.userId,
subnet: clients.subnet
subnet: clients.subnet,
clientEndpoint: clientSitesAssociationsCache.endpoint
})
.from(clients)
.leftJoin(
// this should be one to one
clientSitesAssociationsCache,
and(
eq(
clients.clientId,
clientSitesAssociationsCache.clientId
),
eq(clientSitesAssociationsCache.siteId, newt.siteId)
)
)
.where(
and(
eq(clients.orgId, orgId),
@@ -189,7 +205,8 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
);
ipToClient.set(ip, {
clientId: c.clientId,
userId: c.userId
userId: c.userId,
clientEndpoint: c.clientEndpoint
});
}
}
@@ -234,6 +251,7 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
orgId,
siteId: newt.siteId,
clientId: clientInfo?.clientId ?? null,
clientEndpoint: clientInfo?.clientEndpoint ?? null,
userId: clientInfo?.userId ?? null,
sourceAddr: session.sourceAddr,
destAddr: session.destAddr,

View File

@@ -98,15 +98,6 @@ export async function addUserRole(
);
}
if (existingUser[0].isOwner) {
return next(
createHttpError(
HttpCode.FORBIDDEN,
"Cannot change the role of the owner of the organization"
)
);
}
const roleExists = await db
.select()
.from(roles)

View File

@@ -98,11 +98,11 @@ export async function removeUserRole(
);
}
if (existingUser.isOwner) {
if (existingUser.isOwner && role.isAdmin === true) {
return next(
createHttpError(
HttpCode.FORBIDDEN,
"Cannot change the roles of the owner of the organization"
"Cannot remove the administrator role from the organization owner"
)
);
}

View File

@@ -87,17 +87,8 @@ export async function setUserOrgRoles(
);
}
if (existingUser.isOwner) {
return next(
createHttpError(
HttpCode.FORBIDDEN,
"Cannot change the roles of the owner of the organization"
)
);
}
const orgRoles = await db
.select({ roleId: roles.roleId })
.select({ roleId: roles.roleId, isAdmin: roles.isAdmin })
.from(roles)
.where(
and(
@@ -115,6 +106,18 @@ export async function setUserOrgRoles(
);
}
if (existingUser.isOwner) {
const hasAdminRole = orgRoles.some((r) => r.isAdmin === true);
if (!hasAdminRole) {
return next(
createHttpError(
HttpCode.FORBIDDEN,
"The organization owner must retain an administrator role"
)
);
}
}
let orgClientsToRebuild: Client[] = [];
await db.transaction(async (trx) => {
await trx

View File

@@ -100,6 +100,7 @@ export type QueryConnectionAuditLogResponse = {
orgId: string | null;
siteId: number | null;
clientId: number | null;
clientEndpoint: string | null;
userId: string | null;
sourceAddr: string;
destAddr: string;

View File

@@ -27,11 +27,11 @@ export async function buildSiteConfigurationForOlmClient(
) {
const siteConfigurations: {
siteId: number;
name?: string
endpoint?: string
publicKey?: string
serverIP?: string | null
serverPort?: number | null
name?: string;
endpoint?: string;
publicKey?: string;
serverIP?: string | null;
serverPort?: number | null;
remoteSubnets?: string[];
aliases: Alias[];
}[] = [];
@@ -79,7 +79,6 @@ export async function buildSiteConfigurationForOlmClient(
)
);
if (jitMode) {
// Add site configuration to the array
siteConfigurations.push({
@@ -109,10 +108,9 @@ export async function buildSiteConfigurationForOlmClient(
continue;
}
if (!site.publicKey || site.publicKey == "") { // the site is not ready to accept new peers
logger.warn(
`Site ${site.siteId} has no public key, skipping`
);
if (!site.publicKey || site.publicKey == "") {
// the site is not ready to accept new peers
logger.warn(`Site ${site.siteId} has no public key, skipping`);
continue;
}

View File

@@ -17,7 +17,7 @@ import { initPeerAddHandshake } from "./peers";
export const handleOlmServerInitAddPeerHandshake: MessageHandler = async (
context
) => {
logger.info("Handling register olm message!");
logger.info("Handle Olm Server Init Add Peer Handshake Message");
const { message, client: c, sendToClient } = context;
const olm = c as Olm;

View File

@@ -9,16 +9,50 @@ import {
import { buildSiteConfigurationForOlmClient } from "./buildConfiguration";
import { sendToClient } from "#dynamic/routers/ws";
import logger from "@server/logger";
import { eq, inArray } from "drizzle-orm";
import { count, eq, inArray } from "drizzle-orm";
import config from "@server/lib/config";
import { canCompress } from "@server/lib/clientVersionChecks";
import { build } from "@server/build";
export async function sendOlmSyncMessage(olm: Olm, client: Client) {
// Get all sites data
const sitesCountResult = await db
.select({ count: count() })
.from(sites)
.innerJoin(
clientSitesAssociationsCache,
eq(sites.siteId, clientSitesAssociationsCache.siteId)
)
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
// Extract the count value from the result array
const sitesCount =
sitesCountResult.length > 0 ? sitesCountResult[0].count : 0;
// Prepare an array to store site configurations
logger.debug(
`[handleOlmRegisterMessage] Found ${sitesCount} sites for client ${client.clientId}`,
{ orgId: client.orgId }
);
let jitMode = false;
if (sitesCount > 250 && build == "saas") {
// THIS IS THE MAX ON THE BUSINESS TIER
// we have too many sites
// If we have too many sites we need to drop into fully JIT mode by not sending any of the sites
logger.info(
`[handleOlmRegisterMessage] Too many sites (${sitesCount}), dropping into JIT mode`,
{ orgId: client.orgId }
);
jitMode = true;
}
// NOTE: WE ARE HARDCODING THE RELAY PARAMETER TO FALSE HERE BUT IN THE REGISTER MESSAGE ITS DEFINED BY THE CLIENT
const siteConfigurations = await buildSiteConfigurationForOlmClient(
client,
client.pubKey,
false
false,
jitMode
);
// Get all exit nodes from sites where the client has peers
@@ -82,7 +116,6 @@ export async function sendOlmSyncMessage(olm: Olm, client: Client) {
exitNodes: exitNodesData
}
},
{
compress: canCompress(olm.version, "olm")
}

View File

@@ -88,11 +88,11 @@ export async function addUserRoleLegacy(
);
}
if (existingUser.isOwner) {
if (existingUser.isOwner && role.isAdmin !== true) {
return next(
createHttpError(
HttpCode.FORBIDDEN,
"Cannot change the role of the owner of the organization"
"The organization owner must retain an administrator role"
)
);
}

View File

@@ -47,10 +47,7 @@ export async function queryUser(orgId: string, userId: string) {
.from(userOrgRoles)
.leftJoin(roles, eq(userOrgRoles.roleId, roles.roleId))
.where(
and(
eq(userOrgRoles.userId, userId),
eq(userOrgRoles.orgId, orgId)
)
and(eq(userOrgRoles.userId, userId), eq(userOrgRoles.orgId, orgId))
);
const isAdmin = roleRows.some((r) => r.isAdmin);
@@ -61,7 +58,8 @@ export async function queryUser(orgId: string, userId: string) {
roleIds: roleRows.map((r) => r.roleId),
roles: roleRows.map((r) => ({
roleId: r.roleId,
name: r.roleName ?? ""
name: r.roleName ?? "",
isAdmin: r.isAdmin === true
}))
};
}

View File

@@ -24,6 +24,7 @@ import m15 from "./scriptsPg/1.16.0";
import m16 from "./scriptsPg/1.17.0";
import m17 from "./scriptsPg/1.18.0";
import m18 from "./scriptsPg/1.18.3";
import m19 from "./scriptsPg/1.18.4";
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER
// EXCEPT FOR THE DATABASE AND THE SCHEMA
@@ -47,7 +48,8 @@ const migrations = [
{ version: "1.16.0", run: m15 },
{ version: "1.17.0", run: m16 },
{ version: "1.18.0", run: m17 },
{ version: "1.18.3", run: m18 }
{ version: "1.18.3", run: m18 },
{ version: "1.18.4", run: m19 }
// Add new migrations here as they are created
] as {
version: string;

View File

@@ -42,6 +42,7 @@ import m36 from "./scriptsSqlite/1.16.0";
import m37 from "./scriptsSqlite/1.17.0";
import m38 from "./scriptsSqlite/1.18.0";
import m39 from "./scriptsSqlite/1.18.3";
import m40 from "./scriptsSqlite/1.18.4";
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER
// EXCEPT FOR THE DATABASE AND THE SCHEMA
@@ -81,7 +82,8 @@ const migrations = [
{ version: "1.16.0", run: m36 },
{ version: "1.17.0", run: m37 },
{ version: "1.18.0", run: m38 },
{ version: "1.18.3", run: m39 }
{ version: "1.18.3", run: m39 },
{ version: "1.18.4", run: m40 }
// Add new migrations here as they are created
] as const;

View File

@@ -0,0 +1,34 @@
import { db } from "@server/db/pg/driver";
import { sql } from "drizzle-orm";
const version = "1.18.4";
export default async function migration() {
console.log(`Running setup script ${version}...`);
try {
await db.execute(sql`BEGIN`);
await db.execute(sql`
ALTER TABLE "connectionAuditLog" ADD COLUMN "clientEndpoint" text;
`);
await db.execute(sql`
ALTER TABLE "eventStreamingDestinations" ADD COLUMN "lastError" text;
`);
await db.execute(sql`
ALTER TABLE "eventStreamingDestinations" ADD COLUMN "lastErrorAt" bigint;
`);
await db.execute(sql`COMMIT`);
console.log("Migrated database");
} catch (e) {
await db.execute(sql`ROLLBACK`);
console.log("Unable to migrate database");
console.log(e);
throw e;
}
console.log(`${version} migration complete`);
}

View File

@@ -0,0 +1,43 @@
import { APP_PATH } from "@server/lib/consts";
import Database from "better-sqlite3";
import path from "path";
const version = "1.18.4";
export default async function migration() {
console.log(`Running setup script ${version}...`);
const location = path.join(APP_PATH, "db", "db.sqlite");
const db = new Database(location);
try {
db.pragma("foreign_keys = OFF");
db.transaction(() => {
db.prepare(
`
ALTER TABLE 'connectionAuditLog' ADD 'clientEndpoint' text;
`
).run();
db.prepare(
`
ALTER TABLE 'eventStreamingDestinations' ADD 'lastError' text;
`
).run();
db.prepare(
`
ALTER TABLE 'eventStreamingDestinations' ADD 'lastErrorAt' integer;
`
).run();
})();
db.pragma("foreign_keys = ON");
console.log("Migrated database");
} catch (e) {
console.log("Failed to migrate db:", e);
throw e;
}
console.log(`${version} migration complete`);
}