Feat: Add keyed lock cleanup and status monitoring

This commit is contained in:
yangdx 2025-07-13 00:09:00 +08:00
parent e4bf4d19a0
commit 0e3aaa318f
2 changed files with 374 additions and 161 deletions

View File

@ -52,6 +52,7 @@ from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
initialize_pipeline_status,
cleanup_keyed_lock,
)
from fastapi.security import OAuth2PasswordRequestForm
from lightrag.api.auth import auth_handler
@ -486,6 +487,9 @@ def create_app(args):
else:
auth_mode = "enabled"
# Cleanup expired keyed locks and get status
keyed_lock_info = cleanup_keyed_lock()
return {
"status": "healthy",
"working_directory": str(args.working_dir),
@ -517,6 +521,7 @@ def create_app(args):
},
"auth_mode": auth_mode,
"pipeline_busy": pipeline_status.get("busy", False),
"keyed_locks": keyed_lock_info,
"core_version": core_version,
"api_version": __api_version__,
"webui_title": webui_title,

View File

@ -280,6 +280,125 @@ def _get_combined_key(factory_name: str, key: str) -> str:
return f"{factory_name}:{key}"
def _perform_lock_cleanup(
lock_type: str,
cleanup_data: Dict[str, float],
lock_registry: Optional[Dict[str, Any]],
lock_count: Optional[Dict[str, int]],
earliest_cleanup_time: Optional[float],
last_cleanup_time: Optional[float],
current_time: float,
threshold_check: bool = True,
) -> tuple[int, Optional[float], Optional[float]]:
"""
Generic lock cleanup function to unify cleanup logic for both multiprocess and async locks.
Args:
lock_type: Lock type identifier ("mp" or "async")
cleanup_data: Cleanup data dictionary
lock_registry: Lock registry dictionary (can be None for async locks)
lock_count: Lock count dictionary (can be None for async locks)
earliest_cleanup_time: Earliest cleanup time
last_cleanup_time: Last cleanup time
current_time: Current time
threshold_check: Whether to check threshold condition (default True, set to False in cleanup_expired_locks)
Returns:
tuple: (cleaned_count, new_earliest_time, new_last_cleanup_time)
"""
if len(cleanup_data) == 0:
return 0, earliest_cleanup_time, last_cleanup_time
# If threshold check is needed and threshold not reached, return directly
if threshold_check and len(cleanup_data) < CLEANUP_THRESHOLD:
return 0, earliest_cleanup_time, last_cleanup_time
# Time rollback detection
if last_cleanup_time is not None and current_time < last_cleanup_time:
direct_log(
f"== {lock_type} Lock == Time rollback detected, resetting cleanup time",
level="WARNING",
enable_output=False,
)
last_cleanup_time = None
# Check cleanup conditions
has_expired_locks = (
earliest_cleanup_time is not None
and current_time - earliest_cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS
)
interval_satisfied = (
last_cleanup_time is None
or current_time - last_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS
)
if not (has_expired_locks and interval_satisfied):
return 0, earliest_cleanup_time, last_cleanup_time
try:
cleaned_count = 0
new_earliest_time = None
# Perform cleanup operation
for cleanup_key, cleanup_time in list(cleanup_data.items()):
if current_time - cleanup_time > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
# Remove from cleanup data
cleanup_data.pop(cleanup_key, None)
# Remove from lock registry if exists
if lock_registry is not None:
lock_registry.pop(cleanup_key, None)
if lock_count is not None:
lock_count.pop(cleanup_key, None)
cleaned_count += 1
else:
# Track the earliest time among remaining locks
if new_earliest_time is None or cleanup_time < new_earliest_time:
new_earliest_time = cleanup_time
# Update state only after successful cleanup
if cleaned_count > 0:
new_last_cleanup_time = current_time
# Log cleanup results
next_cleanup_in = max(
(new_earliest_time + CLEANUP_KEYED_LOCKS_AFTER_SECONDS - current_time)
if new_earliest_time
else float("inf"),
MIN_CLEANUP_INTERVAL_SECONDS,
)
total_cleanup_len = len(cleanup_data)
if lock_type == "async":
direct_log(
f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired {lock_type} locks, "
f"next cleanup in {next_cleanup_in:.1f}s",
enable_output=False,
level="INFO",
)
else:
direct_log(
f"== {lock_type} Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, "
f"next cleanup in {next_cleanup_in:.1f}s",
enable_output=False,
level="INFO",
)
return cleaned_count, new_earliest_time, new_last_cleanup_time
else:
return 0, earliest_cleanup_time, last_cleanup_time
except Exception as e:
direct_log(
f"== {lock_type} Lock == Cleanup failed: {e}",
level="ERROR",
enable_output=False,
)
return 0, earliest_cleanup_time, last_cleanup_time
def _get_or_create_shared_raw_mp_lock(
factory_name: str, key: str
) -> Optional[mp.synchronize.Lock]:
@ -346,86 +465,22 @@ def _release_shared_raw_mp_lock(factory_name: str, key: str):
):
_earliest_mp_cleanup_time = current_time
# Efficient cleanup triggering with minimum interval control
total_cleanup_len = len(_lock_cleanup_data)
if total_cleanup_len >= CLEANUP_THRESHOLD:
# Time rollback detection
if (
_last_mp_cleanup_time is not None
and current_time < _last_mp_cleanup_time
):
direct_log(
"== mp Lock == Time rollback detected, resetting cleanup time",
level="WARNING",
enable_output=False,
)
_last_mp_cleanup_time = None
# Use generic cleanup function
cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup(
lock_type="mp",
cleanup_data=_lock_cleanup_data,
lock_registry=_lock_registry,
lock_count=_lock_registry_count,
earliest_cleanup_time=_earliest_mp_cleanup_time,
last_cleanup_time=_last_mp_cleanup_time,
current_time=current_time,
threshold_check=True,
)
# Check cleanup conditions
has_expired_locks = (
_earliest_mp_cleanup_time is not None
and current_time - _earliest_mp_cleanup_time
> CLEANUP_KEYED_LOCKS_AFTER_SECONDS
)
interval_satisfied = (
_last_mp_cleanup_time is None
or current_time - _last_mp_cleanup_time > MIN_CLEANUP_INTERVAL_SECONDS
)
if has_expired_locks and interval_satisfied:
try:
cleaned_count = 0
new_earliest_time = None
# Perform cleanup while maintaining the new earliest time
# Clean expired locks from all namespaces
for cleanup_key, cleanup_time in list(_lock_cleanup_data.items()):
if (
current_time - cleanup_time
> CLEANUP_KEYED_LOCKS_AFTER_SECONDS
):
_lock_registry.pop(cleanup_key, None)
_lock_registry_count.pop(cleanup_key, None)
_lock_cleanup_data.pop(cleanup_key, None)
cleaned_count += 1
else:
# Track the earliest time among remaining locks
if (
new_earliest_time is None
or cleanup_time < new_earliest_time
):
new_earliest_time = cleanup_time
# Update state only after successful cleanup
_earliest_mp_cleanup_time = new_earliest_time
_last_mp_cleanup_time = current_time
if cleaned_count > 0:
next_cleanup_in = max(
(
new_earliest_time
+ CLEANUP_KEYED_LOCKS_AFTER_SECONDS
- current_time
)
if new_earliest_time
else float("inf"),
MIN_CLEANUP_INTERVAL_SECONDS,
)
direct_log(
f"== mp Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired locks, "
f"next cleanup in {next_cleanup_in:.1f}s",
enable_output=False,
level="INFO",
)
except Exception as e:
direct_log(
f"== mp Lock == Cleanup failed: {e}",
level="ERROR",
enable_output=False,
)
# Don't update _last_mp_cleanup_time to allow retry
# Update global state if cleanup was performed
if cleaned_count > 0:
_earliest_mp_cleanup_time = new_earliest_time
_last_mp_cleanup_time = new_last_cleanup_time
class KeyedUnifiedLock:
@ -504,89 +559,22 @@ class KeyedUnifiedLock:
self._earliest_async_cleanup_time = current_time
self._async_lock_count[combined_key] = count
# Efficient cleanup triggering with minimum interval control
total_cleanup_len = len(self._async_lock_cleanup_data)
if total_cleanup_len >= CLEANUP_THRESHOLD:
# Time rollback detection
if (
self._last_async_cleanup_time is not None
and current_time < self._last_async_cleanup_time
):
direct_log(
"== async Lock == Time rollback detected, resetting cleanup time",
level="WARNING",
enable_output=False,
)
self._last_async_cleanup_time = None
# Use generic cleanup function
cleaned_count, new_earliest_time, new_last_cleanup_time = _perform_lock_cleanup(
lock_type="async",
cleanup_data=self._async_lock_cleanup_data,
lock_registry=self._async_lock,
lock_count=self._async_lock_count,
earliest_cleanup_time=self._earliest_async_cleanup_time,
last_cleanup_time=self._last_async_cleanup_time,
current_time=current_time,
threshold_check=True,
)
# Check cleanup conditions
has_expired_locks = (
self._earliest_async_cleanup_time is not None
and current_time - self._earliest_async_cleanup_time
> CLEANUP_KEYED_LOCKS_AFTER_SECONDS
)
interval_satisfied = (
self._last_async_cleanup_time is None
or current_time - self._last_async_cleanup_time
> MIN_CLEANUP_INTERVAL_SECONDS
)
if has_expired_locks and interval_satisfied:
try:
cleaned_count = 0
new_earliest_time = None
# Perform cleanup while maintaining the new earliest time
# Clean expired async locks from all namespaces
for cleanup_key, cleanup_time in list(
self._async_lock_cleanup_data.items()
):
if (
current_time - cleanup_time
> CLEANUP_KEYED_LOCKS_AFTER_SECONDS
):
self._async_lock.pop(cleanup_key)
self._async_lock_count.pop(cleanup_key)
self._async_lock_cleanup_data.pop(cleanup_key)
cleaned_count += 1
else:
# Track the earliest time among remaining locks
if (
new_earliest_time is None
or cleanup_time < new_earliest_time
):
new_earliest_time = cleanup_time
# Update state only after successful cleanup
self._earliest_async_cleanup_time = new_earliest_time
self._last_async_cleanup_time = current_time
if cleaned_count > 0:
next_cleanup_in = max(
(
new_earliest_time
+ CLEANUP_KEYED_LOCKS_AFTER_SECONDS
- current_time
)
if new_earliest_time
else float("inf"),
MIN_CLEANUP_INTERVAL_SECONDS,
)
direct_log(
f"== async Lock == Cleaned up {cleaned_count}/{total_cleanup_len} expired async locks, "
f"next cleanup in {next_cleanup_in:.1f}s",
enable_output=False,
level="INFO",
)
except Exception as e:
direct_log(
f"== async Lock == Cleanup failed: {e}",
level="ERROR",
enable_output=False,
)
# Don't update _last_async_cleanup_time to allow retry
# Update instance state if cleanup was performed
if cleaned_count > 0:
self._earliest_async_cleanup_time = new_earliest_time
self._last_async_cleanup_time = new_last_cleanup_time
def _get_lock_for_key(
self, namespace: str, key: str, enable_logging: bool = False
@ -627,6 +615,171 @@ class KeyedUnifiedLock:
self._release_async_lock(combined_key)
_release_shared_raw_mp_lock(namespace, key)
def cleanup_expired_locks(self) -> Dict[str, Any]:
"""
Cleanup expired locks for both async and multiprocess locks following the same
conditions as _release_shared_raw_mp_lock and _release_async_lock functions.
Only performs cleanup when both has_expired_locks and interval_satisfied conditions are met
to avoid too frequent cleanup operations.
Since async and multiprocess locks work together, this method cleans up
both types of expired locks and returns comprehensive statistics.
Returns:
Dict containing cleanup statistics and current status:
{
"process_id": 12345,
"cleanup_performed": {
"mp_cleaned": 5,
"async_cleaned": 3
},
"current_status": {
"total_mp_locks": 10,
"pending_mp_cleanup": 2,
"total_async_locks": 8,
"pending_async_cleanup": 1
}
}
"""
global _lock_registry, _lock_registry_count, _lock_cleanup_data
global _registry_guard, _earliest_mp_cleanup_time, _last_mp_cleanup_time
cleanup_stats = {"mp_cleaned": 0, "async_cleaned": 0}
current_time = time.time()
# 1. Cleanup multiprocess locks using generic function
if (
_is_multiprocess
and _lock_registry is not None
and _registry_guard is not None
):
try:
with _registry_guard:
if _lock_cleanup_data is not None:
# Use generic cleanup function without threshold check
cleaned_count, new_earliest_time, new_last_cleanup_time = (
_perform_lock_cleanup(
lock_type="mp",
cleanup_data=_lock_cleanup_data,
lock_registry=_lock_registry,
lock_count=_lock_registry_count,
earliest_cleanup_time=_earliest_mp_cleanup_time,
last_cleanup_time=_last_mp_cleanup_time,
current_time=current_time,
threshold_check=False, # Force cleanup in cleanup_expired_locks
)
)
# Update global state if cleanup was performed
if cleaned_count > 0:
_earliest_mp_cleanup_time = new_earliest_time
_last_mp_cleanup_time = new_last_cleanup_time
cleanup_stats["mp_cleaned"] = cleaned_count
except Exception as e:
direct_log(
f"Error during multiprocess lock cleanup: {e}",
level="ERROR",
enable_output=False,
)
# 2. Cleanup async locks using generic function
try:
# Use generic cleanup function without threshold check
cleaned_count, new_earliest_time, new_last_cleanup_time = (
_perform_lock_cleanup(
lock_type="async",
cleanup_data=self._async_lock_cleanup_data,
lock_registry=self._async_lock,
lock_count=self._async_lock_count,
earliest_cleanup_time=self._earliest_async_cleanup_time,
last_cleanup_time=self._last_async_cleanup_time,
current_time=current_time,
threshold_check=False, # Force cleanup in cleanup_expired_locks
)
)
# Update instance state if cleanup was performed
if cleaned_count > 0:
self._earliest_async_cleanup_time = new_earliest_time
self._last_async_cleanup_time = new_last_cleanup_time
cleanup_stats["async_cleaned"] = cleaned_count
except Exception as e:
direct_log(
f"Error during async lock cleanup: {e}",
level="ERROR",
enable_output=False,
)
# Log cleanup results if any locks were cleaned
total_cleaned = cleanup_stats["mp_cleaned"] + cleanup_stats["async_cleaned"]
if total_cleaned > 0:
direct_log(
f"Keyed lock cleanup completed: {total_cleaned} locks cleaned "
f"(MP: {cleanup_stats['mp_cleaned']}, Async: {cleanup_stats['async_cleaned']})",
level="INFO",
enable_output=False,
)
# 3. Get current status after cleanup
current_status = self.get_lock_status()
return {
"process_id": os.getpid(),
"cleanup_performed": cleanup_stats,
"current_status": current_status,
}
def get_lock_status(self) -> Dict[str, int]:
"""
Get current status of both async and multiprocess locks.
Returns comprehensive lock counts for both types of locks since
they work together in the keyed lock system.
Returns:
Dict containing lock counts:
{
"total_mp_locks": 10,
"pending_mp_cleanup": 2,
"total_async_locks": 8,
"pending_async_cleanup": 1
}
"""
global _lock_registry_count, _lock_cleanup_data, _registry_guard
status = {
"total_mp_locks": 0,
"pending_mp_cleanup": 0,
"total_async_locks": 0,
"pending_async_cleanup": 0,
}
try:
# Count multiprocess locks
if _is_multiprocess and _lock_registry_count is not None:
if _registry_guard is not None:
with _registry_guard:
status["total_mp_locks"] = len(_lock_registry_count)
if _lock_cleanup_data is not None:
status["pending_mp_cleanup"] = len(_lock_cleanup_data)
# Count async locks
status["total_async_locks"] = len(self._async_lock_count)
status["pending_async_cleanup"] = len(self._async_lock_cleanup_data)
except Exception as e:
direct_log(
f"Error getting keyed lock status: {e}",
level="ERROR",
enable_output=False,
)
return status
class _KeyedLockContext:
def __init__(
@ -747,6 +900,61 @@ def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock:
)
def cleanup_keyed_lock() -> Dict[str, Any]:
"""
Force cleanup of expired keyed locks and return comprehensive status information.
This function actively cleans up expired locks for both async and multiprocess locks,
then returns detailed statistics about the cleanup operation and current lock status.
Returns:
Same as cleanup_expired_locks in KeyedUnifiedLock
"""
global _storage_keyed_lock
# Check if shared storage is initialized
if not _initialized or _storage_keyed_lock is None:
return {
"process_id": os.getpid(),
"cleanup_performed": {"mp_cleaned": 0, "async_cleaned": 0},
"current_status": {
"total_mp_locks": 0,
"pending_mp_cleanup": 0,
"total_async_locks": 0,
"pending_async_cleanup": 0,
},
}
return _storage_keyed_lock.cleanup_expired_locks()
def get_keyed_lock_status() -> Dict[str, Any]:
"""
Get current status of keyed locks without performing cleanup.
This function provides a read-only view of the current lock counts
for both multiprocess and async locks, including pending cleanup counts.
Returns:
Same as get_lock_status in KeyedUnifiedLock
"""
global _storage_keyed_lock
# Check if shared storage is initialized
if not _initialized or _storage_keyed_lock is None:
return {
"process_id": os.getpid(),
"total_mp_locks": 0,
"pending_mp_cleanup": 0,
"total_async_locks": 0,
"pending_async_cleanup": 0,
}
status = _storage_keyed_lock.get_lock_status()
status["process_id"] = os.getpid()
return status
def initialize_share_data(workers: int = 1):
"""
Initialize shared storage data for single or multi-process mode.