diff --git a/server/lib/blueprints/applyBlueprint.ts b/server/lib/blueprints/applyBlueprint.ts index ac2f9508e..a304bb392 100644 --- a/server/lib/blueprints/applyBlueprint.ts +++ b/server/lib/blueprints/applyBlueprint.ts @@ -107,7 +107,7 @@ export async function applyBlueprint({ [target], matchingHealthcheck ? [matchingHealthcheck] : [], result.proxyResource.protocol, - result.proxyResource.proxyPort + site.newt.version ); } } diff --git a/server/lib/clientVersionChecks.ts b/server/lib/clientVersionChecks.ts new file mode 100644 index 000000000..330959e7c --- /dev/null +++ b/server/lib/clientVersionChecks.ts @@ -0,0 +1,20 @@ +import semver from "semver"; + +export function canCompress( + clientVersion: string | null | undefined, + type: "newt" | "olm" +): boolean { + try { + if (!clientVersion) return false; + // check if it is a valid semver + if (!semver.valid(clientVersion)) return false; + if (type === "newt") { + return semver.gte(clientVersion, "1.10.3"); + } else if (type === "olm") { + return semver.gte(clientVersion, "1.4.3"); + } + return false; + } catch { + return false; + } +} diff --git a/server/lib/rebuildClientAssociations.ts b/server/lib/rebuildClientAssociations.ts index 46eb5c3ef..121e2c7f0 100644 --- a/server/lib/rebuildClientAssociations.ts +++ b/server/lib/rebuildClientAssociations.ts @@ -670,7 +670,11 @@ async function handleSubnetProxyTargetUpdates( `Adding ${targetsToAdd.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}` ); proxyJobs.push( - addSubnetProxyTargets(newt.newtId, targetsToAdd) + addSubnetProxyTargets( + newt.newtId, + targetsToAdd, + newt.version + ) ); } @@ -706,7 +710,11 @@ async function handleSubnetProxyTargetUpdates( `Removing ${targetsToRemove.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}` ); proxyJobs.push( - removeSubnetProxyTargets(newt.newtId, targetsToRemove) + removeSubnetProxyTargets( + newt.newtId, + targetsToRemove, + newt.version + ) ); } @@ -1148,7 +1156,7 @@ async function handleMessagesForClientResources( // Add subnet proxy targets for each site for (const [siteId, resources] of addedBySite.entries()) { const [newt] = await trx - .select({ newtId: newts.newtId }) + .select({ newtId: newts.newtId, version: newts.version }) .from(newts) .where(eq(newts.siteId, siteId)) .limit(1); @@ -1170,7 +1178,13 @@ async function handleMessagesForClientResources( ]); if (targets.length > 0) { - proxyJobs.push(addSubnetProxyTargets(newt.newtId, targets)); + proxyJobs.push( + addSubnetProxyTargets( + newt.newtId, + targets, + newt.version + ) + ); } try { @@ -1219,7 +1233,7 @@ async function handleMessagesForClientResources( // Remove subnet proxy targets for each site for (const [siteId, resources] of removedBySite.entries()) { const [newt] = await trx - .select({ newtId: newts.newtId }) + .select({ newtId: newts.newtId, version: newts.version }) .from(newts) .where(eq(newts.siteId, siteId)) .limit(1); @@ -1242,7 +1256,11 @@ async function handleMessagesForClientResources( if (targets.length > 0) { proxyJobs.push( - removeSubnetProxyTargets(newt.newtId, targets) + removeSubnetProxyTargets( + newt.newtId, + targets, + newt.version + ) ); } diff --git a/server/private/routers/ws/ws.ts b/server/private/routers/ws/ws.ts index 467cedc5f..f10df2863 100644 --- a/server/private/routers/ws/ws.ts +++ b/server/private/routers/ws/ws.ts @@ -168,7 +168,13 @@ const processPendingMessages = async ( const jobs = []; for (const pending of ws.pendingMessages) { jobs.push( - processMessage(ws, pending.data, pending.isBinary, clientId, clientType) + processMessage( + ws, + pending.data, + pending.isBinary, + clientId, + clientType + ) ); } @@ -330,7 +336,9 @@ const addClient = async ( // Check Redis first if enabled if (redisManager.isRedisEnabled()) { try { - const redisVersion = await redisManager.get(getConfigVersionKey(clientId)); + const redisVersion = await redisManager.get( + getConfigVersionKey(clientId) + ); if (redisVersion !== null) { configVersion = parseInt(redisVersion, 10); // Sync to local cache @@ -342,7 +350,10 @@ const addClient = async ( } else { // Use local cache version and sync to Redis configVersion = clientConfigVersions.get(clientId) || 0; - await redisManager.set(getConfigVersionKey(clientId), configVersion.toString()); + await redisManager.set( + getConfigVersionKey(clientId), + configVersion.toString() + ); } } catch (error) { logger.error("Failed to get/set config version in Redis:", error); @@ -437,7 +448,9 @@ const removeClient = async ( }; // Helper to get the current config version for a client -const getClientConfigVersion = async (clientId: string): Promise => { +const getClientConfigVersion = async ( + clientId: string +): Promise => { // Try Redis first if available if (redisManager.isRedisEnabled()) { try { @@ -508,7 +521,13 @@ const sendToClientLocal = async ( const messageString = JSON.stringify(messageWithVersion); if (options.compress) { + logger.debug( + `Message size before compression: ${messageString.length} bytes` + ); const compressed = zlib.gzipSync(Buffer.from(messageString, "utf8")); + logger.debug( + `Message size after compression: ${compressed.length} bytes` + ); clients.forEach((client) => { if (client.readyState === WebSocket.OPEN) { client.send(compressed); @@ -806,7 +825,13 @@ const setupConnection = async ( return; } - await processMessage(ws, data as Buffer, isBinary, clientId, clientType); + await processMessage( + ws, + data as Buffer, + isBinary, + clientId, + clientType + ); }); // Set up other event handlers before async operations diff --git a/server/routers/client/targets.ts b/server/routers/client/targets.ts index 8cac9e05d..94d41a4d1 100644 --- a/server/routers/client/targets.ts +++ b/server/routers/client/targets.ts @@ -1,23 +1,29 @@ import { sendToClient } from "#dynamic/routers/ws"; import { db, olms, Transaction } from "@server/db"; +import { canCompress } from "@server/lib/clientVersionChecks"; import { Alias, SubnetProxyTarget } from "@server/lib/ip"; import logger from "@server/logger"; import { eq } from "drizzle-orm"; -export async function addTargets(newtId: string, targets: SubnetProxyTarget[]) { +export async function addTargets( + newtId: string, + targets: SubnetProxyTarget[], + version?: string | null +) { await sendToClient( newtId, { type: `newt/wg/targets/add`, data: targets }, - { incrementConfigVersion: true } + { incrementConfigVersion: true, compress: canCompress(version, "newt") } ); } export async function removeTargets( newtId: string, - targets: SubnetProxyTarget[] + targets: SubnetProxyTarget[], + version?: string | null ) { await sendToClient( newtId, @@ -25,7 +31,7 @@ export async function removeTargets( type: `newt/wg/targets/remove`, data: targets }, - { incrementConfigVersion: true } + { incrementConfigVersion: true, compress: canCompress(version, "newt") } ); } @@ -34,7 +40,8 @@ export async function updateTargets( targets: { oldTargets: SubnetProxyTarget[]; newTargets: SubnetProxyTarget[]; - } + }, + version?: string | null ) { await sendToClient( newtId, @@ -45,7 +52,7 @@ export async function updateTargets( newTargets: targets.newTargets } }, - { incrementConfigVersion: true } + { incrementConfigVersion: true, compress: canCompress(version, "newt") } ).catch((error) => { logger.warn(`Error sending message:`, error); }); @@ -56,7 +63,8 @@ export async function addPeerData( siteId: number, remoteSubnets: string[], aliases: Alias[], - olmId?: string + olmId?: string, + version?: string | null ) { if (!olmId) { const [olm] = await db @@ -68,6 +76,7 @@ export async function addPeerData( return; // ignore this because an olm might not be associated with the client anymore } olmId = olm.olmId; + version = olm.version; } await sendToClient( @@ -80,7 +89,7 @@ export async function addPeerData( aliases: aliases } }, - { incrementConfigVersion: true } + { incrementConfigVersion: true, compress: canCompress(version, "olm") } ).catch((error) => { logger.warn(`Error sending message:`, error); }); @@ -91,7 +100,8 @@ export async function removePeerData( siteId: number, remoteSubnets: string[], aliases: Alias[], - olmId?: string + olmId?: string, + version?: string | null ) { if (!olmId) { const [olm] = await db @@ -103,6 +113,7 @@ export async function removePeerData( return; } olmId = olm.olmId; + version = olm.version; } await sendToClient( @@ -115,7 +126,7 @@ export async function removePeerData( aliases: aliases } }, - { incrementConfigVersion: true } + { incrementConfigVersion: true, compress: canCompress(version, "olm") } ).catch((error) => { logger.warn(`Error sending message:`, error); }); @@ -136,7 +147,8 @@ export async function updatePeerData( newAliases: Alias[]; } | undefined, - olmId?: string + olmId?: string, + version?: string | null ) { if (!olmId) { const [olm] = await db @@ -148,6 +160,7 @@ export async function updatePeerData( return; } olmId = olm.olmId; + version = olm.version; } await sendToClient( @@ -160,7 +173,7 @@ export async function updatePeerData( ...aliases } }, - { incrementConfigVersion: true } + { incrementConfigVersion: true, compress: canCompress(version, "olm") } ).catch((error) => { logger.warn(`Error sending message:`, error); }); diff --git a/server/routers/newt/buildConfiguration.ts b/server/routers/newt/buildConfiguration.ts index 522b3b8dc..579316336 100644 --- a/server/routers/newt/buildConfiguration.ts +++ b/server/routers/newt/buildConfiguration.ts @@ -243,9 +243,9 @@ export async function buildTargetConfigurationForNewtClient(siteId: number) { !target.hcInterval || !target.hcMethod ) { - logger.debug( - `Skipping adding target health check ${target.targetId} due to missing health check fields` - ); + // logger.debug( + // `Skipping adding target health check ${target.targetId} due to missing health check fields` + // ); return null; // Skip targets with missing health check fields } diff --git a/server/routers/newt/handleGetConfigMessage.ts b/server/routers/newt/handleGetConfigMessage.ts index 24cca17a2..d536e9828 100644 --- a/server/routers/newt/handleGetConfigMessage.ts +++ b/server/routers/newt/handleGetConfigMessage.ts @@ -6,6 +6,7 @@ import { db, ExitNode, exitNodes, Newt, sites } from "@server/db"; import { eq } from "drizzle-orm"; import { sendToExitNode } from "#dynamic/lib/exitNodes"; import { buildClientConfigurationForNewtClient } from "./buildConfiguration"; +import { canCompress } from "@server/lib/clientVersionChecks"; const inputSchema = z.object({ publicKey: z.string(), @@ -135,6 +136,9 @@ export const handleGetConfigMessage: MessageHandler = async (context) => { targets } }, + options: { + compress: canCompress(newt.version, "newt") + }, broadcast: false, excludeSender: false }; diff --git a/server/routers/newt/handleNewtRegisterMessage.ts b/server/routers/newt/handleNewtRegisterMessage.ts index 595430df5..90034cfbf 100644 --- a/server/routers/newt/handleNewtRegisterMessage.ts +++ b/server/routers/newt/handleNewtRegisterMessage.ts @@ -5,9 +5,7 @@ import { eq } from "drizzle-orm"; import { addPeer, deletePeer } from "../gerbil/peers"; import logger from "@server/logger"; import config from "@server/lib/config"; -import { - findNextAvailableCidr, -} from "@server/lib/ip"; +import { findNextAvailableCidr } from "@server/lib/ip"; import { selectBestExitNode, verifyExitNodeOrgAccess @@ -15,6 +13,7 @@ import { import { fetchContainers } from "./dockerSocket"; import { lockManager } from "#dynamic/lib/lock"; import { buildTargetConfigurationForNewtClient } from "./buildConfiguration"; +import { canCompress } from "@server/lib/clientVersionChecks"; export type ExitNodePingResult = { exitNodeId: number; @@ -215,6 +214,9 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => { healthCheckTargets: validHealthCheckTargets } }, + options: { + compress: canCompress(newt.version, "newt") + }, broadcast: false, // Send to all clients excludeSender: false // Include sender in broadcast }; diff --git a/server/routers/newt/sync.ts b/server/routers/newt/sync.ts index e6f465e55..6fce13ff3 100644 --- a/server/routers/newt/sync.ts +++ b/server/routers/newt/sync.ts @@ -6,6 +6,7 @@ import { buildClientConfigurationForNewtClient, buildTargetConfigurationForNewtClient } from "./buildConfiguration"; +import { canCompress } from "@server/lib/clientVersionChecks"; export async function sendNewtSyncMessage(newt: Newt, site: Site) { const { tcpTargets, udpTargets, validHealthCheckTargets } = @@ -24,18 +25,24 @@ export async function sendNewtSyncMessage(newt: Newt, site: Site) { exitNode ); - await sendToClient(newt.newtId, { - type: "newt/sync", - data: { - proxyTargets: { - udp: udpTargets, - tcp: tcpTargets - }, - healthCheckTargets: validHealthCheckTargets, - peers: peers, - clientTargets: targets + await sendToClient( + newt.newtId, + { + type: "newt/sync", + data: { + proxyTargets: { + udp: udpTargets, + tcp: tcpTargets + }, + healthCheckTargets: validHealthCheckTargets, + peers: peers, + clientTargets: targets + } + }, + { + compress: canCompress(newt.version, "newt") } - }).catch((error) => { + ).catch((error) => { logger.warn(`Error sending newt sync message:`, error); }); } diff --git a/server/routers/newt/targets.ts b/server/routers/newt/targets.ts index 6318861e4..6a523ebe9 100644 --- a/server/routers/newt/targets.ts +++ b/server/routers/newt/targets.ts @@ -2,13 +2,14 @@ import { Target, TargetHealthCheck, db, targetHealthCheck } from "@server/db"; import { sendToClient } from "#dynamic/routers/ws"; import logger from "@server/logger"; import { eq, inArray } from "drizzle-orm"; +import { canCompress } from "@server/lib/clientVersionChecks"; export async function addTargets( newtId: string, targets: Target[], healthCheckData: TargetHealthCheck[], protocol: string, - port: number | null = null + version?: string | null ) { //create a list of udp and tcp targets const payloadTargets = targets.map((target) => { @@ -22,7 +23,7 @@ export async function addTargets( data: { targets: payloadTargets } - }, { incrementConfigVersion: true }); + }, { incrementConfigVersion: true, compress: canCompress(version, "newt") }); // Create a map for quick lookup const healthCheckMap = new Map(); @@ -103,14 +104,14 @@ export async function addTargets( data: { targets: validHealthCheckTargets } - }, { incrementConfigVersion: true }); + }, { incrementConfigVersion: true, compress: canCompress(version, "newt") }); } export async function removeTargets( newtId: string, targets: Target[], protocol: string, - port: number | null = null + version?: string | null ) { //create a list of udp and tcp targets const payloadTargets = targets.map((target) => { @@ -135,5 +136,5 @@ export async function removeTargets( data: { ids: healthCheckTargets } - }, { incrementConfigVersion: true }); + }, { incrementConfigVersion: true, compress: canCompress(version, "newt") }); } diff --git a/server/routers/olm/handleOlmRegisterMessage.ts b/server/routers/olm/handleOlmRegisterMessage.ts index 68aa1b624..065aeeaa6 100644 --- a/server/routers/olm/handleOlmRegisterMessage.ts +++ b/server/routers/olm/handleOlmRegisterMessage.ts @@ -19,6 +19,7 @@ import { OlmErrorCodes, sendOlmError } from "./error"; import { handleFingerprintInsertion } from "./fingerprintingUtils"; import { Alias } from "@server/lib/ip"; import { build } from "@server/build"; +import { canCompress } from "@server/lib/clientVersionChecks"; export const handleOlmRegisterMessage: MessageHandler = async (context) => { logger.info("Handling register olm message!"); @@ -295,6 +296,9 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => { utilitySubnet: org.utilitySubnet } }, + options: { + compress: canCompress(olm.version, "olm") + }, broadcast: false, excludeSender: false }; diff --git a/server/routers/olm/peers.ts b/server/routers/olm/peers.ts index 06621cac9..05e153fea 100644 --- a/server/routers/olm/peers.ts +++ b/server/routers/olm/peers.ts @@ -1,5 +1,6 @@ import { sendToClient } from "#dynamic/routers/ws"; import { clientSitesAssociationsCache, db, olms } from "@server/db"; +import { canCompress } from "@server/lib/clientVersionChecks"; import config from "@server/lib/config"; import logger from "@server/logger"; import { and, eq } from "drizzle-orm"; @@ -18,7 +19,8 @@ export async function addPeer( remoteSubnets: string[] | null; // optional, comma-separated list of subnets that this site can access aliases: Alias[]; }, - olmId?: string + olmId?: string, + version?: string | null ) { if (!olmId) { const [olm] = await db @@ -30,6 +32,7 @@ export async function addPeer( return; // ignore this because an olm might not be associated with the client anymore } olmId = olm.olmId; + version = olm.version; } await sendToClient( @@ -48,7 +51,7 @@ export async function addPeer( aliases: peer.aliases } }, - { incrementConfigVersion: true } + { incrementConfigVersion: true, compress: canCompress(version, "olm") } ).catch((error) => { logger.warn(`Error sending message:`, error); }); @@ -60,7 +63,8 @@ export async function deletePeer( clientId: number, siteId: number, publicKey: string, - olmId?: string + olmId?: string, + version?: string | null ) { if (!olmId) { const [olm] = await db @@ -72,6 +76,7 @@ export async function deletePeer( return; } olmId = olm.olmId; + version = olm.version; } await sendToClient( @@ -83,7 +88,7 @@ export async function deletePeer( siteId: siteId } }, - { incrementConfigVersion: true } + { incrementConfigVersion: true, compress: canCompress(version, "olm") } ).catch((error) => { logger.warn(`Error sending message:`, error); }); @@ -103,7 +108,8 @@ export async function updatePeer( remoteSubnets?: string[] | null; // optional, comma-separated list of subnets that aliases?: Alias[] | null; }, - olmId?: string + olmId?: string, + version?: string | null ) { if (!olmId) { const [olm] = await db @@ -115,6 +121,7 @@ export async function updatePeer( return; } olmId = olm.olmId; + version = olm.version; } await sendToClient( @@ -132,7 +139,7 @@ export async function updatePeer( aliases: peer.aliases } }, - { incrementConfigVersion: true } + { incrementConfigVersion: true, compress: canCompress(version, "olm") } ).catch((error) => { logger.warn(`Error sending message:`, error); }); diff --git a/server/routers/olm/sync.ts b/server/routers/olm/sync.ts index d4ecd22c1..c994b2c73 100644 --- a/server/routers/olm/sync.ts +++ b/server/routers/olm/sync.ts @@ -1,9 +1,17 @@ -import { Client, db, exitNodes, Olm, sites, clientSitesAssociationsCache } from "@server/db"; +import { + Client, + db, + exitNodes, + Olm, + sites, + clientSitesAssociationsCache +} from "@server/db"; import { buildSiteConfigurationForOlmClient } from "./buildConfiguration"; import { sendToClient } from "#dynamic/routers/ws"; import logger from "@server/logger"; import { eq, inArray } from "drizzle-orm"; import config from "@server/lib/config"; +import { canCompress } from "@server/lib/clientVersionChecks"; export async function sendOlmSyncMessage(olm: Olm, client: Client) { // NOTE: WE ARE HARDCODING THE RELAY PARAMETER TO FALSE HERE BUT IN THE REGISTER MESSAGE ITS DEFINED BY THE CLIENT @@ -17,10 +25,7 @@ export async function sendOlmSyncMessage(olm: Olm, client: Client) { const clientSites = await db .select() .from(clientSitesAssociationsCache) - .innerJoin( - sites, - eq(sites.siteId, clientSitesAssociationsCache.siteId) - ) + .innerJoin(sites, eq(sites.siteId, clientSitesAssociationsCache.siteId)) .where(eq(clientSitesAssociationsCache.clientId, client.clientId)); // Extract unique exit node IDs @@ -68,13 +73,20 @@ export async function sendOlmSyncMessage(olm: Olm, client: Client) { logger.debug("sendOlmSyncMessage: sending sync message"); - await sendToClient(olm.olmId, { - type: "olm/sync", - data: { - sites: siteConfigurations, - exitNodes: exitNodesData + await sendToClient( + olm.olmId, + { + type: "olm/sync", + data: { + sites: siteConfigurations, + exitNodes: exitNodesData + } + }, + + { + compress: canCompress(olm.version, "olm") } - }).catch((error) => { + ).catch((error) => { logger.warn(`Error sending olm sync message:`, error); }); } diff --git a/server/routers/siteResource/updateSiteResource.ts b/server/routers/siteResource/updateSiteResource.ts index b748e26d3..596ed9a3f 100644 --- a/server/routers/siteResource/updateSiteResource.ts +++ b/server/routers/siteResource/updateSiteResource.ts @@ -620,7 +620,7 @@ export async function handleMessagingForUpdatedSiteResource( await updateTargets(newt.newtId, { oldTargets: oldTargets, newTargets: newTargets - }); + }, newt.version); } const olmJobs: Promise[] = []; diff --git a/server/routers/target/createTarget.ts b/server/routers/target/createTarget.ts index 47495cbbc..ba52d85a1 100644 --- a/server/routers/target/createTarget.ts +++ b/server/routers/target/createTarget.ts @@ -264,7 +264,7 @@ export async function createTarget( newTarget, healthCheck, resource.protocol, - resource.proxyPort + newt.version ); } } diff --git a/server/routers/target/updateTarget.ts b/server/routers/target/updateTarget.ts index c5321e986..dd31f5f1b 100644 --- a/server/routers/target/updateTarget.ts +++ b/server/routers/target/updateTarget.ts @@ -262,7 +262,7 @@ export async function updateTarget( [updatedTarget], [updatedHc], resource.protocol, - resource.proxyPort + newt.version ); } }