From a64c767298d15e9361d63e90b98ec240e1db7105 Mon Sep 17 00:00:00 2001 From: yangdx Date: Fri, 11 Jul 2025 23:43:40 +0800 Subject: [PATCH] optimize: improve lock cleanup performance with threshold-based strategy - Add CLEANUP_THRESHOLD constant (100) to control cleanup frequency - Modify _release_shared_raw_mp_lock to only scan when cleanup list exceeds threshold - Modify _release_async_lock to only scan when cleanup list exceeds threshold --- lightrag/kg/shared_storage.py | 61 ++++++++++++++++++++++------------- 1 file changed, 38 insertions(+), 23 deletions(-) diff --git a/lightrag/kg/shared_storage.py b/lightrag/kg/shared_storage.py index 9330ac6a..0f961687 100644 --- a/lightrag/kg/shared_storage.py +++ b/lightrag/kg/shared_storage.py @@ -37,6 +37,8 @@ _lock_cleanup_data: Optional[Dict[str, time.time]] = None _registry_guard = None # Timeout for keyed locks in seconds CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300 +# Threshold for triggering cleanup - only clean when pending list exceeds this size +CLEANUP_THRESHOLD = 100 _initialized = None @@ -60,6 +62,7 @@ _async_locks: Optional[Dict[str, asyncio.Lock]] = None DEBUG_LOCKS = False _debug_n_locks_acquired: int = 0 + def inc_debug_n_locks_acquired(): global _debug_n_locks_acquired if DEBUG_LOCKS: @@ -324,11 +327,13 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str): if count == 0: _lock_cleanup_data[combined_key] = current_time - for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): - if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: - _lock_registry.pop(cleanup_key, None) - _lock_registry_count.pop(cleanup_key, None) - _lock_cleanup_data.pop(cleanup_key, None) + # Only perform cleanup when the pending cleanup list exceeds threshold + if len(_lock_cleanup_data) > CLEANUP_THRESHOLD: + for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()): + if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + _lock_registry.pop(cleanup_key, None) + _lock_registry_count.pop(cleanup_key, None) + _lock_cleanup_data.pop(cleanup_key, None) class KeyedUnifiedLock: @@ -347,9 +352,15 @@ class KeyedUnifiedLock: self._factory_name = factory_name self._default_enable_logging = default_enable_logging self._async_lock: Dict[str, asyncio.Lock] = {} # local keyed locks - self._async_lock_count: Dict[str, int] = {} # local keyed locks referenced count - self._async_lock_cleanup_data: Dict[str, time.time] = {} # local keyed locks timeout - self._mp_locks: Dict[str, mp.synchronize.Lock] = {} # multi-process lock proxies + self._async_lock_count: Dict[ + str, int + ] = {} # local keyed locks referenced count + self._async_lock_cleanup_data: Dict[ + str, time.time + ] = {} # local keyed locks timeout + self._mp_locks: Dict[ + str, mp.synchronize.Lock + ] = {} # multi-process lock proxies def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None): """ @@ -388,11 +399,15 @@ class KeyedUnifiedLock: self._async_lock_cleanup_data[key] = current_time self._async_lock_count[key] = count - for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()): - if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: - self._async_lock.pop(cleanup_key) - self._async_lock_count.pop(cleanup_key) - self._async_lock_cleanup_data.pop(cleanup_key) + # Only perform cleanup when the pending cleanup list exceeds threshold + if len(self._async_lock_cleanup_data) > CLEANUP_THRESHOLD: + for cleanup_key, cleanup_time in list( + self._async_lock_cleanup_data.items() + ): + if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS: + self._async_lock.pop(cleanup_key) + self._async_lock_count.pop(cleanup_key) + self._async_lock_cleanup_data.pop(cleanup_key) def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock: # 1. get (or create) the per‑process async gate for this key @@ -408,19 +423,19 @@ class KeyedUnifiedLock: # 3. build a *fresh* UnifiedLock with the chosen logging flag if is_multiprocess: return UnifiedLock( - lock = raw_lock, - is_async = False, # manager.Lock is synchronous - name = _get_combined_key(self._factory_name, key), - enable_logging = enable_logging, - async_lock = async_lock, # prevents event‑loop blocking + lock=raw_lock, + is_async=False, # manager.Lock is synchronous + name=_get_combined_key(self._factory_name, key), + enable_logging=enable_logging, + async_lock=async_lock, # prevents event‑loop blocking ) else: return UnifiedLock( - lock = raw_lock, - is_async = True, - name = _get_combined_key(self._factory_name, key), - enable_logging = enable_logging, - async_lock = None, # No need for async lock in single process mode + lock=raw_lock, + is_async=True, + name=_get_combined_key(self._factory_name, key), + enable_logging=enable_logging, + async_lock=None, # No need for async lock in single process mode ) def _release_lock_for_key(self, key: str):