diff --git a/README-zh.md b/README-zh.md index 8fcf5d01..1b5ce4dc 100644 --- a/README-zh.md +++ b/README-zh.md @@ -882,7 +882,7 @@ rag = LightRAG( * **对于Neo4j图数据库,通过label来实现数据的逻辑隔离**:Neo4JStorage -为了保持对遗留数据的兼容,在未配置工作空间时PostgreSQL的默认工作空间为`default`,Neo4j的默认工作空间为`base`。对于所有的外部存储,系统都提供了专用的工作空间环境变量,用于覆盖公共的 `WORKSPACE`环境变量配置。这些适用于指定存储类型的工作空间环境变量为:`REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`。 +为了保持对遗留数据的兼容,在未配置工作空间时PostgreSQL非图存储的工作空间为`default`,PostgreSQL AGE图存储的工作空间为空,Neo4j图存储的默认工作空间为`base`。对于所有的外部存储,系统都提供了专用的工作空间环境变量,用于覆盖公共的 `WORKSPACE`环境变量配置。这些适用于指定存储类型的工作空间环境变量为:`REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`。 ## 编辑实体和关系 diff --git a/README.md b/README.md index b594938b..49290bd7 100644 --- a/README.md +++ b/README.md @@ -928,7 +928,7 @@ The `workspace` parameter ensures data isolation between different LightRAG inst - **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`. - **For the Neo4j graph database, logical data isolation is achieved through labels:** `Neo4JStorage` -To maintain compatibility with legacy data, the default workspace for PostgreSQL is `default` and for Neo4j is `base` when no workspace is configured. For all external storages, the system provides dedicated workspace environment variables to override the common `WORKSPACE` environment variable configuration. These storage-specific workspace environment variables are: `REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`. +To maintain compatibility with legacy data, the default workspace for PostgreSQL non-graph storage is `default` and, for PostgreSQL AGE graph storage is null, for Neo4j graph storage is `base` when no workspace is configured. For all external storages, the system provides dedicated workspace environment variables to override the common `WORKSPACE` environment variable configuration. These storage-specific workspace environment variables are: `REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`. ## Edit Entities and Relations diff --git a/lightrag/api/routers/document_routes.py b/lightrag/api/routers/document_routes.py index 288a66e1..68642909 100644 --- a/lightrag/api/routers/document_routes.py +++ b/lightrag/api/routers/document_routes.py @@ -1301,11 +1301,11 @@ def create_document_routes( "Starting to delete files in input directory" ) - # Delete all files in input_dir + # Delete only files in the current directory, preserve files in subdirectories deleted_files_count = 0 file_errors_count = 0 - for file_path in doc_manager.input_dir.glob("**/*"): + for file_path in doc_manager.input_dir.glob("*"): if file_path.is_file(): try: file_path.unlink() diff --git a/lightrag/kg/memgraph_impl.py b/lightrag/kg/memgraph_impl.py index 3d2c131e..16185775 100644 --- a/lightrag/kg/memgraph_impl.py +++ b/lightrag/kg/memgraph_impl.py @@ -414,7 +414,7 @@ class MemgraphStorage(BaseGraphStorage): if records: edge_result = dict(records[0]["edge_properties"]) for key, default_value in { - "weight": 0.0, + "weight": 1.0, "source_id": None, "description": None, "keywords": None, diff --git a/lightrag/kg/neo4j_impl.py b/lightrag/kg/neo4j_impl.py index b0efc4b5..847b73b0 100644 --- a/lightrag/kg/neo4j_impl.py +++ b/lightrag/kg/neo4j_impl.py @@ -535,7 +535,7 @@ class Neo4JStorage(BaseGraphStorage): # logger.debug(f"Result: {edge_result}") # Ensure required keys exist with defaults required_keys = { - "weight": 0.0, + "weight": 1.0, "source_id": None, "description": None, "keywords": None, @@ -559,7 +559,7 @@ class Neo4JStorage(BaseGraphStorage): ) # Return default edge properties on error return { - "weight": 0.0, + "weight": 1.0, "source_id": None, "description": None, "keywords": None, @@ -610,7 +610,7 @@ class Neo4JStorage(BaseGraphStorage): edge_props = edges[0] # choose the first if multiple exist # Ensure required keys exist with defaults for key, default in { - "weight": 0.0, + "weight": 1.0, "source_id": None, "description": None, "keywords": None, @@ -621,7 +621,7 @@ class Neo4JStorage(BaseGraphStorage): else: # No edge found – set default edge properties edges_dict[(src, tgt)] = { - "weight": 0.0, + "weight": 1.0, "source_id": None, "description": None, "keywords": None, diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 98a09f9d..f77d6bd0 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -105,25 +105,32 @@ class PostgreSQLDB: ): pass - async def _migrate_llm_cache_add_chunk_id(self): - """Add chunk_id column to LIGHTRAG_LLM_CACHE table if it doesn't exist""" + async def _migrate_llm_cache_add_columns(self): + """Add chunk_id and cache_type columns to LIGHTRAG_LLM_CACHE table if they don't exist""" try: - # Check if chunk_id column exists - check_column_sql = """ + # Check if both columns exist + check_columns_sql = """ SELECT column_name FROM information_schema.columns WHERE table_name = 'lightrag_llm_cache' - AND column_name = 'chunk_id' + AND column_name IN ('chunk_id', 'cache_type') """ - column_info = await self.query(check_column_sql) - if not column_info: + existing_columns = await self.query(check_columns_sql, multirows=True) + existing_column_names = ( + {col["column_name"] for col in existing_columns} + if existing_columns + else set() + ) + + # Add missing chunk_id column + if "chunk_id" not in existing_column_names: logger.info("Adding chunk_id column to LIGHTRAG_LLM_CACHE table") - add_column_sql = """ + add_chunk_id_sql = """ ALTER TABLE LIGHTRAG_LLM_CACHE ADD COLUMN chunk_id VARCHAR(255) NULL """ - await self.execute(add_column_sql) + await self.execute(add_chunk_id_sql) logger.info( "Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table" ) @@ -131,54 +138,40 @@ class PostgreSQLDB: logger.info( "chunk_id column already exists in LIGHTRAG_LLM_CACHE table" ) - except Exception as e: - logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}") - async def _migrate_llm_cache_add_cache_type(self): - """Add cache_type column to LIGHTRAG_LLM_CACHE table if it doesn't exist""" - try: - # Check if cache_type column exists - check_column_sql = """ - SELECT column_name - FROM information_schema.columns - WHERE table_name = 'lightrag_llm_cache' - AND column_name = 'cache_type' - """ - - column_info = await self.query(check_column_sql) - if not column_info: + # Add missing cache_type column + if "cache_type" not in existing_column_names: logger.info("Adding cache_type column to LIGHTRAG_LLM_CACHE table") - add_column_sql = """ + add_cache_type_sql = """ ALTER TABLE LIGHTRAG_LLM_CACHE ADD COLUMN cache_type VARCHAR(32) NULL """ - await self.execute(add_column_sql) + await self.execute(add_cache_type_sql) logger.info( "Successfully added cache_type column to LIGHTRAG_LLM_CACHE table" ) - # Migrate existing data: extract cache_type from flattened keys + # Migrate existing data using optimized regex pattern logger.info( - "Migrating existing LLM cache data to populate cache_type field" + "Migrating existing LLM cache data to populate cache_type field (optimized)" ) - update_sql = """ + optimized_update_sql = """ UPDATE LIGHTRAG_LLM_CACHE SET cache_type = CASE - WHEN id LIKE '%:%:%' THEN split_part(id, ':', 2) + WHEN id ~ '^[^:]+:[^:]+:' THEN split_part(id, ':', 2) ELSE 'extract' END WHERE cache_type IS NULL """ - await self.execute(update_sql) + await self.execute(optimized_update_sql) logger.info("Successfully migrated existing LLM cache data") else: logger.info( "cache_type column already exists in LIGHTRAG_LLM_CACHE table" ) + except Exception as e: - logger.warning( - f"Failed to add cache_type column to LIGHTRAG_LLM_CACHE: {e}" - ) + logger.warning(f"Failed to add columns to LIGHTRAG_LLM_CACHE: {e}") async def _migrate_timestamp_columns(self): """Migrate timestamp columns in tables to witimezone-free types, assuming original data is in UTC time""" @@ -187,6 +180,7 @@ class PostgreSQLDB: "LIGHTRAG_VDB_ENTITY": ["create_time", "update_time"], "LIGHTRAG_VDB_RELATION": ["create_time", "update_time"], "LIGHTRAG_DOC_CHUNKS": ["create_time", "update_time"], + "LIGHTRAG_DOC_STATUS": ["created_at", "updated_at"], } for table_name, columns in tables_to_migrate.items(): @@ -292,121 +286,105 @@ class PostgreSQLDB: # Do not re-raise, to allow the application to start async def _check_llm_cache_needs_migration(self): - """Check if LLM cache data needs migration by examining the first record""" + """Check if LLM cache data needs migration by examining any record with old format""" try: - # Only query the first record to determine format + # Optimized query: directly check for old format records without sorting check_sql = """ - SELECT id FROM LIGHTRAG_LLM_CACHE - ORDER BY create_time ASC + SELECT 1 FROM LIGHTRAG_LLM_CACHE + WHERE id NOT LIKE '%:%' LIMIT 1 """ result = await self.query(check_sql) - if result and result.get("id"): - # If id doesn't contain colon, it's old format - return ":" not in result["id"] + # If any old format record exists, migration is needed + return result is not None - return False # No data or already new format except Exception as e: logger.warning(f"Failed to check LLM cache migration status: {e}") return False async def _migrate_llm_cache_to_flattened_keys(self): - """Migrate LLM cache to flattened key format, recalculating hash values""" + """Optimized version: directly execute single UPDATE migration to migrate old format cache keys to flattened format""" try: - # Get all old format data - old_data_sql = """ - SELECT id, mode, original_prompt, return_value, chunk_id, - workspace, create_time, update_time - FROM LIGHTRAG_LLM_CACHE + # Check if migration is needed + check_sql = """ + SELECT COUNT(*) as count FROM LIGHTRAG_LLM_CACHE WHERE id NOT LIKE '%:%' """ + result = await self.query(check_sql) - old_records = await self.query(old_data_sql, multirows=True) - - if not old_records: + if not result or result["count"] == 0: logger.info("No old format LLM cache data found, skipping migration") return - logger.info( - f"Found {len(old_records)} old format cache records, starting migration..." + old_count = result["count"] + logger.info(f"Found {old_count} old format cache records") + + # Check potential primary key conflicts (optional but recommended) + conflict_check_sql = """ + WITH new_ids AS ( + SELECT + workspace, + mode, + id as old_id, + mode || ':' || + CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END || ':' || + md5(original_prompt) as new_id + FROM LIGHTRAG_LLM_CACHE + WHERE id NOT LIKE '%:%' ) + SELECT COUNT(*) as conflicts + FROM new_ids n1 + JOIN LIGHTRAG_LLM_CACHE existing + ON existing.workspace = n1.workspace + AND existing.mode = n1.mode + AND existing.id = n1.new_id + WHERE existing.id LIKE '%:%' -- Only check conflicts with existing new format records + """ - # Import hash calculation function - from ..utils import compute_args_hash + conflict_result = await self.query(conflict_check_sql) + if conflict_result and conflict_result["conflicts"] > 0: + logger.warning( + f"Found {conflict_result['conflicts']} potential ID conflicts with existing records" + ) + # Can choose to continue or abort, here we choose to continue and log warning - migrated_count = 0 + # Execute single UPDATE migration + logger.info("Starting optimized LLM cache migration...") + migration_sql = """ + UPDATE LIGHTRAG_LLM_CACHE + SET + id = mode || ':' || + CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END || ':' || + md5(original_prompt), + cache_type = CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END, + update_time = CURRENT_TIMESTAMP + WHERE id NOT LIKE '%:%' + """ - # Migrate data in batches - for record in old_records: - try: - # Recalculate hash using correct method - new_hash = compute_args_hash( - record["mode"], record["original_prompt"] - ) + # Execute migration + await self.execute(migration_sql) - # Determine cache_type based on mode - cache_type = "extract" if record["mode"] == "default" else "unknown" + # Verify migration results + verify_sql = """ + SELECT COUNT(*) as remaining_old FROM LIGHTRAG_LLM_CACHE + WHERE id NOT LIKE '%:%' + """ + verify_result = await self.query(verify_sql) + remaining = verify_result["remaining_old"] if verify_result else -1 - # Generate new flattened key - new_key = f"{record['mode']}:{cache_type}:{new_hash}" - - # Insert new format data with cache_type field - insert_sql = """ - INSERT INTO LIGHTRAG_LLM_CACHE - (workspace, id, mode, original_prompt, return_value, chunk_id, cache_type, create_time, update_time) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) - ON CONFLICT (workspace, mode, id) DO NOTHING - """ - - await self.execute( - insert_sql, - { - "workspace": record[ - "workspace" - ], # Use original record's workspace - "id": new_key, - "mode": record["mode"], - "original_prompt": record["original_prompt"], - "return_value": record["return_value"], - "chunk_id": record["chunk_id"], - "cache_type": cache_type, # Add cache_type field - "create_time": record["create_time"], - "update_time": record["update_time"], - }, - ) - - # Delete old data - delete_sql = """ - DELETE FROM LIGHTRAG_LLM_CACHE - WHERE workspace=$1 AND mode=$2 AND id=$3 - """ - await self.execute( - delete_sql, - { - "workspace": record[ - "workspace" - ], # Use original record's workspace - "mode": record["mode"], - "id": record["id"], # Old id - }, - ) - - migrated_count += 1 - - except Exception as e: - logger.warning( - f"Failed to migrate cache record {record['id']}: {e}" - ) - continue - - logger.info( - f"Successfully migrated {migrated_count} cache records to flattened format" - ) + if remaining == 0: + logger.info( + f"✅ Successfully migrated {old_count} LLM cache records to flattened format" + ) + else: + logger.warning( + f"⚠️ Migration completed but {remaining} old format records remain" + ) except Exception as e: - logger.error(f"LLM cache migration failed: {e}") - # Don't raise exception, allow system to continue startup + logger.error(f"Optimized LLM cache migration failed: {e}") + raise async def _migrate_doc_status_add_chunks_list(self): """Add chunks_list column to LIGHTRAG_DOC_STATUS table if it doesn't exist""" @@ -646,20 +624,11 @@ class PostgreSQLDB: logger.error(f"PostgreSQL, Failed to migrate timestamp columns: {e}") # Don't throw an exception, allow the initialization process to continue - # Migrate LLM cache table to add chunk_id field if needed + # Migrate LLM cache table to add chunk_id and cache_type columns if needed try: - await self._migrate_llm_cache_add_chunk_id() + await self._migrate_llm_cache_add_columns() except Exception as e: - logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}") - # Don't throw an exception, allow the initialization process to continue - - # Migrate LLM cache table to add cache_type field if needed - try: - await self._migrate_llm_cache_add_cache_type() - except Exception as e: - logger.error( - f"PostgreSQL, Failed to migrate LLM cache cache_type field: {e}" - ) + logger.error(f"PostgreSQL, Failed to migrate LLM cache columns: {e}") # Don't throw an exception, allow the initialization process to continue # Finally, attempt to migrate old doc chunks data if needed @@ -1494,9 +1463,10 @@ class PGDocStatusStorage(DocStatusStorage): """Convert datetime to ISO format string with timezone info""" if dt is None: return None - # If no timezone info, assume it's UTC time + # If no timezone info, assume it's UTC time (as stored in database) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) + # If datetime already has timezone info, keep it as is return dt.isoformat() async def initialize(self): @@ -1818,8 +1788,8 @@ class PGGraphStorage(BaseGraphStorage): """ Generate graph name based on workspace and namespace for data isolation. Rules: - - If workspace is empty: graph_name = namespace - - If workspace has value: graph_name = workspace_namespace + - If workspace is empty or "default": graph_name = namespace + - If workspace has other value: graph_name = workspace_namespace Args: None @@ -1828,15 +1798,15 @@ class PGGraphStorage(BaseGraphStorage): str: The graph name for the current workspace """ workspace = getattr(self, "workspace", None) - namespace = self.namespace or os.environ.get("AGE_GRAPH_NAME", "lightrag") + namespace = self.namespace - if workspace and workspace.strip(): + if workspace and workspace.strip() and workspace.strip().lower() != "default": # Ensure names comply with PostgreSQL identifier specifications safe_workspace = re.sub(r"[^a-zA-Z0-9_]", "_", workspace.strip()) safe_namespace = re.sub(r"[^a-zA-Z0-9_]", "_", namespace) return f"{safe_workspace}_{safe_namespace}" else: - # When workspace is empty, use namespace directly + # When workspace is empty or "default", use namespace directly return re.sub(r"[^a-zA-Z0-9_]", "_", namespace) @staticmethod diff --git a/lightrag/operate.py b/lightrag/operate.py index de29cf11..77eaa70c 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -1016,8 +1016,8 @@ async def _merge_edges_then_upsert( already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id) # Handle the case where get_edge returns None or missing fields if already_edge: - # Get weight with default 0.0 if missing - already_weights.append(already_edge.get("weight", 0.0)) + # Get weight with default 1.0 if missing + already_weights.append(already_edge.get("weight", 1.0)) # Get source_id with empty string default if missing or None if already_edge.get("source_id") is not None: @@ -1284,7 +1284,7 @@ async def merge_nodes_and_edges( "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", "source_id": edge_data["source_id"], "file_path": edge_data.get("file_path", "unknown_source"), - "weight": 0, + "weight": edge_data.get("weight", 1.0), } } await relationships_vdb.upsert(data_for_vdb) @@ -2494,9 +2494,9 @@ async def _find_most_related_edges_from_entities( if edge_props is not None: if "weight" not in edge_props: logger.warning( - f"Edge {pair} missing 'weight' attribute, using default value 0.0" + f"Edge {pair} missing 'weight' attribute, using default value 1.0" ) - edge_props["weight"] = 0.0 + edge_props["weight"] = 1.0 combined = { "src_tgt": pair, @@ -2549,9 +2549,9 @@ async def _get_edge_data( if edge_props is not None: if "weight" not in edge_props: logger.warning( - f"Edge {pair} missing 'weight' attribute, using default value 0.0" + f"Edge {pair} missing 'weight' attribute, using default value 1.0" ) - edge_props["weight"] = 0.0 + edge_props["weight"] = 1.0 # Use edge degree from the batch as rank. combined = {