mirror of
https://github.com/fosrl/pangolin.git
synced 2026-06-26 17:19:09 +00:00
Implement non-redis lock
This commit is contained in:
@@ -1,4 +1,24 @@
|
||||
const instanceId = `local-${Math.random().toString(36).slice(2)}-${Date.now()}`;
|
||||
|
||||
type LocalLockRecord = {
|
||||
owner: string;
|
||||
expiresAt: number;
|
||||
};
|
||||
|
||||
const localLocks = new Map<string, LocalLockRecord>();
|
||||
|
||||
export class LockManager {
|
||||
private clearExpiredLocalLock(lockKey: string): void {
|
||||
const current = localLocks.get(lockKey);
|
||||
if (current && current.expiresAt <= Date.now()) {
|
||||
localLocks.delete(lockKey);
|
||||
}
|
||||
}
|
||||
|
||||
private getLocalOwnerToken(): string {
|
||||
return `${instanceId}:`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire a distributed lock using Redis SET with NX and PX options
|
||||
* @param lockKey - Unique identifier for the lock
|
||||
@@ -7,22 +27,57 @@ export class LockManager {
|
||||
*/
|
||||
async acquireLock(
|
||||
lockKey: string,
|
||||
ttlMs: number = 30000
|
||||
ttlMs: number = 30000,
|
||||
maxRetries: number = 3,
|
||||
retryDelayMs: number = 100
|
||||
): Promise<boolean> {
|
||||
return true;
|
||||
for (let attempt = 0; attempt < maxRetries; attempt++) {
|
||||
this.clearExpiredLocalLock(lockKey);
|
||||
|
||||
const existing = localLocks.get(lockKey);
|
||||
if (!existing) {
|
||||
localLocks.set(lockKey, {
|
||||
owner: this.getLocalOwnerToken(),
|
||||
expiresAt: Date.now() + ttlMs
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
if (existing.owner === this.getLocalOwnerToken()) {
|
||||
existing.expiresAt = Date.now() + ttlMs;
|
||||
localLocks.set(lockKey, existing);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (attempt < maxRetries - 1) {
|
||||
const delay = retryDelayMs * Math.pow(2, attempt);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Release a lock using Lua script to ensure atomicity
|
||||
* @param lockKey - Unique identifier for the lock
|
||||
*/
|
||||
async releaseLock(lockKey: string): Promise<void> {}
|
||||
async releaseLock(lockKey: string): Promise<void> {
|
||||
this.clearExpiredLocalLock(lockKey);
|
||||
const existing = localLocks.get(lockKey);
|
||||
|
||||
if (existing && existing.owner === this.getLocalOwnerToken()) {
|
||||
localLocks.delete(lockKey);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Force release a lock regardless of owner (use with caution)
|
||||
* @param lockKey - Unique identifier for the lock
|
||||
*/
|
||||
async forceReleaseLock(lockKey: string): Promise<void> {}
|
||||
async forceReleaseLock(lockKey: string): Promise<void> {
|
||||
localLocks.delete(lockKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a lock exists and get its info
|
||||
@@ -35,7 +90,20 @@ export class LockManager {
|
||||
ttl: number;
|
||||
owner?: string;
|
||||
}> {
|
||||
return { exists: false, ownedByMe: false, ttl: 0 };
|
||||
this.clearExpiredLocalLock(lockKey);
|
||||
const existing = localLocks.get(lockKey);
|
||||
|
||||
if (!existing) {
|
||||
return { exists: false, ownedByMe: false, ttl: 0 };
|
||||
}
|
||||
|
||||
const ttl = Math.max(0, existing.expiresAt - Date.now());
|
||||
return {
|
||||
exists: true,
|
||||
ownedByMe: existing.owner === this.getLocalOwnerToken(),
|
||||
ttl,
|
||||
owner: existing.owner.split(":")[0]
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -45,6 +113,15 @@ export class LockManager {
|
||||
* @returns Promise<boolean> - true if extended successfully
|
||||
*/
|
||||
async extendLock(lockKey: string, ttlMs: number): Promise<boolean> {
|
||||
this.clearExpiredLocalLock(lockKey);
|
||||
const existing = localLocks.get(lockKey);
|
||||
|
||||
if (!existing || existing.owner !== this.getLocalOwnerToken()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
existing.expiresAt = Date.now() + ttlMs;
|
||||
localLocks.set(lockKey, existing);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -62,7 +139,26 @@ export class LockManager {
|
||||
maxRetries: number = 5,
|
||||
baseDelayMs: number = 100
|
||||
): Promise<boolean> {
|
||||
return true;
|
||||
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||||
const acquired = await this.acquireLock(
|
||||
lockKey,
|
||||
ttlMs,
|
||||
1,
|
||||
baseDelayMs
|
||||
);
|
||||
|
||||
if (acquired) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (attempt < maxRetries) {
|
||||
const delay =
|
||||
baseDelayMs * Math.pow(2, attempt) + Math.random() * 100;
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -99,7 +195,21 @@ export class LockManager {
|
||||
activeLocksCount: number;
|
||||
locksOwnedByMe: number;
|
||||
}> {
|
||||
return { activeLocksCount: 0, locksOwnedByMe: 0 };
|
||||
const now = Date.now();
|
||||
for (const [key, value] of localLocks.entries()) {
|
||||
if (value.expiresAt <= now) {
|
||||
localLocks.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
let locksOwnedByMe = 0;
|
||||
for (const value of localLocks.values()) {
|
||||
if (value.owner === this.getLocalOwnerToken()) {
|
||||
locksOwnedByMe++;
|
||||
}
|
||||
}
|
||||
|
||||
return { activeLocksCount: localLocks.size, locksOwnedByMe };
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -11,14 +11,31 @@
|
||||
* This file is not licensed under the AGPLv3.
|
||||
*/
|
||||
|
||||
import { config } from "@server/lib/config";
|
||||
import logger from "@server/logger";
|
||||
import { redis } from "#private/lib/redis";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
|
||||
const instanceId = uuidv4();
|
||||
|
||||
type LocalLockRecord = {
|
||||
owner: string;
|
||||
expiresAt: number;
|
||||
};
|
||||
|
||||
const localLocks = new Map<string, LocalLockRecord>();
|
||||
|
||||
export class LockManager {
|
||||
private clearExpiredLocalLock(lockKey: string): void {
|
||||
const current = localLocks.get(lockKey);
|
||||
if (current && current.expiresAt <= Date.now()) {
|
||||
localLocks.delete(lockKey);
|
||||
}
|
||||
}
|
||||
|
||||
private getLocalOwnerToken(): string {
|
||||
return `${instanceId}:`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire a distributed lock using Redis SET with NX and PX options
|
||||
* @param lockKey - Unique identifier for the lock
|
||||
@@ -32,12 +49,34 @@ export class LockManager {
|
||||
retryDelayMs: number = 100
|
||||
): Promise<boolean> {
|
||||
if (!redis || !redis.status || redis.status !== "ready") {
|
||||
return true;
|
||||
for (let attempt = 0; attempt < maxRetries; attempt++) {
|
||||
this.clearExpiredLocalLock(lockKey);
|
||||
|
||||
const existing = localLocks.get(lockKey);
|
||||
if (!existing) {
|
||||
localLocks.set(lockKey, {
|
||||
owner: this.getLocalOwnerToken(),
|
||||
expiresAt: Date.now() + ttlMs
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
||||
if (existing.owner === this.getLocalOwnerToken()) {
|
||||
existing.expiresAt = Date.now() + ttlMs;
|
||||
localLocks.set(lockKey, existing);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (attempt < maxRetries - 1) {
|
||||
const delay = retryDelayMs * Math.pow(2, attempt);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
const lockValue = `${
|
||||
instanceId
|
||||
}:${Date.now()}`;
|
||||
const lockValue = `${instanceId}:${Date.now()}`;
|
||||
const redisKey = `lock:${lockKey}`;
|
||||
|
||||
for (let attempt = 0; attempt < maxRetries; attempt++) {
|
||||
@@ -53,11 +92,7 @@ export class LockManager {
|
||||
);
|
||||
|
||||
if (result === "OK") {
|
||||
logger.debug(
|
||||
`Lock acquired: ${lockKey} by ${
|
||||
instanceId
|
||||
}`
|
||||
);
|
||||
logger.debug(`Lock acquired: ${lockKey} by ${instanceId}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -65,17 +100,11 @@ export class LockManager {
|
||||
const existingValue = await redis.get(redisKey);
|
||||
if (
|
||||
existingValue &&
|
||||
existingValue.startsWith(
|
||||
`${instanceId}:`
|
||||
)
|
||||
existingValue.startsWith(`${instanceId}:`)
|
||||
) {
|
||||
// Extend the lock TTL since it's the same worker
|
||||
await redis.pexpire(redisKey, ttlMs);
|
||||
logger.debug(
|
||||
`Lock extended: ${lockKey} by ${
|
||||
instanceId
|
||||
}`
|
||||
);
|
||||
logger.debug(`Lock extended: ${lockKey} by ${instanceId}`);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -88,7 +117,10 @@ export class LockManager {
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Failed to acquire lock ${lockKey} (attempt ${attempt + 1}/${maxRetries}):`, error);
|
||||
logger.error(
|
||||
`Failed to acquire lock ${lockKey} (attempt ${attempt + 1}/${maxRetries}):`,
|
||||
error
|
||||
);
|
||||
// On error, still retry if we have attempts left
|
||||
if (attempt < maxRetries - 1) {
|
||||
const delay = retryDelayMs * Math.pow(2, attempt);
|
||||
@@ -109,6 +141,11 @@ export class LockManager {
|
||||
*/
|
||||
async releaseLock(lockKey: string): Promise<void> {
|
||||
if (!redis || !redis.status || redis.status !== "ready") {
|
||||
this.clearExpiredLocalLock(lockKey);
|
||||
const existing = localLocks.get(lockKey);
|
||||
if (existing && existing.owner === this.getLocalOwnerToken()) {
|
||||
localLocks.delete(lockKey);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -136,11 +173,7 @@ export class LockManager {
|
||||
)) as number;
|
||||
|
||||
if (result === 1) {
|
||||
logger.debug(
|
||||
`Lock released: ${lockKey} by ${
|
||||
instanceId
|
||||
}`
|
||||
);
|
||||
logger.debug(`Lock released: ${lockKey} by ${instanceId}`);
|
||||
} else {
|
||||
logger.warn(
|
||||
`Lock not released - not owned by worker: ${lockKey} by ${
|
||||
@@ -159,6 +192,7 @@ export class LockManager {
|
||||
*/
|
||||
async forceReleaseLock(lockKey: string): Promise<void> {
|
||||
if (!redis || !redis.status || redis.status !== "ready") {
|
||||
localLocks.delete(lockKey);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -186,7 +220,20 @@ export class LockManager {
|
||||
owner?: string;
|
||||
}> {
|
||||
if (!redis || !redis.status || redis.status !== "ready") {
|
||||
return { exists: false, ownedByMe: true, ttl: 0 };
|
||||
this.clearExpiredLocalLock(lockKey);
|
||||
const existing = localLocks.get(lockKey);
|
||||
|
||||
if (!existing) {
|
||||
return { exists: false, ownedByMe: false, ttl: 0 };
|
||||
}
|
||||
|
||||
const ttl = Math.max(0, existing.expiresAt - Date.now());
|
||||
return {
|
||||
exists: true,
|
||||
ownedByMe: existing.owner === this.getLocalOwnerToken(),
|
||||
ttl,
|
||||
owner: existing.owner.split(":")[0]
|
||||
};
|
||||
}
|
||||
|
||||
const redisKey = `lock:${lockKey}`;
|
||||
@@ -198,11 +245,7 @@ export class LockManager {
|
||||
]);
|
||||
|
||||
const exists = value !== null;
|
||||
const ownedByMe =
|
||||
exists &&
|
||||
value!.startsWith(
|
||||
`${instanceId}:`
|
||||
);
|
||||
const ownedByMe = exists && value!.startsWith(`${instanceId}:`);
|
||||
const owner = exists ? value!.split(":")[0] : undefined;
|
||||
|
||||
return {
|
||||
@@ -225,6 +268,15 @@ export class LockManager {
|
||||
*/
|
||||
async extendLock(lockKey: string, ttlMs: number): Promise<boolean> {
|
||||
if (!redis || !redis.status || redis.status !== "ready") {
|
||||
this.clearExpiredLocalLock(lockKey);
|
||||
const existing = localLocks.get(lockKey);
|
||||
|
||||
if (!existing || existing.owner !== this.getLocalOwnerToken()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
existing.expiresAt = Date.now() + ttlMs;
|
||||
localLocks.set(lockKey, existing);
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -255,9 +307,7 @@ export class LockManager {
|
||||
|
||||
if (result === 1) {
|
||||
logger.debug(
|
||||
`Lock extended: ${lockKey} by ${
|
||||
instanceId
|
||||
} for ${ttlMs}ms`
|
||||
`Lock extended: ${lockKey} by ${instanceId} for ${ttlMs}ms`
|
||||
);
|
||||
return true;
|
||||
}
|
||||
@@ -282,12 +332,13 @@ export class LockManager {
|
||||
maxRetries: number = 5,
|
||||
baseDelayMs: number = 100
|
||||
): Promise<boolean> {
|
||||
if (!redis || !redis.status || redis.status !== "ready") {
|
||||
return true;
|
||||
}
|
||||
|
||||
for (let attempt = 0; attempt <= maxRetries; attempt++) {
|
||||
const acquired = await this.acquireLock(lockKey, ttlMs);
|
||||
const acquired = await this.acquireLock(
|
||||
lockKey,
|
||||
ttlMs,
|
||||
1,
|
||||
baseDelayMs
|
||||
);
|
||||
|
||||
if (acquired) {
|
||||
return true;
|
||||
@@ -319,10 +370,6 @@ export class LockManager {
|
||||
fn: () => Promise<T>,
|
||||
ttlMs: number = 30000
|
||||
): Promise<T> {
|
||||
if (!redis || !redis.status || redis.status !== "ready") {
|
||||
return await fn();
|
||||
}
|
||||
|
||||
const acquired = await this.acquireLock(lockKey, ttlMs);
|
||||
|
||||
if (!acquired) {
|
||||
@@ -346,7 +393,21 @@ export class LockManager {
|
||||
locksOwnedByMe: number;
|
||||
}> {
|
||||
if (!redis || !redis.status || redis.status !== "ready") {
|
||||
return { activeLocksCount: 0, locksOwnedByMe: 0 };
|
||||
const now = Date.now();
|
||||
for (const [key, value] of localLocks.entries()) {
|
||||
if (value.expiresAt <= now) {
|
||||
localLocks.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
let locksOwnedByMe = 0;
|
||||
for (const value of localLocks.values()) {
|
||||
if (value.owner === this.getLocalOwnerToken()) {
|
||||
locksOwnedByMe++;
|
||||
}
|
||||
}
|
||||
|
||||
return { activeLocksCount: localLocks.size, locksOwnedByMe };
|
||||
}
|
||||
|
||||
try {
|
||||
@@ -356,11 +417,7 @@ export class LockManager {
|
||||
if (keys.length > 0) {
|
||||
const values = await redis.mget(...keys);
|
||||
locksOwnedByMe = values.filter(
|
||||
(value) =>
|
||||
value &&
|
||||
value.startsWith(
|
||||
`${instanceId}:`
|
||||
)
|
||||
(value) => value && value.startsWith(`${instanceId}:`)
|
||||
).length;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user