From 446eba8bc94801345763c385d00d42f7b59818af Mon Sep 17 00:00:00 2001 From: Owen Date: Wed, 24 Dec 2025 10:03:56 -0500 Subject: [PATCH] Orging how we are going to make the sync --- .../olm/buildSiteConfigurationForOlmClient.ts | 145 ++++++++++++++++++ server/routers/olm/handleOlmPingMessage.ts | 25 +-- .../routers/olm/handleOlmRegisterMessage.ts | 140 +---------------- server/routers/olm/sync.ts | 7 + 4 files changed, 167 insertions(+), 150 deletions(-) create mode 100644 server/routers/olm/buildSiteConfigurationForOlmClient.ts create mode 100644 server/routers/olm/sync.ts diff --git a/server/routers/olm/buildSiteConfigurationForOlmClient.ts b/server/routers/olm/buildSiteConfigurationForOlmClient.ts new file mode 100644 index 00000000..0517e764 --- /dev/null +++ b/server/routers/olm/buildSiteConfigurationForOlmClient.ts @@ -0,0 +1,145 @@ +import { Client, clientSiteResourcesAssociationsCache, clientSitesAssociationsCache, db, exitNodes, siteResources, sites } from "@server/db"; +import { generateAliasConfig, generateRemoteSubnets } from "@server/lib/ip"; +import logger from "@server/logger"; +import { and, eq } from "drizzle-orm"; +import { addPeer, deletePeer } from "../newt/peers"; +import config from "@server/lib/config"; + +export async function buildSiteConfigurationForOlmClient( + client: Client, + publicKey: string, + relay: boolean +) { + const siteConfigurations = []; + + // Get all sites data + const sitesData = await db + .select() + .from(sites) + .innerJoin( + clientSitesAssociationsCache, + eq(sites.siteId, clientSitesAssociationsCache.siteId) + ) + .where(eq(clientSitesAssociationsCache.clientId, client.clientId)); + + // Process each site + for (const { + sites: site, + clientSitesAssociationsCache: association + } of sitesData) { + if (!site.exitNodeId) { + logger.warn( + `Site ${site.siteId} does not have exit node, skipping` + ); + continue; + } + + // Validate endpoint and hole punch status + if (!site.endpoint) { + logger.warn( + `In olm register: site ${site.siteId} has no endpoint, skipping` + ); + continue; + } + + // if (site.lastHolePunch && now - site.lastHolePunch > 6 && relay) { + // logger.warn( + // `Site ${site.siteId} last hole punch is too old, skipping` + // ); + // continue; + // } + + // If public key changed, delete old peer from this site + if (client.pubKey && client.pubKey != publicKey) { + logger.info( + `Public key mismatch. Deleting old peer from site ${site.siteId}...` + ); + await deletePeer(site.siteId, client.pubKey!); + } + + if (!site.subnet) { + logger.warn(`Site ${site.siteId} has no subnet, skipping`); + continue; + } + + const [clientSite] = await db + .select() + .from(clientSitesAssociationsCache) + .where( + and( + eq(clientSitesAssociationsCache.clientId, client.clientId), + eq(clientSitesAssociationsCache.siteId, site.siteId) + ) + ) + .limit(1); + + // Add the peer to the exit node for this site + if (clientSite.endpoint) { + logger.info( + `Adding peer ${publicKey} to site ${site.siteId} with endpoint ${clientSite.endpoint}` + ); + await addPeer(site.siteId, { + publicKey: publicKey, + allowedIps: [`${client.subnet.split("/")[0]}/32`], // we want to only allow from that client + endpoint: relay ? "" : clientSite.endpoint + }); + } else { + logger.warn( + `Client ${client.clientId} has no endpoint, skipping peer addition` + ); + } + + let relayEndpoint: string | undefined = undefined; + if (relay) { + const [exitNode] = await db + .select() + .from(exitNodes) + .where(eq(exitNodes.exitNodeId, site.exitNodeId)) + .limit(1); + if (!exitNode) { + logger.warn(`Exit node not found for site ${site.siteId}`); + continue; + } + relayEndpoint = `${exitNode.endpoint}:${config.getRawConfig().gerbil.clients_start_port}`; + } + + const allSiteResources = await db // only get the site resources that this client has access to + .select() + .from(siteResources) + .innerJoin( + clientSiteResourcesAssociationsCache, + eq( + siteResources.siteResourceId, + clientSiteResourcesAssociationsCache.siteResourceId + ) + ) + .where( + and( + eq(siteResources.siteId, site.siteId), + eq( + clientSiteResourcesAssociationsCache.clientId, + client.clientId + ) + ) + ); + + // Add site configuration to the array + siteConfigurations.push({ + siteId: site.siteId, + name: site.name, + // relayEndpoint: relayEndpoint, // this can be undefined now if not relayed // lets not do this for now because it would conflict with the hole punch testing + endpoint: site.endpoint, + publicKey: site.publicKey, + serverIP: site.address, + serverPort: site.listenPort, + remoteSubnets: generateRemoteSubnets( + allSiteResources.map(({ siteResources }) => siteResources) + ), + aliases: generateAliasConfig( + allSiteResources.map(({ siteResources }) => siteResources) + ) + }); + } + + return siteConfigurations; +} diff --git a/server/routers/olm/handleOlmPingMessage.ts b/server/routers/olm/handleOlmPingMessage.ts index 46b071c9..b7c21c7a 100644 --- a/server/routers/olm/handleOlmPingMessage.ts +++ b/server/routers/olm/handleOlmPingMessage.ts @@ -9,6 +9,7 @@ import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy"; import { sendTerminateClient } from "../client/terminate"; import { encodeHexLowerCase } from "@oslojs/encoding"; import { sha256 } from "@oslojs/crypto/sha2"; +import { sendOlmSyncMessage } from "./sync"; // Track if the offline checker interval is running let offlineCheckerInterval: NodeJS.Timeout | null = null; @@ -108,15 +109,6 @@ export const handleOlmPingMessage: MessageHandler = async (context) => { return; } - // get the version - const configVersion = await getClientConfigVersion(olm.olmId); - - if (message.configVersion && configVersion != message.configVersion) { - logger.warn(`Olm ping with outdated config version: ${message.configVersion} (current: ${configVersion})`); - - // TODO: sync the client - } - if (olm.userId) { // we need to check a user token to make sure its still valid const { session: userSession, user } = @@ -172,13 +164,24 @@ export const handleOlmPingMessage: MessageHandler = async (context) => { try { // Update the client's last ping timestamp - await db + const [client] = await db .update(clients) .set({ lastPing: Math.floor(Date.now() / 1000), online: true }) - .where(eq(clients.clientId, olm.clientId)); + .where(eq(clients.clientId, olm.clientId)).returning(); + + + // get the version + const configVersion = await getClientConfigVersion(olm.olmId); + + if (message.configVersion && configVersion != message.configVersion) { + logger.warn(`Olm ping with outdated config version: ${message.configVersion} (current: ${configVersion})`); + await sendOlmSyncMessage(olm, client); + } + + } catch (error) { logger.error("Error handling ping message", { error }); } diff --git a/server/routers/olm/handleOlmRegisterMessage.ts b/server/routers/olm/handleOlmRegisterMessage.ts index a662383a..eb2a5678 100644 --- a/server/routers/olm/handleOlmRegisterMessage.ts +++ b/server/routers/olm/handleOlmRegisterMessage.ts @@ -24,6 +24,7 @@ import { validateSessionToken } from "@server/auth/sessions/app"; import config from "@server/lib/config"; import { encodeHexLowerCase } from "@oslojs/encoding"; import { sha256 } from "@oslojs/crypto/sha2"; +import { buildSiteConfigurationForOlmClient } from "./buildSiteConfigurationForOlmClient"; export const handleOlmRegisterMessage: MessageHandler = async (context) => { logger.info("Handling register olm message!"); @@ -195,142 +196,3 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => { excludeSender: false }; }; - -export async function buildSiteConfigurationForOlmClient( - client: Client, - publicKey: string, - relay: boolean -) { - const siteConfigurations = []; - - // Get all sites data - const sitesData = await db - .select() - .from(sites) - .innerJoin( - clientSitesAssociationsCache, - eq(sites.siteId, clientSitesAssociationsCache.siteId) - ) - .where(eq(clientSitesAssociationsCache.clientId, client.clientId)); - - // Process each site - for (const { - sites: site, - clientSitesAssociationsCache: association - } of sitesData) { - if (!site.exitNodeId) { - logger.warn( - `Site ${site.siteId} does not have exit node, skipping` - ); - continue; - } - - // Validate endpoint and hole punch status - if (!site.endpoint) { - logger.warn( - `In olm register: site ${site.siteId} has no endpoint, skipping` - ); - continue; - } - - // if (site.lastHolePunch && now - site.lastHolePunch > 6 && relay) { - // logger.warn( - // `Site ${site.siteId} last hole punch is too old, skipping` - // ); - // continue; - // } - - // If public key changed, delete old peer from this site - if (client.pubKey && client.pubKey != publicKey) { - logger.info( - `Public key mismatch. Deleting old peer from site ${site.siteId}...` - ); - await deletePeer(site.siteId, client.pubKey!); - } - - if (!site.subnet) { - logger.warn(`Site ${site.siteId} has no subnet, skipping`); - continue; - } - - const [clientSite] = await db - .select() - .from(clientSitesAssociationsCache) - .where( - and( - eq(clientSitesAssociationsCache.clientId, client.clientId), - eq(clientSitesAssociationsCache.siteId, site.siteId) - ) - ) - .limit(1); - - // Add the peer to the exit node for this site - if (clientSite.endpoint) { - logger.info( - `Adding peer ${publicKey} to site ${site.siteId} with endpoint ${clientSite.endpoint}` - ); - await addPeer(site.siteId, { - publicKey: publicKey, - allowedIps: [`${client.subnet.split("/")[0]}/32`], // we want to only allow from that client - endpoint: relay ? "" : clientSite.endpoint - }); - } else { - logger.warn( - `Client ${client.clientId} has no endpoint, skipping peer addition` - ); - } - - let relayEndpoint: string | undefined = undefined; - if (relay) { - const [exitNode] = await db - .select() - .from(exitNodes) - .where(eq(exitNodes.exitNodeId, site.exitNodeId)) - .limit(1); - if (!exitNode) { - logger.warn(`Exit node not found for site ${site.siteId}`); - continue; - } - relayEndpoint = `${exitNode.endpoint}:${config.getRawConfig().gerbil.clients_start_port}`; - } - - const allSiteResources = await db // only get the site resources that this client has access to - .select() - .from(siteResources) - .innerJoin( - clientSiteResourcesAssociationsCache, - eq( - siteResources.siteResourceId, - clientSiteResourcesAssociationsCache.siteResourceId - ) - ) - .where( - and( - eq(siteResources.siteId, site.siteId), - eq( - clientSiteResourcesAssociationsCache.clientId, - client.clientId - ) - ) - ); - - // Add site configuration to the array - siteConfigurations.push({ - siteId: site.siteId, - name: site.name, - // relayEndpoint: relayEndpoint, // this can be undefined now if not relayed // lets not do this for now because it would conflict with the hole punch testing - endpoint: site.endpoint, - publicKey: site.publicKey, - serverIP: site.address, - serverPort: site.listenPort, - remoteSubnets: generateRemoteSubnets( - allSiteResources.map(({ siteResources }) => siteResources) - ), - aliases: generateAliasConfig( - allSiteResources.map(({ siteResources }) => siteResources) - ) - }); - } - - return siteConfigurations; -} diff --git a/server/routers/olm/sync.ts b/server/routers/olm/sync.ts new file mode 100644 index 00000000..ce6b8ab9 --- /dev/null +++ b/server/routers/olm/sync.ts @@ -0,0 +1,7 @@ +import { Client, Olm } from "@server/db"; +import { buildSiteConfigurationForOlmClient } from "./buildSiteConfigurationForOlmClient"; + +export async function sendOlmSyncMessage(olm: Olm, client: Client) { + const siteConfigurations = await buildSiteConfigurationForOlmClient(client, publicKey, relay); + +}