mirror of
https://github.com/HKUDS/LightRAG.git
synced 2025-08-05 23:31:54 +00:00
Improve code organization and comments
This commit is contained in:
parent
c52c451cf7
commit
ad99d9ba5a
@ -29,14 +29,17 @@ LockType = Union[ProcessLock, asyncio.Lock]
|
|||||||
_is_multiprocess = None
|
_is_multiprocess = None
|
||||||
_workers = None
|
_workers = None
|
||||||
_manager = None
|
_manager = None
|
||||||
|
|
||||||
|
# Global singleton data for multi-process keyed locks
|
||||||
_lock_registry: Optional[Dict[str, mp.synchronize.Lock]] = None
|
_lock_registry: Optional[Dict[str, mp.synchronize.Lock]] = None
|
||||||
_lock_registry_count: Optional[Dict[str, int]] = None
|
_lock_registry_count: Optional[Dict[str, int]] = None
|
||||||
_lock_cleanup_data: Optional[Dict[str, time.time]] = None
|
_lock_cleanup_data: Optional[Dict[str, time.time]] = None
|
||||||
_registry_guard = None
|
_registry_guard = None
|
||||||
_initialized = None
|
# Timeout for keyed locks in seconds
|
||||||
|
|
||||||
CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300
|
CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300
|
||||||
|
|
||||||
|
_initialized = None
|
||||||
|
|
||||||
# shared data for storage across processes
|
# shared data for storage across processes
|
||||||
_shared_dicts: Optional[Dict[str, Any]] = None
|
_shared_dicts: Optional[Dict[str, Any]] = None
|
||||||
_init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
|
_init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
|
||||||
@ -48,6 +51,7 @@ _internal_lock: Optional[LockType] = None
|
|||||||
_pipeline_status_lock: Optional[LockType] = None
|
_pipeline_status_lock: Optional[LockType] = None
|
||||||
_graph_db_lock: Optional[LockType] = None
|
_graph_db_lock: Optional[LockType] = None
|
||||||
_data_init_lock: Optional[LockType] = None
|
_data_init_lock: Optional[LockType] = None
|
||||||
|
# Manager for all keyed locks
|
||||||
_graph_db_lock_keyed: Optional["KeyedUnifiedLock"] = None
|
_graph_db_lock_keyed: Optional["KeyedUnifiedLock"] = None
|
||||||
|
|
||||||
# async locks for coroutine synchronization in multiprocess mode
|
# async locks for coroutine synchronization in multiprocess mode
|
||||||
@ -56,7 +60,6 @@ _async_locks: Optional[Dict[str, asyncio.Lock]] = None
|
|||||||
DEBUG_LOCKS = False
|
DEBUG_LOCKS = False
|
||||||
_debug_n_locks_acquired: int = 0
|
_debug_n_locks_acquired: int = 0
|
||||||
|
|
||||||
|
|
||||||
def inc_debug_n_locks_acquired():
|
def inc_debug_n_locks_acquired():
|
||||||
global _debug_n_locks_acquired
|
global _debug_n_locks_acquired
|
||||||
if DEBUG_LOCKS:
|
if DEBUG_LOCKS:
|
||||||
@ -259,9 +262,6 @@ class UnifiedLock(Generic[T]):
|
|||||||
return self._lock.locked()
|
return self._lock.locked()
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# 2. CROSS‑PROCESS FACTORY (one manager.Lock shared by *all* processes)
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
def _get_combined_key(factory_name: str, key: str) -> str:
|
def _get_combined_key(factory_name: str, key: str) -> str:
|
||||||
"""Return the combined key for the factory and key."""
|
"""Return the combined key for the factory and key."""
|
||||||
return f"{factory_name}:{key}"
|
return f"{factory_name}:{key}"
|
||||||
@ -270,7 +270,7 @@ def _get_combined_key(factory_name: str, key: str) -> str:
|
|||||||
def _get_or_create_shared_raw_mp_lock(
|
def _get_or_create_shared_raw_mp_lock(
|
||||||
factory_name: str, key: str
|
factory_name: str, key: str
|
||||||
) -> Optional[mp.synchronize.Lock]:
|
) -> Optional[mp.synchronize.Lock]:
|
||||||
"""Return the *singleton* manager.Lock() proxy for *key*, creating if needed."""
|
"""Return the *singleton* manager.Lock() proxy for keyed lock, creating if needed."""
|
||||||
if not _is_multiprocess:
|
if not _is_multiprocess:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
@ -281,20 +281,18 @@ def _get_or_create_shared_raw_mp_lock(
|
|||||||
if raw is None:
|
if raw is None:
|
||||||
raw = _manager.Lock()
|
raw = _manager.Lock()
|
||||||
_lock_registry[combined_key] = raw
|
_lock_registry[combined_key] = raw
|
||||||
_lock_registry_count[combined_key] = (
|
count = 0
|
||||||
1 # 修复:新锁初始化为1,与释放逻辑保持一致
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
if count is None:
|
if count is None:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
f"Shared-Data lock registry for {factory_name} is corrupted for key {key}"
|
f"Shared-Data lock registry for {factory_name} is corrupted for key {key}"
|
||||||
)
|
)
|
||||||
count += 1
|
|
||||||
_lock_registry_count[combined_key] = count
|
|
||||||
if (
|
if (
|
||||||
count == 1 and combined_key in _lock_cleanup_data
|
count == 1 and combined_key in _lock_cleanup_data
|
||||||
): # 把再次使用的锁添剔除出待清理字典
|
): # Reusing an key waiting for cleanup, remove it from cleanup list
|
||||||
_lock_cleanup_data.pop(combined_key)
|
_lock_cleanup_data.pop(combined_key)
|
||||||
|
count += 1
|
||||||
|
_lock_registry_count[combined_key] = count
|
||||||
return raw
|
return raw
|
||||||
|
|
||||||
|
|
||||||
@ -326,7 +324,6 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str):
|
|||||||
if count == 0:
|
if count == 0:
|
||||||
_lock_cleanup_data[combined_key] = current_time
|
_lock_cleanup_data[combined_key] = current_time
|
||||||
|
|
||||||
# 清理过期的锁
|
|
||||||
for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()):
|
for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()):
|
||||||
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
||||||
_lock_registry.pop(cleanup_key, None)
|
_lock_registry.pop(cleanup_key, None)
|
||||||
@ -334,31 +331,26 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str):
|
|||||||
_lock_cleanup_data.pop(cleanup_key, None)
|
_lock_cleanup_data.pop(cleanup_key, None)
|
||||||
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
# 3. PARAMETER‑KEYED WRAPPER (unchanged except it *accepts a factory*)
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────────
|
|
||||||
class KeyedUnifiedLock:
|
class KeyedUnifiedLock:
|
||||||
"""
|
"""
|
||||||
Parameter‑keyed wrapper around `UnifiedLock`.
|
Manager for unified keyed locks, supporting both single and multi-process
|
||||||
|
|
||||||
• Keeps only a table of per‑key *asyncio* gates locally
|
• Keeps only a table of async keyed locks locally
|
||||||
• Fetches the shared process‑wide mutex on *every* acquire
|
• Fetches the multi-process keyed lockon every acquire
|
||||||
• Builds a fresh `UnifiedLock` each time, so `enable_logging`
|
• Builds a fresh `UnifiedLock` each time, so `enable_logging`
|
||||||
(or future options) can vary per call.
|
(or future options) can vary per call.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# ---------------- construction ----------------
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self, factory_name: str, *, default_enable_logging: bool = True
|
self, factory_name: str, *, default_enable_logging: bool = True
|
||||||
) -> None:
|
) -> None:
|
||||||
self._factory_name = factory_name
|
self._factory_name = factory_name
|
||||||
self._default_enable_logging = default_enable_logging
|
self._default_enable_logging = default_enable_logging
|
||||||
self._async_lock: Dict[str, asyncio.Lock] = {} # key → asyncio.Lock
|
self._async_lock: Dict[str, asyncio.Lock] = {} # local keyed locks
|
||||||
self._async_lock_count: Dict[str, int] = {} # key → asyncio.Lock count
|
self._async_lock_count: Dict[str, int] = {} # local keyed locks referenced count
|
||||||
self._async_lock_cleanup_data: Dict[str, time.time] = {} # key → time.time
|
self._async_lock_cleanup_data: Dict[str, time.time] = {} # local keyed locks timeout
|
||||||
self._mp_locks: Dict[str, mp.synchronize.Lock] = {} # key → mp.synchronize.Lock
|
self._mp_locks: Dict[str, mp.synchronize.Lock] = {} # multi-process lock proxies
|
||||||
|
|
||||||
# ---------------- public API ------------------
|
|
||||||
def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None):
|
def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None):
|
||||||
"""
|
"""
|
||||||
Ergonomic helper so you can write:
|
Ergonomic helper so you can write:
|
||||||
@ -391,12 +383,11 @@ class KeyedUnifiedLock:
|
|||||||
count = self._async_lock_count.get(key, 0)
|
count = self._async_lock_count.get(key, 0)
|
||||||
count -= 1
|
count -= 1
|
||||||
|
|
||||||
current_time = time.time() # 优化:只调用一次 time.time()
|
current_time = time.time()
|
||||||
if count == 0:
|
if count == 0:
|
||||||
self._async_lock_cleanup_data[key] = current_time
|
self._async_lock_cleanup_data[key] = current_time
|
||||||
self._async_lock_count[key] = count
|
self._async_lock_count[key] = count
|
||||||
|
|
||||||
# 使用缓存的时间戳进行清理,避免在循环中重复调用 time.time()
|
|
||||||
for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()):
|
for cleanup_key, cleanup_time in list(self._async_lock_cleanup_data.items()):
|
||||||
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
|
||||||
self._async_lock.pop(cleanup_key)
|
self._async_lock.pop(cleanup_key)
|
||||||
@ -417,19 +408,19 @@ class KeyedUnifiedLock:
|
|||||||
# 3. build a *fresh* UnifiedLock with the chosen logging flag
|
# 3. build a *fresh* UnifiedLock with the chosen logging flag
|
||||||
if is_multiprocess:
|
if is_multiprocess:
|
||||||
return UnifiedLock(
|
return UnifiedLock(
|
||||||
lock=raw_lock,
|
lock = raw_lock,
|
||||||
is_async=False, # manager.Lock is synchronous
|
is_async = False, # manager.Lock is synchronous
|
||||||
name=f"key:{self._factory_name}:{key}",
|
name = _get_combined_key(self._factory_name, key),
|
||||||
enable_logging=enable_logging,
|
enable_logging = enable_logging,
|
||||||
async_lock=async_lock, # prevents event‑loop blocking
|
async_lock = async_lock, # prevents event‑loop blocking
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
return UnifiedLock(
|
return UnifiedLock(
|
||||||
lock=raw_lock,
|
lock = raw_lock,
|
||||||
is_async=True,
|
is_async = True,
|
||||||
name=f"key:{self._factory_name}:{key}",
|
name = _get_combined_key(self._factory_name, key),
|
||||||
enable_logging=enable_logging,
|
enable_logging = enable_logging,
|
||||||
async_lock=None, # No need for async lock in single process mode
|
async_lock = None, # No need for async lock in single process mode
|
||||||
)
|
)
|
||||||
|
|
||||||
def _release_lock_for_key(self, key: str):
|
def _release_lock_for_key(self, key: str):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user