diff --git a/server/private/routers/gerbil/receiveBandwidth.ts b/server/private/routers/gerbil/receiveBandwidth.ts deleted file mode 100644 index de0b2d2b..00000000 --- a/server/private/routers/gerbil/receiveBandwidth.ts +++ /dev/null @@ -1,13 +0,0 @@ -/* - * This file is part of a proprietary work. - * - * Copyright (c) 2025 Fossorial, Inc. - * All rights reserved. - * - * This file is licensed under the Fossorial Commercial License. - * You may not use this file except in compliance with the License. - * Unauthorized use, copying, modification, or distribution is strictly prohibited. - * - * This file is not licensed under the AGPLv3. - */ - diff --git a/server/routers/gerbil/receiveBandwidth.ts b/server/routers/gerbil/receiveBandwidth.ts index ffbd05c1..297e7c02 100644 --- a/server/routers/gerbil/receiveBandwidth.ts +++ b/server/routers/gerbil/receiveBandwidth.ts @@ -14,12 +14,55 @@ import { build } from "@server/build"; // Track sites that are already offline to avoid unnecessary queries const offlineSites = new Set(); +// Retry configuration for deadlock handling +const MAX_RETRIES = 3; +const BASE_DELAY_MS = 50; + interface PeerBandwidth { publicKey: string; bytesIn: number; bytesOut: number; } +/** + * Check if an error is a deadlock error + */ +function isDeadlockError(error: any): boolean { + return ( + error?.code === "40P01" || + error?.cause?.code === "40P01" || + (error?.message && error.message.includes("deadlock")) + ); +} + +/** + * Execute a function with retry logic for deadlock handling + */ +async function withDeadlockRetry( + operation: () => Promise, + context: string +): Promise { + let attempt = 0; + while (true) { + try { + return await operation(); + } catch (error: any) { + if (isDeadlockError(error) && attempt < MAX_RETRIES) { + attempt++; + const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS; + const jitter = Math.random() * baseDelay; + const delay = baseDelay + jitter; + logger.warn( + `Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + throw error; + } + } +} + export const receiveBandwidth = async ( req: Request, res: Response, @@ -60,201 +103,208 @@ export async function updateSiteBandwidth( const currentTime = new Date(); const oneMinuteAgo = new Date(currentTime.getTime() - 60000); // 1 minute ago - // logger.debug(`Received data: ${JSON.stringify(bandwidthData)}`); + // Sort bandwidth data by publicKey to ensure consistent lock ordering across all instances + // This is critical for preventing deadlocks when multiple instances update the same sites + const sortedBandwidthData = [...bandwidthData].sort((a, b) => + a.publicKey.localeCompare(b.publicKey) + ); - await db.transaction(async (trx) => { - // First, handle sites that are actively reporting bandwidth - const activePeers = bandwidthData.filter((peer) => peer.bytesIn > 0); // Bytesout will have data as it tries to send keep alive messages + // First, handle sites that are actively reporting bandwidth + const activePeers = sortedBandwidthData.filter((peer) => peer.bytesIn > 0); - if (activePeers.length > 0) { - // Remove any active peers from offline tracking since they're sending data - activePeers.forEach((peer) => offlineSites.delete(peer.publicKey)); + // Aggregate usage data by organization (collected outside transaction) + const orgUsageMap = new Map(); + const orgUptimeMap = new Map(); - // Aggregate usage data by organization - const orgUsageMap = new Map(); - const orgUptimeMap = new Map(); + if (activePeers.length > 0) { + // Remove any active peers from offline tracking since they're sending data + activePeers.forEach((peer) => offlineSites.delete(peer.publicKey)); - // Update all active sites with bandwidth data and get the site data in one operation - const updatedSites = []; - for (const peer of activePeers) { - const [updatedSite] = await trx - .update(sites) - .set({ - megabytesOut: sql`${sites.megabytesOut} + ${peer.bytesIn}`, - megabytesIn: sql`${sites.megabytesIn} + ${peer.bytesOut}`, - lastBandwidthUpdate: currentTime.toISOString(), - online: true - }) - .where(eq(sites.pubKey, peer.publicKey)) - .returning({ - online: sites.online, - orgId: sites.orgId, - siteId: sites.siteId, - lastBandwidthUpdate: sites.lastBandwidthUpdate - }); + // Update each active site individually with retry logic + // This reduces transaction scope and allows retries per-site + for (const peer of activePeers) { + try { + const updatedSite = await withDeadlockRetry(async () => { + const [result] = await db + .update(sites) + .set({ + megabytesOut: sql`${sites.megabytesOut} + ${peer.bytesIn}`, + megabytesIn: sql`${sites.megabytesIn} + ${peer.bytesOut}`, + lastBandwidthUpdate: currentTime.toISOString(), + online: true + }) + .where(eq(sites.pubKey, peer.publicKey)) + .returning({ + online: sites.online, + orgId: sites.orgId, + siteId: sites.siteId, + lastBandwidthUpdate: sites.lastBandwidthUpdate + }); + return result; + }, `update active site ${peer.publicKey}`); if (updatedSite) { if (exitNodeId) { - if ( - await checkExitNodeOrg( - exitNodeId, - updatedSite.orgId, - trx - ) - ) { - // not allowed + const notAllowed = await checkExitNodeOrg( + exitNodeId, + updatedSite.orgId + ); + if (notAllowed) { logger.warn( `Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}` ); - // THIS SHOULD TRIGGER THE TRANSACTION TO FAIL? - throw new Error("Exit node not allowed"); + // Skip this site but continue processing others + continue; } } - updatedSites.push({ ...updatedSite, peer }); - } - } - - // Calculate org usage aggregations using the updated site data - for (const { peer, ...site } of updatedSites) { - // Aggregate bandwidth usage for the org - const totalBandwidth = peer.bytesIn + peer.bytesOut; - const currentOrgUsage = orgUsageMap.get(site.orgId) || 0; - orgUsageMap.set(site.orgId, currentOrgUsage + totalBandwidth); - - // Add 10 seconds of uptime for each active site - const currentOrgUptime = orgUptimeMap.get(site.orgId) || 0; - orgUptimeMap.set(site.orgId, currentOrgUptime + 10 / 60); // Store in minutes and jut add 10 seconds - } - - if (calcUsageAndLimits) { - // REMOTE EXIT NODES DO NOT COUNT TOWARDS USAGE - // Process all usage updates sequentially by organization to reduce deadlock risk - const allOrgIds = new Set([...orgUsageMap.keys(), ...orgUptimeMap.keys()]); - - for (const orgId of allOrgIds) { - try { - // Process bandwidth usage for this org - const totalBandwidth = orgUsageMap.get(orgId); - if (totalBandwidth) { - const bandwidthUsage = await usageService.add( - orgId, - FeatureId.EGRESS_DATA_MB, - totalBandwidth, - trx - ); - if (bandwidthUsage) { - usageService - .checkLimitSet( - orgId, - true, - FeatureId.EGRESS_DATA_MB, - bandwidthUsage, - trx - ) - .catch((error: any) => { - logger.error( - `Error checking bandwidth limits for org ${orgId}:`, - error - ); - }); - } - } - - // Process uptime usage for this org - const totalUptime = orgUptimeMap.get(orgId); - if (totalUptime) { - const uptimeUsage = await usageService.add( - orgId, - FeatureId.SITE_UPTIME, - totalUptime, - trx - ); - if (uptimeUsage) { - usageService - .checkLimitSet( - orgId, - true, - FeatureId.SITE_UPTIME, - uptimeUsage, - trx - ) - .catch((error: any) => { - logger.error( - `Error checking uptime limits for org ${orgId}:`, - error - ); - }); - } - } - } catch (error) { - logger.error( - `Error processing usage for org ${orgId}:`, - error - ); - // Don't break the loop, continue with other orgs - } + // Aggregate bandwidth usage for the org + const totalBandwidth = peer.bytesIn + peer.bytesOut; + const currentOrgUsage = orgUsageMap.get(updatedSite.orgId) || 0; + orgUsageMap.set(updatedSite.orgId, currentOrgUsage + totalBandwidth); + + // Add 10 seconds of uptime for each active site + const currentOrgUptime = orgUptimeMap.get(updatedSite.orgId) || 0; + orgUptimeMap.set(updatedSite.orgId, currentOrgUptime + 10 / 60); } + } catch (error) { + logger.error( + `Failed to update bandwidth for site ${peer.publicKey}:`, + error + ); + // Continue with other sites } } + } - // Handle sites that reported zero bandwidth but need online status updated - const zeroBandwidthPeers = bandwidthData.filter( - (peer) => peer.bytesIn === 0 && !offlineSites.has(peer.publicKey) // Bytesout will have data as it tries to send keep alive messages - ); + // Process usage updates outside of site update transactions + // This separates the concerns and reduces lock contention + if (calcUsageAndLimits && (orgUsageMap.size > 0 || orgUptimeMap.size > 0)) { + // Sort org IDs to ensure consistent lock ordering + const allOrgIds = [...new Set([...orgUsageMap.keys(), ...orgUptimeMap.keys()])].sort(); - if (zeroBandwidthPeers.length > 0) { - const zeroBandwidthSites = await trx - .select() - .from(sites) - .where( - inArray( - sites.pubKey, - zeroBandwidthPeers.map((p) => p.publicKey) - ) - ); - - for (const site of zeroBandwidthSites) { - let newOnlineStatus = site.online; - - // Check if site should go offline based on last bandwidth update WITH DATA - if (site.lastBandwidthUpdate) { - const lastUpdateWithData = new Date( - site.lastBandwidthUpdate + for (const orgId of allOrgIds) { + try { + // Process bandwidth usage for this org + const totalBandwidth = orgUsageMap.get(orgId); + if (totalBandwidth) { + const bandwidthUsage = await usageService.add( + orgId, + FeatureId.EGRESS_DATA_MB, + totalBandwidth ); - if (lastUpdateWithData < oneMinuteAgo) { - newOnlineStatus = false; + if (bandwidthUsage) { + // Fire and forget - don't block on limit checking + usageService + .checkLimitSet( + orgId, + true, + FeatureId.EGRESS_DATA_MB, + bandwidthUsage + ) + .catch((error: any) => { + logger.error( + `Error checking bandwidth limits for org ${orgId}:`, + error + ); + }); } - } else { - // No previous data update recorded, set to offline - newOnlineStatus = false; } - // Always update lastBandwidthUpdate to show this instance is receiving reports - // Only update online status if it changed - if (site.online !== newOnlineStatus) { - const [updatedSite] = await trx - .update(sites) - .set({ - online: newOnlineStatus - }) - .where(eq(sites.siteId, site.siteId)) - .returning(); + // Process uptime usage for this org + const totalUptime = orgUptimeMap.get(orgId); + if (totalUptime) { + const uptimeUsage = await usageService.add( + orgId, + FeatureId.SITE_UPTIME, + totalUptime + ); + if (uptimeUsage) { + // Fire and forget - don't block on limit checking + usageService + .checkLimitSet( + orgId, + true, + FeatureId.SITE_UPTIME, + uptimeUsage + ) + .catch((error: any) => { + logger.error( + `Error checking uptime limits for org ${orgId}:`, + error + ); + }); + } + } + } catch (error) { + logger.error( + `Error processing usage for org ${orgId}:`, + error + ); + // Continue with other orgs + } + } + } + + // Handle sites that reported zero bandwidth but need online status updated + const zeroBandwidthPeers = sortedBandwidthData.filter( + (peer) => peer.bytesIn === 0 && !offlineSites.has(peer.publicKey) + ); + + if (zeroBandwidthPeers.length > 0) { + // Fetch all zero bandwidth sites in one query + const zeroBandwidthSites = await db + .select() + .from(sites) + .where( + inArray( + sites.pubKey, + zeroBandwidthPeers.map((p) => p.publicKey) + ) + ); + + // Sort by siteId to ensure consistent lock ordering + const sortedZeroBandwidthSites = zeroBandwidthSites.sort( + (a, b) => a.siteId - b.siteId + ); + + for (const site of sortedZeroBandwidthSites) { + let newOnlineStatus = site.online; + + // Check if site should go offline based on last bandwidth update WITH DATA + if (site.lastBandwidthUpdate) { + const lastUpdateWithData = new Date(site.lastBandwidthUpdate); + if (lastUpdateWithData < oneMinuteAgo) { + newOnlineStatus = false; + } + } else { + // No previous data update recorded, set to offline + newOnlineStatus = false; + } + + // Only update online status if it changed + if (site.online !== newOnlineStatus) { + try { + const updatedSite = await withDeadlockRetry(async () => { + const [result] = await db + .update(sites) + .set({ + online: newOnlineStatus + }) + .where(eq(sites.siteId, site.siteId)) + .returning(); + return result; + }, `update offline status for site ${site.siteId}`); if (updatedSite && exitNodeId) { - if ( - await checkExitNodeOrg( - exitNodeId, - updatedSite.orgId, - trx - ) - ) { - // not allowed + const notAllowed = await checkExitNodeOrg( + exitNodeId, + updatedSite.orgId + ); + if (notAllowed) { logger.warn( `Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}` ); - // THIS SHOULD TRIGGER THE TRANSACTION TO FAIL? - throw new Error("Exit node not allowed"); } } @@ -262,8 +312,14 @@ export async function updateSiteBandwidth( if (!newOnlineStatus && site.pubKey) { offlineSites.add(site.pubKey); } + } catch (error) { + logger.error( + `Failed to update offline status for site ${site.siteId}:`, + error + ); + // Continue with other sites } } } - }); + } }