diff --git a/server/routers/gerbil/receiveBandwidth.ts b/server/routers/gerbil/receiveBandwidth.ts index 66556c32..b6205914 100644 --- a/server/routers/gerbil/receiveBandwidth.ts +++ b/server/routers/gerbil/receiveBandwidth.ts @@ -1,6 +1,6 @@ import { Request, Response, NextFunction } from "express"; -import { eq } from "drizzle-orm"; -import { sites, } from "@server/db"; +import { eq, and, lt, inArray } from "drizzle-orm"; +import { sites } from "@server/db"; import { db } from "@server/db"; import logger from "@server/logger"; import createHttpError from "http-errors"; @@ -25,46 +25,86 @@ export const receiveBandwidth = async ( throw new Error("Invalid bandwidth data"); } - await db.transaction(async (trx) => { - for (const peer of bandwidthData) { - const { publicKey, bytesIn, bytesOut } = peer; + const currentTime = new Date(); + const oneMinuteAgo = new Date(currentTime.getTime() - 60000); // 1 minute ago - const [site] = await trx + await db.transaction(async (trx) => { + // First, handle sites that are actively reporting bandwidth + const activePeers = bandwidthData.filter(peer => peer.bytesIn > 0 || peer.bytesOut > 0); + + if (activePeers.length > 0) { + // Get all active sites in one query + const activeSites = await trx .select() .from(sites) - .where(eq(sites.pubKey, publicKey)) - .limit(1); + .where(inArray(sites.pubKey, activePeers.map(p => p.publicKey))); - if (!site) { - continue; + // Create a map for quick lookup + const siteMap = new Map(); + activeSites.forEach(site => { + siteMap.set(site.pubKey, site); + }); + + // Update sites with actual bandwidth usage + for (const peer of activePeers) { + const site = siteMap.get(peer.publicKey); + if (!site) continue; + + await trx + .update(sites) + .set({ + megabytesOut: (site.megabytesOut || 0) + peer.bytesIn, + megabytesIn: (site.megabytesIn || 0) + peer.bytesOut, + lastBandwidthUpdate: currentTime.toISOString(), + online: true + }) + .where(eq(sites.siteId, site.siteId)); } - let online = site.online; + } - // if the bandwidth for the site is > 0 then set it to online. if it has been less than 0 (no update) for 5 minutes then set it to offline - if (bytesIn > 0 || bytesOut > 0) { - online = true; - } else if (site.lastBandwidthUpdate) { - const lastBandwidthUpdate = new Date( - site.lastBandwidthUpdate - ); - const currentTime = new Date(); - const diff = - currentTime.getTime() - lastBandwidthUpdate.getTime(); - if (diff < 300000) { - online = false; + // Handle sites that reported zero bandwidth but need online status updated + const zeroBandwidthPeers = bandwidthData.filter(peer => peer.bytesIn === 0 && peer.bytesOut === 0); + + if (zeroBandwidthPeers.length > 0) { + const zeroBandwidthSites = await trx + .select() + .from(sites) + .where(inArray(sites.pubKey, zeroBandwidthPeers.map(p => p.publicKey))); + + for (const site of zeroBandwidthSites) { + let newOnlineStatus = site.online; + + // Check if site should go offline based on last bandwidth update WITH DATA + if (site.lastBandwidthUpdate) { + const lastUpdateWithData = new Date(site.lastBandwidthUpdate); + if (lastUpdateWithData < oneMinuteAgo) { + newOnlineStatus = false; + } + } else { + // No previous data update recorded, set to offline + newOnlineStatus = false; + } + + // Always update lastBandwidthUpdate to show this instance is receiving reports + // Only update online status if it changed + if (site.online !== newOnlineStatus) { + await trx + .update(sites) + .set({ + lastBandwidthUpdate: currentTime.toISOString(), + online: newOnlineStatus + }) + .where(eq(sites.siteId, site.siteId)); + } else { + // Just update the heartbeat timestamp + await trx + .update(sites) + .set({ + lastBandwidthUpdate: currentTime.toISOString() + }) + .where(eq(sites.siteId, site.siteId)); } } - - // Update the site's bandwidth usage - await trx - .update(sites) - .set({ - megabytesOut: (site.megabytesOut || 0) + bytesIn, - megabytesIn: (site.megabytesIn || 0) + bytesOut, - lastBandwidthUpdate: new Date().toISOString(), - online - }) - .where(eq(sites.siteId, site.siteId)); } }); @@ -72,7 +112,7 @@ export const receiveBandwidth = async ( data: {}, success: true, error: false, - message: "Organization retrieved successfully", + message: "Bandwidth data updated successfully", status: HttpCode.OK }); } catch (error) {