From cccf236042d677cc9a2e5bd9449e842eec94f1ac Mon Sep 17 00:00:00 2001 From: Owen Date: Thu, 12 Mar 2026 17:49:21 -0700 Subject: [PATCH] Add optional compression --- server/private/routers/ws/ws.ts | 57 +++++++++---- server/routers/client/targets.ts | 132 ++++++++++++++----------------- server/routers/ws/types.ts | 3 +- server/routers/ws/ws.ts | 48 ++++++++--- 4 files changed, 139 insertions(+), 101 deletions(-) diff --git a/server/private/routers/ws/ws.ts b/server/private/routers/ws/ws.ts index 7d1769bca..467cedc5f 100644 --- a/server/private/routers/ws/ws.ts +++ b/server/private/routers/ws/ws.ts @@ -12,6 +12,7 @@ */ import { Router, Request, Response } from "express"; +import zlib from "zlib"; import { Server as HttpServer } from "http"; import { WebSocket, WebSocketServer } from "ws"; import { Socket } from "net"; @@ -57,11 +58,13 @@ const MAX_PENDING_MESSAGES = 50; // Maximum messages to queue during connection const processMessage = async ( ws: AuthenticatedWebSocket, data: Buffer, + isBinary: boolean, clientId: string, clientType: ClientType ): Promise => { try { - const message: WSMessage = JSON.parse(data.toString()); + const messageBuffer = isBinary ? zlib.gunzipSync(data) : data; + const message: WSMessage = JSON.parse(messageBuffer.toString()); // logger.debug( // `Processing message from ${clientType.toUpperCase()} ID: ${clientId}, type: ${message.type}` @@ -163,8 +166,10 @@ const processPendingMessages = async ( ); const jobs = []; - for (const messageData of ws.pendingMessages) { - jobs.push(processMessage(ws, messageData, clientId, clientType)); + for (const pending of ws.pendingMessages) { + jobs.push( + processMessage(ws, pending.data, pending.isBinary, clientId, clientType) + ); } await Promise.all(jobs); @@ -502,11 +507,20 @@ const sendToClientLocal = async ( }; const messageString = JSON.stringify(messageWithVersion); - clients.forEach((client) => { - if (client.readyState === WebSocket.OPEN) { - client.send(messageString); - } - }); + if (options.compress) { + const compressed = zlib.gzipSync(Buffer.from(messageString, "utf8")); + clients.forEach((client) => { + if (client.readyState === WebSocket.OPEN) { + client.send(compressed); + } + }); + } else { + clients.forEach((client) => { + if (client.readyState === WebSocket.OPEN) { + client.send(messageString); + } + }); + } return true; }; @@ -532,11 +546,22 @@ const broadcastToAllExceptLocal = async ( configVersion }; - clients.forEach((client) => { - if (client.readyState === WebSocket.OPEN) { - client.send(JSON.stringify(messageWithVersion)); - } - }); + if (options.compress) { + const compressed = zlib.gzipSync( + Buffer.from(JSON.stringify(messageWithVersion), "utf8") + ); + clients.forEach((client) => { + if (client.readyState === WebSocket.OPEN) { + client.send(compressed); + } + }); + } else { + clients.forEach((client) => { + if (client.readyState === WebSocket.OPEN) { + client.send(JSON.stringify(messageWithVersion)); + } + }); + } } } }; @@ -762,7 +787,7 @@ const setupConnection = async ( } // Set up message handler FIRST to prevent race condition - ws.on("message", async (data) => { + ws.on("message", async (data, isBinary) => { if (!ws.isFullyConnected) { // Queue message for later processing with limits ws.pendingMessages = ws.pendingMessages || []; @@ -777,11 +802,11 @@ const setupConnection = async ( logger.debug( `Queueing message from ${clientType.toUpperCase()} ID: ${clientId} (connection not fully established)` ); - ws.pendingMessages.push(data as Buffer); + ws.pendingMessages.push({ data: data as Buffer, isBinary }); return; } - await processMessage(ws, data as Buffer, clientId, clientType); + await processMessage(ws, data as Buffer, isBinary, clientId, clientType); }); // Set up other event handlers before async operations diff --git a/server/routers/client/targets.ts b/server/routers/client/targets.ts index bf612d352..8cac9e05d 100644 --- a/server/routers/client/targets.ts +++ b/server/routers/client/targets.ts @@ -4,48 +4,29 @@ import { Alias, SubnetProxyTarget } from "@server/lib/ip"; import logger from "@server/logger"; import { eq } from "drizzle-orm"; -const BATCH_SIZE = 50; -const BATCH_DELAY_MS = 50; - -function sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); -} - -function chunkArray(array: T[], size: number): T[][] { - const chunks: T[][] = []; - for (let i = 0; i < array.length; i += size) { - chunks.push(array.slice(i, i + size)); - } - return chunks; -} - export async function addTargets(newtId: string, targets: SubnetProxyTarget[]) { - const batches = chunkArray(targets, BATCH_SIZE); - for (let i = 0; i < batches.length; i++) { - if (i > 0) { - await sleep(BATCH_DELAY_MS); - } - await sendToClient(newtId, { + await sendToClient( + newtId, + { type: `newt/wg/targets/add`, - data: batches[i] - }, { incrementConfigVersion: true }); - } + data: targets + }, + { incrementConfigVersion: true } + ); } export async function removeTargets( newtId: string, targets: SubnetProxyTarget[] ) { - const batches = chunkArray(targets, BATCH_SIZE); - for (let i = 0; i < batches.length; i++) { - if (i > 0) { - await sleep(BATCH_DELAY_MS); - } - await sendToClient(newtId, { + await sendToClient( + newtId, + { type: `newt/wg/targets/remove`, - data: batches[i] - },{ incrementConfigVersion: true }); - } + data: targets + }, + { incrementConfigVersion: true } + ); } export async function updateTargets( @@ -55,24 +36,19 @@ export async function updateTargets( newTargets: SubnetProxyTarget[]; } ) { - const oldBatches = chunkArray(targets.oldTargets, BATCH_SIZE); - const newBatches = chunkArray(targets.newTargets, BATCH_SIZE); - const maxBatches = Math.max(oldBatches.length, newBatches.length); - - for (let i = 0; i < maxBatches; i++) { - if (i > 0) { - await sleep(BATCH_DELAY_MS); - } - await sendToClient(newtId, { + await sendToClient( + newtId, + { type: `newt/wg/targets/update`, data: { - oldTargets: oldBatches[i] || [], - newTargets: newBatches[i] || [] + oldTargets: targets.oldTargets, + newTargets: targets.newTargets } - }, { incrementConfigVersion: true }).catch((error) => { - logger.warn(`Error sending message:`, error); - }); - } + }, + { incrementConfigVersion: true } + ).catch((error) => { + logger.warn(`Error sending message:`, error); + }); } export async function addPeerData( @@ -94,14 +70,18 @@ export async function addPeerData( olmId = olm.olmId; } - await sendToClient(olmId, { - type: `olm/wg/peer/data/add`, - data: { - siteId: siteId, - remoteSubnets: remoteSubnets, - aliases: aliases - } - }, { incrementConfigVersion: true }).catch((error) => { + await sendToClient( + olmId, + { + type: `olm/wg/peer/data/add`, + data: { + siteId: siteId, + remoteSubnets: remoteSubnets, + aliases: aliases + } + }, + { incrementConfigVersion: true } + ).catch((error) => { logger.warn(`Error sending message:`, error); }); } @@ -125,14 +105,18 @@ export async function removePeerData( olmId = olm.olmId; } - await sendToClient(olmId, { - type: `olm/wg/peer/data/remove`, - data: { - siteId: siteId, - remoteSubnets: remoteSubnets, - aliases: aliases - } - }, { incrementConfigVersion: true }).catch((error) => { + await sendToClient( + olmId, + { + type: `olm/wg/peer/data/remove`, + data: { + siteId: siteId, + remoteSubnets: remoteSubnets, + aliases: aliases + } + }, + { incrementConfigVersion: true } + ).catch((error) => { logger.warn(`Error sending message:`, error); }); } @@ -166,14 +150,18 @@ export async function updatePeerData( olmId = olm.olmId; } - await sendToClient(olmId, { - type: `olm/wg/peer/data/update`, - data: { - siteId: siteId, - ...remoteSubnets, - ...aliases - } - }, { incrementConfigVersion: true }).catch((error) => { + await sendToClient( + olmId, + { + type: `olm/wg/peer/data/update`, + data: { + siteId: siteId, + ...remoteSubnets, + ...aliases + } + }, + { incrementConfigVersion: true } + ).catch((error) => { logger.warn(`Error sending message:`, error); }); } diff --git a/server/routers/ws/types.ts b/server/routers/ws/types.ts index 4be68883e..e539954ce 100644 --- a/server/routers/ws/types.ts +++ b/server/routers/ws/types.ts @@ -24,7 +24,7 @@ export interface AuthenticatedWebSocket extends WebSocket { clientType?: ClientType; connectionId?: string; isFullyConnected?: boolean; - pendingMessages?: Buffer[]; + pendingMessages?: { data: Buffer; isBinary: boolean }[]; configVersion?: number; } @@ -73,6 +73,7 @@ export type MessageHandler = ( // Options for sending messages with config version tracking export interface SendMessageOptions { incrementConfigVersion?: boolean; + compress?: boolean; } // Redis message type for cross-node communication diff --git a/server/routers/ws/ws.ts b/server/routers/ws/ws.ts index 32432d997..c7085fba9 100644 --- a/server/routers/ws/ws.ts +++ b/server/routers/ws/ws.ts @@ -1,4 +1,5 @@ import { Router, Request, Response } from "express"; +import zlib from "zlib"; import { Server as HttpServer } from "http"; import { WebSocket, WebSocketServer } from "ws"; import { Socket } from "net"; @@ -116,11 +117,20 @@ const sendToClientLocal = async ( }; const messageString = JSON.stringify(messageWithVersion); - clients.forEach((client) => { - if (client.readyState === WebSocket.OPEN) { - client.send(messageString); - } - }); + if (options.compress) { + const compressed = zlib.gzipSync(Buffer.from(messageString, "utf8")); + clients.forEach((client) => { + if (client.readyState === WebSocket.OPEN) { + client.send(compressed); + } + }); + } else { + clients.forEach((client) => { + if (client.readyState === WebSocket.OPEN) { + client.send(messageString); + } + }); + } return true; }; @@ -147,11 +157,22 @@ const broadcastToAllExceptLocal = async ( ...message, configVersion }; - clients.forEach((client) => { - if (client.readyState === WebSocket.OPEN) { - client.send(JSON.stringify(messageWithVersion)); - } - }); + if (options.compress) { + const compressed = zlib.gzipSync( + Buffer.from(JSON.stringify(messageWithVersion), "utf8") + ); + clients.forEach((client) => { + if (client.readyState === WebSocket.OPEN) { + client.send(compressed); + } + }); + } else { + clients.forEach((client) => { + if (client.readyState === WebSocket.OPEN) { + client.send(JSON.stringify(messageWithVersion)); + } + }); + } } }); }; @@ -286,9 +307,12 @@ const setupConnection = async ( clientType === "newt" ? (client as Newt).newtId : (client as Olm).olmId; await addClient(clientType, clientId, ws); - ws.on("message", async (data) => { + ws.on("message", async (data, isBinary) => { try { - const message: WSMessage = JSON.parse(data.toString()); + const messageBuffer = isBinary + ? zlib.gunzipSync(data as Buffer) + : (data as Buffer); + const message: WSMessage = JSON.parse(messageBuffer.toString()); if (!message.type || typeof message.type !== "string") { throw new Error(