From ad99d9ba5ab1bcc00f61cf42caa50b08f99adae9 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 11 Jul 2025 22:13:02 +0800 Subject: [PATCH] Improve code organization and comments --- lightrag/kg/shared_storage.py | 67 +++++++++++++++-------------------- 1 file changed, 29 insertions(+), 38 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index fdd4adcd..9330ac6a 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -29,14 +29,17 @@ LockType = Union[ProcessLock, asyncio.Lock] _is_multiprocess = None _workers = None _manager = None + +# Global singleton data for multi-process keyed locks _lock_registry: Optional[Dict[str, mp.synchronize.Lock]] = None _lock_registry_count: Optional[Dict[str, int]] = None _lock_cleanup_data: Optional[Dict[str, time.time]] = None _registry_guard = None -_initialized = None - +# Timeout for keyed locks in seconds CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300 +_initialized = None + # shared data for storage across processes _shared_dicts: Optional[Dict[str, Any]] = None _init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized @@ -48,6 +51,7 @@ _internal_lock: Optional[LockType] = None _pipeline_status_lock: Optional[LockType] = None _graph_db_lock: Optional[LockType] = None _data_init_lock: Optional[LockType] = None +# Manager for all keyed locks _graph_db_lock_keyed: Optional["KeyedUnifiedLock"] = None # async locks for coroutine synchronization in multiprocess mode @@ -56,7 +60,6 @@ _async_locks: Optional[Dict[str, asyncio.Lock]] = None DEBUG_LOCKS = False _debug_n_locks_acquired: int = 0 - def inc_debug_n_locks_acquired(): global _debug_n_locks_acquired if DEBUG_LOCKS: @@ -259,9 +262,6 @@ class UnifiedLock(Generic[T]): return self._lock.locked() -# ───────────────────────────────────────────────────────────────────────────── -# 2. CROSS‑PROCESS FACTORY (one manager.Lock shared by *all* processes) -# ───────────────────────────────────────────────────────────────────────────── def _get_combined_key(factory_name: str, key: str) -> str: """Return the combined key for the factory and key.""" return f"{factory_name}:{key}" @@ -270,7 +270,7 @@ def _get_combined_key(factory_name: str, key: str) -> str: def _get_or_create_shared_raw_mp_lock( factory_name: str, key: str ) -> Optional[mp.synchronize.Lock]: - """Return the *singleton* manager.Lock() proxy for *key*, creating if needed.""" + """Return the *singleton* manager.Lock() proxy for keyed lock, creating if needed.""" if not _is_multiprocess: return None @@ -281,20 +281,18 @@ def _get_or_create_shared_raw_mp_lock( if raw is None: raw = _manager.Lock() _lock_registry[combined_key] = raw - _lock_registry_count[combined_key] = ( - 1 # 修复:新锁初始化为1,与释放逻辑保持一致 - ) + count = 0 else: if count is None: raise RuntimeError( f"Shared-Data lock registry for {factory_name} is corrupted for key {key}" ) - count += 1 - _lock_registry_count[combined_key] = count if ( count == 1 and combined_key in _lock_cleanup_data - ): # 把再次使用的锁添剔除出待清理字典 + ): # Reusing an key waiting for cleanup, remove it from cleanup list _lock_cleanup_data.pop(combined_key) + count += 1 + _lock_registry_count[combined_key] = count return raw @@ -326,7 +324,6 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): if count == 0: _lock_cleanup_data[combined_key] = current_time - # 清理过期的锁 for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: _lock_registry.pop(cleanup_key, None) @@ -334,31 +331,26 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): _lock_cleanup_data.pop(cleanup_key, None) -# ───────────────────────────────────────────────────────────────────────────── -# 3. PARAMETER‑KEYED WRAPPER (unchanged except it *accepts a factory*) -# ───────────────────────────────────────────────────────────────────────────── class KeyedUnifiedLock: """ - Parameter‑keyed wrapper around `UnifiedLock`. + Manager for unified keyed locks, supporting both single and multi-process - • Keeps only a table of per‑key *asyncio* gates locally - • Fetches the shared process‑wide mutex on *every* acquire + • Keeps only a table of async keyed locks locally + • Fetches the multi-process keyed lockon every acquire • Builds a fresh `UnifiedLock` each time, so `enable_logging` (or future options) can vary per call. """ - # ---------------- construction ---------------- def __init__( self, factory_name: str, *, default_enable_logging: bool = True ) -> None: self._factory_name = factory_name self._default_enable_logging = default_enable_logging - self._async_lock: Dict[str, asyncio.Lock] = {} # key → asyncio.Lock - self._async_lock_count: Dict[str, int] = {} # key → asyncio.Lock count - self._async_lock_cleanup_data: Dict[str, time.time] = {} # key → time.time - self._mp_locks: Dict[str, mp.synchronize.Lock] = {} # key → mp.synchronize.Lock + self._async_lock: Dict[str, asyncio.Lock] = {} # local keyed locks + self._async_lock_count: Dict[str, int] = {} # local keyed locks referenced count + self._async_lock_cleanup_data: Dict[str, time.time] = {} # local keyed locks timeout + self._mp_locks: Dict[str, mp.synchronize.Lock] = {} # multi-process lock proxies - # ---------------- public API ------------------ def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None): """ Ergonomic helper so you can write: @@ -391,12 +383,11 @@ class KeyedUnifiedLock: count = self._async_lock_count.get(key, 0) count -= 1 - current_time = time.time() # 优化:只调用一次 time.time() + current_time = time.time() if count == 0: self._async_lock_cleanup_data[key] = current_time self._async_lock_count[key] = count - # 使用缓存的时间戳进行清理,避免在循环中重复调用 time.time() for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()): if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: self._async_lock.pop(cleanup_key) @@ -417,19 +408,19 @@ class KeyedUnifiedLock: # 3. build a *fresh* UnifiedLock with the chosen logging flag if is_multiprocess: return UnifiedLock( - lock=raw_lock, - is_async=False, # manager.Lock is synchronous - name=f"key:{self._factory_name}:{key}", - enable_logging=enable_logging, - async_lock=async_lock, # prevents event‑loop blocking + lock = raw_lock, + is_async = False, # manager.Lock is synchronous + name = _get_combined_key(self._factory_name, key), + enable_logging = enable_logging, + async_lock = async_lock, # prevents event‑loop blocking ) else: return UnifiedLock( - lock=raw_lock, - is_async=True, - name=f"key:{self._factory_name}:{key}", - enable_logging=enable_logging, - async_lock=None, # No need for async lock in single process mode + lock = raw_lock, + is_async = True, + name = _get_combined_key(self._factory_name, key), + enable_logging = enable_logging, + async_lock = None, # No need for async lock in single process mode ) def _release_lock_for_key(self, key: str):