diff --git a/server/routers/newt/handleReceiveBandwidthMessage.ts b/server/routers/newt/handleReceiveBandwidthMessage.ts index 3d060a0c..eb930e68 100644 --- a/server/routers/newt/handleReceiveBandwidthMessage.ts +++ b/server/routers/newt/handleReceiveBandwidthMessage.ts @@ -1,7 +1,7 @@ import { db } from "@server/db"; import { MessageHandler } from "@server/routers/ws"; -import { clients, Newt } from "@server/db"; -import { eq } from "drizzle-orm"; +import { clients } from "@server/db"; +import { eq, sql } from "drizzle-orm"; import logger from "@server/logger"; interface PeerBandwidth { @@ -10,13 +10,57 @@ interface PeerBandwidth { bytesOut: number; } +// Retry configuration for deadlock handling +const MAX_RETRIES = 3; +const BASE_DELAY_MS = 50; + +/** + * Check if an error is a deadlock error + */ +function isDeadlockError(error: any): boolean { + return ( + error?.code === "40P01" || + error?.cause?.code === "40P01" || + (error?.message && error.message.includes("deadlock")) + ); +} + +/** + * Execute a function with retry logic for deadlock handling + */ +async function withDeadlockRetry( + operation: () => Promise, + context: string +): Promise { + let attempt = 0; + while (true) { + try { + return await operation(); + } catch (error: any) { + if (isDeadlockError(error) && attempt < MAX_RETRIES) { + attempt++; + const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS; + const jitter = Math.random() * baseDelay; + const delay = baseDelay + jitter; + logger.warn( + `Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + throw error; + } + } +} + export const handleReceiveBandwidthMessage: MessageHandler = async ( context ) => { - const { message, client, sendToClient } = context; + const { message } = context; if (!message.data.bandwidthData) { logger.warn("No bandwidth data provided"); + return; } const bandwidthData: PeerBandwidth[] = message.data.bandwidthData; @@ -25,30 +69,40 @@ export const handleReceiveBandwidthMessage: MessageHandler = async ( throw new Error("Invalid bandwidth data"); } - await db.transaction(async (trx) => { - for (const peer of bandwidthData) { - const { publicKey, bytesIn, bytesOut } = peer; + // Sort bandwidth data by publicKey to ensure consistent lock ordering across all instances + // This is critical for preventing deadlocks when multiple instances update the same clients + const sortedBandwidthData = [...bandwidthData].sort((a, b) => + a.publicKey.localeCompare(b.publicKey) + ); - // Find the client by public key - const [client] = await trx - .select() - .from(clients) - .where(eq(clients.pubKey, publicKey)) - .limit(1); + const currentTime = new Date().toISOString(); - if (!client) { - continue; - } + // Update each client individually with retry logic + // This reduces transaction scope and allows retries per-client + for (const peer of sortedBandwidthData) { + const { publicKey, bytesIn, bytesOut } = peer; - // Update the client's bandwidth usage - await trx - .update(clients) - .set({ - megabytesOut: (client.megabytesIn || 0) + bytesIn, - megabytesIn: (client.megabytesOut || 0) + bytesOut, - lastBandwidthUpdate: new Date().toISOString() - }) - .where(eq(clients.clientId, client.clientId)); + try { + await withDeadlockRetry(async () => { + // Use atomic SQL increment to avoid SELECT then UPDATE pattern + // This eliminates the need to read the current value first + await db + .update(clients) + .set({ + // Note: bytesIn from peer goes to megabytesOut (data sent to client) + // and bytesOut from peer goes to megabytesIn (data received from client) + megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`, + megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`, + lastBandwidthUpdate: currentTime + }) + .where(eq(clients.pubKey, publicKey)); + }, `update client bandwidth ${publicKey}`); + } catch (error) { + logger.error( + `Failed to update bandwidth for client ${publicKey}:`, + error + ); + // Continue with other clients even if one fails } - }); + } };