optimize: improve lock cleanup performance with threshold-based strategy

- Add CLEANUP_THRESHOLD constant (100) to control cleanup frequency
- Modify _release_shared_raw_mp_lock to only scan when cleanup list exceeds threshold
- Modify _release_async_lock to only scan when cleanup list exceeds threshold
This commit is contained in:
yangdx 2025-07-11 23:43:40 +08:00
parent ad99d9ba5a
commit a64c767298

View File

@ -37,6 +37,8 @@ _lock_cleanup_data: Optional[Dict[str, time.time]] = None
_registry_guard = None
# Timeout for keyed locks in seconds
CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300
# Threshold for triggering cleanup - only clean when pending list exceeds this size
CLEANUP_THRESHOLD = 100
_initialized = None
@ -60,6 +62,7 @@ _async_locks: Optional[Dict[str, asyncio.Lock]] = None
DEBUG_LOCKS = False
_debug_n_locks_acquired: int = 0
def inc_debug_n_locks_acquired():
global _debug_n_locks_acquired
if DEBUG_LOCKS:
@ -324,11 +327,13 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str):
if count == 0:
_lock_cleanup_data[combined_key] = current_time
for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()):
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
_lock_registry.pop(cleanup_key, None)
_lock_registry_count.pop(cleanup_key, None)
_lock_cleanup_data.pop(cleanup_key, None)
# Only perform cleanup when the pending cleanup list exceeds threshold
if len(_lock_cleanup_data) > CLEANUP_THRESHOLD:
for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()):
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
_lock_registry.pop(cleanup_key, None)
_lock_registry_count.pop(cleanup_key, None)
_lock_cleanup_data.pop(cleanup_key, None)
class KeyedUnifiedLock:
@ -347,9 +352,15 @@ class KeyedUnifiedLock:
self._factory_name = factory_name
self._default_enable_logging = default_enable_logging
self._async_lock: Dict[str, asyncio.Lock] = {} # local keyed locks
self._async_lock_count: Dict[str, int] = {} # local keyed locks referenced count
self._async_lock_cleanup_data: Dict[str, time.time] = {} # local keyed locks timeout
self._mp_locks: Dict[str, mp.synchronize.Lock] = {} # multi-process lock proxies
self._async_lock_count: Dict[
str, int
] = {} # local keyed locks referenced count
self._async_lock_cleanup_data: Dict[
str, time.time
] = {} # local keyed locks timeout
self._mp_locks: Dict[
str, mp.synchronize.Lock
] = {} # multi-process lock proxies
def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None):
"""
@ -388,11 +399,15 @@ class KeyedUnifiedLock:
self._async_lock_cleanup_data[key] = current_time
self._async_lock_count[key] = count
for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()):
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
self._async_lock.pop(cleanup_key)
self._async_lock_count.pop(cleanup_key)
self._async_lock_cleanup_data.pop(cleanup_key)
# Only perform cleanup when the pending cleanup list exceeds threshold
if len(self._async_lock_cleanup_data) > CLEANUP_THRESHOLD:
for cleanup_key, cleanup_time in list(
self._async_lock_cleanup_data.items()
):
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
self._async_lock.pop(cleanup_key)
self._async_lock_count.pop(cleanup_key)
self._async_lock_cleanup_data.pop(cleanup_key)
def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock:
# 1. get (or create) the perprocess async gate for this key
@ -408,19 +423,19 @@ class KeyedUnifiedLock:
# 3. build a *fresh* UnifiedLock with the chosen logging flag
if is_multiprocess:
return UnifiedLock(
lock = raw_lock,
is_async = False, # manager.Lock is synchronous
name = _get_combined_key(self._factory_name, key),
enable_logging = enable_logging,
async_lock = async_lock, # prevents eventloop blocking
lock=raw_lock,
is_async=False, # manager.Lock is synchronous
name=_get_combined_key(self._factory_name, key),
enable_logging=enable_logging,
async_lock=async_lock, # prevents eventloop blocking
)
else:
return UnifiedLock(
lock = raw_lock,
is_async = True,
name = _get_combined_key(self._factory_name, key),
enable_logging = enable_logging,
async_lock = None, # No need for async lock in single process mode
lock=raw_lock,
is_async=True,
name=_get_combined_key(self._factory_name, key),
enable_logging=enable_logging,
async_lock=None, # No need for async lock in single process mode
)
def _release_lock_for_key(self, key: str):