From f2ba4b270f96bb49b0aa01e4e89b642abc186635 Mon Sep 17 00:00:00 2001 From: Owen Date: Thu, 29 Jan 2026 20:56:46 -0800 Subject: [PATCH] Dont write stripe to files anymore --- server/integrationApiServer.ts | 12 +- server/lib/billing/usageService.ts | 378 +++++---------------------- server/private/lib/config.ts | 5 +- server/private/lib/readConfigFile.ts | 2 +- 4 files changed, 71 insertions(+), 326 deletions(-) diff --git a/server/integrationApiServer.ts b/server/integrationApiServer.ts index 0ef0c0af..6d513cf6 100644 --- a/server/integrationApiServer.ts +++ b/server/integrationApiServer.ts @@ -105,11 +105,13 @@ function getOpenApiDocumentation() { servers: [{ url: "/v1" }] }); - // convert to yaml and save to file - const outputPath = path.join(APP_PATH, "openapi.yaml"); - const yamlOutput = yaml.dump(generated); - fs.writeFileSync(outputPath, yamlOutput, "utf8"); - logger.info(`OpenAPI documentation saved to ${outputPath}`); + if (!process.env.DISABLE_GEN_OPENAPI) { + // convert to yaml and save to file + const outputPath = path.join(APP_PATH, "openapi.yaml"); + const yamlOutput = yaml.dump(generated); + fs.writeFileSync(outputPath, yamlOutput, "utf8"); + logger.info(`OpenAPI documentation saved to ${outputPath}`); + } return generated; } diff --git a/server/lib/billing/usageService.ts b/server/lib/billing/usageService.ts index 0fde8eba..4d52ee69 100644 --- a/server/lib/billing/usageService.ts +++ b/server/lib/billing/usageService.ts @@ -1,8 +1,6 @@ import { eq, sql, and } from "drizzle-orm"; import { v4 as uuidv4 } from "uuid"; import { PutObjectCommand } from "@aws-sdk/client-s3"; -import * as fs from "fs/promises"; -import * as path from "path"; import { db, usage, @@ -34,8 +32,7 @@ interface StripeEvent { export function noop() { if ( build !== "saas" || - !process.env.S3_BUCKET || - !process.env.LOCAL_FILE_PATH + !process.env.S3_BUCKET ) { return true; } @@ -44,31 +41,37 @@ export function noop() { export class UsageService { private bucketName: string | undefined; - private currentEventFile: string | null = null; - private currentFileStartTime: number = 0; - private eventsDir: string | undefined; - private uploadingFiles: Set = new Set(); + private events: StripeEvent[] = []; + private lastUploadTime: number = Date.now(); + private isUploading: boolean = false; constructor() { if (noop()) { return; } - // this.bucketName = privateConfig.getRawPrivateConfig().stripe?.s3Bucket; - // this.eventsDir = privateConfig.getRawPrivateConfig().stripe?.localFilePath; this.bucketName = process.env.S3_BUCKET || undefined; - this.eventsDir = process.env.LOCAL_FILE_PATH || undefined; - // Ensure events directory exists - this.initializeEventsDirectory().then(() => { - this.uploadPendingEventFilesOnStartup(); - }); - - // Periodically check for old event files to upload + // Periodically check and upload events setInterval(() => { - this.uploadOldEventFiles().catch((err) => { - logger.error("Error in periodic event file upload:", err); + this.checkAndUploadEvents().catch((err) => { + logger.error("Error in periodic event upload:", err); }); }, 30000); // every 30 seconds + + // Handle graceful shutdown on SIGTERM + process.on("SIGTERM", async () => { + logger.info("SIGTERM received, uploading events before shutdown..."); + await this.forceUpload(); + logger.info("Events uploaded, proceeding with shutdown"); + }); + + // Handle SIGINT as well (Ctrl+C) + process.on("SIGINT", async () => { + logger.info("SIGINT received, uploading events before shutdown..."); + await this.forceUpload(); + logger.info("Events uploaded, proceeding with shutdown"); + process.exit(0); + }); } /** @@ -78,85 +81,6 @@ export class UsageService { return Math.round(value * 100000000000) / 100000000000; // 11 decimal places } - private async initializeEventsDirectory(): Promise { - if (!this.eventsDir) { - logger.warn( - "Stripe local file path is not configured, skipping events directory initialization." - ); - return; - } - try { - await fs.mkdir(this.eventsDir, { recursive: true }); - } catch (error) { - logger.error("Failed to create events directory:", error); - } - } - - private async uploadPendingEventFilesOnStartup(): Promise { - if (!this.eventsDir || !this.bucketName) { - logger.warn( - "Stripe local file path or bucket name is not configured, skipping leftover event file upload." - ); - return; - } - try { - const files = await fs.readdir(this.eventsDir); - for (const file of files) { - if (file.endsWith(".json")) { - const filePath = path.join(this.eventsDir, file); - try { - const fileContent = await fs.readFile( - filePath, - "utf-8" - ); - const events = JSON.parse(fileContent); - if (Array.isArray(events) && events.length > 0) { - // Upload to S3 - const uploadCommand = new PutObjectCommand({ - Bucket: this.bucketName, - Key: file, - Body: fileContent, - ContentType: "application/json" - }); - await s3Client.send(uploadCommand); - - // Check if file still exists before unlinking - try { - await fs.access(filePath); - await fs.unlink(filePath); - } catch (unlinkError) { - logger.debug( - `Startup file ${file} was already deleted` - ); - } - - logger.info( - `Uploaded leftover event file ${file} to S3 with ${events.length} events` - ); - } else { - // Remove empty file - try { - await fs.access(filePath); - await fs.unlink(filePath); - } catch (unlinkError) { - logger.debug( - `Empty startup file ${file} was already deleted` - ); - } - } - } catch (err) { - logger.error( - `Error processing leftover event file ${file}:`, - err - ); - } - } - } - } catch (error) { - logger.error("Failed to scan for leftover event files"); - } - } - public async add( orgId: string, featureId: FeatureId, @@ -450,121 +374,58 @@ export class UsageService { } }; - await this.writeEventToFile(event); - await this.checkAndUploadFile(); + this.addEventToMemory(event); + await this.checkAndUploadEvents(); } - private async writeEventToFile(event: StripeEvent): Promise { - if (!this.eventsDir || !this.bucketName) { + private addEventToMemory(event: StripeEvent): void { + if (!this.bucketName) { logger.warn( - "Stripe local file path or bucket name is not configured, skipping event file write." + "S3 bucket name is not configured, skipping event storage." ); return; } - if (!this.currentEventFile) { - this.currentEventFile = this.generateEventFileName(); - this.currentFileStartTime = Date.now(); - } - - const filePath = path.join(this.eventsDir, this.currentEventFile); - - try { - let events: StripeEvent[] = []; - - // Try to read existing file - try { - const fileContent = await fs.readFile(filePath, "utf-8"); - events = JSON.parse(fileContent); - } catch (error) { - // File doesn't exist or is empty, start with empty array - events = []; - } - - // Add new event - events.push(event); - - // Write back to file - await fs.writeFile(filePath, JSON.stringify(events, null, 2)); - } catch (error) { - logger.error("Failed to write event to file:", error); - } + this.events.push(event); } - private async checkAndUploadFile(): Promise { - if (!this.currentEventFile) { - return; - } - + private async checkAndUploadEvents(): Promise { const now = Date.now(); - const fileAge = now - this.currentFileStartTime; + const timeSinceLastUpload = now - this.lastUploadTime; - // Check if file is at least 1 minute old - if (fileAge >= 60000) { - // 60 seconds - await this.uploadFileToS3(); + // Check if at least 1 minute has passed since last upload + if (timeSinceLastUpload >= 60000 && this.events.length > 0) { + await this.uploadEventsToS3(); } } - private async uploadFileToS3(): Promise { - if (!this.bucketName || !this.eventsDir) { + private async uploadEventsToS3(): Promise { + if (!this.bucketName) { logger.warn( - "Stripe local file path or bucket name is not configured, skipping S3 upload." - ); - return; - } - if (!this.currentEventFile) { - return; - } - - const fileName = this.currentEventFile; - const filePath = path.join(this.eventsDir, fileName); - - // Check if this file is already being uploaded - if (this.uploadingFiles.has(fileName)) { - logger.debug( - `File ${fileName} is already being uploaded, skipping` + "S3 bucket name is not configured, skipping S3 upload." ); return; } - // Mark file as being uploaded - this.uploadingFiles.add(fileName); + if (this.events.length === 0) { + return; + } + + // Check if already uploading + if (this.isUploading) { + logger.debug("Already uploading events, skipping"); + return; + } + + this.isUploading = true; try { - // Check if file exists before trying to read it - try { - await fs.access(filePath); - } catch (error) { - logger.debug( - `File ${fileName} does not exist, may have been already processed` - ); - this.uploadingFiles.delete(fileName); - // Reset current file if it was this file - if (this.currentEventFile === fileName) { - this.currentEventFile = null; - this.currentFileStartTime = 0; - } - return; - } + // Take a snapshot of current events and clear the array + const eventsToUpload = [...this.events]; + this.events = []; + this.lastUploadTime = Date.now(); - // Check if file exists and has content - const fileContent = await fs.readFile(filePath, "utf-8"); - const events = JSON.parse(fileContent); - - if (events.length === 0) { - // No events to upload, just clean up - try { - await fs.unlink(filePath); - } catch (unlinkError) { - // File may have been already deleted - logger.debug( - `File ${fileName} was already deleted during cleanup` - ); - } - this.currentEventFile = null; - this.uploadingFiles.delete(fileName); - return; - } + const fileName = this.generateEventFileName(); + const fileContent = JSON.stringify(eventsToUpload, null, 2); // Upload to S3 const uploadCommand = new PutObjectCommand({ @@ -576,29 +437,15 @@ export class UsageService { await s3Client.send(uploadCommand); - // Clean up local file - check if it still exists before unlinking - try { - await fs.access(filePath); - await fs.unlink(filePath); - } catch (unlinkError) { - // File may have been already deleted by another process - logger.debug( - `File ${fileName} was already deleted during upload` - ); - } - logger.info( - `Uploaded ${fileName} to S3 with ${events.length} events` + `Uploaded ${fileName} to S3 with ${eventsToUpload.length} events` ); - - // Reset for next file - this.currentEventFile = null; - this.currentFileStartTime = 0; } catch (error) { - logger.error(`Failed to upload ${fileName} to S3:`, error); + logger.error("Failed to upload events to S3:", error); + // Note: Events are lost if upload fails. In a production system, + // you might want to add the events back to the array or implement retry logic } finally { - // Always remove from uploading set - this.uploadingFiles.delete(fileName); + this.isUploading = false; } } @@ -695,111 +542,10 @@ export class UsageService { } public async forceUpload(): Promise { - await this.uploadFileToS3(); - } - - /** - * Scan the events directory for files older than 1 minute and upload them if not empty. - */ - private async uploadOldEventFiles(): Promise { - if (!this.eventsDir || !this.bucketName) { - logger.warn( - "Stripe local file path or bucket name is not configured, skipping old event file upload." - ); - return; - } - try { - const files = await fs.readdir(this.eventsDir); - const now = Date.now(); - for (const file of files) { - if (!file.endsWith(".json")) continue; - - // Skip files that are already being uploaded - if (this.uploadingFiles.has(file)) { - logger.debug( - `Skipping file ${file} as it's already being uploaded` - ); - continue; - } - - const filePath = path.join(this.eventsDir, file); - - try { - // Check if file still exists before processing - try { - await fs.access(filePath); - } catch (accessError) { - logger.debug(`File ${file} does not exist, skipping`); - continue; - } - - const stat = await fs.stat(filePath); - const age = now - stat.mtimeMs; - if (age >= 90000) { - // 1.5 minutes - Mark as being uploaded - this.uploadingFiles.add(file); - - try { - const fileContent = await fs.readFile( - filePath, - "utf-8" - ); - const events = JSON.parse(fileContent); - if (Array.isArray(events) && events.length > 0) { - // Upload to S3 - const uploadCommand = new PutObjectCommand({ - Bucket: this.bucketName, - Key: file, - Body: fileContent, - ContentType: "application/json" - }); - await s3Client.send(uploadCommand); - - // Check if file still exists before unlinking - try { - await fs.access(filePath); - await fs.unlink(filePath); - } catch (unlinkError) { - logger.debug( - `File ${file} was already deleted during interval upload` - ); - } - - logger.info( - `Interval: Uploaded event file ${file} to S3 with ${events.length} events` - ); - // If this was the current event file, reset it - if (this.currentEventFile === file) { - this.currentEventFile = null; - this.currentFileStartTime = 0; - } - } else { - // Remove empty file - try { - await fs.access(filePath); - await fs.unlink(filePath); - } catch (unlinkError) { - logger.debug( - `Empty file ${file} was already deleted` - ); - } - } - } finally { - // Always remove from uploading set - this.uploadingFiles.delete(file); - } - } - } catch (err) { - logger.error( - `Interval: Error processing event file ${file}:`, - err - ); - // Remove from uploading set on error - this.uploadingFiles.delete(file); - } - } - } catch (err) { - logger.error("Interval: Failed to scan for event files:", err); + if (this.events.length > 0) { + // Force upload regardless of time + this.lastUploadTime = 0; // Reset to force upload + await this.uploadEventsToS3(); } } diff --git a/server/private/lib/config.ts b/server/private/lib/config.ts index ae9ca5c7..f37ba2c1 100644 --- a/server/private/lib/config.ts +++ b/server/private/lib/config.ts @@ -128,10 +128,7 @@ export class PrivateConfig { if (this.rawPrivateConfig.stripe?.s3Bucket) { process.env.S3_BUCKET = this.rawPrivateConfig.stripe.s3Bucket; } - if (this.rawPrivateConfig.stripe?.localFilePath) { - process.env.LOCAL_FILE_PATH = - this.rawPrivateConfig.stripe.localFilePath; - } + if (this.rawPrivateConfig.stripe?.s3Region) { process.env.S3_REGION = this.rawPrivateConfig.stripe.s3Region; } diff --git a/server/private/lib/readConfigFile.ts b/server/private/lib/readConfigFile.ts index 374dee7c..34eccf29 100644 --- a/server/private/lib/readConfigFile.ts +++ b/server/private/lib/readConfigFile.ts @@ -161,7 +161,7 @@ export const privateConfigSchema = z.object({ webhook_secret: z.string(), s3Bucket: z.string(), s3Region: z.string().default("us-east-1"), - localFilePath: z.string() + localFilePath: z.string().optional() }) .optional() });