From 06fb3685e4fe67518883b4acb79b6815f956c377 Mon Sep 17 00:00:00 2001 From: Owen Date: Thu, 11 Jun 2026 12:25:57 -0700 Subject: [PATCH] Add queue --- server/index.ts | 2 + server/lib/rebuildClientAssociations.ts | 104 +++++++++++++-- server/lib/rebuildQueue.ts | 23 ++++ server/private/lib/rebuildQueue.ts | 169 ++++++++++++++++++++++++ 4 files changed, 288 insertions(+), 10 deletions(-) create mode 100644 server/lib/rebuildQueue.ts create mode 100644 server/private/lib/rebuildQueue.ts diff --git a/server/index.ts b/server/index.ts index 99fd20156..53b3e9a69 100644 --- a/server/index.ts +++ b/server/index.ts @@ -24,6 +24,7 @@ import license from "#dynamic/license/license"; import { initLogCleanupInterval } from "@server/lib/cleanupLogs"; import { initAcmeCertSync } from "#dynamic/lib/acmeCertSync"; import { fetchServerIp } from "@server/lib/serverIpService"; +import { startRebuildQueueProcessor } from "@server/lib/rebuildClientAssociations"; async function startServers() { await setHostMeta(); @@ -41,6 +42,7 @@ async function startServers() { initLogCleanupInterval(); initAcmeCertSync(); + startRebuildQueueProcessor(); // Start all servers const apiServer = createApiServer(); diff --git a/server/lib/rebuildClientAssociations.ts b/server/lib/rebuildClientAssociations.ts index 4efc72476..8b601ae71 100644 --- a/server/lib/rebuildClientAssociations.ts +++ b/server/lib/rebuildClientAssociations.ts @@ -8,6 +8,7 @@ import { exitNodes, newts, olms, + primaryDb, roleSiteResources, Site, SiteResource, @@ -40,6 +41,7 @@ import { removeTargets as removeSubnetProxyTargets } from "@server/routers/client/targets"; import { lockManager } from "#dynamic/lib/lock"; +import { rebuildQueue } from "#dynamic/lib/rebuildQueue"; // TTL for rebuild-association locks. These functions can fan out into many // peer/proxy updates, so give them a generous window. @@ -167,11 +169,32 @@ export async function rebuildClientAssociationsFromSiteResource( subnet: string | null; }[]; }> { - return await lockManager.withLock( - `rebuild-client-associations:site-resource:${siteResource.siteResourceId}`, - () => rebuildClientAssociationsFromSiteResourceImpl(siteResource, trx), - REBUILD_ASSOCIATIONS_LOCK_TTL_MS - ); + try { + return await lockManager.withLock( + `rebuild-client-associations:site-resource:${siteResource.siteResourceId}`, + () => + rebuildClientAssociationsFromSiteResourceImpl( + siteResource, + trx + ), + REBUILD_ASSOCIATIONS_LOCK_TTL_MS + ); + } catch (err: any) { + if ( + typeof err?.message === "string" && + err.message.startsWith("Failed to acquire lock") + ) { + logger.warn( + `rebuildClientAssociations: could not acquire lock for site resource ${siteResource.siteResourceId}, queuing for deferred processing` + ); + await rebuildQueue.enqueue({ + type: "site-resource", + id: siteResource.siteResourceId + }); + return { mergedAllClients: [] }; + } + throw err; + } } async function rebuildClientAssociationsFromSiteResourceImpl( @@ -956,11 +979,28 @@ export async function rebuildClientAssociationsFromClient( client: Client, trx: Transaction | typeof db = db ): Promise { - return await lockManager.withLock( - `rebuild-client-associations:client:${client.clientId}`, - () => rebuildClientAssociationsFromClientImpl(client, trx), - REBUILD_ASSOCIATIONS_LOCK_TTL_MS - ); + try { + return await lockManager.withLock( + `rebuild-client-associations:client:${client.clientId}`, + () => rebuildClientAssociationsFromClientImpl(client, trx), + REBUILD_ASSOCIATIONS_LOCK_TTL_MS + ); + } catch (err: any) { + if ( + typeof err?.message === "string" && + err.message.startsWith("Failed to acquire lock") + ) { + logger.warn( + `rebuildClientAssociations: could not acquire lock for client ${client.clientId}, queuing for deferred processing` + ); + await rebuildQueue.enqueue({ + type: "client", + id: client.clientId + }); + return; + } + throw err; + } } async function rebuildClientAssociationsFromClientImpl( @@ -1906,3 +1946,47 @@ export async function cleanupSiteAssociations( logger.debug(`cleanupSiteAssociations: DONE siteId=${siteId}`); } + +/** + * Start the background rebuild queue processor. This should be called once + * during server startup. Only one server instance at a time will actively + * consume the queue (enforced via a distributed Redis lock); all other + * instances will poll and wait until the lock becomes available. + */ +export function startRebuildQueueProcessor(): void { + rebuildQueue.startProcessing({ + onSiteResource: async (siteResourceId: number) => { + const [siteResource] = await primaryDb + .select() + .from(siteResources) + .where(eq(siteResources.siteResourceId, siteResourceId)); + + if (!siteResource) { + logger.warn( + `Rebuild queue: site resource ${siteResourceId} not found, skipping` + ); + return; + } + + await rebuildClientAssociationsFromSiteResource( + siteResource, + primaryDb + ); + }, + onClient: async (clientId: number) => { + const [client] = await primaryDb + .select() + .from(clients) + .where(eq(clients.clientId, clientId)); + + if (!client) { + logger.warn( + `Rebuild queue: client ${clientId} not found, skipping` + ); + return; + } + + await rebuildClientAssociationsFromClient(client, primaryDb); + } + }); +} diff --git a/server/lib/rebuildQueue.ts b/server/lib/rebuildQueue.ts new file mode 100644 index 000000000..475858108 --- /dev/null +++ b/server/lib/rebuildQueue.ts @@ -0,0 +1,23 @@ +export type RebuildJobType = "site-resource" | "client"; + +export interface RebuildJob { + type: RebuildJobType; + id: number; +} + +export interface RebuildJobHandlers { + onSiteResource(siteResourceId: number): Promise; + onClient(clientId: number): Promise; +} + +export interface RebuildQueueManager { + enqueue(job: RebuildJob): Promise; + startProcessing(handlers: RebuildJobHandlers): void; +} + +class NoopRebuildQueue implements RebuildQueueManager { + async enqueue(_job: RebuildJob): Promise {} + startProcessing(_handlers: RebuildJobHandlers): void {} +} + +export const rebuildQueue: RebuildQueueManager = new NoopRebuildQueue(); diff --git a/server/private/lib/rebuildQueue.ts b/server/private/lib/rebuildQueue.ts new file mode 100644 index 000000000..e5ee7e7cb --- /dev/null +++ b/server/private/lib/rebuildQueue.ts @@ -0,0 +1,169 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025-2026 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { redis } from "#private/lib/redis"; +import { lockManager } from "#dynamic/lib/lock"; +import logger from "@server/logger"; + +export type RebuildJobType = "site-resource" | "client"; + +export interface RebuildJob { + type: RebuildJobType; + id: number; +} + +export interface RebuildJobHandlers { + onSiteResource(siteResourceId: number): Promise; + onClient(clientId: number): Promise; +} + +// Redis list holding pending rebuild jobs (RPUSH to enqueue, LPOP to dequeue — FIFO order). +const QUEUE_KEY = "rebuild-client-associations:queue"; + +// Distributed lock that serialises queue consumption to a single server instance +// at a time. TTL is generous enough to cover a full batch of expensive rebuilds. +const PROCESSOR_LOCK_KEY = "rebuild-client-associations:processor"; + +// Each rebuild can take up to REBUILD_ASSOCIATIONS_LOCK_TTL_MS (120 s) per +// resource. Allow BATCH_SIZE resources per processor-lock acquisition, plus a +// small buffer. +const BATCH_SIZE = 5; +const PROCESSOR_LOCK_TTL_MS = 120000 * BATCH_SIZE + 30000; // ~630 s + +const POLL_INTERVAL_MS = 500; + +class RedisRebuildQueue { + private processingStarted = false; + + async enqueue(job: RebuildJob): Promise { + if (!redis || redis.status !== "ready") { + logger.warn( + `Rebuild queue: Redis not available — rebuild for ${job.type}:${job.id} will not be retried` + ); + return; + } + + try { + await redis.rpush(QUEUE_KEY, JSON.stringify(job)); + logger.debug( + `Rebuild queue: enqueued ${job.type}:${job.id} (queue position: tail)` + ); + } catch (err) { + logger.error( + `Rebuild queue: failed to enqueue ${job.type}:${job.id}:`, + err + ); + } + } + + startProcessing(handlers: RebuildJobHandlers): void { + if (this.processingStarted) return; + this.processingStarted = true; + + this.processLoop(handlers).catch((err) => { + logger.error("Rebuild queue processor loop crashed:", err); + }); + + logger.info("Rebuild queue processor started"); + } + + private async processLoop(handlers: RebuildJobHandlers): Promise { + while (true) { + try { + await this.tryProcessBatch(handlers); + } catch (err) { + logger.error( + "Rebuild queue: unhandled error in process loop:", + err + ); + } + await new Promise((resolve) => + setTimeout(resolve, POLL_INTERVAL_MS) + ); + } + } + + private async tryProcessBatch(handlers: RebuildJobHandlers): Promise { + if (!redis || redis.status !== "ready") return; + + // Peek before acquiring the processor lock to avoid unnecessary Redis + // round-trips and lock contention when the queue is idle. + const queueLength = await redis.llen(QUEUE_KEY).catch(() => 0); + if (queueLength === 0) return; + + try { + await lockManager.withLock( + PROCESSOR_LOCK_KEY, + async () => { + for (let i = 0; i < BATCH_SIZE; i++) { + if (!redis || redis.status !== "ready") break; + + const payload = await redis.lpop(QUEUE_KEY); + if (payload === null) break; // queue drained + + let job: RebuildJob; + try { + job = JSON.parse(payload) as RebuildJob; + } catch { + logger.error( + `Rebuild queue: could not parse job payload, discarding: ${payload}` + ); + continue; + } + + logger.debug( + `Rebuild queue: processing ${job.type}:${job.id}` + ); + + try { + if (job.type === "site-resource") { + await handlers.onSiteResource(job.id); + } else if (job.type === "client") { + await handlers.onClient(job.id); + } else { + logger.warn( + `Rebuild queue: unknown job type "${(job as any).type}", discarding` + ); + } + + logger.debug( + `Rebuild queue: completed ${job.type}:${job.id}` + ); + } catch (err) { + logger.error( + `Rebuild queue: job ${job.type}:${job.id} threw an error:`, + err + ); + } + } + }, + PROCESSOR_LOCK_TTL_MS + ); + } catch (err: any) { + if ( + typeof err?.message === "string" && + err.message.startsWith("Failed to acquire lock") + ) { + // Another server instance currently holds the processor lock and + // is consuming the queue — nothing to do this cycle. + logger.debug( + "Rebuild queue: processor lock held by another instance, skipping this cycle" + ); + } else { + throw err; + } + } + } +} + +export const rebuildQueue: RedisRebuildQueue = new RedisRebuildQueue();