diff --git a/server/cleanup.ts b/server/cleanup.ts index e494fcdc9..137654827 100644 --- a/server/cleanup.ts +++ b/server/cleanup.ts @@ -1,6 +1,10 @@ +import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; +import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; import { cleanup as wsCleanup } from "#dynamic/routers/ws"; async function cleanup() { + await flushBandwidthToDb(); + await flushSiteBandwidthToDb(); await wsCleanup(); process.exit(0); @@ -10,4 +14,4 @@ export async function initCleanup() { // Handle process termination process.on("SIGTERM", () => cleanup()); process.on("SIGINT", () => cleanup()); -} +} \ No newline at end of file diff --git a/server/private/cleanup.ts b/server/private/cleanup.ts index e9b305270..0bd9822dd 100644 --- a/server/private/cleanup.ts +++ b/server/private/cleanup.ts @@ -13,8 +13,12 @@ import { rateLimitService } from "#private/lib/rateLimit"; import { cleanup as wsCleanup } from "#private/routers/ws"; +import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; +import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; async function cleanup() { + await flushBandwidthToDb(); + await flushSiteBandwidthToDb(); await rateLimitService.cleanup(); await wsCleanup(); @@ -25,4 +29,4 @@ export async function initCleanup() { // Handle process termination process.on("SIGTERM", () => cleanup()); process.on("SIGINT", () => cleanup()); -} +} \ No newline at end of file diff --git a/server/routers/gerbil/receiveBandwidth.ts b/server/routers/gerbil/receiveBandwidth.ts index dbd687a15..098a1b558 100644 --- a/server/routers/gerbil/receiveBandwidth.ts +++ b/server/routers/gerbil/receiveBandwidth.ts @@ -1,5 +1,5 @@ import { Request, Response, NextFunction } from "express"; -import { eq, and, lt, inArray, sql } from "drizzle-orm"; +import { eq, sql } from "drizzle-orm"; import { sites } from "@server/db"; import { db } from "@server/db"; import logger from "@server/logger"; @@ -11,19 +11,31 @@ import { FeatureId } from "@server/lib/billing/features"; import { checkExitNodeOrg } from "#dynamic/lib/exitNodes"; import { build } from "@server/build"; -// Track sites that are already offline to avoid unnecessary queries -const offlineSites = new Set(); - -// Retry configuration for deadlock handling -const MAX_RETRIES = 3; -const BASE_DELAY_MS = 50; - interface PeerBandwidth { publicKey: string; bytesIn: number; bytesOut: number; } +interface AccumulatorEntry { + bytesIn: number; + bytesOut: number; + /** Present when the update came through a remote exit node. */ + exitNodeId?: number; + /** Whether to record egress usage for billing purposes. */ + calcUsage: boolean; +} + +// Retry configuration for deadlock handling +const MAX_RETRIES = 3; +const BASE_DELAY_MS = 50; + +// How often to flush accumulated bandwidth data to the database +const FLUSH_INTERVAL_MS = 30_000; // 30 seconds + +// In-memory accumulator: publicKey -> AccumulatorEntry +let accumulator = new Map(); + /** * Check if an error is a deadlock error */ @@ -63,6 +75,220 @@ async function withDeadlockRetry( } } +/** + * Flush all accumulated site bandwidth data to the database. + * + * Swaps out the accumulator before writing so that any bandwidth messages + * received during the flush are captured in the new accumulator rather than + * being lost or causing contention. Entries that fail to write are re-queued + * back into the accumulator so they will be retried on the next flush. + * + * This function is exported so that the application's graceful-shutdown + * cleanup handler can call it before the process exits. + */ +export async function flushSiteBandwidthToDb(): Promise { + if (accumulator.size === 0) { + return; + } + + // Atomically swap out the accumulator so new data keeps flowing in + // while we write the snapshot to the database. + const snapshot = accumulator; + accumulator = new Map(); + + const currentTime = new Date().toISOString(); + + // Sort by publicKey for consistent lock ordering across concurrent + // writers — deadlock-prevention strategy. + const sortedEntries = [...snapshot.entries()].sort(([a], [b]) => + a.localeCompare(b) + ); + + logger.debug( + `Flushing accumulated bandwidth data for ${sortedEntries.length} site(s) to the database` + ); + + // Aggregate billing usage by org, collected during the DB update loop. + const orgUsageMap = new Map(); + + for (const [publicKey, { bytesIn, bytesOut, exitNodeId, calcUsage }] of sortedEntries) { + try { + const updatedSite = await withDeadlockRetry(async () => { + const [result] = await db + .update(sites) + .set({ + megabytesOut: sql`COALESCE(${sites.megabytesOut}, 0) + ${bytesIn}`, + megabytesIn: sql`COALESCE(${sites.megabytesIn}, 0) + ${bytesOut}`, + lastBandwidthUpdate: currentTime + }) + .where(eq(sites.pubKey, publicKey)) + .returning({ + orgId: sites.orgId, + siteId: sites.siteId + }); + return result; + }, `flush bandwidth for site ${publicKey}`); + + if (updatedSite) { + if (exitNodeId) { + const notAllowed = await checkExitNodeOrg( + exitNodeId, + updatedSite.orgId + ); + if (notAllowed) { + logger.warn( + `Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}` + ); + // Skip usage tracking for this site but continue + // processing the rest. + continue; + } + } + + if (calcUsage) { + const totalBandwidth = bytesIn + bytesOut; + const current = orgUsageMap.get(updatedSite.orgId) ?? 0; + orgUsageMap.set(updatedSite.orgId, current + totalBandwidth); + } + } + } catch (error) { + logger.error( + `Failed to flush bandwidth for site ${publicKey}:`, + error + ); + + // Re-queue the failed entry so it is retried on the next flush + // rather than silently dropped. + const existing = accumulator.get(publicKey); + if (existing) { + existing.bytesIn += bytesIn; + existing.bytesOut += bytesOut; + } else { + accumulator.set(publicKey, { + bytesIn, + bytesOut, + exitNodeId, + calcUsage + }); + } + } + } + + // Process billing usage updates outside the site-update loop to keep + // lock scope small and concerns separated. + if (orgUsageMap.size > 0) { + // Sort org IDs for consistent lock ordering. + const sortedOrgIds = [...orgUsageMap.keys()].sort(); + + for (const orgId of sortedOrgIds) { + try { + const totalBandwidth = orgUsageMap.get(orgId)!; + const bandwidthUsage = await usageService.add( + orgId, + FeatureId.EGRESS_DATA_MB, + totalBandwidth + ); + if (bandwidthUsage) { + // Fire-and-forget — don't block the flush on limit checking. + usageService + .checkLimitSet( + orgId, + FeatureId.EGRESS_DATA_MB, + bandwidthUsage + ) + .catch((error: any) => { + logger.error( + `Error checking bandwidth limits for org ${orgId}:`, + error + ); + }); + } + } catch (error) { + logger.error( + `Error processing usage for org ${orgId}:`, + error + ); + // Continue with other orgs. + } + } + } +} + +// --------------------------------------------------------------------------- +// Periodic flush timer +// --------------------------------------------------------------------------- + +const flushTimer = setInterval(async () => { + try { + await flushSiteBandwidthToDb(); + } catch (error) { + logger.error( + "Unexpected error during periodic site bandwidth flush:", + error + ); + } +}, FLUSH_INTERVAL_MS); + +// Allow the process to exit normally even while the timer is pending. +// The graceful-shutdown path (see server/cleanup.ts) will call +// flushSiteBandwidthToDb() explicitly before process.exit(), so no data +// is lost. +flushTimer.unref(); + +// --------------------------------------------------------------------------- +// Public API +// --------------------------------------------------------------------------- + +/** + * Accumulate bandwidth data reported by a gerbil or remote exit node. + * + * Only peers that actually transferred data (bytesIn > 0) are added to the + * accumulator; peers with no activity are silently ignored, which means the + * flush will only write rows that have genuinely changed. + * + * The function is intentionally synchronous in its fast path so that the + * HTTP handler can respond immediately without waiting for any I/O. + */ +export async function updateSiteBandwidth( + bandwidthData: PeerBandwidth[], + calcUsageAndLimits: boolean, + exitNodeId?: number +): Promise { + for (const { publicKey, bytesIn, bytesOut } of bandwidthData) { + // Skip peers that haven't transferred any data — writing zeros to the + // database would be a no-op anyway. + if (bytesIn <= 0 && bytesOut <= 0) { + continue; + } + + const existing = accumulator.get(publicKey); + if (existing) { + existing.bytesIn += bytesIn; + existing.bytesOut += bytesOut; + // Retain the most-recent exitNodeId for this peer. + if (exitNodeId !== undefined) { + existing.exitNodeId = exitNodeId; + } + // Once calcUsage has been requested for a peer, keep it set for + // the lifetime of this flush window. + if (calcUsageAndLimits) { + existing.calcUsage = true; + } + } else { + accumulator.set(publicKey, { + bytesIn, + bytesOut, + exitNodeId, + calcUsage: calcUsageAndLimits + }); + } + } +} + +// --------------------------------------------------------------------------- +// HTTP handler +// --------------------------------------------------------------------------- + export const receiveBandwidth = async ( req: Request, res: Response, @@ -75,7 +301,9 @@ export const receiveBandwidth = async ( throw new Error("Invalid bandwidth data"); } - await updateSiteBandwidth(bandwidthData, build == "saas"); // we are checking the usage on saas only + // Accumulate in memory; the periodic timer (and the shutdown hook) + // will write to the database. + await updateSiteBandwidth(bandwidthData, build == "saas"); return response(res, { data: {}, @@ -93,202 +321,4 @@ export const receiveBandwidth = async ( ) ); } -}; - -export async function updateSiteBandwidth( - bandwidthData: PeerBandwidth[], - calcUsageAndLimits: boolean, - exitNodeId?: number -) { - const currentTime = new Date(); - const oneMinuteAgo = new Date(currentTime.getTime() - 60000); // 1 minute ago - - // Sort bandwidth data by publicKey to ensure consistent lock ordering across all instances - // This is critical for preventing deadlocks when multiple instances update the same sites - const sortedBandwidthData = [...bandwidthData].sort((a, b) => - a.publicKey.localeCompare(b.publicKey) - ); - - // First, handle sites that are actively reporting bandwidth - const activePeers = sortedBandwidthData.filter((peer) => peer.bytesIn > 0); - - // Aggregate usage data by organization (collected outside transaction) - const orgUsageMap = new Map(); - - if (activePeers.length > 0) { - // Remove any active peers from offline tracking since they're sending data - activePeers.forEach((peer) => offlineSites.delete(peer.publicKey)); - - // Update each active site individually with retry logic - // This reduces transaction scope and allows retries per-site - for (const peer of activePeers) { - try { - const updatedSite = await withDeadlockRetry(async () => { - const [result] = await db - .update(sites) - .set({ - megabytesOut: sql`${sites.megabytesOut} + ${peer.bytesIn}`, - megabytesIn: sql`${sites.megabytesIn} + ${peer.bytesOut}`, - lastBandwidthUpdate: currentTime.toISOString(), - online: true - }) - .where(eq(sites.pubKey, peer.publicKey)) - .returning({ - online: sites.online, - orgId: sites.orgId, - siteId: sites.siteId, - lastBandwidthUpdate: sites.lastBandwidthUpdate - }); - return result; - }, `update active site ${peer.publicKey}`); - - if (updatedSite) { - if (exitNodeId) { - const notAllowed = await checkExitNodeOrg( - exitNodeId, - updatedSite.orgId - ); - if (notAllowed) { - logger.warn( - `Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}` - ); - // Skip this site but continue processing others - continue; - } - } - - // Aggregate bandwidth usage for the org - const totalBandwidth = peer.bytesIn + peer.bytesOut; - const currentOrgUsage = - orgUsageMap.get(updatedSite.orgId) || 0; - orgUsageMap.set( - updatedSite.orgId, - currentOrgUsage + totalBandwidth - ); - } - } catch (error) { - logger.error( - `Failed to update bandwidth for site ${peer.publicKey}:`, - error - ); - // Continue with other sites - } - } - } - - // Process usage updates outside of site update transactions - // This separates the concerns and reduces lock contention - if (calcUsageAndLimits && orgUsageMap.size > 0) { - // Sort org IDs to ensure consistent lock ordering - const allOrgIds = [...new Set([...orgUsageMap.keys()])].sort(); - - for (const orgId of allOrgIds) { - try { - // Process bandwidth usage for this org - const totalBandwidth = orgUsageMap.get(orgId); - if (totalBandwidth) { - const bandwidthUsage = await usageService.add( - orgId, - FeatureId.EGRESS_DATA_MB, - totalBandwidth - ); - if (bandwidthUsage) { - // Fire and forget - don't block on limit checking - usageService - .checkLimitSet( - orgId, - FeatureId.EGRESS_DATA_MB, - bandwidthUsage - ) - .catch((error: any) => { - logger.error( - `Error checking bandwidth limits for org ${orgId}:`, - error - ); - }); - } - } - } catch (error) { - logger.error(`Error processing usage for org ${orgId}:`, error); - // Continue with other orgs - } - } - } - - // Handle sites that reported zero bandwidth but need online status updated - const zeroBandwidthPeers = sortedBandwidthData.filter( - (peer) => peer.bytesIn === 0 && !offlineSites.has(peer.publicKey) - ); - - if (zeroBandwidthPeers.length > 0) { - // Fetch all zero bandwidth sites in one query - const zeroBandwidthSites = await db - .select() - .from(sites) - .where( - inArray( - sites.pubKey, - zeroBandwidthPeers.map((p) => p.publicKey) - ) - ); - - // Sort by siteId to ensure consistent lock ordering - const sortedZeroBandwidthSites = zeroBandwidthSites.sort( - (a, b) => a.siteId - b.siteId - ); - - for (const site of sortedZeroBandwidthSites) { - let newOnlineStatus = site.online; - - // Check if site should go offline based on last bandwidth update WITH DATA - if (site.lastBandwidthUpdate) { - const lastUpdateWithData = new Date(site.lastBandwidthUpdate); - if (lastUpdateWithData < oneMinuteAgo) { - newOnlineStatus = false; - } - } else { - // No previous data update recorded, set to offline - newOnlineStatus = false; - } - - // Only update online status if it changed - if (site.online !== newOnlineStatus) { - try { - const updatedSite = await withDeadlockRetry(async () => { - const [result] = await db - .update(sites) - .set({ - online: newOnlineStatus - }) - .where(eq(sites.siteId, site.siteId)) - .returning(); - return result; - }, `update offline status for site ${site.siteId}`); - - if (updatedSite && exitNodeId) { - const notAllowed = await checkExitNodeOrg( - exitNodeId, - updatedSite.orgId - ); - if (notAllowed) { - logger.warn( - `Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}` - ); - } - } - - // If site went offline, add it to our tracking set - if (!newOnlineStatus && site.pubKey) { - offlineSites.add(site.pubKey); - } - } catch (error) { - logger.error( - `Failed to update offline status for site ${site.siteId}:`, - error - ); - // Continue with other sites - } - } - } - } -} +}; \ No newline at end of file diff --git a/server/routers/newt/handleReceiveBandwidthMessage.ts b/server/routers/newt/handleReceiveBandwidthMessage.ts index eb930e682..f086333e7 100644 --- a/server/routers/newt/handleReceiveBandwidthMessage.ts +++ b/server/routers/newt/handleReceiveBandwidthMessage.ts @@ -10,10 +10,21 @@ interface PeerBandwidth { bytesOut: number; } +interface BandwidthAccumulator { + bytesIn: number; + bytesOut: number; +} + // Retry configuration for deadlock handling const MAX_RETRIES = 3; const BASE_DELAY_MS = 50; +// How often to flush accumulated bandwidth data to the database +const FLUSH_INTERVAL_MS = 120_000; // 120 seconds + +// In-memory accumulator: publicKey -> { bytesIn, bytesOut } +let accumulator = new Map(); + /** * Check if an error is a deadlock error */ @@ -53,6 +64,90 @@ async function withDeadlockRetry( } } +/** + * Flush all accumulated bandwidth data to the database. + * + * Swaps out the accumulator before writing so that any bandwidth messages + * received during the flush are captured in the new accumulator rather than + * being lost or causing contention. Entries that fail to write are re-queued + * back into the accumulator so they will be retried on the next flush. + * + * This function is exported so that the application's graceful-shutdown + * cleanup handler can call it before the process exits. + */ +export async function flushBandwidthToDb(): Promise { + if (accumulator.size === 0) { + return; + } + + // Atomically swap out the accumulator so new data keeps flowing in + // while we write the snapshot to the database. + const snapshot = accumulator; + accumulator = new Map(); + + const currentTime = new Date().toISOString(); + + // Sort by publicKey for consistent lock ordering across concurrent + // writers — this is the same deadlock-prevention strategy used in the + // original per-message implementation. + const sortedEntries = [...snapshot.entries()].sort(([a], [b]) => + a.localeCompare(b) + ); + + logger.debug( + `Flushing accumulated bandwidth data for ${sortedEntries.length} client(s) to the database` + ); + + for (const [publicKey, { bytesIn, bytesOut }] of sortedEntries) { + try { + await withDeadlockRetry(async () => { + // Use atomic SQL increment to avoid the SELECT-then-UPDATE + // anti-pattern and the races it would introduce. + await db + .update(clients) + .set({ + // Note: bytesIn from peer goes to megabytesOut (data + // sent to client) and bytesOut from peer goes to + // megabytesIn (data received from client). + megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`, + megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`, + lastBandwidthUpdate: currentTime + }) + .where(eq(clients.pubKey, publicKey)); + }, `flush bandwidth for client ${publicKey}`); + } catch (error) { + logger.error( + `Failed to flush bandwidth for client ${publicKey}:`, + error + ); + + // Re-queue the failed entry so it is retried on the next flush + // rather than silently dropped. + const existing = accumulator.get(publicKey); + if (existing) { + existing.bytesIn += bytesIn; + existing.bytesOut += bytesOut; + } else { + accumulator.set(publicKey, { bytesIn, bytesOut }); + } + } + } +} + +const flushTimer = setInterval(async () => { + try { + await flushBandwidthToDb(); + } catch (error) { + logger.error("Unexpected error during periodic bandwidth flush:", error); + } +}, FLUSH_INTERVAL_MS); + +// Calling unref() means this timer will not keep the Node.js event loop alive +// on its own — the process can still exit normally when there is no other work +// left. The graceful-shutdown path (see server/cleanup.ts) will call +// flushBandwidthToDb() explicitly before process.exit(), so no data is lost. +flushTimer.unref(); + export const handleReceiveBandwidthMessage: MessageHandler = async ( context ) => { @@ -69,40 +164,21 @@ export const handleReceiveBandwidthMessage: MessageHandler = async ( throw new Error("Invalid bandwidth data"); } - // Sort bandwidth data by publicKey to ensure consistent lock ordering across all instances - // This is critical for preventing deadlocks when multiple instances update the same clients - const sortedBandwidthData = [...bandwidthData].sort((a, b) => - a.publicKey.localeCompare(b.publicKey) - ); + // Accumulate the incoming data in memory; the periodic timer (and the + // shutdown hook) will take care of writing it to the database. + for (const { publicKey, bytesIn, bytesOut } of bandwidthData) { + // Skip peers that haven't transferred any data — writing zeros to the + // database would be a no-op anyway. + if (bytesIn <= 0 && bytesOut <= 0) { + continue; + } - const currentTime = new Date().toISOString(); - - // Update each client individually with retry logic - // This reduces transaction scope and allows retries per-client - for (const peer of sortedBandwidthData) { - const { publicKey, bytesIn, bytesOut } = peer; - - try { - await withDeadlockRetry(async () => { - // Use atomic SQL increment to avoid SELECT then UPDATE pattern - // This eliminates the need to read the current value first - await db - .update(clients) - .set({ - // Note: bytesIn from peer goes to megabytesOut (data sent to client) - // and bytesOut from peer goes to megabytesIn (data received from client) - megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`, - megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`, - lastBandwidthUpdate: currentTime - }) - .where(eq(clients.pubKey, publicKey)); - }, `update client bandwidth ${publicKey}`); - } catch (error) { - logger.error( - `Failed to update bandwidth for client ${publicKey}:`, - error - ); - // Continue with other clients even if one fails + const existing = accumulator.get(publicKey); + if (existing) { + existing.bytesIn += bytesIn; + existing.bytesOut += bytesOut; + } else { + accumulator.set(publicKey, { bytesIn, bytesOut }); } } };