2025-02-26 18:11:02 +08:00
|
|
|
|
import os
|
2025-02-27 13:25:22 +08:00
|
|
|
|
import sys
|
2025-03-01 01:49:26 +08:00
|
|
|
|
import asyncio
|
2025-05-08 11:35:10 +10:00
|
|
|
|
import multiprocessing as mp
|
2025-02-26 05:38:38 +08:00
|
|
|
|
from multiprocessing.synchronize import Lock as ProcessLock
|
|
|
|
|
|
from multiprocessing import Manager
|
2025-05-08 11:35:10 +10:00
|
|
|
|
import time
|
2025-07-12 02:41:31 +08:00
|
|
|
|
import logging
|
2025-07-11 20:40:50 +08:00
|
|
|
|
from typing import Any, Dict, List, Optional, Union, TypeVar, Generic
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-02-26 05:38:38 +08:00
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
# Define a direct print function for critical logs that must be visible in all processes
|
2025-07-13 01:58:50 +08:00
|
|
|
|
def direct_log(message, enable_output: bool = False, level: str = "DEBUG"):
|
2025-02-27 13:25:22 +08:00
|
|
|
|
"""
|
|
|
|
|
|
Log a message directly to stderr to ensure visibility in all processes,
|
|
|
|
|
|
including the Gunicorn master process.
|
2025-03-09 01:00:42 +08:00
|
|
|
|
|
2025-03-08 22:36:41 +08:00
|
|
|
|
Args:
|
|
|
|
|
|
message: The message to log
|
2025-07-12 02:41:31 +08:00
|
|
|
|
level: Log level (default: "DEBUG")
|
2025-03-08 22:36:41 +08:00
|
|
|
|
enable_output: Whether to actually output the log (default: True)
|
2025-02-27 19:05:51 +08:00
|
|
|
|
"""
|
2025-07-15 13:38:05 +08:00
|
|
|
|
if not enable_output:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
2025-07-12 02:41:31 +08:00
|
|
|
|
# Get the current logger level from the lightrag logger
|
|
|
|
|
|
try:
|
|
|
|
|
|
from lightrag.utils import logger
|
|
|
|
|
|
|
|
|
|
|
|
current_level = logger.getEffectiveLevel()
|
|
|
|
|
|
except ImportError:
|
|
|
|
|
|
# Fallback if lightrag.utils is not available
|
2025-07-13 01:58:50 +08:00
|
|
|
|
current_level = 20 # INFO
|
2025-07-12 02:41:31 +08:00
|
|
|
|
|
|
|
|
|
|
# Convert string level to numeric level for comparison
|
|
|
|
|
|
level_mapping = {
|
2025-07-13 01:58:50 +08:00
|
|
|
|
"DEBUG": 10, # DEBUG
|
|
|
|
|
|
"INFO": 20, # INFO
|
|
|
|
|
|
"WARNING": 30, # WARNING
|
|
|
|
|
|
"ERROR": 40, # ERROR
|
|
|
|
|
|
"CRITICAL": 50, # CRITICAL
|
2025-07-12 02:41:31 +08:00
|
|
|
|
}
|
|
|
|
|
|
message_level = level_mapping.get(level.upper(), logging.DEBUG)
|
|
|
|
|
|
|
|
|
|
|
|
# print(f"Diret_log: {level.upper()} {message_level} ? {current_level}", file=sys.stderr, flush=True)
|
2025-07-15 13:38:05 +08:00
|
|
|
|
if message_level >= current_level:
|
2025-03-08 22:36:41 +08:00
|
|
|
|
print(f"{level}: {message}", file=sys.stderr, flush=True)
|
2025-02-27 13:25:22 +08:00
|
|
|
|
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-03-01 16:23:34 +08:00
|
|
|
|
T = TypeVar("T")
|
2025-03-01 10:48:55 +08:00
|
|
|
|
LockType = Union[ProcessLock, asyncio.Lock]
|
|
|
|
|
|
|
2025-03-25 10:48:15 +08:00
|
|
|
|
_is_multiprocess = None
|
2025-03-01 10:48:55 +08:00
|
|
|
|
_workers = None
|
|
|
|
|
|
_manager = None
|
2025-07-11 22:13:02 +08:00
|
|
|
|
|
|
|
|
|
|
# Global singleton data for multi-process keyed locks
|
2025-05-08 11:35:10 +10:00
|
|
|
|
_lock_registry: Optional[Dict[str, mp.synchronize.Lock]] = None
|
|
|
|
|
|
_lock_registry_count: Optional[Dict[str, int]] = None
|
|
|
|
|
|
_lock_cleanup_data: Optional[Dict[str, time.time]] = None
|
|
|
|
|
|
_registry_guard = None
|
2025-07-12 11:05:25 +08:00
|
|
|
|
# Timeout for keyed locks in seconds (Default 300)
|
2025-07-12 03:32:08 +08:00
|
|
|
|
CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300
|
2025-07-12 11:05:25 +08:00
|
|
|
|
# Cleanup pending list threshold for triggering cleanup (Default 500)
|
2025-07-12 03:32:08 +08:00
|
|
|
|
CLEANUP_THRESHOLD = 500
|
2025-07-12 11:05:25 +08:00
|
|
|
|
# Minimum interval between cleanup operations in seconds (Default 30)
|
2025-07-12 04:34:26 +08:00
|
|
|
|
MIN_CLEANUP_INTERVAL_SECONDS = 30
|
|
|
|
|
|
# Track the earliest cleanup time for efficient cleanup triggering (multiprocess locks only)
|
|
|
|
|
|
_earliest_mp_cleanup_time: Optional[float] = None
|
|
|
|
|
|
# Track the last cleanup time to enforce minimum interval (multiprocess locks only)
|
|
|
|
|
|
_last_mp_cleanup_time: Optional[float] = None
|
2025-05-08 11:35:10 +10:00
|
|
|
|
|
2025-07-11 22:13:02 +08:00
|
|
|
|
_initialized = None
|
|
|
|
|
|
|
2025-03-01 10:48:55 +08:00
|
|
|
|
# shared data for storage across processes
|
|
|
|
|
|
_shared_dicts: Optional[Dict[str, Any]] = None
|
|
|
|
|
|
_init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
|
2025-03-01 16:23:34 +08:00
|
|
|
|
_update_flags: Optional[Dict[str, bool]] = None # namespace -> updated
|
2025-03-01 10:48:55 +08:00
|
|
|
|
|
|
|
|
|
|
# locks for mutex access
|
|
|
|
|
|
_storage_lock: Optional[LockType] = None
|
|
|
|
|
|
_internal_lock: Optional[LockType] = None
|
|
|
|
|
|
_pipeline_status_lock: Optional[LockType] = None
|
2025-03-08 22:36:41 +08:00
|
|
|
|
_graph_db_lock: Optional[LockType] = None
|
2025-03-09 17:33:15 +08:00
|
|
|
|
_data_init_lock: Optional[LockType] = None
|
2025-07-11 22:13:02 +08:00
|
|
|
|
# Manager for all keyed locks
|
2025-07-12 12:10:12 +08:00
|
|
|
|
_storage_keyed_lock: Optional["KeyedUnifiedLock"] = None
|
2025-03-01 02:22:35 +08:00
|
|
|
|
|
2025-03-21 16:08:23 +08:00
|
|
|
|
# async locks for coroutine synchronization in multiprocess mode
|
|
|
|
|
|
_async_locks: Optional[Dict[str, asyncio.Lock]] = None
|
|
|
|
|
|
|
2025-05-08 11:35:10 +10:00
|
|
|
|
DEBUG_LOCKS = False
|
|
|
|
|
|
_debug_n_locks_acquired: int = 0
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-07-11 23:43:40 +08:00
|
|
|
|
|
2025-05-08 11:35:10 +10:00
|
|
|
|
def inc_debug_n_locks_acquired():
|
|
|
|
|
|
global _debug_n_locks_acquired
|
|
|
|
|
|
if DEBUG_LOCKS:
|
|
|
|
|
|
_debug_n_locks_acquired += 1
|
2025-07-12 02:41:31 +08:00
|
|
|
|
print(f"DEBUG: Keyed Lock acquired, total: {_debug_n_locks_acquired:>5}")
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-05-08 11:35:10 +10:00
|
|
|
|
|
|
|
|
|
|
def dec_debug_n_locks_acquired():
|
|
|
|
|
|
global _debug_n_locks_acquired
|
|
|
|
|
|
if DEBUG_LOCKS:
|
|
|
|
|
|
if _debug_n_locks_acquired > 0:
|
|
|
|
|
|
_debug_n_locks_acquired -= 1
|
2025-07-12 02:41:31 +08:00
|
|
|
|
print(f"DEBUG: Keyed Lock released, total: {_debug_n_locks_acquired:>5}")
|
2025-05-08 11:35:10 +10:00
|
|
|
|
else:
|
|
|
|
|
|
raise RuntimeError("Attempting to release lock when no locks are acquired")
|
|
|
|
|
|
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-05-08 11:35:10 +10:00
|
|
|
|
def get_debug_n_locks_acquired():
|
|
|
|
|
|
global _debug_n_locks_acquired
|
|
|
|
|
|
return _debug_n_locks_acquired
|
2025-03-01 16:23:34 +08:00
|
|
|
|
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-03-01 02:22:35 +08:00
|
|
|
|
class UnifiedLock(Generic[T]):
|
2025-03-01 05:01:26 +08:00
|
|
|
|
"""Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
|
2025-03-01 16:23:34 +08:00
|
|
|
|
|
2025-03-09 01:00:42 +08:00
|
|
|
|
def __init__(
|
|
|
|
|
|
self,
|
|
|
|
|
|
lock: Union[ProcessLock, asyncio.Lock],
|
|
|
|
|
|
is_async: bool,
|
|
|
|
|
|
name: str = "unnamed",
|
|
|
|
|
|
enable_logging: bool = True,
|
2025-03-21 16:08:23 +08:00
|
|
|
|
async_lock: Optional[asyncio.Lock] = None,
|
2025-03-09 01:00:42 +08:00
|
|
|
|
):
|
2025-03-01 02:22:35 +08:00
|
|
|
|
self._lock = lock
|
|
|
|
|
|
self._is_async = is_async
|
2025-03-08 22:36:41 +08:00
|
|
|
|
self._pid = os.getpid() # for debug only
|
|
|
|
|
|
self._name = name # for debug only
|
|
|
|
|
|
self._enable_logging = enable_logging # for debug only
|
2025-03-21 16:08:23 +08:00
|
|
|
|
self._async_lock = async_lock # auxiliary lock for coroutine synchronization
|
2025-03-01 02:22:35 +08:00
|
|
|
|
|
2025-03-01 16:23:34 +08:00
|
|
|
|
async def __aenter__(self) -> "UnifiedLock[T]":
|
2025-03-08 22:36:41 +08:00
|
|
|
|
try:
|
2025-03-21 16:08:23 +08:00
|
|
|
|
# If in multiprocess mode and async lock exists, acquire it first
|
|
|
|
|
|
if not self._is_async and self._async_lock is not None:
|
|
|
|
|
|
await self._async_lock.acquire()
|
|
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
2025-03-21 16:56:47 +08:00
|
|
|
|
|
2025-03-21 16:08:23 +08:00
|
|
|
|
# Then acquire the main lock
|
2025-03-08 22:36:41 +08:00
|
|
|
|
if self._is_async:
|
|
|
|
|
|
await self._lock.acquire()
|
|
|
|
|
|
else:
|
|
|
|
|
|
self._lock.acquire()
|
2025-03-21 16:56:47 +08:00
|
|
|
|
|
2025-03-09 01:00:42 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
2025-03-08 22:36:41 +08:00
|
|
|
|
return self
|
|
|
|
|
|
except Exception as e:
|
2025-03-21 16:08:23 +08:00
|
|
|
|
# If main lock acquisition fails, release the async lock if it was acquired
|
2025-03-21 16:56:47 +08:00
|
|
|
|
if (
|
|
|
|
|
|
not self._is_async
|
|
|
|
|
|
and self._async_lock is not None
|
|
|
|
|
|
and self._async_lock.locked()
|
|
|
|
|
|
):
|
2025-03-21 16:08:23 +08:00
|
|
|
|
self._async_lock.release()
|
2025-03-21 16:56:47 +08:00
|
|
|
|
|
2025-03-09 01:00:42 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}",
|
|
|
|
|
|
level="ERROR",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
2025-03-08 22:36:41 +08:00
|
|
|
|
raise
|
2025-03-01 02:22:35 +08:00
|
|
|
|
|
|
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
2025-03-21 16:08:23 +08:00
|
|
|
|
main_lock_released = False
|
2025-03-08 22:36:41 +08:00
|
|
|
|
try:
|
2025-03-21 16:08:23 +08:00
|
|
|
|
# Release main lock first
|
2025-03-08 22:36:41 +08:00
|
|
|
|
if self._is_async:
|
|
|
|
|
|
self._lock.release()
|
|
|
|
|
|
else:
|
|
|
|
|
|
self._lock.release()
|
2025-03-21 16:08:23 +08:00
|
|
|
|
main_lock_released = True
|
2025-03-21 16:56:47 +08:00
|
|
|
|
|
2025-04-29 13:32:05 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2025-03-21 16:08:23 +08:00
|
|
|
|
# Then release async lock if in multiprocess mode
|
|
|
|
|
|
if not self._is_async and self._async_lock is not None:
|
|
|
|
|
|
self._async_lock.release()
|
2025-04-29 13:32:05 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Async lock '{self._name}' released",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
2025-03-21 16:56:47 +08:00
|
|
|
|
|
2025-03-08 22:36:41 +08:00
|
|
|
|
except Exception as e:
|
2025-03-09 01:00:42 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}",
|
|
|
|
|
|
level="ERROR",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
2025-03-21 16:56:47 +08:00
|
|
|
|
|
2025-03-21 16:08:23 +08:00
|
|
|
|
# If main lock release failed but async lock hasn't been released, try to release it
|
2025-03-21 16:56:47 +08:00
|
|
|
|
if (
|
|
|
|
|
|
not main_lock_released
|
|
|
|
|
|
and not self._is_async
|
|
|
|
|
|
and self._async_lock is not None
|
|
|
|
|
|
):
|
2025-03-21 16:08:23 +08:00
|
|
|
|
try:
|
|
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure",
|
|
|
|
|
|
level="WARNING",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
|
|
|
|
|
self._async_lock.release()
|
|
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Successfully released async lock after main lock failure",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
|
|
|
|
|
except Exception as inner_e:
|
|
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Failed to release async lock after main lock failure: {inner_e}",
|
|
|
|
|
|
level="ERROR",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
2025-03-21 16:56:47 +08:00
|
|
|
|
|
2025-03-08 22:36:41 +08:00
|
|
|
|
raise
|
2025-03-01 02:22:35 +08:00
|
|
|
|
|
2025-03-01 16:23:34 +08:00
|
|
|
|
def __enter__(self) -> "UnifiedLock[T]":
|
2025-03-01 05:01:26 +08:00
|
|
|
|
"""For backward compatibility"""
|
2025-03-08 22:36:41 +08:00
|
|
|
|
try:
|
|
|
|
|
|
if self._is_async:
|
|
|
|
|
|
raise RuntimeError("Use 'async with' for shared_storage lock")
|
2025-03-09 01:00:42 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (sync)",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
2025-03-08 22:36:41 +08:00
|
|
|
|
self._lock.acquire()
|
2025-03-09 01:00:42 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (sync)",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
2025-03-08 22:36:41 +08:00
|
|
|
|
return self
|
|
|
|
|
|
except Exception as e:
|
2025-03-09 01:00:42 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}' (sync): {e}",
|
|
|
|
|
|
level="ERROR",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
2025-03-08 22:36:41 +08:00
|
|
|
|
raise
|
2025-03-01 02:22:35 +08:00
|
|
|
|
|
|
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
2025-03-01 05:01:26 +08:00
|
|
|
|
"""For backward compatibility"""
|
2025-03-08 22:36:41 +08:00
|
|
|
|
try:
|
|
|
|
|
|
if self._is_async:
|
|
|
|
|
|
raise RuntimeError("Use 'async with' for shared_storage lock")
|
2025-03-09 01:00:42 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (sync)",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
2025-03-08 22:36:41 +08:00
|
|
|
|
self._lock.release()
|
2025-03-09 01:00:42 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Lock '{self._name}' released (sync)",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
2025-03-08 22:36:41 +08:00
|
|
|
|
except Exception as e:
|
2025-03-09 01:00:42 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== Lock == Process {self._pid}: Failed to release lock '{self._name}' (sync): {e}",
|
|
|
|
|
|
level="ERROR",
|
|
|
|
|
|
enable_output=self._enable_logging,
|
|
|
|
|
|
)
|
2025-03-08 22:36:41 +08:00
|
|
|
|
raise
|
2025-03-01 02:22:35 +08:00
|
|
|
|
|
2025-05-08 11:35:10 +10:00
|
|
|
|
def locked(self) -> bool:
|
|
|
|
|
|
if self._is_async:
|
|
|
|
|
|
return self._lock.locked()
|
|
|
|
|
|
else:
|
|
|
|
|
|
return self._lock.locked()
|
|
|
|
|
|
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-05-08 11:35:10 +10:00
|
|
|
|
def _get_combined_key(factory_name: str, key: str) -> str:
|
|
|
|
|
|
"""Return the combined key for the factory and key."""
|
|
|
|
|
|
return f"{factory_name}:{key}"
|
|
|
|
|
|
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-07-13 00:09:00 +08:00
|
|
|
|
def _perform_lock_cleanup(
|
|
|
|
|
|
lock_type: str,
|
|
|
|
|
|
cleanup_data: Dict[str, float],
|
|
|
|
|
|
lock_registry: Optional[Dict[str, Any]],
|
|
|
|
|
|
lock_count: Optional[Dict[str, int]],
|
|
|
|
|
|
earliest_cleanup_time: Optional[float],
|
|
|
|
|
|
last_cleanup_time: Optional[float],
|
|
|
|
|
|
current_time: float,
|
|
|
|
|
|
threshold_check: bool = True,
|
|
|
|
|
|
) -> tuple[int, Optional[float], Optional[float]]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Generic lock cleanup function to unify cleanup logic for both multiprocess and async locks.
|
|
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
|
lock_type: Lock type identifier ("mp" or "async")
|
|
|
|
|
|
cleanup_data: Cleanup data dictionary
|
|
|
|
|
|
lock_registry: Lock registry dictionary (can be None for async locks)
|
|
|
|
|
|
lock_count: Lock count dictionary (can be None for async locks)
|
|
|
|
|
|
earliest_cleanup_time: Earliest cleanup time
|
|
|
|
|
|
last_cleanup_time: Last cleanup time
|
|
|
|
|
|
current_time: Current time
|
|
|
|
|
|
threshold_check: Whether to check threshold condition (default True, set to False in cleanup_expired_locks)
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
tuple: (cleaned_count, new_earliest_time, new_last_cleanup_time)
|
|
|
|
|
|
"""
|
|
|
|
|
|
if len(cleanup_data) == 0:
|
|
|
|
|
|
return 0, earliest_cleanup_time, last_cleanup_time
|
|
|
|
|
|
|
|
|
|
|
|
# If threshold check is needed and threshold not reached, return directly
|
|
|
|
|
|
if threshold_check and len(cleanup_data) < CLEANUP_THRESHOLD:
|
|
|
|
|
|
return 0, earliest_cleanup_time, last_cleanup_time
|
|
|
|
|
|
|
|
|
|
|
|
# Time rollback detection
|
|
|
|
|
|
if last_cleanup_time is not None and current_time < last_cleanup_time:
|
|
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== {lock_type} Lock == Time rollback detected, resetting cleanup time",
|
|
|
|
|
|
level="WARNING",
|
|
|
|
|
|
enable_output=False,
|
|
|
|
|
|
)
|
|
|
|
|
|
last_cleanup_time = None
|
|
|
|
|
|
|
|
|
|
|
|
# Check cleanup conditions
|
|
|
|
|
|
has_expired_locks = (
|
|
|
|
|
|
earliest_cleanup_time is not None
|
|
|
|
|
|
and current_time - earliest_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
interval_satisfied = (
|
|
|
|
|
|
last_cleanup_time is None
|
|
|
|
|
|
or current_time - last_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if not (has_expired_locks and interval_satisfied):
|
|
|
|
|
|
return 0, earliest_cleanup_time, last_cleanup_time
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
cleaned_count = 0
|
|
|
|
|
|
new_earliest_time = None
|
|
|
|
|
|
|
2025-07-13 02:38:36 +08:00
|
|
|
|
# Calculate total count before cleanup
|
|
|
|
|
|
total_cleanup_len = len(cleanup_data)
|
|
|
|
|
|
|
2025-07-13 00:09:00 +08:00
|
|
|
|
# Perform cleanup operation
|
|
|
|
|
|
for cleanup_key, cleanup_time in list(cleanup_data.items()):
|
|
|
|
|
|
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
|
|
|
|
|
# Remove from cleanup data
|
|
|
|
|
|
cleanup_data.pop(cleanup_key, None)
|
|
|
|
|
|
|
|
|
|
|
|
# Remove from lock registry if exists
|
|
|
|
|
|
if lock_registry is not None:
|
|
|
|
|
|
lock_registry.pop(cleanup_key, None)
|
|
|
|
|
|
if lock_count is not None:
|
|
|
|
|
|
lock_count.pop(cleanup_key, None)
|
|
|
|
|
|
|
|
|
|
|
|
cleaned_count += 1
|
|
|
|
|
|
else:
|
|
|
|
|
|
# Track the earliest time among remaining locks
|
|
|
|
|
|
if new_earliest_time is None or cleanup_time < new_earliest_time:
|
|
|
|
|
|
new_earliest_time = cleanup_time
|
|
|
|
|
|
|
|
|
|
|
|
# Update state only after successful cleanup
|
|
|
|
|
|
if cleaned_count > 0:
|
|
|
|
|
|
new_last_cleanup_time = current_time
|
|
|
|
|
|
|
|
|
|
|
|
# Log cleanup results
|
|
|
|
|
|
next_cleanup_in = max(
|
|
|
|
|
|
(new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time)
|
|
|
|
|
|
if new_earliest_time
|
|
|
|
|
|
else float("inf"),
|
|
|
|
|
|
MIN_CLEANUP_INTERVAL_SECONDS,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if lock_type == "async":
|
|
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired {lock_type} locks, "
|
|
|
|
|
|
f"next cleanup in {next_cleanup_in:.1f}s",
|
|
|
|
|
|
enable_output=False,
|
|
|
|
|
|
level="INFO",
|
|
|
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, "
|
|
|
|
|
|
f"next cleanup in {next_cleanup_in:.1f}s",
|
|
|
|
|
|
enable_output=False,
|
|
|
|
|
|
level="INFO",
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
return cleaned_count, new_earliest_time, new_last_cleanup_time
|
|
|
|
|
|
else:
|
|
|
|
|
|
return 0, earliest_cleanup_time, last_cleanup_time
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
direct_log(
|
|
|
|
|
|
f"== {lock_type} Lock == Cleanup failed: {e}",
|
|
|
|
|
|
level="ERROR",
|
|
|
|
|
|
enable_output=False,
|
|
|
|
|
|
)
|
|
|
|
|
|
return 0, earliest_cleanup_time, last_cleanup_time
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-07-11 20:40:50 +08:00
|
|
|
|
def _get_or_create_shared_raw_mp_lock(
|
|
|
|
|
|
factory_name: str, key: str
|
|
|
|
|
|
) -> Optional[mp.synchronize.Lock]:
|
2025-07-11 22:13:02 +08:00
|
|
|
|
"""Return the *singleton* manager.Lock() proxy for keyed lock, creating if needed."""
|
2025-05-08 11:35:10 +10:00
|
|
|
|
if not _is_multiprocess:
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
with _registry_guard:
|
|
|
|
|
|
combined_key = _get_combined_key(factory_name, key)
|
|
|
|
|
|
raw = _lock_registry.get(combined_key)
|
|
|
|
|
|
count = _lock_registry_count.get(combined_key)
|
|
|
|
|
|
if raw is None:
|
|
|
|
|
|
raw = _manager.Lock()
|
|
|
|
|
|
_lock_registry[combined_key] = raw
|
2025-07-11 22:13:02 +08:00
|
|
|
|
count = 0
|
2025-05-08 11:35:10 +10:00
|
|
|
|
else:
|
|
|
|
|
|
if count is None:
|
2025-07-11 20:40:50 +08:00
|
|
|
|
raise RuntimeError(
|
|
|
|
|
|
f"Shared-Data lock registry for {factory_name} is corrupted for key {key}"
|
|
|
|
|
|
)
|
|
|
|
|
|
if (
|
2025-07-13 13:51:48 +08:00
|
|
|
|
count == 0 and combined_key in _lock_cleanup_data
|
2025-07-11 22:13:02 +08:00
|
|
|
|
): # Reusing an key waiting for cleanup, remove it from cleanup list
|
2025-05-08 11:35:10 +10:00
|
|
|
|
_lock_cleanup_data.pop(combined_key)
|
2025-07-11 22:13:02 +08:00
|
|
|
|
count += 1
|
|
|
|
|
|
_lock_registry_count[combined_key] = count
|
2025-05-08 11:35:10 +10:00
|
|
|
|
return raw
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _release_shared_raw_mp_lock(factory_name: str, key: str):
|
|
|
|
|
|
"""Release the *singleton* manager.Lock() proxy for *key*."""
|
|
|
|
|
|
if not _is_multiprocess:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
2025-07-12 04:34:26 +08:00
|
|
|
|
global _earliest_mp_cleanup_time, _last_mp_cleanup_time
|
|
|
|
|
|
|
2025-05-08 11:35:10 +10:00
|
|
|
|
with _registry_guard:
|
|
|
|
|
|
combined_key = _get_combined_key(factory_name, key)
|
|
|
|
|
|
raw = _lock_registry.get(combined_key)
|
|
|
|
|
|
count = _lock_registry_count.get(combined_key)
|
|
|
|
|
|
if raw is None and count is None:
|
|
|
|
|
|
return
|
|
|
|
|
|
elif raw is None or count is None:
|
2025-07-11 20:40:50 +08:00
|
|
|
|
raise RuntimeError(
|
|
|
|
|
|
f"Shared-Data lock registry for {factory_name} is corrupted for key {key}"
|
|
|
|
|
|
)
|
2025-05-08 11:35:10 +10:00
|
|
|
|
|
|
|
|
|
|
count -= 1
|
|
|
|
|
|
if count < 0:
|
2025-07-11 20:40:50 +08:00
|
|
|
|
raise RuntimeError(
|
|
|
|
|
|
f"Attempting to release lock for {key} more times than it was acquired"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2025-07-11 20:39:08 +08:00
|
|
|
|
_lock_registry_count[combined_key] = count
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-07-11 20:39:08 +08:00
|
|
|
|
current_time = time.time()
|
2025-05-08 11:35:10 +10:00
|
|
|
|
if count == 0:
|
2025-07-11 20:39:08 +08:00
|
|
|
|
_lock_cleanup_data[combined_key] = current_time
|
2025-07-12 05:17:44 +08:00
|
|
|
|
|
2025-07-12 04:34:26 +08:00
|
|
|
|
# Update earliest multiprocess cleanup time (only when earlier)
|
2025-07-12 05:17:44 +08:00
|
|
|
|
if (
|
|
|
|
|
|
_earliest_mp_cleanup_time is None
|
|
|
|
|
|
or current_time < _earliest_mp_cleanup_time
|
|
|
|
|
|
):
|
2025-07-12 04:34:26 +08:00
|
|
|
|
_earliest_mp_cleanup_time = current_time
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-07-13 00:09:00 +08:00
|
|
|
|
# Use generic cleanup function
|
|
|
|
|
|
cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup(
|
|
|
|
|
|
lock_type="mp",
|
|
|
|
|
|
cleanup_data=_lock_cleanup_data,
|
|
|
|
|
|
lock_registry=_lock_registry,
|
|
|
|
|
|
lock_count=_lock_registry_count,
|
|
|
|
|
|
earliest_cleanup_time=_earliest_mp_cleanup_time,
|
|
|
|
|
|
last_cleanup_time=_last_mp_cleanup_time,
|
|
|
|
|
|
current_time=current_time,
|
|
|
|
|
|
threshold_check=True,
|
|
|
|
|
|
)
|
2025-07-12 05:17:44 +08:00
|
|
|
|
|
2025-07-13 00:09:00 +08:00
|
|
|
|
# Update global state if cleanup was performed
|
|
|
|
|
|
if cleaned_count > 0:
|
|
|
|
|
|
_earliest_mp_cleanup_time = new_earliest_time
|
|
|
|
|
|
_last_mp_cleanup_time = new_last_cleanup_time
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-05-08 11:35:10 +10:00
|
|
|
|
|
|
|
|
|
|
class KeyedUnifiedLock:
|
|
|
|
|
|
"""
|
2025-07-11 22:13:02 +08:00
|
|
|
|
Manager for unified keyed locks, supporting both single and multi-process
|
2025-05-08 11:35:10 +10:00
|
|
|
|
|
2025-07-11 22:13:02 +08:00
|
|
|
|
• Keeps only a table of async keyed locks locally
|
2025-07-12 12:10:12 +08:00
|
|
|
|
• Fetches the multi-process keyed lock on every acquire
|
2025-05-08 11:35:10 +10:00
|
|
|
|
• Builds a fresh `UnifiedLock` each time, so `enable_logging`
|
|
|
|
|
|
(or future options) can vary per call.
|
2025-07-12 12:10:12 +08:00
|
|
|
|
• Supports dynamic namespaces specified at lock usage time
|
2025-05-08 11:35:10 +10:00
|
|
|
|
"""
|
|
|
|
|
|
|
2025-07-12 12:10:12 +08:00
|
|
|
|
def __init__(self, *, default_enable_logging: bool = True) -> None:
|
2025-05-08 11:35:10 +10:00
|
|
|
|
self._default_enable_logging = default_enable_logging
|
2025-07-11 22:13:02 +08:00
|
|
|
|
self._async_lock: Dict[str, asyncio.Lock] = {} # local keyed locks
|
2025-07-11 23:43:40 +08:00
|
|
|
|
self._async_lock_count: Dict[
|
|
|
|
|
|
str, int
|
|
|
|
|
|
] = {} # local keyed locks referenced count
|
|
|
|
|
|
self._async_lock_cleanup_data: Dict[
|
|
|
|
|
|
str, time.time
|
|
|
|
|
|
] = {} # local keyed locks timeout
|
|
|
|
|
|
self._mp_locks: Dict[
|
|
|
|
|
|
str, mp.synchronize.Lock
|
|
|
|
|
|
] = {} # multi-process lock proxies
|
2025-07-12 05:17:44 +08:00
|
|
|
|
self._earliest_async_cleanup_time: Optional[float] = (
|
|
|
|
|
|
None # track earliest async cleanup time
|
|
|
|
|
|
)
|
|
|
|
|
|
self._last_async_cleanup_time: Optional[float] = (
|
|
|
|
|
|
None # track last async cleanup time for minimum interval
|
|
|
|
|
|
)
|
2025-05-08 11:35:10 +10:00
|
|
|
|
|
2025-07-12 12:10:12 +08:00
|
|
|
|
def __call__(
|
|
|
|
|
|
self, namespace: str, keys: list[str], *, enable_logging: Optional[bool] = None
|
|
|
|
|
|
):
|
2025-05-08 11:35:10 +10:00
|
|
|
|
"""
|
|
|
|
|
|
Ergonomic helper so you can write:
|
|
|
|
|
|
|
2025-07-12 12:10:12 +08:00
|
|
|
|
async with storage_keyed_lock("namespace", ["key1", "key2"]):
|
2025-05-08 11:35:10 +10:00
|
|
|
|
...
|
|
|
|
|
|
"""
|
|
|
|
|
|
if enable_logging is None:
|
|
|
|
|
|
enable_logging = self._default_enable_logging
|
2025-07-11 20:40:50 +08:00
|
|
|
|
return _KeyedLockContext(
|
|
|
|
|
|
self,
|
2025-07-12 12:10:12 +08:00
|
|
|
|
namespace=namespace,
|
2025-07-11 20:40:50 +08:00
|
|
|
|
keys=keys,
|
|
|
|
|
|
enable_logging=enable_logging,
|
|
|
|
|
|
)
|
2025-05-08 11:35:10 +10:00
|
|
|
|
|
2025-07-12 12:10:12 +08:00
|
|
|
|
def _get_or_create_async_lock(self, combined_key: str) -> asyncio.Lock:
|
|
|
|
|
|
async_lock = self._async_lock.get(combined_key)
|
|
|
|
|
|
count = self._async_lock_count.get(combined_key, 0)
|
2025-05-08 11:35:10 +10:00
|
|
|
|
if async_lock is None:
|
|
|
|
|
|
async_lock = asyncio.Lock()
|
2025-07-12 12:10:12 +08:00
|
|
|
|
self._async_lock[combined_key] = async_lock
|
|
|
|
|
|
elif count == 0 and combined_key in self._async_lock_cleanup_data:
|
|
|
|
|
|
self._async_lock_cleanup_data.pop(combined_key)
|
2025-05-08 11:35:10 +10:00
|
|
|
|
count += 1
|
2025-07-12 12:10:12 +08:00
|
|
|
|
self._async_lock_count[combined_key] = count
|
2025-05-08 11:35:10 +10:00
|
|
|
|
return async_lock
|
|
|
|
|
|
|
2025-07-12 12:10:12 +08:00
|
|
|
|
def _release_async_lock(self, combined_key: str):
|
|
|
|
|
|
count = self._async_lock_count.get(combined_key, 0)
|
2025-05-08 11:35:10 +10:00
|
|
|
|
count -= 1
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-07-11 22:13:02 +08:00
|
|
|
|
current_time = time.time()
|
2025-05-08 11:35:10 +10:00
|
|
|
|
if count == 0:
|
2025-07-12 12:10:12 +08:00
|
|
|
|
self._async_lock_cleanup_data[combined_key] = current_time
|
2025-07-12 05:17:44 +08:00
|
|
|
|
|
2025-07-12 04:34:26 +08:00
|
|
|
|
# Update earliest async cleanup time (only when earlier)
|
2025-07-12 05:17:44 +08:00
|
|
|
|
if (
|
|
|
|
|
|
self._earliest_async_cleanup_time is None
|
|
|
|
|
|
or current_time < self._earliest_async_cleanup_time
|
|
|
|
|
|
):
|
2025-07-12 04:34:26 +08:00
|
|
|
|
self._earliest_async_cleanup_time = current_time
|
2025-07-12 12:10:12 +08:00
|
|
|
|
self._async_lock_count[combined_key] = count
|
2025-05-08 11:35:10 +10:00
|
|
|
|
|
2025-07-13 00:09:00 +08:00
|
|
|
|
# Use generic cleanup function
|
|
|
|
|
|
cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup(
|
|
|
|
|
|
lock_type="async",
|
|
|
|
|
|
cleanup_data=self._async_lock_cleanup_data,
|
|
|
|
|
|
lock_registry=self._async_lock,
|
|
|
|
|
|
lock_count=self._async_lock_count,
|
|
|
|
|
|
earliest_cleanup_time=self._earliest_async_cleanup_time,
|
|
|
|
|
|
last_cleanup_time=self._last_async_cleanup_time,
|
|
|
|
|
|
current_time=current_time,
|
|
|
|
|
|
threshold_check=True,
|
|
|
|
|
|
)
|
2025-07-12 05:17:44 +08:00
|
|
|
|
|
2025-07-13 00:09:00 +08:00
|
|
|
|
# Update instance state if cleanup was performed
|
|
|
|
|
|
if cleaned_count > 0:
|
|
|
|
|
|
self._earliest_async_cleanup_time = new_earliest_time
|
|
|
|
|
|
self._last_async_cleanup_time = new_last_cleanup_time
|
2025-05-08 11:35:10 +10:00
|
|
|
|
|
2025-07-12 12:10:12 +08:00
|
|
|
|
def _get_lock_for_key(
|
|
|
|
|
|
self, namespace: str, key: str, enable_logging: bool = False
|
|
|
|
|
|
) -> UnifiedLock:
|
|
|
|
|
|
# 1. Create combined key for this namespace:key combination
|
|
|
|
|
|
combined_key = _get_combined_key(namespace, key)
|
|
|
|
|
|
|
|
|
|
|
|
# 2. get (or create) the per‑process async gate for this combined key
|
2025-05-08 11:35:10 +10:00
|
|
|
|
# Is synchronous, so no need to acquire a lock
|
2025-07-12 12:10:12 +08:00
|
|
|
|
async_lock = self._get_or_create_async_lock(combined_key)
|
2025-05-08 11:35:10 +10:00
|
|
|
|
|
2025-07-12 12:10:12 +08:00
|
|
|
|
# 3. fetch the shared raw lock
|
|
|
|
|
|
raw_lock = _get_or_create_shared_raw_mp_lock(namespace, key)
|
2025-05-08 11:35:10 +10:00
|
|
|
|
is_multiprocess = raw_lock is not None
|
|
|
|
|
|
if not is_multiprocess:
|
|
|
|
|
|
raw_lock = async_lock
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-07-12 12:10:12 +08:00
|
|
|
|
# 4. build a *fresh* UnifiedLock with the chosen logging flag
|
2025-05-08 11:35:10 +10:00
|
|
|
|
if is_multiprocess:
|
|
|
|
|
|
return UnifiedLock(
|
2025-07-11 23:43:40 +08:00
|
|
|
|
lock=raw_lock,
|
|
|
|
|
|
is_async=False, # manager.Lock is synchronous
|
2025-07-12 12:10:12 +08:00
|
|
|
|
name=combined_key,
|
2025-07-11 23:43:40 +08:00
|
|
|
|
enable_logging=enable_logging,
|
|
|
|
|
|
async_lock=async_lock, # prevents event‑loop blocking
|
2025-05-08 11:35:10 +10:00
|
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
|
|
|
return UnifiedLock(
|
2025-07-11 23:43:40 +08:00
|
|
|
|
lock=raw_lock,
|
|
|
|
|
|
is_async=True,
|
2025-07-12 12:10:12 +08:00
|
|
|
|
name=combined_key,
|
2025-07-11 23:43:40 +08:00
|
|
|
|
enable_logging=enable_logging,
|
|
|
|
|
|
async_lock=None, # No need for async lock in single process mode
|
2025-05-08 11:35:10 +10:00
|
|
|
|
)
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-07-12 12:10:12 +08:00
|
|
|
|
def _release_lock_for_key(self, namespace: str, key: str):
|
|
|
|
|
|
combined_key = _get_combined_key(namespace, key)
|
|
|
|
|
|
self._release_async_lock(combined_key)
|
|
|
|
|
|
_release_shared_raw_mp_lock(namespace, key)
|
2025-05-08 11:35:10 +10:00
|
|
|
|
|
2025-07-13 00:09:00 +08:00
|
|
|
|
def cleanup_expired_locks(self) -> Dict[str, Any]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Cleanup expired locks for both async and multiprocess locks following the same
|
|
|
|
|
|
conditions as _release_shared_raw_mp_lock and _release_async_lock functions.
|
|
|
|
|
|
|
|
|
|
|
|
Only performs cleanup when both has_expired_locks and interval_satisfied conditions are met
|
|
|
|
|
|
to avoid too frequent cleanup operations.
|
|
|
|
|
|
|
|
|
|
|
|
Since async and multiprocess locks work together, this method cleans up
|
|
|
|
|
|
both types of expired locks and returns comprehensive statistics.
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
Dict containing cleanup statistics and current status:
|
|
|
|
|
|
{
|
|
|
|
|
|
"process_id": 12345,
|
|
|
|
|
|
"cleanup_performed": {
|
|
|
|
|
|
"mp_cleaned": 5,
|
|
|
|
|
|
"async_cleaned": 3
|
|
|
|
|
|
},
|
|
|
|
|
|
"current_status": {
|
|
|
|
|
|
"total_mp_locks": 10,
|
|
|
|
|
|
"pending_mp_cleanup": 2,
|
|
|
|
|
|
"total_async_locks": 8,
|
|
|
|
|
|
"pending_async_cleanup": 1
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
"""
|
|
|
|
|
|
global _lock_registry, _lock_registry_count, _lock_cleanup_data
|
|
|
|
|
|
global _registry_guard, _earliest_mp_cleanup_time, _last_mp_cleanup_time
|
|
|
|
|
|
|
|
|
|
|
|
cleanup_stats = {"mp_cleaned": 0, "async_cleaned": 0}
|
|
|
|
|
|
|
|
|
|
|
|
current_time = time.time()
|
|
|
|
|
|
|
|
|
|
|
|
# 1. Cleanup multiprocess locks using generic function
|
|
|
|
|
|
if (
|
|
|
|
|
|
_is_multiprocess
|
|
|
|
|
|
and _lock_registry is not None
|
|
|
|
|
|
and _registry_guard is not None
|
|
|
|
|
|
):
|
|
|
|
|
|
try:
|
|
|
|
|
|
with _registry_guard:
|
|
|
|
|
|
if _lock_cleanup_data is not None:
|
|
|
|
|
|
# Use generic cleanup function without threshold check
|
|
|
|
|
|
cleaned_count, new_earliest_time, new_last_cleanup_time = (
|
|
|
|
|
|
_perform_lock_cleanup(
|
|
|
|
|
|
lock_type="mp",
|
|
|
|
|
|
cleanup_data=_lock_cleanup_data,
|
|
|
|
|
|
lock_registry=_lock_registry,
|
|
|
|
|
|
lock_count=_lock_registry_count,
|
|
|
|
|
|
earliest_cleanup_time=_earliest_mp_cleanup_time,
|
|
|
|
|
|
last_cleanup_time=_last_mp_cleanup_time,
|
|
|
|
|
|
current_time=current_time,
|
|
|
|
|
|
threshold_check=False, # Force cleanup in cleanup_expired_locks
|
|
|
|
|
|
)
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# Update global state if cleanup was performed
|
|
|
|
|
|
if cleaned_count > 0:
|
|
|
|
|
|
_earliest_mp_cleanup_time = new_earliest_time
|
|
|
|
|
|
_last_mp_cleanup_time = new_last_cleanup_time
|
|
|
|
|
|
cleanup_stats["mp_cleaned"] = cleaned_count
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
direct_log(
|
|
|
|
|
|
f"Error during multiprocess lock cleanup: {e}",
|
|
|
|
|
|
level="ERROR",
|
|
|
|
|
|
enable_output=False,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# 2. Cleanup async locks using generic function
|
|
|
|
|
|
try:
|
|
|
|
|
|
# Use generic cleanup function without threshold check
|
|
|
|
|
|
cleaned_count, new_earliest_time, new_last_cleanup_time = (
|
|
|
|
|
|
_perform_lock_cleanup(
|
|
|
|
|
|
lock_type="async",
|
|
|
|
|
|
cleanup_data=self._async_lock_cleanup_data,
|
|
|
|
|
|
lock_registry=self._async_lock,
|
|
|
|
|
|
lock_count=self._async_lock_count,
|
|
|
|
|
|
earliest_cleanup_time=self._earliest_async_cleanup_time,
|
|
|
|
|
|
last_cleanup_time=self._last_async_cleanup_time,
|
|
|
|
|
|
current_time=current_time,
|
|
|
|
|
|
threshold_check=False, # Force cleanup in cleanup_expired_locks
|
|
|
|
|
|
)
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# Update instance state if cleanup was performed
|
|
|
|
|
|
if cleaned_count > 0:
|
|
|
|
|
|
self._earliest_async_cleanup_time = new_earliest_time
|
|
|
|
|
|
self._last_async_cleanup_time = new_last_cleanup_time
|
|
|
|
|
|
cleanup_stats["async_cleaned"] = cleaned_count
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
direct_log(
|
|
|
|
|
|
f"Error during async lock cleanup: {e}",
|
|
|
|
|
|
level="ERROR",
|
|
|
|
|
|
enable_output=False,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# 3. Get current status after cleanup
|
|
|
|
|
|
current_status = self.get_lock_status()
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
"process_id": os.getpid(),
|
|
|
|
|
|
"cleanup_performed": cleanup_stats,
|
|
|
|
|
|
"current_status": current_status,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def get_lock_status(self) -> Dict[str, int]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Get current status of both async and multiprocess locks.
|
|
|
|
|
|
|
|
|
|
|
|
Returns comprehensive lock counts for both types of locks since
|
|
|
|
|
|
they work together in the keyed lock system.
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
Dict containing lock counts:
|
|
|
|
|
|
{
|
|
|
|
|
|
"total_mp_locks": 10,
|
|
|
|
|
|
"pending_mp_cleanup": 2,
|
|
|
|
|
|
"total_async_locks": 8,
|
|
|
|
|
|
"pending_async_cleanup": 1
|
|
|
|
|
|
}
|
|
|
|
|
|
"""
|
|
|
|
|
|
global _lock_registry_count, _lock_cleanup_data, _registry_guard
|
|
|
|
|
|
|
|
|
|
|
|
status = {
|
|
|
|
|
|
"total_mp_locks": 0,
|
|
|
|
|
|
"pending_mp_cleanup": 0,
|
|
|
|
|
|
"total_async_locks": 0,
|
|
|
|
|
|
"pending_async_cleanup": 0,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
# Count multiprocess locks
|
|
|
|
|
|
if _is_multiprocess and _lock_registry_count is not None:
|
|
|
|
|
|
if _registry_guard is not None:
|
|
|
|
|
|
with _registry_guard:
|
|
|
|
|
|
status["total_mp_locks"] = len(_lock_registry_count)
|
|
|
|
|
|
if _lock_cleanup_data is not None:
|
|
|
|
|
|
status["pending_mp_cleanup"] = len(_lock_cleanup_data)
|
|
|
|
|
|
|
|
|
|
|
|
# Count async locks
|
|
|
|
|
|
status["total_async_locks"] = len(self._async_lock_count)
|
|
|
|
|
|
status["pending_async_cleanup"] = len(self._async_lock_cleanup_data)
|
|
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
direct_log(
|
|
|
|
|
|
f"Error getting keyed lock status: {e}",
|
|
|
|
|
|
level="ERROR",
|
|
|
|
|
|
enable_output=False,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
return status
|
|
|
|
|
|
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-05-08 11:35:10 +10:00
|
|
|
|
class _KeyedLockContext:
|
|
|
|
|
|
def __init__(
|
|
|
|
|
|
self,
|
|
|
|
|
|
parent: KeyedUnifiedLock,
|
2025-07-12 12:10:12 +08:00
|
|
|
|
namespace: str,
|
2025-05-08 11:35:10 +10:00
|
|
|
|
keys: list[str],
|
|
|
|
|
|
enable_logging: bool,
|
|
|
|
|
|
) -> None:
|
|
|
|
|
|
self._parent = parent
|
2025-07-12 12:10:12 +08:00
|
|
|
|
self._namespace = namespace
|
2025-05-08 11:35:10 +10:00
|
|
|
|
|
|
|
|
|
|
# The sorting is critical to ensure proper lock and release order
|
|
|
|
|
|
# to avoid deadlocks
|
|
|
|
|
|
self._keys = sorted(keys)
|
|
|
|
|
|
self._enable_logging = (
|
2025-07-11 20:40:50 +08:00
|
|
|
|
enable_logging
|
|
|
|
|
|
if enable_logging is not None
|
2025-05-08 11:35:10 +10:00
|
|
|
|
else parent._default_enable_logging
|
|
|
|
|
|
)
|
|
|
|
|
|
self._ul: Optional[List["UnifiedLock"]] = None # set in __aenter__
|
|
|
|
|
|
|
|
|
|
|
|
# ----- enter -----
|
|
|
|
|
|
async def __aenter__(self):
|
|
|
|
|
|
if self._ul is not None:
|
|
|
|
|
|
raise RuntimeError("KeyedUnifiedLock already acquired in current context")
|
|
|
|
|
|
|
2025-07-12 12:10:12 +08:00
|
|
|
|
# acquire locks for all keys in the namespace
|
2025-05-08 11:35:10 +10:00
|
|
|
|
self._ul = []
|
|
|
|
|
|
for key in self._keys:
|
2025-07-11 20:40:50 +08:00
|
|
|
|
lock = self._parent._get_lock_for_key(
|
2025-07-12 12:10:12 +08:00
|
|
|
|
self._namespace, key, enable_logging=self._enable_logging
|
2025-07-11 20:40:50 +08:00
|
|
|
|
)
|
2025-05-08 11:35:10 +10:00
|
|
|
|
await lock.__aenter__()
|
|
|
|
|
|
inc_debug_n_locks_acquired()
|
|
|
|
|
|
self._ul.append(lock)
|
2025-07-12 12:10:12 +08:00
|
|
|
|
return self
|
2025-05-08 11:35:10 +10:00
|
|
|
|
|
|
|
|
|
|
# ----- exit -----
|
|
|
|
|
|
async def __aexit__(self, exc_type, exc, tb):
|
|
|
|
|
|
# The UnifiedLock takes care of proper release order
|
|
|
|
|
|
for ul, key in zip(reversed(self._ul), reversed(self._keys)):
|
|
|
|
|
|
await ul.__aexit__(exc_type, exc, tb)
|
2025-07-12 12:10:12 +08:00
|
|
|
|
self._parent._release_lock_for_key(self._namespace, key)
|
2025-05-08 11:35:10 +10:00
|
|
|
|
dec_debug_n_locks_acquired()
|
|
|
|
|
|
self._ul = None
|
2025-03-01 02:22:35 +08:00
|
|
|
|
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-03-08 22:36:41 +08:00
|
|
|
|
def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
|
2025-03-01 10:48:55 +08:00
|
|
|
|
"""return unified storage lock for data consistency"""
|
2025-03-25 10:48:15 +08:00
|
|
|
|
async_lock = _async_locks.get("internal_lock") if _is_multiprocess else None
|
2025-03-09 01:00:42 +08:00
|
|
|
|
return UnifiedLock(
|
|
|
|
|
|
lock=_internal_lock,
|
2025-03-25 10:48:15 +08:00
|
|
|
|
is_async=not _is_multiprocess,
|
2025-03-09 01:00:42 +08:00
|
|
|
|
name="internal_lock",
|
|
|
|
|
|
enable_logging=enable_logging,
|
2025-03-21 16:08:23 +08:00
|
|
|
|
async_lock=async_lock,
|
2025-03-09 01:00:42 +08:00
|
|
|
|
)
|
2025-03-01 16:23:34 +08:00
|
|
|
|
|
2025-02-26 18:11:02 +08:00
|
|
|
|
|
2025-03-08 22:36:41 +08:00
|
|
|
|
def get_storage_lock(enable_logging: bool = False) -> UnifiedLock:
|
2025-03-01 10:48:55 +08:00
|
|
|
|
"""return unified storage lock for data consistency"""
|
2025-03-25 10:48:15 +08:00
|
|
|
|
async_lock = _async_locks.get("storage_lock") if _is_multiprocess else None
|
2025-03-09 01:00:42 +08:00
|
|
|
|
return UnifiedLock(
|
|
|
|
|
|
lock=_storage_lock,
|
2025-03-25 10:48:15 +08:00
|
|
|
|
is_async=not _is_multiprocess,
|
2025-03-09 01:00:42 +08:00
|
|
|
|
name="storage_lock",
|
|
|
|
|
|
enable_logging=enable_logging,
|
2025-03-21 16:08:23 +08:00
|
|
|
|
async_lock=async_lock,
|
2025-03-09 01:00:42 +08:00
|
|
|
|
)
|
2025-03-01 16:23:34 +08:00
|
|
|
|
|
2025-02-26 18:11:02 +08:00
|
|
|
|
|
2025-03-08 22:36:41 +08:00
|
|
|
|
def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock:
|
2025-03-01 10:48:55 +08:00
|
|
|
|
"""return unified storage lock for data consistency"""
|
2025-03-25 10:48:15 +08:00
|
|
|
|
async_lock = _async_locks.get("pipeline_status_lock") if _is_multiprocess else None
|
2025-03-09 01:00:42 +08:00
|
|
|
|
return UnifiedLock(
|
|
|
|
|
|
lock=_pipeline_status_lock,
|
2025-03-25 10:48:15 +08:00
|
|
|
|
is_async=not _is_multiprocess,
|
2025-03-09 01:00:42 +08:00
|
|
|
|
name="pipeline_status_lock",
|
|
|
|
|
|
enable_logging=enable_logging,
|
2025-03-21 16:08:23 +08:00
|
|
|
|
async_lock=async_lock,
|
2025-03-09 01:00:42 +08:00
|
|
|
|
)
|
2025-03-08 22:36:41 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock:
|
|
|
|
|
|
"""return unified graph database lock for ensuring atomic operations"""
|
2025-03-25 10:48:15 +08:00
|
|
|
|
async_lock = _async_locks.get("graph_db_lock") if _is_multiprocess else None
|
2025-03-09 01:00:42 +08:00
|
|
|
|
return UnifiedLock(
|
|
|
|
|
|
lock=_graph_db_lock,
|
2025-03-25 10:48:15 +08:00
|
|
|
|
is_async=not _is_multiprocess,
|
2025-03-09 01:00:42 +08:00
|
|
|
|
name="graph_db_lock",
|
|
|
|
|
|
enable_logging=enable_logging,
|
2025-03-21 16:08:23 +08:00
|
|
|
|
async_lock=async_lock,
|
2025-03-09 01:00:42 +08:00
|
|
|
|
)
|
2025-03-01 16:23:34 +08:00
|
|
|
|
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-07-12 12:10:12 +08:00
|
|
|
|
def get_storage_keyed_lock(
|
|
|
|
|
|
keys: str | list[str], namespace: str = "default", enable_logging: bool = False
|
|
|
|
|
|
) -> _KeyedLockContext:
|
|
|
|
|
|
"""Return unified storage keyed lock for ensuring atomic operations across different namespaces"""
|
|
|
|
|
|
global _storage_keyed_lock
|
|
|
|
|
|
if _storage_keyed_lock is None:
|
2025-05-08 11:35:10 +10:00
|
|
|
|
raise RuntimeError("Shared-Data is not initialized")
|
|
|
|
|
|
if isinstance(keys, str):
|
|
|
|
|
|
keys = [keys]
|
2025-07-12 12:10:12 +08:00
|
|
|
|
return _storage_keyed_lock(namespace, keys, enable_logging=enable_logging)
|
2025-02-28 01:25:59 +08:00
|
|
|
|
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-03-09 17:33:15 +08:00
|
|
|
|
def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock:
|
|
|
|
|
|
"""return unified data initialization lock for ensuring atomic data initialization"""
|
2025-03-25 10:48:15 +08:00
|
|
|
|
async_lock = _async_locks.get("data_init_lock") if _is_multiprocess else None
|
2025-03-09 17:33:15 +08:00
|
|
|
|
return UnifiedLock(
|
|
|
|
|
|
lock=_data_init_lock,
|
2025-03-25 10:48:15 +08:00
|
|
|
|
is_async=not _is_multiprocess,
|
2025-03-09 17:33:15 +08:00
|
|
|
|
name="data_init_lock",
|
|
|
|
|
|
enable_logging=enable_logging,
|
2025-03-21 16:08:23 +08:00
|
|
|
|
async_lock=async_lock,
|
2025-03-09 17:33:15 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-07-13 00:09:00 +08:00
|
|
|
|
def cleanup_keyed_lock() -> Dict[str, Any]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Force cleanup of expired keyed locks and return comprehensive status information.
|
|
|
|
|
|
|
|
|
|
|
|
This function actively cleans up expired locks for both async and multiprocess locks,
|
|
|
|
|
|
then returns detailed statistics about the cleanup operation and current lock status.
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
Same as cleanup_expired_locks in KeyedUnifiedLock
|
|
|
|
|
|
"""
|
|
|
|
|
|
global _storage_keyed_lock
|
|
|
|
|
|
|
|
|
|
|
|
# Check if shared storage is initialized
|
|
|
|
|
|
if not _initialized or _storage_keyed_lock is None:
|
|
|
|
|
|
return {
|
|
|
|
|
|
"process_id": os.getpid(),
|
|
|
|
|
|
"cleanup_performed": {"mp_cleaned": 0, "async_cleaned": 0},
|
|
|
|
|
|
"current_status": {
|
|
|
|
|
|
"total_mp_locks": 0,
|
|
|
|
|
|
"pending_mp_cleanup": 0,
|
|
|
|
|
|
"total_async_locks": 0,
|
|
|
|
|
|
"pending_async_cleanup": 0,
|
|
|
|
|
|
},
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return _storage_keyed_lock.cleanup_expired_locks()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_keyed_lock_status() -> Dict[str, Any]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Get current status of keyed locks without performing cleanup.
|
|
|
|
|
|
|
|
|
|
|
|
This function provides a read-only view of the current lock counts
|
|
|
|
|
|
for both multiprocess and async locks, including pending cleanup counts.
|
|
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
|
Same as get_lock_status in KeyedUnifiedLock
|
|
|
|
|
|
"""
|
|
|
|
|
|
global _storage_keyed_lock
|
|
|
|
|
|
|
|
|
|
|
|
# Check if shared storage is initialized
|
|
|
|
|
|
if not _initialized or _storage_keyed_lock is None:
|
|
|
|
|
|
return {
|
|
|
|
|
|
"process_id": os.getpid(),
|
|
|
|
|
|
"total_mp_locks": 0,
|
|
|
|
|
|
"pending_mp_cleanup": 0,
|
|
|
|
|
|
"total_async_locks": 0,
|
|
|
|
|
|
"pending_async_cleanup": 0,
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
status = _storage_keyed_lock.get_lock_status()
|
|
|
|
|
|
status["process_id"] = os.getpid()
|
|
|
|
|
|
return status
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-02-27 08:48:33 +08:00
|
|
|
|
def initialize_share_data(workers: int = 1):
|
2025-02-27 13:25:22 +08:00
|
|
|
|
"""
|
|
|
|
|
|
Initialize shared storage data for single or multi-process mode.
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
When used with Gunicorn's preload feature, this function is called once in the
|
|
|
|
|
|
master process before forking worker processes, allowing all workers to share
|
|
|
|
|
|
the same initialized data.
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-03-02 01:00:27 +08:00
|
|
|
|
In single-process mode, this function is called in FASTAPI lifespan function.
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
The function determines whether to use cross-process shared variables for data storage
|
|
|
|
|
|
based on the number of workers. If workers=1, it uses thread locks and local dictionaries.
|
|
|
|
|
|
If workers>1, it uses process locks and shared dictionaries managed by multiprocessing.Manager.
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
Args:
|
|
|
|
|
|
workers (int): Number of worker processes. If 1, single-process mode is used.
|
|
|
|
|
|
If > 1, multi-process mode with shared memory is used.
|
|
|
|
|
|
"""
|
2025-02-27 19:05:51 +08:00
|
|
|
|
global \
|
|
|
|
|
|
_manager, \
|
2025-03-01 01:49:26 +08:00
|
|
|
|
_workers, \
|
2025-03-25 10:48:15 +08:00
|
|
|
|
_is_multiprocess, \
|
2025-03-01 10:48:55 +08:00
|
|
|
|
_storage_lock, \
|
2025-05-08 11:35:10 +10:00
|
|
|
|
_lock_registry, \
|
|
|
|
|
|
_lock_registry_count, \
|
|
|
|
|
|
_lock_cleanup_data, \
|
|
|
|
|
|
_registry_guard, \
|
2025-03-01 10:48:55 +08:00
|
|
|
|
_internal_lock, \
|
|
|
|
|
|
_pipeline_status_lock, \
|
2025-03-08 22:36:41 +08:00
|
|
|
|
_graph_db_lock, \
|
2025-03-09 17:33:15 +08:00
|
|
|
|
_data_init_lock, \
|
2025-02-27 19:05:51 +08:00
|
|
|
|
_shared_dicts, \
|
|
|
|
|
|
_init_flags, \
|
2025-03-01 01:49:26 +08:00
|
|
|
|
_initialized, \
|
2025-03-21 16:08:23 +08:00
|
|
|
|
_update_flags, \
|
2025-05-08 11:35:10 +10:00
|
|
|
|
_async_locks, \
|
2025-07-12 12:10:12 +08:00
|
|
|
|
_storage_keyed_lock, \
|
2025-07-12 04:34:26 +08:00
|
|
|
|
_earliest_mp_cleanup_time, \
|
|
|
|
|
|
_last_mp_cleanup_time
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
# Check if already initialized
|
2025-02-27 15:36:12 +08:00
|
|
|
|
if _initialized:
|
2025-02-27 19:05:51 +08:00
|
|
|
|
direct_log(
|
2025-03-25 10:48:15 +08:00
|
|
|
|
f"Process {os.getpid()} Shared-Data already initialized (multiprocess={_is_multiprocess})"
|
2025-02-27 19:05:51 +08:00
|
|
|
|
)
|
2025-02-27 13:25:22 +08:00
|
|
|
|
return
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-03-01 01:49:26 +08:00
|
|
|
|
_workers = workers
|
2025-02-27 08:48:33 +08:00
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
if workers > 1:
|
2025-03-25 10:48:15 +08:00
|
|
|
|
_is_multiprocess = True
|
2025-03-09 17:33:15 +08:00
|
|
|
|
_manager = Manager()
|
2025-05-08 11:35:10 +10:00
|
|
|
|
_lock_registry = _manager.dict()
|
|
|
|
|
|
_lock_registry_count = _manager.dict()
|
|
|
|
|
|
_lock_cleanup_data = _manager.dict()
|
|
|
|
|
|
_registry_guard = _manager.RLock()
|
2025-03-01 10:48:55 +08:00
|
|
|
|
_internal_lock = _manager.Lock()
|
|
|
|
|
|
_storage_lock = _manager.Lock()
|
|
|
|
|
|
_pipeline_status_lock = _manager.Lock()
|
2025-03-08 22:36:41 +08:00
|
|
|
|
_graph_db_lock = _manager.Lock()
|
2025-03-09 17:33:15 +08:00
|
|
|
|
_data_init_lock = _manager.Lock()
|
2025-02-27 08:48:33 +08:00
|
|
|
|
_shared_dicts = _manager.dict()
|
2025-02-28 01:25:59 +08:00
|
|
|
|
_init_flags = _manager.dict()
|
2025-03-01 01:49:26 +08:00
|
|
|
|
_update_flags = _manager.dict()
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-07-12 12:10:12 +08:00
|
|
|
|
_storage_keyed_lock = KeyedUnifiedLock()
|
2025-03-21 16:56:47 +08:00
|
|
|
|
|
2025-03-21 16:08:23 +08:00
|
|
|
|
# Initialize async locks for multiprocess mode
|
|
|
|
|
|
_async_locks = {
|
|
|
|
|
|
"internal_lock": asyncio.Lock(),
|
|
|
|
|
|
"storage_lock": asyncio.Lock(),
|
|
|
|
|
|
"pipeline_status_lock": asyncio.Lock(),
|
|
|
|
|
|
"graph_db_lock": asyncio.Lock(),
|
|
|
|
|
|
"data_init_lock": asyncio.Lock(),
|
|
|
|
|
|
}
|
2025-03-21 16:56:47 +08:00
|
|
|
|
|
2025-02-27 19:05:51 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
|
|
|
|
|
|
)
|
2025-02-27 13:25:22 +08:00
|
|
|
|
else:
|
2025-03-25 10:48:15 +08:00
|
|
|
|
_is_multiprocess = False
|
2025-03-01 10:48:55 +08:00
|
|
|
|
_internal_lock = asyncio.Lock()
|
|
|
|
|
|
_storage_lock = asyncio.Lock()
|
|
|
|
|
|
_pipeline_status_lock = asyncio.Lock()
|
2025-03-08 22:36:41 +08:00
|
|
|
|
_graph_db_lock = asyncio.Lock()
|
2025-03-09 17:33:15 +08:00
|
|
|
|
_data_init_lock = asyncio.Lock()
|
2025-02-27 13:25:22 +08:00
|
|
|
|
_shared_dicts = {}
|
|
|
|
|
|
_init_flags = {}
|
2025-03-01 01:49:26 +08:00
|
|
|
|
_update_flags = {}
|
2025-03-21 16:08:23 +08:00
|
|
|
|
_async_locks = None # No need for async locks in single process mode
|
2025-07-11 20:40:50 +08:00
|
|
|
|
|
2025-07-12 12:10:12 +08:00
|
|
|
|
_storage_keyed_lock = KeyedUnifiedLock()
|
2025-02-27 19:03:53 +08:00
|
|
|
|
direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")
|
2025-02-26 18:11:02 +08:00
|
|
|
|
|
2025-07-12 04:34:26 +08:00
|
|
|
|
# Initialize multiprocess cleanup times
|
|
|
|
|
|
_earliest_mp_cleanup_time = None
|
|
|
|
|
|
_last_mp_cleanup_time = None
|
2025-07-12 05:17:44 +08:00
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
# Mark as initialized
|
2025-02-27 15:36:12 +08:00
|
|
|
|
_initialized = True
|
2025-02-28 21:46:45 +08:00
|
|
|
|
|
|
|
|
|
|
|
2025-03-01 10:48:55 +08:00
|
|
|
|
async def initialize_pipeline_status():
|
2025-03-01 03:48:19 +08:00
|
|
|
|
"""
|
|
|
|
|
|
Initialize pipeline namespace with default values.
|
2025-03-02 01:00:27 +08:00
|
|
|
|
This function is called during FASTAPI lifespan for each worker.
|
2025-03-01 03:48:19 +08:00
|
|
|
|
"""
|
|
|
|
|
|
pipeline_namespace = await get_namespace_data("pipeline_status")
|
|
|
|
|
|
|
2025-03-01 10:48:55 +08:00
|
|
|
|
async with get_internal_lock():
|
2025-03-01 03:48:19 +08:00
|
|
|
|
# Check if already initialized by checking for required fields
|
|
|
|
|
|
if "busy" in pipeline_namespace:
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
# Create a shared list object for history_messages
|
2025-03-25 10:48:15 +08:00
|
|
|
|
history_messages = _manager.list() if _is_multiprocess else []
|
2025-03-01 16:23:34 +08:00
|
|
|
|
pipeline_namespace.update(
|
|
|
|
|
|
{
|
2025-03-10 17:14:14 +08:00
|
|
|
|
"autoscanned": False, # Auto-scan started
|
2025-03-01 16:23:34 +08:00
|
|
|
|
"busy": False, # Control concurrent processes
|
2025-03-26 17:48:00 +08:00
|
|
|
|
"job_name": "-", # Current job name (indexing files/indexing texts)
|
2025-03-01 16:23:34 +08:00
|
|
|
|
"job_start": None, # Job start time
|
|
|
|
|
|
"docs": 0, # Total number of documents to be indexed
|
|
|
|
|
|
"batchs": 0, # Number of batches for processing documents
|
|
|
|
|
|
"cur_batch": 0, # Current processing batch
|
|
|
|
|
|
"request_pending": False, # Flag for pending request for processing
|
|
|
|
|
|
"latest_message": "", # Latest message from pipeline processing
|
|
|
|
|
|
"history_messages": history_messages, # 使用共享列表对象
|
|
|
|
|
|
}
|
|
|
|
|
|
)
|
2025-03-01 03:48:19 +08:00
|
|
|
|
direct_log(f"Process {os.getpid()} Pipeline namespace initialized")
|
2025-02-26 18:11:16 +08:00
|
|
|
|
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-03-01 05:01:26 +08:00
|
|
|
|
async def get_update_flag(namespace: str):
|
2025-03-01 01:49:26 +08:00
|
|
|
|
"""
|
2025-03-01 05:01:26 +08:00
|
|
|
|
Create a namespace's update flag for a workers.
|
|
|
|
|
|
Returen the update flag to caller for referencing or reset.
|
2025-03-01 01:49:26 +08:00
|
|
|
|
"""
|
|
|
|
|
|
global _update_flags
|
|
|
|
|
|
if _update_flags is None:
|
|
|
|
|
|
raise ValueError("Try to create namespace before Shared-Data is initialized")
|
|
|
|
|
|
|
2025-03-01 10:48:55 +08:00
|
|
|
|
async with get_internal_lock():
|
2025-03-01 02:22:35 +08:00
|
|
|
|
if namespace not in _update_flags:
|
2025-03-25 10:48:15 +08:00
|
|
|
|
if _is_multiprocess and _manager is not None:
|
2025-03-01 02:22:35 +08:00
|
|
|
|
_update_flags[namespace] = _manager.list()
|
|
|
|
|
|
else:
|
2025-03-01 01:49:26 +08:00
|
|
|
|
_update_flags[namespace] = []
|
2025-03-01 16:23:34 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"Process {os.getpid()} initialized updated flags for namespace: [{namespace}]"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2025-03-25 10:48:15 +08:00
|
|
|
|
if _is_multiprocess and _manager is not None:
|
2025-03-01 16:23:34 +08:00
|
|
|
|
new_update_flag = _manager.Value("b", False)
|
2025-03-01 02:22:35 +08:00
|
|
|
|
else:
|
2025-03-10 15:41:00 +08:00
|
|
|
|
# Create a simple mutable object to store boolean value for compatibility with mutiprocess
|
|
|
|
|
|
class MutableBoolean:
|
|
|
|
|
|
def __init__(self, initial_value=False):
|
|
|
|
|
|
self.value = initial_value
|
2025-03-10 15:41:46 +08:00
|
|
|
|
|
2025-03-10 15:41:00 +08:00
|
|
|
|
new_update_flag = MutableBoolean(False)
|
2025-03-01 16:23:34 +08:00
|
|
|
|
|
2025-03-01 02:22:35 +08:00
|
|
|
|
_update_flags[namespace].append(new_update_flag)
|
|
|
|
|
|
return new_update_flag
|
2025-03-01 01:49:26 +08:00
|
|
|
|
|
2025-03-01 16:23:34 +08:00
|
|
|
|
|
2025-03-01 05:01:26 +08:00
|
|
|
|
async def set_all_update_flags(namespace: str):
|
|
|
|
|
|
"""Set all update flag of namespace indicating all workers need to reload data from files"""
|
2025-03-01 01:49:26 +08:00
|
|
|
|
global _update_flags
|
|
|
|
|
|
if _update_flags is None:
|
|
|
|
|
|
raise ValueError("Try to create namespace before Shared-Data is initialized")
|
2025-03-01 16:23:34 +08:00
|
|
|
|
|
2025-03-01 10:48:55 +08:00
|
|
|
|
async with get_internal_lock():
|
2025-03-01 02:22:35 +08:00
|
|
|
|
if namespace not in _update_flags:
|
|
|
|
|
|
raise ValueError(f"Namespace {namespace} not found in update flags")
|
|
|
|
|
|
# Update flags for both modes
|
|
|
|
|
|
for i in range(len(_update_flags[namespace])):
|
2025-03-25 10:48:15 +08:00
|
|
|
|
_update_flags[namespace][i].value = True
|
2025-03-01 01:49:26 +08:00
|
|
|
|
|
2025-03-10 02:07:19 +08:00
|
|
|
|
|
2025-03-10 01:17:25 +08:00
|
|
|
|
async def clear_all_update_flags(namespace: str):
|
|
|
|
|
|
"""Clear all update flag of namespace indicating all workers need to reload data from files"""
|
|
|
|
|
|
global _update_flags
|
|
|
|
|
|
if _update_flags is None:
|
|
|
|
|
|
raise ValueError("Try to create namespace before Shared-Data is initialized")
|
|
|
|
|
|
|
|
|
|
|
|
async with get_internal_lock():
|
|
|
|
|
|
if namespace not in _update_flags:
|
|
|
|
|
|
raise ValueError(f"Namespace {namespace} not found in update flags")
|
|
|
|
|
|
# Update flags for both modes
|
|
|
|
|
|
for i in range(len(_update_flags[namespace])):
|
2025-03-25 10:48:15 +08:00
|
|
|
|
_update_flags[namespace][i].value = False
|
2025-03-01 01:49:26 +08:00
|
|
|
|
|
2025-03-10 02:07:19 +08:00
|
|
|
|
|
2025-03-01 14:58:26 +08:00
|
|
|
|
async def get_all_update_flags_status() -> Dict[str, list]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
Get update flags status for all namespaces.
|
2025-03-01 16:23:34 +08:00
|
|
|
|
|
2025-03-01 14:58:26 +08:00
|
|
|
|
Returns:
|
|
|
|
|
|
Dict[str, list]: A dictionary mapping namespace names to lists of update flag statuses
|
|
|
|
|
|
"""
|
|
|
|
|
|
if _update_flags is None:
|
|
|
|
|
|
return {}
|
2025-03-01 16:23:34 +08:00
|
|
|
|
|
2025-03-01 14:58:26 +08:00
|
|
|
|
result = {}
|
|
|
|
|
|
async with get_internal_lock():
|
|
|
|
|
|
for namespace, flags in _update_flags.items():
|
|
|
|
|
|
worker_statuses = []
|
|
|
|
|
|
for flag in flags:
|
2025-03-25 10:48:15 +08:00
|
|
|
|
if _is_multiprocess:
|
2025-03-01 14:58:26 +08:00
|
|
|
|
worker_statuses.append(flag.value)
|
|
|
|
|
|
else:
|
|
|
|
|
|
worker_statuses.append(flag)
|
|
|
|
|
|
result[namespace] = worker_statuses
|
2025-03-01 16:23:34 +08:00
|
|
|
|
|
2025-03-01 14:58:26 +08:00
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
|
2025-03-09 15:22:06 +08:00
|
|
|
|
async def try_initialize_namespace(namespace: str) -> bool:
|
2025-02-26 18:11:02 +08:00
|
|
|
|
"""
|
2025-03-02 01:00:27 +08:00
|
|
|
|
Returns True if the current worker(process) gets initialization permission for loading data later.
|
|
|
|
|
|
The worker does not get the permission is prohibited to load data from files.
|
2025-02-26 18:11:02 +08:00
|
|
|
|
"""
|
|
|
|
|
|
global _init_flags, _manager
|
2025-02-26 18:11:16 +08:00
|
|
|
|
|
2025-02-27 19:03:53 +08:00
|
|
|
|
if _init_flags is None:
|
2025-02-28 01:14:25 +08:00
|
|
|
|
raise ValueError("Try to create nanmespace before Shared-Data is initialized")
|
2025-02-26 18:11:16 +08:00
|
|
|
|
|
2025-03-09 15:22:06 +08:00
|
|
|
|
async with get_internal_lock():
|
|
|
|
|
|
if namespace not in _init_flags:
|
|
|
|
|
|
_init_flags[namespace] = True
|
|
|
|
|
|
direct_log(
|
|
|
|
|
|
f"Process {os.getpid()} ready to initialize storage namespace: [{namespace}]"
|
|
|
|
|
|
)
|
|
|
|
|
|
return True
|
2025-02-28 01:25:59 +08:00
|
|
|
|
direct_log(
|
2025-03-09 15:22:06 +08:00
|
|
|
|
f"Process {os.getpid()} storage namespace already initialized: [{namespace}]"
|
2025-02-28 01:25:59 +08:00
|
|
|
|
)
|
2025-03-09 15:22:06 +08:00
|
|
|
|
|
2025-02-27 19:03:53 +08:00
|
|
|
|
return False
|
2025-02-26 05:38:38 +08:00
|
|
|
|
|
2025-02-26 18:11:16 +08:00
|
|
|
|
|
2025-03-01 01:49:26 +08:00
|
|
|
|
async def get_namespace_data(namespace: str) -> Dict[str, Any]:
|
2025-03-02 01:00:27 +08:00
|
|
|
|
"""get the shared data reference for specific namespace"""
|
2025-02-27 19:03:53 +08:00
|
|
|
|
if _shared_dicts is None:
|
2025-02-27 19:05:51 +08:00
|
|
|
|
direct_log(
|
2025-03-02 01:00:27 +08:00
|
|
|
|
f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}",
|
2025-02-27 19:05:51 +08:00
|
|
|
|
level="ERROR",
|
|
|
|
|
|
)
|
2025-02-27 19:03:53 +08:00
|
|
|
|
raise ValueError("Shared dictionaries not initialized")
|
|
|
|
|
|
|
2025-03-01 10:48:55 +08:00
|
|
|
|
async with get_internal_lock():
|
2025-03-01 02:22:35 +08:00
|
|
|
|
if namespace not in _shared_dicts:
|
2025-03-25 10:48:15 +08:00
|
|
|
|
if _is_multiprocess and _manager is not None:
|
2025-03-01 02:22:35 +08:00
|
|
|
|
_shared_dicts[namespace] = _manager.dict()
|
|
|
|
|
|
else:
|
2025-02-26 18:11:02 +08:00
|
|
|
|
_shared_dicts[namespace] = {}
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-02-26 18:11:02 +08:00
|
|
|
|
return _shared_dicts[namespace]
|
2025-02-26 05:38:38 +08:00
|
|
|
|
|
2025-02-26 18:11:16 +08:00
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
def finalize_share_data():
|
|
|
|
|
|
"""
|
|
|
|
|
|
Release shared resources and clean up.
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
This function should be called when the application is shutting down
|
|
|
|
|
|
to properly release shared resources and avoid memory leaks.
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
In multi-process mode, it shuts down the Manager and releases all shared objects.
|
|
|
|
|
|
In single-process mode, it simply resets the global variables.
|
|
|
|
|
|
"""
|
2025-02-27 19:05:51 +08:00
|
|
|
|
global \
|
|
|
|
|
|
_manager, \
|
2025-03-25 10:48:15 +08:00
|
|
|
|
_is_multiprocess, \
|
2025-03-02 01:00:27 +08:00
|
|
|
|
_storage_lock, \
|
|
|
|
|
|
_internal_lock, \
|
|
|
|
|
|
_pipeline_status_lock, \
|
2025-03-08 22:36:41 +08:00
|
|
|
|
_graph_db_lock, \
|
2025-03-09 17:33:15 +08:00
|
|
|
|
_data_init_lock, \
|
2025-02-27 19:05:51 +08:00
|
|
|
|
_shared_dicts, \
|
|
|
|
|
|
_init_flags, \
|
2025-03-02 01:00:27 +08:00
|
|
|
|
_initialized, \
|
2025-03-21 16:08:23 +08:00
|
|
|
|
_update_flags, \
|
|
|
|
|
|
_async_locks
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
# Check if already initialized
|
2025-02-27 15:36:12 +08:00
|
|
|
|
if not _initialized:
|
2025-02-27 19:05:51 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"Process {os.getpid()} storage data not initialized, nothing to finalize"
|
|
|
|
|
|
)
|
2025-02-27 13:25:22 +08:00
|
|
|
|
return
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
|
|
|
|
|
direct_log(
|
2025-03-25 10:48:15 +08:00
|
|
|
|
f"Process {os.getpid()} finalizing storage data (multiprocess={_is_multiprocess})"
|
2025-02-27 19:05:51 +08:00
|
|
|
|
)
|
|
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
# In multi-process mode, shut down the Manager
|
2025-03-25 10:48:15 +08:00
|
|
|
|
if _is_multiprocess and _manager is not None:
|
2025-02-27 13:25:22 +08:00
|
|
|
|
try:
|
2025-03-02 01:00:27 +08:00
|
|
|
|
# Clear shared resources before shutting down Manager
|
2025-02-27 13:25:22 +08:00
|
|
|
|
if _shared_dicts is not None:
|
2025-03-02 01:00:27 +08:00
|
|
|
|
# Clear pipeline status history messages first if exists
|
|
|
|
|
|
try:
|
|
|
|
|
|
pipeline_status = _shared_dicts.get("pipeline_status", {})
|
|
|
|
|
|
if "history_messages" in pipeline_status:
|
|
|
|
|
|
pipeline_status["history_messages"].clear()
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass # Ignore any errors during history messages cleanup
|
2025-02-27 13:25:22 +08:00
|
|
|
|
_shared_dicts.clear()
|
|
|
|
|
|
if _init_flags is not None:
|
|
|
|
|
|
_init_flags.clear()
|
2025-03-02 01:00:27 +08:00
|
|
|
|
if _update_flags is not None:
|
|
|
|
|
|
# Clear each namespace's update flags list and Value objects
|
|
|
|
|
|
try:
|
|
|
|
|
|
for namespace in _update_flags:
|
|
|
|
|
|
flags_list = _update_flags[namespace]
|
|
|
|
|
|
if isinstance(flags_list, list):
|
|
|
|
|
|
# Clear Value objects in the list
|
|
|
|
|
|
for flag in flags_list:
|
|
|
|
|
|
if hasattr(
|
|
|
|
|
|
flag, "value"
|
|
|
|
|
|
): # Check if it's a Value object
|
|
|
|
|
|
flag.value = False
|
|
|
|
|
|
flags_list.clear()
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass # Ignore any errors during update flags cleanup
|
|
|
|
|
|
_update_flags.clear()
|
|
|
|
|
|
|
|
|
|
|
|
# Shut down the Manager - this will automatically clean up all shared resources
|
2025-02-27 13:25:22 +08:00
|
|
|
|
_manager.shutdown()
|
|
|
|
|
|
direct_log(f"Process {os.getpid()} Manager shutdown complete")
|
|
|
|
|
|
except Exception as e:
|
2025-02-27 19:05:51 +08:00
|
|
|
|
direct_log(
|
|
|
|
|
|
f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR"
|
|
|
|
|
|
)
|
|
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
# Reset global variables
|
|
|
|
|
|
_manager = None
|
|
|
|
|
|
_initialized = None
|
2025-03-25 10:48:15 +08:00
|
|
|
|
_is_multiprocess = None
|
2025-02-27 13:25:22 +08:00
|
|
|
|
_shared_dicts = None
|
|
|
|
|
|
_init_flags = None
|
2025-03-02 01:00:27 +08:00
|
|
|
|
_storage_lock = None
|
|
|
|
|
|
_internal_lock = None
|
|
|
|
|
|
_pipeline_status_lock = None
|
2025-03-08 22:36:41 +08:00
|
|
|
|
_graph_db_lock = None
|
2025-03-09 17:33:15 +08:00
|
|
|
|
_data_init_lock = None
|
2025-03-02 01:00:27 +08:00
|
|
|
|
_update_flags = None
|
2025-03-21 16:08:23 +08:00
|
|
|
|
_async_locks = None
|
2025-02-27 19:05:51 +08:00
|
|
|
|
|
2025-02-27 13:25:22 +08:00
|
|
|
|
direct_log(f"Process {os.getpid()} storage data finalization complete")
|