LightRAG/lightrag/kg/shared_storage.py

862 lines
33 KiB
Python
Raw Normal View History

2025-05-08 11:35:10 +10:00
from collections import defaultdict
import os
import sys
import asyncio
2025-05-08 11:35:10 +10:00
import multiprocessing as mp
from multiprocessing.synchronize import Lock as ProcessLock
from multiprocessing import Manager
2025-05-08 11:35:10 +10:00
import time
from typing import Any, Callable, Dict, List, Optional, Union, TypeVar, Generic
2025-02-27 19:05:51 +08:00
# Define a direct print function for critical logs that must be visible in all processes
def direct_log(message, level="INFO", enable_output: bool = True):
"""
Log a message directly to stderr to ensure visibility in all processes,
including the Gunicorn master process.
2025-03-09 01:00:42 +08:00
Args:
message: The message to log
level: Log level (default: "INFO")
enable_output: Whether to actually output the log (default: True)
2025-02-27 19:05:51 +08:00
"""
if enable_output:
print(f"{level}: {message}", file=sys.stderr, flush=True)
2025-02-27 19:05:51 +08:00
2025-03-01 16:23:34 +08:00
T = TypeVar("T")
LockType = Union[ProcessLock, asyncio.Lock]
_is_multiprocess = None
_workers = None
_manager = None
2025-05-08 11:35:10 +10:00
_lock_registry: Optional[Dict[str, mp.synchronize.Lock]] = None
_lock_registry_count: Optional[Dict[str, int]] = None
_lock_cleanup_data: Optional[Dict[str, time.time]] = None
_registry_guard = None
_initialized = None
2025-05-08 11:35:10 +10:00
CLEANUP_KEYED_LOCKS_AFTER_SECONDS = 300
# shared data for storage across processes
_shared_dicts: Optional[Dict[str, Any]] = None
_init_flags: Optional[Dict[str, bool]] = None # namespace -> initialized
2025-03-01 16:23:34 +08:00
_update_flags: Optional[Dict[str, bool]] = None # namespace -> updated
# locks for mutex access
_storage_lock: Optional[LockType] = None
_internal_lock: Optional[LockType] = None
_pipeline_status_lock: Optional[LockType] = None
_graph_db_lock: Optional[LockType] = None
_data_init_lock: Optional[LockType] = None
2025-05-08 11:35:10 +10:00
_graph_db_lock_keyed: Optional["KeyedUnifiedLock"] = None
# async locks for coroutine synchronization in multiprocess mode
_async_locks: Optional[Dict[str, asyncio.Lock]] = None
2025-05-08 11:35:10 +10:00
DEBUG_LOCKS = False
_debug_n_locks_acquired: int = 0
def inc_debug_n_locks_acquired():
global _debug_n_locks_acquired
if DEBUG_LOCKS:
_debug_n_locks_acquired += 1
print(f"DEBUG: Keyed Lock acquired, total: {_debug_n_locks_acquired:>5}", end="\r", flush=True)
def dec_debug_n_locks_acquired():
global _debug_n_locks_acquired
if DEBUG_LOCKS:
if _debug_n_locks_acquired > 0:
_debug_n_locks_acquired -= 1
print(f"DEBUG: Keyed Lock released, total: {_debug_n_locks_acquired:>5}", end="\r", flush=True)
else:
raise RuntimeError("Attempting to release lock when no locks are acquired")
def get_debug_n_locks_acquired():
global _debug_n_locks_acquired
return _debug_n_locks_acquired
2025-03-01 16:23:34 +08:00
class UnifiedLock(Generic[T]):
"""Provide a unified lock interface type for asyncio.Lock and multiprocessing.Lock"""
2025-03-01 16:23:34 +08:00
2025-03-09 01:00:42 +08:00
def __init__(
self,
lock: Union[ProcessLock, asyncio.Lock],
is_async: bool,
name: str = "unnamed",
enable_logging: bool = True,
async_lock: Optional[asyncio.Lock] = None,
2025-03-09 01:00:42 +08:00
):
self._lock = lock
self._is_async = is_async
self._pid = os.getpid() # for debug only
self._name = name # for debug only
self._enable_logging = enable_logging # for debug only
self._async_lock = async_lock # auxiliary lock for coroutine synchronization
2025-03-01 16:23:34 +08:00
async def __aenter__(self) -> "UnifiedLock[T]":
try:
2025-04-28 23:17:09 +08:00
# direct_log(
# f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (async={self._is_async})",
# enable_output=self._enable_logging,
# )
2025-03-21 16:56:47 +08:00
# If in multiprocess mode and async lock exists, acquire it first
if not self._is_async and self._async_lock is not None:
2025-04-28 23:17:09 +08:00
# direct_log(
# f"== Lock == Process {self._pid}: Acquiring async lock for '{self._name}'",
# enable_output=self._enable_logging,
# )
await self._async_lock.acquire()
direct_log(
f"== Lock == Process {self._pid}: Async lock for '{self._name}' acquired",
enable_output=self._enable_logging,
)
2025-03-21 16:56:47 +08:00
# Then acquire the main lock
if self._is_async:
await self._lock.acquire()
else:
self._lock.acquire()
2025-03-21 16:56:47 +08:00
2025-03-09 01:00:42 +08:00
direct_log(
f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (async={self._is_async})",
enable_output=self._enable_logging,
)
return self
except Exception as e:
# If main lock acquisition fails, release the async lock if it was acquired
2025-03-21 16:56:47 +08:00
if (
not self._is_async
and self._async_lock is not None
and self._async_lock.locked()
):
self._async_lock.release()
2025-03-21 16:56:47 +08:00
2025-03-09 01:00:42 +08:00
direct_log(
f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}': {e}",
level="ERROR",
enable_output=self._enable_logging,
)
raise
async def __aexit__(self, exc_type, exc_val, exc_tb):
main_lock_released = False
try:
# Release main lock first
if self._is_async:
self._lock.release()
else:
self._lock.release()
main_lock_released = True
2025-03-21 16:56:47 +08:00
2025-04-29 13:32:05 +08:00
direct_log(
f"== Lock == Process {self._pid}: Lock '{self._name}' released (async={self._is_async})",
enable_output=self._enable_logging,
)
# Then release async lock if in multiprocess mode
if not self._is_async and self._async_lock is not None:
self._async_lock.release()
2025-04-29 13:32:05 +08:00
direct_log(
f"== Lock == Process {self._pid}: Async lock '{self._name}' released",
enable_output=self._enable_logging,
)
2025-03-21 16:56:47 +08:00
except Exception as e:
2025-03-09 01:00:42 +08:00
direct_log(
f"== Lock == Process {self._pid}: Failed to release lock '{self._name}': {e}",
level="ERROR",
enable_output=self._enable_logging,
)
2025-03-21 16:56:47 +08:00
# If main lock release failed but async lock hasn't been released, try to release it
2025-03-21 16:56:47 +08:00
if (
not main_lock_released
and not self._is_async
and self._async_lock is not None
):
try:
direct_log(
f"== Lock == Process {self._pid}: Attempting to release async lock after main lock failure",
level="WARNING",
enable_output=self._enable_logging,
)
self._async_lock.release()
direct_log(
f"== Lock == Process {self._pid}: Successfully released async lock after main lock failure",
enable_output=self._enable_logging,
)
except Exception as inner_e:
direct_log(
f"== Lock == Process {self._pid}: Failed to release async lock after main lock failure: {inner_e}",
level="ERROR",
enable_output=self._enable_logging,
)
2025-03-21 16:56:47 +08:00
raise
2025-03-01 16:23:34 +08:00
def __enter__(self) -> "UnifiedLock[T]":
"""For backward compatibility"""
try:
if self._is_async:
raise RuntimeError("Use 'async with' for shared_storage lock")
2025-03-09 01:00:42 +08:00
direct_log(
f"== Lock == Process {self._pid}: Acquiring lock '{self._name}' (sync)",
enable_output=self._enable_logging,
)
self._lock.acquire()
2025-03-09 01:00:42 +08:00
direct_log(
f"== Lock == Process {self._pid}: Lock '{self._name}' acquired (sync)",
enable_output=self._enable_logging,
)
return self
except Exception as e:
2025-03-09 01:00:42 +08:00
direct_log(
f"== Lock == Process {self._pid}: Failed to acquire lock '{self._name}' (sync): {e}",
level="ERROR",
enable_output=self._enable_logging,
)
raise
def __exit__(self, exc_type, exc_val, exc_tb):
"""For backward compatibility"""
try:
if self._is_async:
raise RuntimeError("Use 'async with' for shared_storage lock")
2025-03-09 01:00:42 +08:00
direct_log(
f"== Lock == Process {self._pid}: Releasing lock '{self._name}' (sync)",
enable_output=self._enable_logging,
)
self._lock.release()
2025-03-09 01:00:42 +08:00
direct_log(
f"== Lock == Process {self._pid}: Lock '{self._name}' released (sync)",
enable_output=self._enable_logging,
)
except Exception as e:
2025-03-09 01:00:42 +08:00
direct_log(
f"== Lock == Process {self._pid}: Failed to release lock '{self._name}' (sync): {e}",
level="ERROR",
enable_output=self._enable_logging,
)
raise
2025-05-08 11:35:10 +10:00
def locked(self) -> bool:
if self._is_async:
return self._lock.locked()
else:
return self._lock.locked()
# ─────────────────────────────────────────────────────────────────────────────
# 2. CROSSPROCESS FACTORY (one manager.Lock shared by *all* processes)
# ─────────────────────────────────────────────────────────────────────────────
def _get_combined_key(factory_name: str, key: str) -> str:
"""Return the combined key for the factory and key."""
return f"{factory_name}:{key}"
def _get_or_create_shared_raw_mp_lock(factory_name: str, key: str) -> Optional[mp.synchronize.Lock]:
"""Return the *singleton* manager.Lock() proxy for *key*, creating if needed."""
if not _is_multiprocess:
return None
with _registry_guard:
combined_key = _get_combined_key(factory_name, key)
raw = _lock_registry.get(combined_key)
count = _lock_registry_count.get(combined_key)
if raw is None:
raw = _manager.Lock()
_lock_registry[combined_key] = raw
_lock_registry_count[combined_key] = 0
else:
if count is None:
raise RuntimeError(f"Shared-Data lock registry for {factory_name} is corrupted for key {key}")
count += 1
_lock_registry_count[combined_key] = count
if count == 1 and combined_key in _lock_cleanup_data:
_lock_cleanup_data.pop(combined_key)
return raw
def _release_shared_raw_mp_lock(factory_name: str, key: str):
"""Release the *singleton* manager.Lock() proxy for *key*."""
if not _is_multiprocess:
return
with _registry_guard:
combined_key = _get_combined_key(factory_name, key)
raw = _lock_registry.get(combined_key)
count = _lock_registry_count.get(combined_key)
if raw is None and count is None:
return
elif raw is None or count is None:
raise RuntimeError(f"Shared-Data lock registry for {factory_name} is corrupted for key {key}")
count -= 1
if count < 0:
raise RuntimeError(f"Attempting to remove lock for {key} but it is not in the registry")
else:
_lock_registry_count[combined_key] = count
if count == 0:
_lock_cleanup_data[combined_key] = time.time()
for combined_key, value in list(_lock_cleanup_data.items()):
if time.time() - value > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
_lock_registry.pop(combined_key)
_lock_registry_count.pop(combined_key)
_lock_cleanup_data.pop(combined_key)
# ─────────────────────────────────────────────────────────────────────────────
# 3. PARAMETERKEYED WRAPPER (unchanged except it *accepts a factory*)
# ─────────────────────────────────────────────────────────────────────────────
class KeyedUnifiedLock:
"""
Parameterkeyed wrapper around `UnifiedLock`.
Keeps only a table of perkey *asyncio* gates locally
Fetches the shared processwide mutex on *every* acquire
Builds a fresh `UnifiedLock` each time, so `enable_logging`
(or future options) can vary per call.
"""
# ---------------- construction ----------------
def __init__(self, factory_name: str, *, default_enable_logging: bool = True) -> None:
self._factory_name = factory_name
self._default_enable_logging = default_enable_logging
self._async_lock: Dict[str, asyncio.Lock] = {} # key → asyncio.Lock
self._async_lock_count: Dict[str, int] = {} # key → asyncio.Lock count
self._async_lock_cleanup_data: Dict[str, time.time] = {} # key → time.time
self._mp_locks: Dict[str, mp.synchronize.Lock] = {} # key → mp.synchronize.Lock
# ---------------- public API ------------------
def __call__(self, keys: list[str], *, enable_logging: Optional[bool] = None):
"""
Ergonomic helper so you can write:
async with keyed_locks("alpha"):
...
"""
if enable_logging is None:
enable_logging = self._default_enable_logging
return _KeyedLockContext(self, factory_name=self._factory_name, keys=keys, enable_logging=enable_logging)
def _get_or_create_async_lock(self, key: str) -> asyncio.Lock:
async_lock = self._async_lock.get(key)
count = self._async_lock_count.get(key, 0)
if async_lock is None:
async_lock = asyncio.Lock()
self._async_lock[key] = async_lock
elif count == 0 and key in self._async_lock_cleanup_data:
self._async_lock_cleanup_data.pop(key)
count += 1
self._async_lock_count[key] = count
return async_lock
def _release_async_lock(self, key: str):
count = self._async_lock_count.get(key, 0)
count -= 1
if count == 0:
self._async_lock_cleanup_data[key] = time.time()
self._async_lock_count[key] = count
for key, value in list(self._async_lock_cleanup_data.items()):
if time.time() - value > CLEANUP_KEYED_LOCKS_AFTER_SECONDS:
self._async_lock.pop(key)
self._async_lock_count.pop(key)
self._async_lock_cleanup_data.pop(key)
def _get_lock_for_key(self, key: str, enable_logging: bool = False) -> UnifiedLock:
# 1. get (or create) the perprocess async gate for this key
# Is synchronous, so no need to acquire a lock
async_lock = self._get_or_create_async_lock(key)
# 2. fetch the shared raw lock
raw_lock = _get_or_create_shared_raw_mp_lock(self._factory_name, key)
is_multiprocess = raw_lock is not None
if not is_multiprocess:
raw_lock = async_lock
# 3. build a *fresh* UnifiedLock with the chosen logging flag
if is_multiprocess:
return UnifiedLock(
lock=raw_lock,
is_async=False, # manager.Lock is synchronous
name=f"key:{self._factory_name}:{key}",
enable_logging=enable_logging,
async_lock=async_lock, # prevents eventloop blocking
)
else:
return UnifiedLock(
lock=raw_lock,
is_async=True,
name=f"key:{self._factory_name}:{key}",
enable_logging=enable_logging,
async_lock=None, # No need for async lock in single process mode
)
def _release_lock_for_key(self, key: str):
self._release_async_lock(key)
_release_shared_raw_mp_lock(self._factory_name, key)
class _KeyedLockContext:
def __init__(
self,
parent: KeyedUnifiedLock,
factory_name: str,
keys: list[str],
enable_logging: bool,
) -> None:
self._parent = parent
self._factory_name = factory_name
# The sorting is critical to ensure proper lock and release order
# to avoid deadlocks
self._keys = sorted(keys)
self._enable_logging = (
enable_logging if enable_logging is not None
else parent._default_enable_logging
)
self._ul: Optional[List["UnifiedLock"]] = None # set in __aenter__
# ----- enter -----
async def __aenter__(self):
if self._ul is not None:
raise RuntimeError("KeyedUnifiedLock already acquired in current context")
# 4. acquire it
self._ul = []
for key in self._keys:
lock = self._parent._get_lock_for_key(key, enable_logging=self._enable_logging)
await lock.__aenter__()
inc_debug_n_locks_acquired()
self._ul.append(lock)
return self # or return self._key if you prefer
# ----- exit -----
async def __aexit__(self, exc_type, exc, tb):
# The UnifiedLock takes care of proper release order
for ul, key in zip(reversed(self._ul), reversed(self._keys)):
await ul.__aexit__(exc_type, exc, tb)
self._parent._release_lock_for_key(key)
dec_debug_n_locks_acquired()
self._ul = None
def get_internal_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified storage lock for data consistency"""
async_lock = _async_locks.get("internal_lock") if _is_multiprocess else None
2025-03-09 01:00:42 +08:00
return UnifiedLock(
lock=_internal_lock,
is_async=not _is_multiprocess,
2025-03-09 01:00:42 +08:00
name="internal_lock",
enable_logging=enable_logging,
async_lock=async_lock,
2025-03-09 01:00:42 +08:00
)
2025-03-01 16:23:34 +08:00
def get_storage_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified storage lock for data consistency"""
async_lock = _async_locks.get("storage_lock") if _is_multiprocess else None
2025-03-09 01:00:42 +08:00
return UnifiedLock(
lock=_storage_lock,
is_async=not _is_multiprocess,
2025-03-09 01:00:42 +08:00
name="storage_lock",
enable_logging=enable_logging,
async_lock=async_lock,
2025-03-09 01:00:42 +08:00
)
2025-03-01 16:23:34 +08:00
def get_pipeline_status_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified storage lock for data consistency"""
async_lock = _async_locks.get("pipeline_status_lock") if _is_multiprocess else None
2025-03-09 01:00:42 +08:00
return UnifiedLock(
lock=_pipeline_status_lock,
is_async=not _is_multiprocess,
2025-03-09 01:00:42 +08:00
name="pipeline_status_lock",
enable_logging=enable_logging,
async_lock=async_lock,
2025-03-09 01:00:42 +08:00
)
def get_graph_db_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified graph database lock for ensuring atomic operations"""
async_lock = _async_locks.get("graph_db_lock") if _is_multiprocess else None
2025-03-09 01:00:42 +08:00
return UnifiedLock(
lock=_graph_db_lock,
is_async=not _is_multiprocess,
2025-03-09 01:00:42 +08:00
name="graph_db_lock",
enable_logging=enable_logging,
async_lock=async_lock,
2025-03-09 01:00:42 +08:00
)
2025-03-01 16:23:34 +08:00
2025-05-08 11:35:10 +10:00
def get_graph_db_lock_keyed(keys: str | list[str], enable_logging: bool = False) -> KeyedUnifiedLock:
"""return unified graph database lock for ensuring atomic operations"""
global _graph_db_lock_keyed
if _graph_db_lock_keyed is None:
raise RuntimeError("Shared-Data is not initialized")
if isinstance(keys, str):
keys = [keys]
return _graph_db_lock_keyed(keys, enable_logging=enable_logging)
2025-02-28 01:25:59 +08:00
def get_data_init_lock(enable_logging: bool = False) -> UnifiedLock:
"""return unified data initialization lock for ensuring atomic data initialization"""
async_lock = _async_locks.get("data_init_lock") if _is_multiprocess else None
return UnifiedLock(
lock=_data_init_lock,
is_async=not _is_multiprocess,
name="data_init_lock",
enable_logging=enable_logging,
async_lock=async_lock,
)
def initialize_share_data(workers: int = 1):
"""
Initialize shared storage data for single or multi-process mode.
2025-02-27 19:05:51 +08:00
When used with Gunicorn's preload feature, this function is called once in the
master process before forking worker processes, allowing all workers to share
the same initialized data.
2025-02-27 19:05:51 +08:00
In single-process mode, this function is called in FASTAPI lifespan function.
2025-02-27 19:05:51 +08:00
The function determines whether to use cross-process shared variables for data storage
based on the number of workers. If workers=1, it uses thread locks and local dictionaries.
If workers>1, it uses process locks and shared dictionaries managed by multiprocessing.Manager.
2025-02-27 19:05:51 +08:00
Args:
workers (int): Number of worker processes. If 1, single-process mode is used.
If > 1, multi-process mode with shared memory is used.
"""
2025-02-27 19:05:51 +08:00
global \
_manager, \
_workers, \
_is_multiprocess, \
_storage_lock, \
2025-05-08 11:35:10 +10:00
_lock_registry, \
_lock_registry_count, \
_lock_cleanup_data, \
_registry_guard, \
_internal_lock, \
_pipeline_status_lock, \
_graph_db_lock, \
_data_init_lock, \
2025-02-27 19:05:51 +08:00
_shared_dicts, \
_init_flags, \
_initialized, \
_update_flags, \
2025-05-08 11:35:10 +10:00
_async_locks, \
_graph_db_lock_keyed
2025-02-27 19:05:51 +08:00
# Check if already initialized
if _initialized:
2025-02-27 19:05:51 +08:00
direct_log(
f"Process {os.getpid()} Shared-Data already initialized (multiprocess={_is_multiprocess})"
2025-02-27 19:05:51 +08:00
)
return
2025-02-27 19:05:51 +08:00
_workers = workers
if workers > 1:
_is_multiprocess = True
_manager = Manager()
2025-05-08 11:35:10 +10:00
_lock_registry = _manager.dict()
_lock_registry_count = _manager.dict()
_lock_cleanup_data = _manager.dict()
_registry_guard = _manager.RLock()
_internal_lock = _manager.Lock()
_storage_lock = _manager.Lock()
_pipeline_status_lock = _manager.Lock()
_graph_db_lock = _manager.Lock()
_data_init_lock = _manager.Lock()
_shared_dicts = _manager.dict()
2025-02-28 01:25:59 +08:00
_init_flags = _manager.dict()
_update_flags = _manager.dict()
2025-05-08 11:35:10 +10:00
_graph_db_lock_keyed = KeyedUnifiedLock(
factory_name="graph_db_lock",
)
2025-03-21 16:56:47 +08:00
# Initialize async locks for multiprocess mode
_async_locks = {
"internal_lock": asyncio.Lock(),
"storage_lock": asyncio.Lock(),
"pipeline_status_lock": asyncio.Lock(),
"graph_db_lock": asyncio.Lock(),
"data_init_lock": asyncio.Lock(),
}
2025-03-21 16:56:47 +08:00
2025-02-27 19:05:51 +08:00
direct_log(
f"Process {os.getpid()} Shared-Data created for Multiple Process (workers={workers})"
)
else:
_is_multiprocess = False
_internal_lock = asyncio.Lock()
_storage_lock = asyncio.Lock()
_pipeline_status_lock = asyncio.Lock()
_graph_db_lock = asyncio.Lock()
_data_init_lock = asyncio.Lock()
_shared_dicts = {}
_init_flags = {}
_update_flags = {}
_async_locks = None # No need for async locks in single process mode
2025-05-08 11:35:10 +10:00
_graph_db_lock_keyed = KeyedUnifiedLock(
factory_name="graph_db_lock",
)
direct_log(f"Process {os.getpid()} Shared-Data created for Single Process")
# Mark as initialized
_initialized = True
async def initialize_pipeline_status():
"""
Initialize pipeline namespace with default values.
This function is called during FASTAPI lifespan for each worker.
"""
pipeline_namespace = await get_namespace_data("pipeline_status")
async with get_internal_lock():
# Check if already initialized by checking for required fields
if "busy" in pipeline_namespace:
return
# Create a shared list object for history_messages
history_messages = _manager.list() if _is_multiprocess else []
2025-03-01 16:23:34 +08:00
pipeline_namespace.update(
{
"autoscanned": False, # Auto-scan started
2025-03-01 16:23:34 +08:00
"busy": False, # Control concurrent processes
2025-03-26 17:48:00 +08:00
"job_name": "-", # Current job name (indexing files/indexing texts)
2025-03-01 16:23:34 +08:00
"job_start": None, # Job start time
"docs": 0, # Total number of documents to be indexed
"batchs": 0, # Number of batches for processing documents
"cur_batch": 0, # Current processing batch
"request_pending": False, # Flag for pending request for processing
"latest_message": "", # Latest message from pipeline processing
"history_messages": history_messages, # 使用共享列表对象
}
)
direct_log(f"Process {os.getpid()} Pipeline namespace initialized")
2025-02-26 18:11:16 +08:00
2025-02-27 19:05:51 +08:00
async def get_update_flag(namespace: str):
"""
Create a namespace's update flag for a workers.
Returen the update flag to caller for referencing or reset.
"""
global _update_flags
if _update_flags is None:
raise ValueError("Try to create namespace before Shared-Data is initialized")
async with get_internal_lock():
if namespace not in _update_flags:
if _is_multiprocess and _manager is not None:
_update_flags[namespace] = _manager.list()
else:
_update_flags[namespace] = []
2025-03-01 16:23:34 +08:00
direct_log(
f"Process {os.getpid()} initialized updated flags for namespace: [{namespace}]"
)
if _is_multiprocess and _manager is not None:
2025-03-01 16:23:34 +08:00
new_update_flag = _manager.Value("b", False)
else:
# Create a simple mutable object to store boolean value for compatibility with mutiprocess
class MutableBoolean:
def __init__(self, initial_value=False):
self.value = initial_value
2025-03-10 15:41:46 +08:00
new_update_flag = MutableBoolean(False)
2025-03-01 16:23:34 +08:00
_update_flags[namespace].append(new_update_flag)
return new_update_flag
2025-03-01 16:23:34 +08:00
async def set_all_update_flags(namespace: str):
"""Set all update flag of namespace indicating all workers need to reload data from files"""
global _update_flags
if _update_flags is None:
raise ValueError("Try to create namespace before Shared-Data is initialized")
2025-03-01 16:23:34 +08:00
async with get_internal_lock():
if namespace not in _update_flags:
raise ValueError(f"Namespace {namespace} not found in update flags")
# Update flags for both modes
for i in range(len(_update_flags[namespace])):
_update_flags[namespace][i].value = True
2025-03-10 02:07:19 +08:00
async def clear_all_update_flags(namespace: str):
"""Clear all update flag of namespace indicating all workers need to reload data from files"""
global _update_flags
if _update_flags is None:
raise ValueError("Try to create namespace before Shared-Data is initialized")
async with get_internal_lock():
if namespace not in _update_flags:
raise ValueError(f"Namespace {namespace} not found in update flags")
# Update flags for both modes
for i in range(len(_update_flags[namespace])):
_update_flags[namespace][i].value = False
2025-03-10 02:07:19 +08:00
async def get_all_update_flags_status() -> Dict[str, list]:
"""
Get update flags status for all namespaces.
2025-03-01 16:23:34 +08:00
Returns:
Dict[str, list]: A dictionary mapping namespace names to lists of update flag statuses
"""
if _update_flags is None:
return {}
2025-03-01 16:23:34 +08:00
result = {}
async with get_internal_lock():
for namespace, flags in _update_flags.items():
worker_statuses = []
for flag in flags:
if _is_multiprocess:
worker_statuses.append(flag.value)
else:
worker_statuses.append(flag)
result[namespace] = worker_statuses
2025-03-01 16:23:34 +08:00
return result
2025-03-09 15:22:06 +08:00
async def try_initialize_namespace(namespace: str) -> bool:
"""
Returns True if the current worker(process) gets initialization permission for loading data later.
The worker does not get the permission is prohibited to load data from files.
"""
global _init_flags, _manager
2025-02-26 18:11:16 +08:00
if _init_flags is None:
raise ValueError("Try to create nanmespace before Shared-Data is initialized")
2025-02-26 18:11:16 +08:00
2025-03-09 15:22:06 +08:00
async with get_internal_lock():
if namespace not in _init_flags:
_init_flags[namespace] = True
direct_log(
f"Process {os.getpid()} ready to initialize storage namespace: [{namespace}]"
)
return True
2025-02-28 01:25:59 +08:00
direct_log(
2025-03-09 15:22:06 +08:00
f"Process {os.getpid()} storage namespace already initialized: [{namespace}]"
2025-02-28 01:25:59 +08:00
)
2025-03-09 15:22:06 +08:00
return False
2025-02-26 18:11:16 +08:00
async def get_namespace_data(namespace: str) -> Dict[str, Any]:
"""get the shared data reference for specific namespace"""
if _shared_dicts is None:
2025-02-27 19:05:51 +08:00
direct_log(
f"Error: try to getnanmespace before it is initialized, pid={os.getpid()}",
2025-02-27 19:05:51 +08:00
level="ERROR",
)
raise ValueError("Shared dictionaries not initialized")
async with get_internal_lock():
if namespace not in _shared_dicts:
if _is_multiprocess and _manager is not None:
_shared_dicts[namespace] = _manager.dict()
else:
_shared_dicts[namespace] = {}
2025-02-27 19:05:51 +08:00
return _shared_dicts[namespace]
2025-02-26 18:11:16 +08:00
def finalize_share_data():
"""
Release shared resources and clean up.
2025-02-27 19:05:51 +08:00
This function should be called when the application is shutting down
to properly release shared resources and avoid memory leaks.
2025-02-27 19:05:51 +08:00
In multi-process mode, it shuts down the Manager and releases all shared objects.
In single-process mode, it simply resets the global variables.
"""
2025-02-27 19:05:51 +08:00
global \
_manager, \
_is_multiprocess, \
_storage_lock, \
_internal_lock, \
_pipeline_status_lock, \
_graph_db_lock, \
_data_init_lock, \
2025-02-27 19:05:51 +08:00
_shared_dicts, \
_init_flags, \
_initialized, \
_update_flags, \
_async_locks
2025-02-27 19:05:51 +08:00
# Check if already initialized
if not _initialized:
2025-02-27 19:05:51 +08:00
direct_log(
f"Process {os.getpid()} storage data not initialized, nothing to finalize"
)
return
2025-02-27 19:05:51 +08:00
direct_log(
f"Process {os.getpid()} finalizing storage data (multiprocess={_is_multiprocess})"
2025-02-27 19:05:51 +08:00
)
# In multi-process mode, shut down the Manager
if _is_multiprocess and _manager is not None:
try:
# Clear shared resources before shutting down Manager
if _shared_dicts is not None:
# Clear pipeline status history messages first if exists
try:
pipeline_status = _shared_dicts.get("pipeline_status", {})
if "history_messages" in pipeline_status:
pipeline_status["history_messages"].clear()
except Exception:
pass # Ignore any errors during history messages cleanup
_shared_dicts.clear()
if _init_flags is not None:
_init_flags.clear()
if _update_flags is not None:
# Clear each namespace's update flags list and Value objects
try:
for namespace in _update_flags:
flags_list = _update_flags[namespace]
if isinstance(flags_list, list):
# Clear Value objects in the list
for flag in flags_list:
if hasattr(
flag, "value"
): # Check if it's a Value object
flag.value = False
flags_list.clear()
except Exception:
pass # Ignore any errors during update flags cleanup
_update_flags.clear()
# Shut down the Manager - this will automatically clean up all shared resources
_manager.shutdown()
direct_log(f"Process {os.getpid()} Manager shutdown complete")
except Exception as e:
2025-02-27 19:05:51 +08:00
direct_log(
f"Process {os.getpid()} Error shutting down Manager: {e}", level="ERROR"
)
# Reset global variables
_manager = None
_initialized = None
_is_multiprocess = None
_shared_dicts = None
_init_flags = None
_storage_lock = None
_internal_lock = None
_pipeline_status_lock = None
_graph_db_lock = None
_data_init_lock = None
_update_flags = None
_async_locks = None
2025-02-27 19:05:51 +08:00
direct_log(f"Process {os.getpid()} storage data finalization complete")