From b71f582329f4835e5a5589514c3488d722f95bc7 Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 17 Feb 2026 15:09:42 -0800 Subject: [PATCH] Use the billing org id when updating and checking usage --- server/lib/billing/usageService.ts | 346 ++++++++++++++++------------- 1 file changed, 195 insertions(+), 151 deletions(-) diff --git a/server/lib/billing/usageService.ts b/server/lib/billing/usageService.ts index c4ae2925..03fa42a8 100644 --- a/server/lib/billing/usageService.ts +++ b/server/lib/billing/usageService.ts @@ -10,7 +10,8 @@ import { limits, Usage, Limit, - Transaction + Transaction, + orgs } from "@server/db"; import { FeatureId, getFeatureMeterId } from "./features"; import logger from "@server/logger"; @@ -37,10 +38,10 @@ export function noop() { } export class UsageService { - private bucketName: string | undefined; - private events: StripeEvent[] = []; - private lastUploadTime: number = Date.now(); - private isUploading: boolean = false; + // private bucketName: string | undefined; + // private events: StripeEvent[] = []; + // private lastUploadTime: number = Date.now(); + // private isUploading: boolean = false; constructor() { if (noop()) { @@ -91,6 +92,8 @@ export class UsageService { return null; } + let orgIdToUse = await this.getBillingOrg(orgId, transaction); + // Truncate value to 11 decimal places value = this.truncateValue(value); @@ -100,20 +103,20 @@ export class UsageService { while (attempt <= maxRetries) { try { - // Get subscription data for this org (with caching) - const customerId = await this.getCustomerId(orgId, featureId); + // // Get subscription data for this org (with caching) + // const customerId = await this.getCustomerId(orgIdToUse, featureId); - if (!customerId) { - logger.warn( - `No subscription data found for org ${orgId} and feature ${featureId}` - ); - return null; - } + // if (!customerId) { + // logger.warn( + // `No subscription data found for org ${orgIdToUse} and feature ${featureId}` + // ); + // return null; + // } let usage; if (transaction) { usage = await this.internalAddUsage( - orgId, + orgIdToUse, featureId, value, transaction @@ -121,7 +124,7 @@ export class UsageService { } else { await db.transaction(async (trx) => { usage = await this.internalAddUsage( - orgId, + orgIdToUse, featureId, value, trx @@ -150,7 +153,7 @@ export class UsageService { const delay = baseDelay + jitter; logger.warn( - `Deadlock detected for ${orgId}/${featureId}, retrying attempt ${attempt}/${maxRetries} after ${delay.toFixed(0)}ms` + `Deadlock detected for ${orgIdToUse}/${featureId}, retrying attempt ${attempt}/${maxRetries} after ${delay.toFixed(0)}ms` ); await new Promise((resolve) => setTimeout(resolve, delay)); @@ -158,7 +161,7 @@ export class UsageService { } logger.error( - `Failed to add usage for ${orgId}/${featureId} after ${attempt} attempts:`, + `Failed to add usage for ${orgIdToUse}/${featureId} after ${attempt} attempts:`, error ); break; @@ -169,7 +172,7 @@ export class UsageService { } private async internalAddUsage( - orgId: string, + orgId: string, // here the orgId is the billing org already resolved by getBillingOrg in updateCount featureId: FeatureId, value: number, trx: Transaction @@ -221,17 +224,20 @@ export class UsageService { if (noop()) { return; } + + let orgIdToUse = await this.getBillingOrg(orgId); + try { - if (!customerId) { - customerId = - (await this.getCustomerId(orgId, featureId)) || undefined; - if (!customerId) { - logger.warn( - `No subscription data found for org ${orgId} and feature ${featureId}` - ); - return; - } - } + // if (!customerId) { + // customerId = + // (await this.getCustomerId(orgIdToUse, featureId)) || undefined; + // if (!customerId) { + // logger.warn( + // `No subscription data found for org ${orgIdToUse} and feature ${featureId}` + // ); + // return; + // } + // } // Truncate value to 11 decimal places if provided if (value !== undefined && value !== null) { @@ -242,7 +248,7 @@ export class UsageService { await db.transaction(async (trx) => { // Get existing meter record - const usageId = `${orgId}-${featureId}`; + const usageId = `${orgIdToUse}-${featureId}`; // Get current usage record [currentUsage] = await trx .select() @@ -264,7 +270,7 @@ export class UsageService { await trx.insert(usage).values({ usageId, featureId, - orgId, + orgId: orgIdToUse, meterId, instantaneousValue: value || 0, latestValue: value || 0, @@ -278,7 +284,7 @@ export class UsageService { // } } catch (error) { logger.error( - `Failed to update count usage for ${orgId}/${featureId}:`, + `Failed to update count usage for ${orgIdToUse}/${featureId}:`, error ); } @@ -288,7 +294,9 @@ export class UsageService { orgId: string, featureId: FeatureId ): Promise { - const cacheKey = `customer_${orgId}_${featureId}`; + let orgIdToUse = await this.getBillingOrg(orgId); + + const cacheKey = `customer_${orgIdToUse}_${featureId}`; const cached = cache.get(cacheKey); if (cached) { @@ -302,7 +310,7 @@ export class UsageService { customerId: customers.customerId }) .from(customers) - .where(eq(customers.orgId, orgId)) + .where(eq(customers.orgId, orgIdToUse)) .limit(1); if (!customer) { @@ -317,112 +325,13 @@ export class UsageService { return customerId; } catch (error) { logger.error( - `Failed to get subscription data for ${orgId}/${featureId}:`, + `Failed to get subscription data for ${orgIdToUse}/${featureId}:`, error ); return null; } } - private async logStripeEvent( - featureId: FeatureId, - value: number, - customerId: string - ): Promise { - // Truncate value to 11 decimal places before sending to Stripe - const truncatedValue = this.truncateValue(value); - - const event: StripeEvent = { - identifier: uuidv4(), - timestamp: Math.floor(new Date().getTime() / 1000), - event_name: featureId, - payload: { - value: truncatedValue, - stripe_customer_id: customerId - } - }; - - this.addEventToMemory(event); - await this.checkAndUploadEvents(); - } - - private addEventToMemory(event: StripeEvent): void { - if (!this.bucketName) { - logger.warn( - "S3 bucket name is not configured, skipping event storage." - ); - return; - } - this.events.push(event); - } - - private async checkAndUploadEvents(): Promise { - const now = Date.now(); - const timeSinceLastUpload = now - this.lastUploadTime; - - // Check if at least 1 minute has passed since last upload - if (timeSinceLastUpload >= 60000 && this.events.length > 0) { - await this.uploadEventsToS3(); - } - } - - private async uploadEventsToS3(): Promise { - if (!this.bucketName) { - logger.warn( - "S3 bucket name is not configured, skipping S3 upload." - ); - return; - } - - if (this.events.length === 0) { - return; - } - - // Check if already uploading - if (this.isUploading) { - logger.debug("Already uploading events, skipping"); - return; - } - - this.isUploading = true; - - try { - // Take a snapshot of current events and clear the array - const eventsToUpload = [...this.events]; - this.events = []; - this.lastUploadTime = Date.now(); - - const fileName = this.generateEventFileName(); - const fileContent = JSON.stringify(eventsToUpload, null, 2); - - // Upload to S3 - const uploadCommand = new PutObjectCommand({ - Bucket: this.bucketName, - Key: fileName, - Body: fileContent, - ContentType: "application/json" - }); - - await s3Client.send(uploadCommand); - - logger.info( - `Uploaded ${fileName} to S3 with ${eventsToUpload.length} events` - ); - } catch (error) { - logger.error("Failed to upload events to S3:", error); - // Note: Events are lost if upload fails. In a production system, - // you might want to add the events back to the array or implement retry logic - } finally { - this.isUploading = false; - } - } - - private generateEventFileName(): string { - const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); - const uuid = uuidv4().substring(0, 8); - return `events-${timestamp}-${uuid}.json`; - } - public async getUsage( orgId: string, featureId: FeatureId, @@ -432,7 +341,9 @@ export class UsageService { return null; } - const usageId = `${orgId}-${featureId}`; + let orgIdToUse = await this.getBillingOrg(orgId, trx); + + const usageId = `${orgIdToUse}-${featureId}`; try { const [result] = await trx @@ -444,7 +355,7 @@ export class UsageService { if (!result) { // Lets create one if it doesn't exist using upsert to handle race conditions logger.info( - `Creating new usage record for ${orgId}/${featureId}` + `Creating new usage record for ${orgIdToUse}/${featureId}` ); const meterId = getFeatureMeterId(featureId); @@ -454,7 +365,7 @@ export class UsageService { .values({ usageId, featureId, - orgId, + orgId: orgIdToUse, meterId, latestValue: 0, updatedAt: Math.floor(Date.now() / 1000) @@ -476,7 +387,7 @@ export class UsageService { } catch (insertError) { // Fallback: try to fetch existing record in case of any insert issues logger.warn( - `Insert failed for ${orgId}/${featureId}, attempting to fetch existing record:`, + `Insert failed for ${orgIdToUse}/${featureId}, attempting to fetch existing record:`, insertError ); const [existingUsage] = await trx @@ -491,19 +402,41 @@ export class UsageService { return result; } catch (error) { logger.error( - `Failed to get usage for ${orgId}/${featureId}:`, + `Failed to get usage for ${orgIdToUse}/${featureId}:`, error ); throw error; } } - public async forceUpload(): Promise { - if (this.events.length > 0) { - // Force upload regardless of time - this.lastUploadTime = 0; // Reset to force upload - await this.uploadEventsToS3(); + public async getBillingOrg( + orgId: string, + trx: Transaction | typeof db = db + ): Promise { + let orgIdToUse = orgId; + + // get the org + const [org] = await trx + .select() + .from(orgs) + .where(eq(orgs.orgId, orgId)) + .limit(1); + + if (!org) { + throw new Error(`Organization with ID ${orgId} not found`); } + + if (!org.isBillingOrg) { + if (org.billingOrgId) { + orgIdToUse = org.billingOrgId; + } else { + throw new Error( + `Organization ${orgId} is not a billing org and does not have a billingOrgId set` + ); + } + } + + return orgIdToUse; } public async checkLimitSet( @@ -515,6 +448,9 @@ export class UsageService { if (noop()) { return false; } + + let orgIdToUse = await this.getBillingOrg(orgId, trx); + // This method should check the current usage against the limits set for the organization // and kick out all of the sites on the org let hasExceededLimits = false; @@ -528,7 +464,7 @@ export class UsageService { .from(limits) .where( and( - eq(limits.orgId, orgId), + eq(limits.orgId, orgIdToUse), eq(limits.featureId, featureId) ) ); @@ -537,11 +473,11 @@ export class UsageService { orgLimits = await trx .select() .from(limits) - .where(eq(limits.orgId, orgId)); + .where(eq(limits.orgId, orgIdToUse)); } if (orgLimits.length === 0) { - logger.debug(`No limits set for org ${orgId}`); + logger.debug(`No limits set for org ${orgIdToUse}`); return false; } @@ -552,7 +488,7 @@ export class UsageService { currentUsage = usage; } else { currentUsage = await this.getUsage( - orgId, + orgIdToUse, limit.featureId as FeatureId, trx ); @@ -563,10 +499,10 @@ export class UsageService { currentUsage?.latestValue || 0; logger.debug( - `Current usage for org ${orgId} on feature ${limit.featureId}: ${usageValue}` + `Current usage for org ${orgIdToUse} on feature ${limit.featureId}: ${usageValue}` ); logger.debug( - `Limit for org ${orgId} on feature ${limit.featureId}: ${limit.value}` + `Limit for org ${orgIdToUse} on feature ${limit.featureId}: ${limit.value}` ); if ( currentUsage && @@ -574,7 +510,7 @@ export class UsageService { usageValue > limit.value ) { logger.debug( - `Org ${orgId} has exceeded limit for ${limit.featureId}: ` + + `Org ${orgIdToUse} has exceeded limit for ${limit.featureId}: ` + `${usageValue} > ${limit.value}` ); hasExceededLimits = true; @@ -582,11 +518,119 @@ export class UsageService { } } } catch (error) { - logger.error(`Error checking limits for org ${orgId}:`, error); + logger.error(`Error checking limits for org ${orgIdToUse}:`, error); } return hasExceededLimits; } + + // private async logStripeEvent( + // featureId: FeatureId, + // value: number, + // customerId: string + // ): Promise { + // // Truncate value to 11 decimal places before sending to Stripe + // const truncatedValue = this.truncateValue(value); + + // const event: StripeEvent = { + // identifier: uuidv4(), + // timestamp: Math.floor(new Date().getTime() / 1000), + // event_name: featureId, + // payload: { + // value: truncatedValue, + // stripe_customer_id: customerId + // } + // }; + + // this.addEventToMemory(event); + // await this.checkAndUploadEvents(); + // } + + // private addEventToMemory(event: StripeEvent): void { + // if (!this.bucketName) { + // logger.warn( + // "S3 bucket name is not configured, skipping event storage." + // ); + // return; + // } + // this.events.push(event); + // } + + // private async checkAndUploadEvents(): Promise { + // const now = Date.now(); + // const timeSinceLastUpload = now - this.lastUploadTime; + + // // Check if at least 1 minute has passed since last upload + // if (timeSinceLastUpload >= 60000 && this.events.length > 0) { + // await this.uploadEventsToS3(); + // } + // } + + // private async uploadEventsToS3(): Promise { + // if (!this.bucketName) { + // logger.warn( + // "S3 bucket name is not configured, skipping S3 upload." + // ); + // return; + // } + + // if (this.events.length === 0) { + // return; + // } + + // // Check if already uploading + // if (this.isUploading) { + // logger.debug("Already uploading events, skipping"); + // return; + // } + + // this.isUploading = true; + + // try { + // // Take a snapshot of current events and clear the array + // const eventsToUpload = [...this.events]; + // this.events = []; + // this.lastUploadTime = Date.now(); + + // const fileName = this.generateEventFileName(); + // const fileContent = JSON.stringify(eventsToUpload, null, 2); + + // // Upload to S3 + // const uploadCommand = new PutObjectCommand({ + // Bucket: this.bucketName, + // Key: fileName, + // Body: fileContent, + // ContentType: "application/json" + // }); + + // await s3Client.send(uploadCommand); + + // logger.info( + // `Uploaded ${fileName} to S3 with ${eventsToUpload.length} events` + // ); + // } catch (error) { + // logger.error("Failed to upload events to S3:", error); + // // Note: Events are lost if upload fails. In a production system, + // // you might want to add the events back to the array or implement retry logic + // } finally { + // this.isUploading = false; + // } + // } + + // private generateEventFileName(): string { + // const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); + // const uuid = uuidv4().substring(0, 8); + // return `events-${timestamp}-${uuid}.json`; + // } + + // public async forceUpload(): Promise { + // if (this.events.length > 0) { + // // Force upload regardless of time + // this.lastUploadTime = 0; // Reset to force upload + // await this.uploadEventsToS3(); + // } + // } + } export const usageService = new UsageService();