From 50cf28427391ee7128e781022d5135a9933c9aa5 Mon Sep 17 00:00:00 2001 From: Owen Date: Wed, 13 Aug 2025 21:45:44 -0700 Subject: [PATCH] Break out bandwidth --- server/routers/gerbil/receiveBandwidth.ts | 207 ++++++++++++---------- 1 file changed, 109 insertions(+), 98 deletions(-) diff --git a/server/routers/gerbil/receiveBandwidth.ts b/server/routers/gerbil/receiveBandwidth.ts index caadf7bb..350228ec 100644 --- a/server/routers/gerbil/receiveBandwidth.ts +++ b/server/routers/gerbil/receiveBandwidth.ts @@ -28,103 +28,7 @@ export const receiveBandwidth = async ( throw new Error("Invalid bandwidth data"); } - const currentTime = new Date(); - const oneMinuteAgo = new Date(currentTime.getTime() - 60000); // 1 minute ago - - // logger.debug(`Received data: ${JSON.stringify(bandwidthData)}`); - - await db.transaction(async (trx) => { - // First, handle sites that are actively reporting bandwidth - const activePeers = bandwidthData.filter(peer => peer.bytesIn > 0); // Bytesout will have data as it tries to send keep alive messages - - if (activePeers.length > 0) { - // Remove any active peers from offline tracking since they're sending data - activePeers.forEach(peer => offlineSites.delete(peer.publicKey)); - - // Aggregate usage data by organization - const orgUsageMap = new Map(); - const orgUptimeMap = new Map(); - - // Update all active sites with bandwidth data and get the site data in one operation - const updatedSites = []; - for (const peer of activePeers) { - const updatedSite = await trx - .update(sites) - .set({ - megabytesOut: sql`${sites.megabytesOut} + ${peer.bytesIn}`, - megabytesIn: sql`${sites.megabytesIn} + ${peer.bytesOut}`, - lastBandwidthUpdate: currentTime.toISOString(), - online: true - }) - .where(eq(sites.pubKey, peer.publicKey)) - .returning({ - online: sites.online, - orgId: sites.orgId, - siteId: sites.siteId, - lastBandwidthUpdate: sites.lastBandwidthUpdate, - }); - - if (updatedSite.length > 0) { - updatedSites.push({ ...updatedSite[0], peer }); - } - } - - // Calculate org usage aggregations using the updated site data - for (const { peer, ...site } of updatedSites) { - // Aggregate bandwidth usage for the org - const totalBandwidth = peer.bytesIn + peer.bytesOut; - const currentOrgUsage = orgUsageMap.get(site.orgId) || 0; - orgUsageMap.set(site.orgId, currentOrgUsage + totalBandwidth); - - // Add 10 seconds of uptime for each active site - const currentOrgUptime = orgUptimeMap.get(site.orgId) || 0; - orgUptimeMap.set(site.orgId, currentOrgUptime + 10 / 60); // Store in minutes and jut add 10 seconds - } - } - - // Handle sites that reported zero bandwidth but need online status updated - const zeroBandwidthPeers = bandwidthData.filter(peer => - peer.bytesIn === 0 && !offlineSites.has(peer.publicKey) // Bytesout will have data as it tries to send keep alive messages - ); - - if (zeroBandwidthPeers.length > 0) { - const zeroBandwidthSites = await trx - .select() - .from(sites) - .where(inArray(sites.pubKey, zeroBandwidthPeers.map(p => p.publicKey))); - - for (const site of zeroBandwidthSites) { - let newOnlineStatus = site.online; - - // Check if site should go offline based on last bandwidth update WITH DATA - if (site.lastBandwidthUpdate) { - const lastUpdateWithData = new Date(site.lastBandwidthUpdate); - if (lastUpdateWithData < oneMinuteAgo) { - newOnlineStatus = false; - } - } else { - // No previous data update recorded, set to offline - newOnlineStatus = false; - } - - // Always update lastBandwidthUpdate to show this instance is receiving reports - // Only update online status if it changed - if (site.online !== newOnlineStatus) { - await trx - .update(sites) - .set({ - online: newOnlineStatus - }) - .where(eq(sites.siteId, site.siteId)); - - // If site went offline, add it to our tracking set - if (!newOnlineStatus && site.pubKey) { - offlineSites.add(site.pubKey); - } - } - } - } - }); + await updateSiteBandwidth(bandwidthData); return response(res, { data: {}, @@ -142,4 +46,111 @@ export const receiveBandwidth = async ( ) ); } -}; \ No newline at end of file +}; + +export async function updateSiteBandwidth(bandwidthData: PeerBandwidth[]) { + const currentTime = new Date(); + const oneMinuteAgo = new Date(currentTime.getTime() - 60000); // 1 minute ago + + // logger.debug(`Received data: ${JSON.stringify(bandwidthData)}`); + + await db.transaction(async (trx) => { + // First, handle sites that are actively reporting bandwidth + const activePeers = bandwidthData.filter((peer) => peer.bytesIn > 0); // Bytesout will have data as it tries to send keep alive messages + + if (activePeers.length > 0) { + // Remove any active peers from offline tracking since they're sending data + activePeers.forEach((peer) => offlineSites.delete(peer.publicKey)); + + // Aggregate usage data by organization + const orgUsageMap = new Map(); + const orgUptimeMap = new Map(); + + // Update all active sites with bandwidth data and get the site data in one operation + const updatedSites = []; + for (const peer of activePeers) { + const updatedSite = await trx + .update(sites) + .set({ + megabytesOut: sql`${sites.megabytesOut} + ${peer.bytesIn}`, + megabytesIn: sql`${sites.megabytesIn} + ${peer.bytesOut}`, + lastBandwidthUpdate: currentTime.toISOString(), + online: true + }) + .where(eq(sites.pubKey, peer.publicKey)) + .returning({ + online: sites.online, + orgId: sites.orgId, + siteId: sites.siteId, + lastBandwidthUpdate: sites.lastBandwidthUpdate + }); + + if (updatedSite.length > 0) { + updatedSites.push({ ...updatedSite[0], peer }); + } + } + + // Calculate org usage aggregations using the updated site data + for (const { peer, ...site } of updatedSites) { + // Aggregate bandwidth usage for the org + const totalBandwidth = peer.bytesIn + peer.bytesOut; + const currentOrgUsage = orgUsageMap.get(site.orgId) || 0; + orgUsageMap.set(site.orgId, currentOrgUsage + totalBandwidth); + + // Add 10 seconds of uptime for each active site + const currentOrgUptime = orgUptimeMap.get(site.orgId) || 0; + orgUptimeMap.set(site.orgId, currentOrgUptime + 10 / 60); // Store in minutes and jut add 10 seconds + } + } + + // Handle sites that reported zero bandwidth but need online status updated + const zeroBandwidthPeers = bandwidthData.filter( + (peer) => peer.bytesIn === 0 && !offlineSites.has(peer.publicKey) // Bytesout will have data as it tries to send keep alive messages + ); + + if (zeroBandwidthPeers.length > 0) { + const zeroBandwidthSites = await trx + .select() + .from(sites) + .where( + inArray( + sites.pubKey, + zeroBandwidthPeers.map((p) => p.publicKey) + ) + ); + + for (const site of zeroBandwidthSites) { + let newOnlineStatus = site.online; + + // Check if site should go offline based on last bandwidth update WITH DATA + if (site.lastBandwidthUpdate) { + const lastUpdateWithData = new Date( + site.lastBandwidthUpdate + ); + if (lastUpdateWithData < oneMinuteAgo) { + newOnlineStatus = false; + } + } else { + // No previous data update recorded, set to offline + newOnlineStatus = false; + } + + // Always update lastBandwidthUpdate to show this instance is receiving reports + // Only update online status if it changed + if (site.online !== newOnlineStatus) { + await trx + .update(sites) + .set({ + online: newOnlineStatus + }) + .where(eq(sites.siteId, site.siteId)); + + // If site went offline, add it to our tracking set + if (!newOnlineStatus && site.pubKey) { + offlineSites.add(site.pubKey); + } + } + } + } + }); +}