diff --git a/server/private/lib/exitNodes/exitNodeComms.ts b/server/private/lib/exitNodes/exitNodeComms.ts index faf1153f..2145f32f 100644 --- a/server/private/lib/exitNodes/exitNodeComms.ts +++ b/server/private/lib/exitNodes/exitNodeComms.ts @@ -50,10 +50,14 @@ export async function sendToExitNode( ); } - return sendToClient(remoteExitNode.remoteExitNodeId, { - type: request.remoteType, - data: request.data - }); + return sendToClient( + remoteExitNode.remoteExitNodeId, + { + type: request.remoteType, + data: request.data + }, + { incrementConfigVersion: true } + ); } else { let hostname = exitNode.reachableAt; diff --git a/server/private/routers/ws/ws.ts b/server/private/routers/ws/ws.ts index 4b9a3295..1c1f54f4 100644 --- a/server/private/routers/ws/ws.ts +++ b/server/private/routers/ws/ws.ts @@ -119,12 +119,21 @@ const processMessage = async ( if (response.broadcast) { await broadcastToAllExcept( response.message, - response.excludeSender ? clientId : undefined + response.excludeSender ? clientId : undefined, + response.options ); } else if (response.targetClientId) { - await sendToClient(response.targetClientId, response.message); + await sendToClient( + response.targetClientId, + response.message, + response.options + ); } else { - ws.send(JSON.stringify(response.message)); + await sendToClient( + clientId, + response.message, + response.options + ); } } } catch (error) { @@ -186,7 +195,8 @@ const getClientMapKey = (clientId: string) => clientId; const getConnectionsKey = (clientId: string) => `ws:connections:${clientId}`; const getNodeConnectionsKey = (nodeId: string, clientId: string) => `ws:node:${nodeId}:${clientId}`; -const getConfigVersionKey = (clientId: string) => `ws:configVersion:${clientId}`; +const getConfigVersionKey = (clientId: string) => + `ws:configVersion:${clientId}`; // Initialize Redis subscription for cross-node messaging const initializeRedisSubscription = async (): Promise => { @@ -387,7 +397,9 @@ const getClientConfigVersion = async (clientId: string): Promise => { // Try Redis first if available if (redisManager.isRedisEnabled()) { try { - const redisVersion = await redisManager.get(getConfigVersionKey(clientId)); + const redisVersion = await redisManager.get( + getConfigVersionKey(clientId) + ); if (redisVersion !== null) { const version = parseInt(redisVersion, 10); // Sync local cache with Redis @@ -398,15 +410,17 @@ const getClientConfigVersion = async (clientId: string): Promise => { logger.error("Failed to get config version from Redis:", error); } } - + // Fall back to local cache return clientConfigVersions.get(clientId) || 0; }; // Helper to increment and get the new config version for a client -const incrementClientConfigVersion = async (clientId: string): Promise => { +const incrementClientConfigVersion = async ( + clientId: string +): Promise => { let newVersion: number; - + if (redisManager.isRedisEnabled()) { try { // Use Redis INCR for atomic increment across nodes @@ -419,7 +433,7 @@ const incrementClientConfigVersion = async (clientId: string): Promise = // Fall through to local increment } } - + // Local increment const currentVersion = clientConfigVersions.get(clientId) || 0; newVersion = currentVersion + 1; @@ -438,19 +452,19 @@ const sendToClientLocal = async ( if (!clients || clients.length === 0) { return false; } - + // Handle config version let configVersion = await getClientConfigVersion(clientId); if (options.incrementConfigVersion) { configVersion = await incrementClientConfigVersion(clientId); } - + // Add config version to message const messageWithVersion = { ...message, configVersion }; - + const messageString = JSON.stringify(messageWithVersion); clients.forEach((client) => { if (client.readyState === WebSocket.OPEN) { @@ -462,7 +476,6 @@ const sendToClientLocal = async ( `sendToClient: Message type ${message.type} sent to clientId ${clientId} (configVersion: ${configVersion})` ); - return true; }; @@ -480,13 +493,13 @@ const broadcastToAllExceptLocal = async ( if (options.incrementConfigVersion) { configVersion = await incrementClientConfigVersion(clientId); } - + // Add config version to message const messageWithVersion = { ...message, configVersion }; - + clients.forEach((client) => { if (client.readyState === WebSocket.OPEN) { client.send(JSON.stringify(messageWithVersion)); @@ -514,7 +527,7 @@ const sendToClient = async ( if (options.incrementConfigVersion) { configVersion = await incrementClientConfigVersion(clientId); } - + const redisMessage: RedisMessage = { type: "direct", targetClientId: clientId, diff --git a/server/routers/client/targets.ts b/server/routers/client/targets.ts index 653a2578..bf612d35 100644 --- a/server/routers/client/targets.ts +++ b/server/routers/client/targets.ts @@ -28,7 +28,7 @@ export async function addTargets(newtId: string, targets: SubnetProxyTarget[]) { await sendToClient(newtId, { type: `newt/wg/targets/add`, data: batches[i] - }); + }, { incrementConfigVersion: true }); } } @@ -44,7 +44,7 @@ export async function removeTargets( await sendToClient(newtId, { type: `newt/wg/targets/remove`, data: batches[i] - }); + },{ incrementConfigVersion: true }); } } @@ -69,7 +69,7 @@ export async function updateTargets( oldTargets: oldBatches[i] || [], newTargets: newBatches[i] || [] } - }).catch((error) => { + }, { incrementConfigVersion: true }).catch((error) => { logger.warn(`Error sending message:`, error); }); } @@ -101,7 +101,7 @@ export async function addPeerData( remoteSubnets: remoteSubnets, aliases: aliases } - }).catch((error) => { + }, { incrementConfigVersion: true }).catch((error) => { logger.warn(`Error sending message:`, error); }); } @@ -132,7 +132,7 @@ export async function removePeerData( remoteSubnets: remoteSubnets, aliases: aliases } - }).catch((error) => { + }, { incrementConfigVersion: true }).catch((error) => { logger.warn(`Error sending message:`, error); }); } @@ -173,7 +173,7 @@ export async function updatePeerData( ...remoteSubnets, ...aliases } - }).catch((error) => { + }, { incrementConfigVersion: true }).catch((error) => { logger.warn(`Error sending message:`, error); }); } diff --git a/server/routers/newt/handleGetConfigMessage.ts b/server/routers/newt/handleGetConfigMessage.ts index 02e3b864..30d55c73 100644 --- a/server/routers/newt/handleGetConfigMessage.ts +++ b/server/routers/newt/handleGetConfigMessage.ts @@ -153,6 +153,6 @@ export const handleGetConfigMessage: MessageHandler = async (context) => { } }, broadcast: false, - excludeSender: false, + excludeSender: false }; }; diff --git a/server/routers/newt/handleNewtPingMessage.ts b/server/routers/newt/handleNewtPingMessage.ts index e7dea7ce..4a0102e0 100644 --- a/server/routers/newt/handleNewtPingMessage.ts +++ b/server/routers/newt/handleNewtPingMessage.ts @@ -110,7 +110,9 @@ export const handleNewtPingMessage: MessageHandler = async (context) => { const configVersion = await getClientConfigVersion(newt.newtId); if (message.configVersion && configVersion != message.configVersion) { - logger.warn(`Newt ping with outdated config version: ${message.configVersion} (current: ${configVersion})`); + logger.warn( + `Newt ping with outdated config version: ${message.configVersion} (current: ${configVersion})` + ); // TODO: sync the client } diff --git a/server/routers/newt/peers.ts b/server/routers/newt/peers.ts index c7546ff0..4b74d863 100644 --- a/server/routers/newt/peers.ts +++ b/server/routers/newt/peers.ts @@ -39,7 +39,7 @@ export async function addPeer( await sendToClient(newtId, { type: "newt/wg/peer/add", data: peer - }).catch((error) => { + }, { incrementConfigVersion: true }).catch((error) => { logger.warn(`Error sending message:`, error); }); @@ -81,7 +81,7 @@ export async function deletePeer( data: { publicKey } - }).catch((error) => { + }, { incrementConfigVersion: true }).catch((error) => { logger.warn(`Error sending message:`, error); }); @@ -128,7 +128,7 @@ export async function updatePeer( publicKey, ...peer } - }).catch((error) => { + }, { incrementConfigVersion: true }).catch((error) => { logger.warn(`Error sending message:`, error); }); diff --git a/server/routers/newt/sync.ts b/server/routers/newt/sync.ts index c876bf7e..9a7ad21a 100644 --- a/server/routers/newt/sync.ts +++ b/server/routers/newt/sync.ts @@ -5,7 +5,11 @@ import logger from "@server/logger"; export async function sendOlmSyncMessage(olm: Olm, client: Client) { // NOTE: WE ARE HARDCODING THE RELAY PARAMETER TO FALSE HERE BUT IN THE REGISTER MESSAGE ITS DEFINED BY THE CLIENT - const siteConfigurations = await buildSiteConfigurationForOlmClient(client, client.pubKey, false); + const siteConfigurations = await buildSiteConfigurationForOlmClient( + client, + client.pubKey, + false + ); await sendToClient(olm.olmId, { type: "olm/sync", diff --git a/server/routers/newt/targets.ts b/server/routers/newt/targets.ts index e97aed35..6318861e 100644 --- a/server/routers/newt/targets.ts +++ b/server/routers/newt/targets.ts @@ -22,7 +22,7 @@ export async function addTargets( data: { targets: payloadTargets } - }); + }, { incrementConfigVersion: true }); // Create a map for quick lookup const healthCheckMap = new Map(); @@ -103,7 +103,7 @@ export async function addTargets( data: { targets: validHealthCheckTargets } - }); + }, { incrementConfigVersion: true }); } export async function removeTargets( @@ -124,7 +124,7 @@ export async function removeTargets( data: { targets: payloadTargets } - }); + }, { incrementConfigVersion: true }); const healthCheckTargets = targets.map((target) => { return target.targetId; @@ -135,5 +135,5 @@ export async function removeTargets( data: { ids: healthCheckTargets } - }); + }, { incrementConfigVersion: true }); } diff --git a/server/routers/olm/handleOlmPingMessage.ts b/server/routers/olm/handleOlmPingMessage.ts index b7c21c7a..6cfb5216 100644 --- a/server/routers/olm/handleOlmPingMessage.ts +++ b/server/routers/olm/handleOlmPingMessage.ts @@ -170,18 +170,18 @@ export const handleOlmPingMessage: MessageHandler = async (context) => { lastPing: Math.floor(Date.now() / 1000), online: true }) - .where(eq(clients.clientId, olm.clientId)).returning(); - + .where(eq(clients.clientId, olm.clientId)) + .returning(); // get the version const configVersion = await getClientConfigVersion(olm.olmId); if (message.configVersion && configVersion != message.configVersion) { - logger.warn(`Olm ping with outdated config version: ${message.configVersion} (current: ${configVersion})`); + logger.warn( + `Olm ping with outdated config version: ${message.configVersion} (current: ${configVersion})` + ); await sendOlmSyncMessage(olm, client); } - - } catch (error) { logger.error("Error handling ping message", { error }); } diff --git a/server/routers/olm/handleOlmRegisterMessage.ts b/server/routers/olm/handleOlmRegisterMessage.ts index a6b30a4e..04eaa415 100644 --- a/server/routers/olm/handleOlmRegisterMessage.ts +++ b/server/routers/olm/handleOlmRegisterMessage.ts @@ -157,12 +157,11 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => { .where(eq(clientSitesAssociationsCache.clientId, client.clientId)); // Extract the count value from the result array - const sitesCount = sitesCountResult.length > 0 ? sitesCountResult[0].count : 0; + const sitesCount = + sitesCountResult.length > 0 ? sitesCountResult[0].count : 0; // Prepare an array to store site configurations - logger.debug( - `Found ${sitesCount} sites for client ${client.clientId}` - ); + logger.debug(`Found ${sitesCount} sites for client ${client.clientId}`); // this prevents us from accepting a register from an olm that has not hole punched yet. // the olm will pump the register so we can keep checking @@ -175,7 +174,11 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => { } // NOTE: its important that the client here is the old client and the public key is the new key - const siteConfigurations = await buildSiteConfigurationForOlmClient(client, publicKey, relay); + const siteConfigurations = await buildSiteConfigurationForOlmClient( + client, + publicKey, + relay + ); // REMOVED THIS SO IT CREATES THE INTERFACE AND JUST WAITS FOR THE SITES // if (siteConfigurations.length === 0) { diff --git a/server/routers/olm/peers.ts b/server/routers/olm/peers.ts index e164b257..d18e1760 100644 --- a/server/routers/olm/peers.ts +++ b/server/routers/olm/peers.ts @@ -45,7 +45,7 @@ export async function addPeer( remoteSubnets: peer.remoteSubnets, // optional, comma-separated list of subnets that this site can access aliases: peer.aliases } - }).catch((error) => { + }, { incrementConfigVersion: true }).catch((error) => { logger.warn(`Error sending message:`, error); }); @@ -76,7 +76,7 @@ export async function deletePeer( publicKey, siteId: siteId } - }).catch((error) => { + }, { incrementConfigVersion: true }).catch((error) => { logger.warn(`Error sending message:`, error); }); @@ -121,7 +121,7 @@ export async function updatePeer( remoteSubnets: peer.remoteSubnets, aliases: peer.aliases } - }).catch((error) => { + }, { incrementConfigVersion: true }).catch((error) => { logger.warn(`Error sending message:`, error); }); @@ -161,6 +161,8 @@ export async function initPeerAddHandshake( endpoint: peer.exitNode.endpoint } } + // }, { incrementConfigVersion: true }).catch((error) => { + // TODO: DOES THIS NEED TO BE A INCREMENT VERSION? I AM NOT SURE BECAUSE IT WOULD BE TRIGGERED BY THE SYNC? }).catch((error) => { logger.warn(`Error sending message:`, error); }); diff --git a/server/routers/olm/sync.ts b/server/routers/olm/sync.ts index c876bf7e..9a7ad21a 100644 --- a/server/routers/olm/sync.ts +++ b/server/routers/olm/sync.ts @@ -5,7 +5,11 @@ import logger from "@server/logger"; export async function sendOlmSyncMessage(olm: Olm, client: Client) { // NOTE: WE ARE HARDCODING THE RELAY PARAMETER TO FALSE HERE BUT IN THE REGISTER MESSAGE ITS DEFINED BY THE CLIENT - const siteConfigurations = await buildSiteConfigurationForOlmClient(client, client.pubKey, false); + const siteConfigurations = await buildSiteConfigurationForOlmClient( + client, + client.pubKey, + false + ); await sendToClient(olm.olmId, { type: "olm/sync", diff --git a/server/routers/ws/types.ts b/server/routers/ws/types.ts index 81d3bd49..4be68883 100644 --- a/server/routers/ws/types.ts +++ b/server/routers/ws/types.ts @@ -45,6 +45,7 @@ export interface HandlerResponse { broadcast?: boolean; excludeSender?: boolean; targetClientId?: string; + options?: SendMessageOptions; } export interface HandlerContext { diff --git a/server/routers/ws/ws.ts b/server/routers/ws/ws.ts index 063202db..7f396ea7 100644 --- a/server/routers/ws/ws.ts +++ b/server/routers/ws/ws.ts @@ -306,15 +306,21 @@ const setupConnection = async ( if (response.broadcast) { await broadcastToAllExcept( response.message, - response.excludeSender ? clientId : undefined + response.excludeSender ? clientId : undefined, + response.options ); } else if (response.targetClientId) { await sendToClient( response.targetClientId, - response.message + response.message, + response.options ); } else { - ws.send(JSON.stringify(response.message)); + await sendToClient( + clientId, + response.message, + response.options + ); } } } catch (error) {