const instanceId = `local-${Math.random().toString(36).slice(2)}-${Date.now()}`; type LocalLockRecord = { owner: string; expiresAt: number; }; const localLocks = new Map(); export class LockManager { private clearExpiredLocalLock(lockKey: string): void { const current = localLocks.get(lockKey); if (current && current.expiresAt <= Date.now()) { localLocks.delete(lockKey); } } private getLocalOwnerToken(): string { return `${instanceId}:`; } /** * Acquire a distributed lock using Redis SET with NX and PX options * @param lockKey - Unique identifier for the lock * @param ttlMs - Time to live in milliseconds * @returns Promise - true if lock acquired, false otherwise */ async acquireLock( lockKey: string, ttlMs: number = 30000, maxRetries: number = 3, retryDelayMs: number = 100 ): Promise { for (let attempt = 0; attempt < maxRetries; attempt++) { this.clearExpiredLocalLock(lockKey); const existing = localLocks.get(lockKey); if (!existing) { localLocks.set(lockKey, { owner: this.getLocalOwnerToken(), expiresAt: Date.now() + ttlMs }); return true; } if (existing.owner === this.getLocalOwnerToken()) { existing.expiresAt = Date.now() + ttlMs; localLocks.set(lockKey, existing); return true; } if (attempt < maxRetries - 1) { const delay = retryDelayMs * Math.pow(2, attempt); await new Promise((resolve) => setTimeout(resolve, delay)); } } return false; } /** * Release a lock using Lua script to ensure atomicity * @param lockKey - Unique identifier for the lock */ async releaseLock(lockKey: string): Promise { this.clearExpiredLocalLock(lockKey); const existing = localLocks.get(lockKey); if (existing && existing.owner === this.getLocalOwnerToken()) { localLocks.delete(lockKey); } } /** * Force release a lock regardless of owner (use with caution) * @param lockKey - Unique identifier for the lock */ async forceReleaseLock(lockKey: string): Promise { localLocks.delete(lockKey); } /** * Check if a lock exists and get its info * @param lockKey - Unique identifier for the lock * @returns Promise<{exists: boolean, ownedByMe: boolean, ttl: number}> */ async getLockInfo(lockKey: string): Promise<{ exists: boolean; ownedByMe: boolean; ttl: number; owner?: string; }> { this.clearExpiredLocalLock(lockKey); const existing = localLocks.get(lockKey); if (!existing) { return { exists: false, ownedByMe: false, ttl: 0 }; } const ttl = Math.max(0, existing.expiresAt - Date.now()); return { exists: true, ownedByMe: existing.owner === this.getLocalOwnerToken(), ttl, owner: existing.owner.split(":")[0] }; } /** * Extend the TTL of an existing lock owned by this worker * @param lockKey - Unique identifier for the lock * @param ttlMs - New TTL in milliseconds * @returns Promise - true if extended successfully */ async extendLock(lockKey: string, ttlMs: number): Promise { this.clearExpiredLocalLock(lockKey); const existing = localLocks.get(lockKey); if (!existing || existing.owner !== this.getLocalOwnerToken()) { return false; } existing.expiresAt = Date.now() + ttlMs; localLocks.set(lockKey, existing); return true; } /** * Attempt to acquire lock with retries and exponential backoff * @param lockKey - Unique identifier for the lock * @param ttlMs - Time to live in milliseconds * @param maxRetries - Maximum number of retry attempts * @param baseDelayMs - Base delay between retries in milliseconds * @returns Promise - true if lock acquired */ async acquireLockWithRetry( lockKey: string, ttlMs: number = 30000, maxRetries: number = 5, baseDelayMs: number = 100 ): Promise { for (let attempt = 0; attempt <= maxRetries; attempt++) { const acquired = await this.acquireLock( lockKey, ttlMs, 1, baseDelayMs ); if (acquired) { return true; } if (attempt < maxRetries) { const delay = baseDelayMs * Math.pow(2, attempt) + Math.random() * 100; await new Promise((resolve) => setTimeout(resolve, delay)); } } return false; } /** * Execute a function while holding a lock * @param lockKey - Unique identifier for the lock * @param fn - Function to execute while holding the lock * @param ttlMs - Lock TTL in milliseconds * @returns Promise - Result of the executed function */ async withLock( lockKey: string, fn: () => Promise, ttlMs: number = 30000 ): Promise { const acquired = await this.acquireLock(lockKey, ttlMs); if (!acquired) { throw new Error(`Failed to acquire lock: ${lockKey}`); } try { return await fn(); } finally { await this.releaseLock(lockKey); } } /** * Clean up expired locks - Redis handles this automatically, but this method * can be used to get statistics about locks * @returns Promise<{activeLocksCount: number, locksOwnedByMe: number}> */ async getLockStatistics(): Promise<{ activeLocksCount: number; locksOwnedByMe: number; }> { const now = Date.now(); for (const [key, value] of localLocks.entries()) { if (value.expiresAt <= now) { localLocks.delete(key); } } let locksOwnedByMe = 0; for (const value of localLocks.values()) { if (value.owner === this.getLocalOwnerToken()) { locksOwnedByMe++; } } return { activeLocksCount: localLocks.size, locksOwnedByMe }; } /** * Close the Redis connection */ async disconnect(): Promise {} } export const lockManager = new LockManager();