Add optional compression

This commit is contained in:
Owen
2026-03-12 17:49:21 -07:00
parent 63fd63c65c
commit cccf236042
4 changed files with 139 additions and 101 deletions

View File

@@ -12,6 +12,7 @@
*/ */
import { Router, Request, Response } from "express"; import { Router, Request, Response } from "express";
import zlib from "zlib";
import { Server as HttpServer } from "http"; import { Server as HttpServer } from "http";
import { WebSocket, WebSocketServer } from "ws"; import { WebSocket, WebSocketServer } from "ws";
import { Socket } from "net"; import { Socket } from "net";
@@ -57,11 +58,13 @@ const MAX_PENDING_MESSAGES = 50; // Maximum messages to queue during connection
const processMessage = async ( const processMessage = async (
ws: AuthenticatedWebSocket, ws: AuthenticatedWebSocket,
data: Buffer, data: Buffer,
isBinary: boolean,
clientId: string, clientId: string,
clientType: ClientType clientType: ClientType
): Promise<void> => { ): Promise<void> => {
try { try {
const message: WSMessage = JSON.parse(data.toString()); const messageBuffer = isBinary ? zlib.gunzipSync(data) : data;
const message: WSMessage = JSON.parse(messageBuffer.toString());
// logger.debug( // logger.debug(
// `Processing message from ${clientType.toUpperCase()} ID: ${clientId}, type: ${message.type}` // `Processing message from ${clientType.toUpperCase()} ID: ${clientId}, type: ${message.type}`
@@ -163,8 +166,10 @@ const processPendingMessages = async (
); );
const jobs = []; const jobs = [];
for (const messageData of ws.pendingMessages) { for (const pending of ws.pendingMessages) {
jobs.push(processMessage(ws, messageData, clientId, clientType)); jobs.push(
processMessage(ws, pending.data, pending.isBinary, clientId, clientType)
);
} }
await Promise.all(jobs); await Promise.all(jobs);
@@ -502,11 +507,20 @@ const sendToClientLocal = async (
}; };
const messageString = JSON.stringify(messageWithVersion); const messageString = JSON.stringify(messageWithVersion);
clients.forEach((client) => { if (options.compress) {
if (client.readyState === WebSocket.OPEN) { const compressed = zlib.gzipSync(Buffer.from(messageString, "utf8"));
client.send(messageString); clients.forEach((client) => {
} if (client.readyState === WebSocket.OPEN) {
}); client.send(compressed);
}
});
} else {
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(messageString);
}
});
}
return true; return true;
}; };
@@ -532,11 +546,22 @@ const broadcastToAllExceptLocal = async (
configVersion configVersion
}; };
clients.forEach((client) => { if (options.compress) {
if (client.readyState === WebSocket.OPEN) { const compressed = zlib.gzipSync(
client.send(JSON.stringify(messageWithVersion)); Buffer.from(JSON.stringify(messageWithVersion), "utf8")
} );
}); clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(compressed);
}
});
} else {
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(messageWithVersion));
}
});
}
} }
} }
}; };
@@ -762,7 +787,7 @@ const setupConnection = async (
} }
// Set up message handler FIRST to prevent race condition // Set up message handler FIRST to prevent race condition
ws.on("message", async (data) => { ws.on("message", async (data, isBinary) => {
if (!ws.isFullyConnected) { if (!ws.isFullyConnected) {
// Queue message for later processing with limits // Queue message for later processing with limits
ws.pendingMessages = ws.pendingMessages || []; ws.pendingMessages = ws.pendingMessages || [];
@@ -777,11 +802,11 @@ const setupConnection = async (
logger.debug( logger.debug(
`Queueing message from ${clientType.toUpperCase()} ID: ${clientId} (connection not fully established)` `Queueing message from ${clientType.toUpperCase()} ID: ${clientId} (connection not fully established)`
); );
ws.pendingMessages.push(data as Buffer); ws.pendingMessages.push({ data: data as Buffer, isBinary });
return; return;
} }
await processMessage(ws, data as Buffer, clientId, clientType); await processMessage(ws, data as Buffer, isBinary, clientId, clientType);
}); });
// Set up other event handlers before async operations // Set up other event handlers before async operations

View File

@@ -4,48 +4,29 @@ import { Alias, SubnetProxyTarget } from "@server/lib/ip";
import logger from "@server/logger"; import logger from "@server/logger";
import { eq } from "drizzle-orm"; import { eq } from "drizzle-orm";
const BATCH_SIZE = 50;
const BATCH_DELAY_MS = 50;
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
export async function addTargets(newtId: string, targets: SubnetProxyTarget[]) { export async function addTargets(newtId: string, targets: SubnetProxyTarget[]) {
const batches = chunkArray(targets, BATCH_SIZE); await sendToClient(
for (let i = 0; i < batches.length; i++) { newtId,
if (i > 0) { {
await sleep(BATCH_DELAY_MS);
}
await sendToClient(newtId, {
type: `newt/wg/targets/add`, type: `newt/wg/targets/add`,
data: batches[i] data: targets
}, { incrementConfigVersion: true }); },
} { incrementConfigVersion: true }
);
} }
export async function removeTargets( export async function removeTargets(
newtId: string, newtId: string,
targets: SubnetProxyTarget[] targets: SubnetProxyTarget[]
) { ) {
const batches = chunkArray(targets, BATCH_SIZE); await sendToClient(
for (let i = 0; i < batches.length; i++) { newtId,
if (i > 0) { {
await sleep(BATCH_DELAY_MS);
}
await sendToClient(newtId, {
type: `newt/wg/targets/remove`, type: `newt/wg/targets/remove`,
data: batches[i] data: targets
},{ incrementConfigVersion: true }); },
} { incrementConfigVersion: true }
);
} }
export async function updateTargets( export async function updateTargets(
@@ -55,24 +36,19 @@ export async function updateTargets(
newTargets: SubnetProxyTarget[]; newTargets: SubnetProxyTarget[];
} }
) { ) {
const oldBatches = chunkArray(targets.oldTargets, BATCH_SIZE); await sendToClient(
const newBatches = chunkArray(targets.newTargets, BATCH_SIZE); newtId,
const maxBatches = Math.max(oldBatches.length, newBatches.length); {
for (let i = 0; i < maxBatches; i++) {
if (i > 0) {
await sleep(BATCH_DELAY_MS);
}
await sendToClient(newtId, {
type: `newt/wg/targets/update`, type: `newt/wg/targets/update`,
data: { data: {
oldTargets: oldBatches[i] || [], oldTargets: targets.oldTargets,
newTargets: newBatches[i] || [] newTargets: targets.newTargets
} }
}, { incrementConfigVersion: true }).catch((error) => { },
logger.warn(`Error sending message:`, error); { incrementConfigVersion: true }
}); ).catch((error) => {
} logger.warn(`Error sending message:`, error);
});
} }
export async function addPeerData( export async function addPeerData(
@@ -94,14 +70,18 @@ export async function addPeerData(
olmId = olm.olmId; olmId = olm.olmId;
} }
await sendToClient(olmId, { await sendToClient(
type: `olm/wg/peer/data/add`, olmId,
data: { {
siteId: siteId, type: `olm/wg/peer/data/add`,
remoteSubnets: remoteSubnets, data: {
aliases: aliases siteId: siteId,
} remoteSubnets: remoteSubnets,
}, { incrementConfigVersion: true }).catch((error) => { aliases: aliases
}
},
{ incrementConfigVersion: true }
).catch((error) => {
logger.warn(`Error sending message:`, error); logger.warn(`Error sending message:`, error);
}); });
} }
@@ -125,14 +105,18 @@ export async function removePeerData(
olmId = olm.olmId; olmId = olm.olmId;
} }
await sendToClient(olmId, { await sendToClient(
type: `olm/wg/peer/data/remove`, olmId,
data: { {
siteId: siteId, type: `olm/wg/peer/data/remove`,
remoteSubnets: remoteSubnets, data: {
aliases: aliases siteId: siteId,
} remoteSubnets: remoteSubnets,
}, { incrementConfigVersion: true }).catch((error) => { aliases: aliases
}
},
{ incrementConfigVersion: true }
).catch((error) => {
logger.warn(`Error sending message:`, error); logger.warn(`Error sending message:`, error);
}); });
} }
@@ -166,14 +150,18 @@ export async function updatePeerData(
olmId = olm.olmId; olmId = olm.olmId;
} }
await sendToClient(olmId, { await sendToClient(
type: `olm/wg/peer/data/update`, olmId,
data: { {
siteId: siteId, type: `olm/wg/peer/data/update`,
...remoteSubnets, data: {
...aliases siteId: siteId,
} ...remoteSubnets,
}, { incrementConfigVersion: true }).catch((error) => { ...aliases
}
},
{ incrementConfigVersion: true }
).catch((error) => {
logger.warn(`Error sending message:`, error); logger.warn(`Error sending message:`, error);
}); });
} }

View File

@@ -24,7 +24,7 @@ export interface AuthenticatedWebSocket extends WebSocket {
clientType?: ClientType; clientType?: ClientType;
connectionId?: string; connectionId?: string;
isFullyConnected?: boolean; isFullyConnected?: boolean;
pendingMessages?: Buffer[]; pendingMessages?: { data: Buffer; isBinary: boolean }[];
configVersion?: number; configVersion?: number;
} }
@@ -73,6 +73,7 @@ export type MessageHandler = (
// Options for sending messages with config version tracking // Options for sending messages with config version tracking
export interface SendMessageOptions { export interface SendMessageOptions {
incrementConfigVersion?: boolean; incrementConfigVersion?: boolean;
compress?: boolean;
} }
// Redis message type for cross-node communication // Redis message type for cross-node communication

View File

@@ -1,4 +1,5 @@
import { Router, Request, Response } from "express"; import { Router, Request, Response } from "express";
import zlib from "zlib";
import { Server as HttpServer } from "http"; import { Server as HttpServer } from "http";
import { WebSocket, WebSocketServer } from "ws"; import { WebSocket, WebSocketServer } from "ws";
import { Socket } from "net"; import { Socket } from "net";
@@ -116,11 +117,20 @@ const sendToClientLocal = async (
}; };
const messageString = JSON.stringify(messageWithVersion); const messageString = JSON.stringify(messageWithVersion);
clients.forEach((client) => { if (options.compress) {
if (client.readyState === WebSocket.OPEN) { const compressed = zlib.gzipSync(Buffer.from(messageString, "utf8"));
client.send(messageString); clients.forEach((client) => {
} if (client.readyState === WebSocket.OPEN) {
}); client.send(compressed);
}
});
} else {
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(messageString);
}
});
}
return true; return true;
}; };
@@ -147,11 +157,22 @@ const broadcastToAllExceptLocal = async (
...message, ...message,
configVersion configVersion
}; };
clients.forEach((client) => { if (options.compress) {
if (client.readyState === WebSocket.OPEN) { const compressed = zlib.gzipSync(
client.send(JSON.stringify(messageWithVersion)); Buffer.from(JSON.stringify(messageWithVersion), "utf8")
} );
}); clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(compressed);
}
});
} else {
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(messageWithVersion));
}
});
}
} }
}); });
}; };
@@ -286,9 +307,12 @@ const setupConnection = async (
clientType === "newt" ? (client as Newt).newtId : (client as Olm).olmId; clientType === "newt" ? (client as Newt).newtId : (client as Olm).olmId;
await addClient(clientType, clientId, ws); await addClient(clientType, clientId, ws);
ws.on("message", async (data) => { ws.on("message", async (data, isBinary) => {
try { try {
const message: WSMessage = JSON.parse(data.toString()); const messageBuffer = isBinary
? zlib.gunzipSync(data as Buffer)
: (data as Buffer);
const message: WSMessage = JSON.parse(messageBuffer.toString());
if (!message.type || typeof message.type !== "string") { if (!message.type || typeof message.type !== "string") {
throw new Error( throw new Error(