mirror of
https://github.com/HKUDS/LightRAG.git
synced 2025-12-18 02:12:28 +00:00
Merge branch 'main' into milvus-for-nullable
This commit is contained in:
commit
fc60b902e5
@ -882,7 +882,7 @@ rag = LightRAG(
|
|||||||
|
|
||||||
* **对于Neo4j图数据库,通过label来实现数据的逻辑隔离**:Neo4JStorage
|
* **对于Neo4j图数据库,通过label来实现数据的逻辑隔离**:Neo4JStorage
|
||||||
|
|
||||||
为了保持对遗留数据的兼容,在未配置工作空间时PostgreSQL的默认工作空间为`default`,Neo4j的默认工作空间为`base`。对于所有的外部存储,系统都提供了专用的工作空间环境变量,用于覆盖公共的 `WORKSPACE`环境变量配置。这些适用于指定存储类型的工作空间环境变量为:`REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`。
|
为了保持对遗留数据的兼容,在未配置工作空间时PostgreSQL非图存储的工作空间为`default`,PostgreSQL AGE图存储的工作空间为空,Neo4j图存储的默认工作空间为`base`。对于所有的外部存储,系统都提供了专用的工作空间环境变量,用于覆盖公共的 `WORKSPACE`环境变量配置。这些适用于指定存储类型的工作空间环境变量为:`REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`。
|
||||||
|
|
||||||
## 编辑实体和关系
|
## 编辑实体和关系
|
||||||
|
|
||||||
|
|||||||
@ -928,7 +928,7 @@ The `workspace` parameter ensures data isolation between different LightRAG inst
|
|||||||
- **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`.
|
- **For relational databases, data isolation is achieved by adding a `workspace` field to the tables for logical data separation:** `PGKVStorage`, `PGVectorStorage`, `PGDocStatusStorage`.
|
||||||
- **For the Neo4j graph database, logical data isolation is achieved through labels:** `Neo4JStorage`
|
- **For the Neo4j graph database, logical data isolation is achieved through labels:** `Neo4JStorage`
|
||||||
|
|
||||||
To maintain compatibility with legacy data, the default workspace for PostgreSQL is `default` and for Neo4j is `base` when no workspace is configured. For all external storages, the system provides dedicated workspace environment variables to override the common `WORKSPACE` environment variable configuration. These storage-specific workspace environment variables are: `REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`.
|
To maintain compatibility with legacy data, the default workspace for PostgreSQL non-graph storage is `default` and, for PostgreSQL AGE graph storage is null, for Neo4j graph storage is `base` when no workspace is configured. For all external storages, the system provides dedicated workspace environment variables to override the common `WORKSPACE` environment variable configuration. These storage-specific workspace environment variables are: `REDIS_WORKSPACE`, `MILVUS_WORKSPACE`, `QDRANT_WORKSPACE`, `MONGODB_WORKSPACE`, `POSTGRES_WORKSPACE`, `NEO4J_WORKSPACE`.
|
||||||
|
|
||||||
## Edit Entities and Relations
|
## Edit Entities and Relations
|
||||||
|
|
||||||
|
|||||||
@ -1301,11 +1301,11 @@ def create_document_routes(
|
|||||||
"Starting to delete files in input directory"
|
"Starting to delete files in input directory"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Delete all files in input_dir
|
# Delete only files in the current directory, preserve files in subdirectories
|
||||||
deleted_files_count = 0
|
deleted_files_count = 0
|
||||||
file_errors_count = 0
|
file_errors_count = 0
|
||||||
|
|
||||||
for file_path in doc_manager.input_dir.glob("**/*"):
|
for file_path in doc_manager.input_dir.glob("*"):
|
||||||
if file_path.is_file():
|
if file_path.is_file():
|
||||||
try:
|
try:
|
||||||
file_path.unlink()
|
file_path.unlink()
|
||||||
|
|||||||
@ -414,7 +414,7 @@ class MemgraphStorage(BaseGraphStorage):
|
|||||||
if records:
|
if records:
|
||||||
edge_result = dict(records[0]["edge_properties"])
|
edge_result = dict(records[0]["edge_properties"])
|
||||||
for key, default_value in {
|
for key, default_value in {
|
||||||
"weight": 0.0,
|
"weight": 1.0,
|
||||||
"source_id": None,
|
"source_id": None,
|
||||||
"description": None,
|
"description": None,
|
||||||
"keywords": None,
|
"keywords": None,
|
||||||
|
|||||||
@ -535,7 +535,7 @@ class Neo4JStorage(BaseGraphStorage):
|
|||||||
# logger.debug(f"Result: {edge_result}")
|
# logger.debug(f"Result: {edge_result}")
|
||||||
# Ensure required keys exist with defaults
|
# Ensure required keys exist with defaults
|
||||||
required_keys = {
|
required_keys = {
|
||||||
"weight": 0.0,
|
"weight": 1.0,
|
||||||
"source_id": None,
|
"source_id": None,
|
||||||
"description": None,
|
"description": None,
|
||||||
"keywords": None,
|
"keywords": None,
|
||||||
@ -559,7 +559,7 @@ class Neo4JStorage(BaseGraphStorage):
|
|||||||
)
|
)
|
||||||
# Return default edge properties on error
|
# Return default edge properties on error
|
||||||
return {
|
return {
|
||||||
"weight": 0.0,
|
"weight": 1.0,
|
||||||
"source_id": None,
|
"source_id": None,
|
||||||
"description": None,
|
"description": None,
|
||||||
"keywords": None,
|
"keywords": None,
|
||||||
@ -610,7 +610,7 @@ class Neo4JStorage(BaseGraphStorage):
|
|||||||
edge_props = edges[0] # choose the first if multiple exist
|
edge_props = edges[0] # choose the first if multiple exist
|
||||||
# Ensure required keys exist with defaults
|
# Ensure required keys exist with defaults
|
||||||
for key, default in {
|
for key, default in {
|
||||||
"weight": 0.0,
|
"weight": 1.0,
|
||||||
"source_id": None,
|
"source_id": None,
|
||||||
"description": None,
|
"description": None,
|
||||||
"keywords": None,
|
"keywords": None,
|
||||||
@ -621,7 +621,7 @@ class Neo4JStorage(BaseGraphStorage):
|
|||||||
else:
|
else:
|
||||||
# No edge found – set default edge properties
|
# No edge found – set default edge properties
|
||||||
edges_dict[(src, tgt)] = {
|
edges_dict[(src, tgt)] = {
|
||||||
"weight": 0.0,
|
"weight": 1.0,
|
||||||
"source_id": None,
|
"source_id": None,
|
||||||
"description": None,
|
"description": None,
|
||||||
"keywords": None,
|
"keywords": None,
|
||||||
|
|||||||
@ -105,25 +105,32 @@ class PostgreSQLDB:
|
|||||||
):
|
):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
async def _migrate_llm_cache_add_chunk_id(self):
|
async def _migrate_llm_cache_add_columns(self):
|
||||||
"""Add chunk_id column to LIGHTRAG_LLM_CACHE table if it doesn't exist"""
|
"""Add chunk_id and cache_type columns to LIGHTRAG_LLM_CACHE table if they don't exist"""
|
||||||
try:
|
try:
|
||||||
# Check if chunk_id column exists
|
# Check if both columns exist
|
||||||
check_column_sql = """
|
check_columns_sql = """
|
||||||
SELECT column_name
|
SELECT column_name
|
||||||
FROM information_schema.columns
|
FROM information_schema.columns
|
||||||
WHERE table_name = 'lightrag_llm_cache'
|
WHERE table_name = 'lightrag_llm_cache'
|
||||||
AND column_name = 'chunk_id'
|
AND column_name IN ('chunk_id', 'cache_type')
|
||||||
"""
|
"""
|
||||||
|
|
||||||
column_info = await self.query(check_column_sql)
|
existing_columns = await self.query(check_columns_sql, multirows=True)
|
||||||
if not column_info:
|
existing_column_names = (
|
||||||
|
{col["column_name"] for col in existing_columns}
|
||||||
|
if existing_columns
|
||||||
|
else set()
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add missing chunk_id column
|
||||||
|
if "chunk_id" not in existing_column_names:
|
||||||
logger.info("Adding chunk_id column to LIGHTRAG_LLM_CACHE table")
|
logger.info("Adding chunk_id column to LIGHTRAG_LLM_CACHE table")
|
||||||
add_column_sql = """
|
add_chunk_id_sql = """
|
||||||
ALTER TABLE LIGHTRAG_LLM_CACHE
|
ALTER TABLE LIGHTRAG_LLM_CACHE
|
||||||
ADD COLUMN chunk_id VARCHAR(255) NULL
|
ADD COLUMN chunk_id VARCHAR(255) NULL
|
||||||
"""
|
"""
|
||||||
await self.execute(add_column_sql)
|
await self.execute(add_chunk_id_sql)
|
||||||
logger.info(
|
logger.info(
|
||||||
"Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table"
|
"Successfully added chunk_id column to LIGHTRAG_LLM_CACHE table"
|
||||||
)
|
)
|
||||||
@ -131,54 +138,40 @@ class PostgreSQLDB:
|
|||||||
logger.info(
|
logger.info(
|
||||||
"chunk_id column already exists in LIGHTRAG_LLM_CACHE table"
|
"chunk_id column already exists in LIGHTRAG_LLM_CACHE table"
|
||||||
)
|
)
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Failed to add chunk_id column to LIGHTRAG_LLM_CACHE: {e}")
|
|
||||||
|
|
||||||
async def _migrate_llm_cache_add_cache_type(self):
|
# Add missing cache_type column
|
||||||
"""Add cache_type column to LIGHTRAG_LLM_CACHE table if it doesn't exist"""
|
if "cache_type" not in existing_column_names:
|
||||||
try:
|
|
||||||
# Check if cache_type column exists
|
|
||||||
check_column_sql = """
|
|
||||||
SELECT column_name
|
|
||||||
FROM information_schema.columns
|
|
||||||
WHERE table_name = 'lightrag_llm_cache'
|
|
||||||
AND column_name = 'cache_type'
|
|
||||||
"""
|
|
||||||
|
|
||||||
column_info = await self.query(check_column_sql)
|
|
||||||
if not column_info:
|
|
||||||
logger.info("Adding cache_type column to LIGHTRAG_LLM_CACHE table")
|
logger.info("Adding cache_type column to LIGHTRAG_LLM_CACHE table")
|
||||||
add_column_sql = """
|
add_cache_type_sql = """
|
||||||
ALTER TABLE LIGHTRAG_LLM_CACHE
|
ALTER TABLE LIGHTRAG_LLM_CACHE
|
||||||
ADD COLUMN cache_type VARCHAR(32) NULL
|
ADD COLUMN cache_type VARCHAR(32) NULL
|
||||||
"""
|
"""
|
||||||
await self.execute(add_column_sql)
|
await self.execute(add_cache_type_sql)
|
||||||
logger.info(
|
logger.info(
|
||||||
"Successfully added cache_type column to LIGHTRAG_LLM_CACHE table"
|
"Successfully added cache_type column to LIGHTRAG_LLM_CACHE table"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Migrate existing data: extract cache_type from flattened keys
|
# Migrate existing data using optimized regex pattern
|
||||||
logger.info(
|
logger.info(
|
||||||
"Migrating existing LLM cache data to populate cache_type field"
|
"Migrating existing LLM cache data to populate cache_type field (optimized)"
|
||||||
)
|
)
|
||||||
update_sql = """
|
optimized_update_sql = """
|
||||||
UPDATE LIGHTRAG_LLM_CACHE
|
UPDATE LIGHTRAG_LLM_CACHE
|
||||||
SET cache_type = CASE
|
SET cache_type = CASE
|
||||||
WHEN id LIKE '%:%:%' THEN split_part(id, ':', 2)
|
WHEN id ~ '^[^:]+:[^:]+:' THEN split_part(id, ':', 2)
|
||||||
ELSE 'extract'
|
ELSE 'extract'
|
||||||
END
|
END
|
||||||
WHERE cache_type IS NULL
|
WHERE cache_type IS NULL
|
||||||
"""
|
"""
|
||||||
await self.execute(update_sql)
|
await self.execute(optimized_update_sql)
|
||||||
logger.info("Successfully migrated existing LLM cache data")
|
logger.info("Successfully migrated existing LLM cache data")
|
||||||
else:
|
else:
|
||||||
logger.info(
|
logger.info(
|
||||||
"cache_type column already exists in LIGHTRAG_LLM_CACHE table"
|
"cache_type column already exists in LIGHTRAG_LLM_CACHE table"
|
||||||
)
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(
|
logger.warning(f"Failed to add columns to LIGHTRAG_LLM_CACHE: {e}")
|
||||||
f"Failed to add cache_type column to LIGHTRAG_LLM_CACHE: {e}"
|
|
||||||
)
|
|
||||||
|
|
||||||
async def _migrate_timestamp_columns(self):
|
async def _migrate_timestamp_columns(self):
|
||||||
"""Migrate timestamp columns in tables to witimezone-free types, assuming original data is in UTC time"""
|
"""Migrate timestamp columns in tables to witimezone-free types, assuming original data is in UTC time"""
|
||||||
@ -187,6 +180,7 @@ class PostgreSQLDB:
|
|||||||
"LIGHTRAG_VDB_ENTITY": ["create_time", "update_time"],
|
"LIGHTRAG_VDB_ENTITY": ["create_time", "update_time"],
|
||||||
"LIGHTRAG_VDB_RELATION": ["create_time", "update_time"],
|
"LIGHTRAG_VDB_RELATION": ["create_time", "update_time"],
|
||||||
"LIGHTRAG_DOC_CHUNKS": ["create_time", "update_time"],
|
"LIGHTRAG_DOC_CHUNKS": ["create_time", "update_time"],
|
||||||
|
"LIGHTRAG_DOC_STATUS": ["created_at", "updated_at"],
|
||||||
}
|
}
|
||||||
|
|
||||||
for table_name, columns in tables_to_migrate.items():
|
for table_name, columns in tables_to_migrate.items():
|
||||||
@ -292,121 +286,105 @@ class PostgreSQLDB:
|
|||||||
# Do not re-raise, to allow the application to start
|
# Do not re-raise, to allow the application to start
|
||||||
|
|
||||||
async def _check_llm_cache_needs_migration(self):
|
async def _check_llm_cache_needs_migration(self):
|
||||||
"""Check if LLM cache data needs migration by examining the first record"""
|
"""Check if LLM cache data needs migration by examining any record with old format"""
|
||||||
try:
|
try:
|
||||||
# Only query the first record to determine format
|
# Optimized query: directly check for old format records without sorting
|
||||||
check_sql = """
|
check_sql = """
|
||||||
SELECT id FROM LIGHTRAG_LLM_CACHE
|
SELECT 1 FROM LIGHTRAG_LLM_CACHE
|
||||||
ORDER BY create_time ASC
|
WHERE id NOT LIKE '%:%'
|
||||||
LIMIT 1
|
LIMIT 1
|
||||||
"""
|
"""
|
||||||
result = await self.query(check_sql)
|
result = await self.query(check_sql)
|
||||||
|
|
||||||
if result and result.get("id"):
|
# If any old format record exists, migration is needed
|
||||||
# If id doesn't contain colon, it's old format
|
return result is not None
|
||||||
return ":" not in result["id"]
|
|
||||||
|
|
||||||
return False # No data or already new format
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to check LLM cache migration status: {e}")
|
logger.warning(f"Failed to check LLM cache migration status: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
async def _migrate_llm_cache_to_flattened_keys(self):
|
async def _migrate_llm_cache_to_flattened_keys(self):
|
||||||
"""Migrate LLM cache to flattened key format, recalculating hash values"""
|
"""Optimized version: directly execute single UPDATE migration to migrate old format cache keys to flattened format"""
|
||||||
try:
|
try:
|
||||||
# Get all old format data
|
# Check if migration is needed
|
||||||
old_data_sql = """
|
check_sql = """
|
||||||
SELECT id, mode, original_prompt, return_value, chunk_id,
|
SELECT COUNT(*) as count FROM LIGHTRAG_LLM_CACHE
|
||||||
workspace, create_time, update_time
|
|
||||||
FROM LIGHTRAG_LLM_CACHE
|
|
||||||
WHERE id NOT LIKE '%:%'
|
WHERE id NOT LIKE '%:%'
|
||||||
"""
|
"""
|
||||||
|
result = await self.query(check_sql)
|
||||||
|
|
||||||
old_records = await self.query(old_data_sql, multirows=True)
|
if not result or result["count"] == 0:
|
||||||
|
|
||||||
if not old_records:
|
|
||||||
logger.info("No old format LLM cache data found, skipping migration")
|
logger.info("No old format LLM cache data found, skipping migration")
|
||||||
return
|
return
|
||||||
|
|
||||||
logger.info(
|
old_count = result["count"]
|
||||||
f"Found {len(old_records)} old format cache records, starting migration..."
|
logger.info(f"Found {old_count} old format cache records")
|
||||||
|
|
||||||
|
# Check potential primary key conflicts (optional but recommended)
|
||||||
|
conflict_check_sql = """
|
||||||
|
WITH new_ids AS (
|
||||||
|
SELECT
|
||||||
|
workspace,
|
||||||
|
mode,
|
||||||
|
id as old_id,
|
||||||
|
mode || ':' ||
|
||||||
|
CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END || ':' ||
|
||||||
|
md5(original_prompt) as new_id
|
||||||
|
FROM LIGHTRAG_LLM_CACHE
|
||||||
|
WHERE id NOT LIKE '%:%'
|
||||||
)
|
)
|
||||||
|
SELECT COUNT(*) as conflicts
|
||||||
|
FROM new_ids n1
|
||||||
|
JOIN LIGHTRAG_LLM_CACHE existing
|
||||||
|
ON existing.workspace = n1.workspace
|
||||||
|
AND existing.mode = n1.mode
|
||||||
|
AND existing.id = n1.new_id
|
||||||
|
WHERE existing.id LIKE '%:%' -- Only check conflicts with existing new format records
|
||||||
|
"""
|
||||||
|
|
||||||
# Import hash calculation function
|
conflict_result = await self.query(conflict_check_sql)
|
||||||
from ..utils import compute_args_hash
|
if conflict_result and conflict_result["conflicts"] > 0:
|
||||||
|
logger.warning(
|
||||||
|
f"Found {conflict_result['conflicts']} potential ID conflicts with existing records"
|
||||||
|
)
|
||||||
|
# Can choose to continue or abort, here we choose to continue and log warning
|
||||||
|
|
||||||
migrated_count = 0
|
# Execute single UPDATE migration
|
||||||
|
logger.info("Starting optimized LLM cache migration...")
|
||||||
|
migration_sql = """
|
||||||
|
UPDATE LIGHTRAG_LLM_CACHE
|
||||||
|
SET
|
||||||
|
id = mode || ':' ||
|
||||||
|
CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END || ':' ||
|
||||||
|
md5(original_prompt),
|
||||||
|
cache_type = CASE WHEN mode = 'default' THEN 'extract' ELSE 'unknown' END,
|
||||||
|
update_time = CURRENT_TIMESTAMP
|
||||||
|
WHERE id NOT LIKE '%:%'
|
||||||
|
"""
|
||||||
|
|
||||||
# Migrate data in batches
|
# Execute migration
|
||||||
for record in old_records:
|
await self.execute(migration_sql)
|
||||||
try:
|
|
||||||
# Recalculate hash using correct method
|
|
||||||
new_hash = compute_args_hash(
|
|
||||||
record["mode"], record["original_prompt"]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Determine cache_type based on mode
|
# Verify migration results
|
||||||
cache_type = "extract" if record["mode"] == "default" else "unknown"
|
verify_sql = """
|
||||||
|
SELECT COUNT(*) as remaining_old FROM LIGHTRAG_LLM_CACHE
|
||||||
|
WHERE id NOT LIKE '%:%'
|
||||||
|
"""
|
||||||
|
verify_result = await self.query(verify_sql)
|
||||||
|
remaining = verify_result["remaining_old"] if verify_result else -1
|
||||||
|
|
||||||
# Generate new flattened key
|
if remaining == 0:
|
||||||
new_key = f"{record['mode']}:{cache_type}:{new_hash}"
|
logger.info(
|
||||||
|
f"✅ Successfully migrated {old_count} LLM cache records to flattened format"
|
||||||
# Insert new format data with cache_type field
|
)
|
||||||
insert_sql = """
|
else:
|
||||||
INSERT INTO LIGHTRAG_LLM_CACHE
|
logger.warning(
|
||||||
(workspace, id, mode, original_prompt, return_value, chunk_id, cache_type, create_time, update_time)
|
f"⚠️ Migration completed but {remaining} old format records remain"
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
)
|
||||||
ON CONFLICT (workspace, mode, id) DO NOTHING
|
|
||||||
"""
|
|
||||||
|
|
||||||
await self.execute(
|
|
||||||
insert_sql,
|
|
||||||
{
|
|
||||||
"workspace": record[
|
|
||||||
"workspace"
|
|
||||||
], # Use original record's workspace
|
|
||||||
"id": new_key,
|
|
||||||
"mode": record["mode"],
|
|
||||||
"original_prompt": record["original_prompt"],
|
|
||||||
"return_value": record["return_value"],
|
|
||||||
"chunk_id": record["chunk_id"],
|
|
||||||
"cache_type": cache_type, # Add cache_type field
|
|
||||||
"create_time": record["create_time"],
|
|
||||||
"update_time": record["update_time"],
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
# Delete old data
|
|
||||||
delete_sql = """
|
|
||||||
DELETE FROM LIGHTRAG_LLM_CACHE
|
|
||||||
WHERE workspace=$1 AND mode=$2 AND id=$3
|
|
||||||
"""
|
|
||||||
await self.execute(
|
|
||||||
delete_sql,
|
|
||||||
{
|
|
||||||
"workspace": record[
|
|
||||||
"workspace"
|
|
||||||
], # Use original record's workspace
|
|
||||||
"mode": record["mode"],
|
|
||||||
"id": record["id"], # Old id
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
migrated_count += 1
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(
|
|
||||||
f"Failed to migrate cache record {record['id']}: {e}"
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f"Successfully migrated {migrated_count} cache records to flattened format"
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"LLM cache migration failed: {e}")
|
logger.error(f"Optimized LLM cache migration failed: {e}")
|
||||||
# Don't raise exception, allow system to continue startup
|
raise
|
||||||
|
|
||||||
async def _migrate_doc_status_add_chunks_list(self):
|
async def _migrate_doc_status_add_chunks_list(self):
|
||||||
"""Add chunks_list column to LIGHTRAG_DOC_STATUS table if it doesn't exist"""
|
"""Add chunks_list column to LIGHTRAG_DOC_STATUS table if it doesn't exist"""
|
||||||
@ -646,20 +624,11 @@ class PostgreSQLDB:
|
|||||||
logger.error(f"PostgreSQL, Failed to migrate timestamp columns: {e}")
|
logger.error(f"PostgreSQL, Failed to migrate timestamp columns: {e}")
|
||||||
# Don't throw an exception, allow the initialization process to continue
|
# Don't throw an exception, allow the initialization process to continue
|
||||||
|
|
||||||
# Migrate LLM cache table to add chunk_id field if needed
|
# Migrate LLM cache table to add chunk_id and cache_type columns if needed
|
||||||
try:
|
try:
|
||||||
await self._migrate_llm_cache_add_chunk_id()
|
await self._migrate_llm_cache_add_columns()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"PostgreSQL, Failed to migrate LLM cache chunk_id field: {e}")
|
logger.error(f"PostgreSQL, Failed to migrate LLM cache columns: {e}")
|
||||||
# Don't throw an exception, allow the initialization process to continue
|
|
||||||
|
|
||||||
# Migrate LLM cache table to add cache_type field if needed
|
|
||||||
try:
|
|
||||||
await self._migrate_llm_cache_add_cache_type()
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(
|
|
||||||
f"PostgreSQL, Failed to migrate LLM cache cache_type field: {e}"
|
|
||||||
)
|
|
||||||
# Don't throw an exception, allow the initialization process to continue
|
# Don't throw an exception, allow the initialization process to continue
|
||||||
|
|
||||||
# Finally, attempt to migrate old doc chunks data if needed
|
# Finally, attempt to migrate old doc chunks data if needed
|
||||||
@ -1494,9 +1463,10 @@ class PGDocStatusStorage(DocStatusStorage):
|
|||||||
"""Convert datetime to ISO format string with timezone info"""
|
"""Convert datetime to ISO format string with timezone info"""
|
||||||
if dt is None:
|
if dt is None:
|
||||||
return None
|
return None
|
||||||
# If no timezone info, assume it's UTC time
|
# If no timezone info, assume it's UTC time (as stored in database)
|
||||||
if dt.tzinfo is None:
|
if dt.tzinfo is None:
|
||||||
dt = dt.replace(tzinfo=timezone.utc)
|
dt = dt.replace(tzinfo=timezone.utc)
|
||||||
|
# If datetime already has timezone info, keep it as is
|
||||||
return dt.isoformat()
|
return dt.isoformat()
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
@ -1818,8 +1788,8 @@ class PGGraphStorage(BaseGraphStorage):
|
|||||||
"""
|
"""
|
||||||
Generate graph name based on workspace and namespace for data isolation.
|
Generate graph name based on workspace and namespace for data isolation.
|
||||||
Rules:
|
Rules:
|
||||||
- If workspace is empty: graph_name = namespace
|
- If workspace is empty or "default": graph_name = namespace
|
||||||
- If workspace has value: graph_name = workspace_namespace
|
- If workspace has other value: graph_name = workspace_namespace
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
None
|
None
|
||||||
@ -1828,15 +1798,15 @@ class PGGraphStorage(BaseGraphStorage):
|
|||||||
str: The graph name for the current workspace
|
str: The graph name for the current workspace
|
||||||
"""
|
"""
|
||||||
workspace = getattr(self, "workspace", None)
|
workspace = getattr(self, "workspace", None)
|
||||||
namespace = self.namespace or os.environ.get("AGE_GRAPH_NAME", "lightrag")
|
namespace = self.namespace
|
||||||
|
|
||||||
if workspace and workspace.strip():
|
if workspace and workspace.strip() and workspace.strip().lower() != "default":
|
||||||
# Ensure names comply with PostgreSQL identifier specifications
|
# Ensure names comply with PostgreSQL identifier specifications
|
||||||
safe_workspace = re.sub(r"[^a-zA-Z0-9_]", "_", workspace.strip())
|
safe_workspace = re.sub(r"[^a-zA-Z0-9_]", "_", workspace.strip())
|
||||||
safe_namespace = re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
|
safe_namespace = re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
|
||||||
return f"{safe_workspace}_{safe_namespace}"
|
return f"{safe_workspace}_{safe_namespace}"
|
||||||
else:
|
else:
|
||||||
# When workspace is empty, use namespace directly
|
# When workspace is empty or "default", use namespace directly
|
||||||
return re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
|
return re.sub(r"[^a-zA-Z0-9_]", "_", namespace)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|||||||
@ -1016,8 +1016,8 @@ async def _merge_edges_then_upsert(
|
|||||||
already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id)
|
already_edge = await knowledge_graph_inst.get_edge(src_id, tgt_id)
|
||||||
# Handle the case where get_edge returns None or missing fields
|
# Handle the case where get_edge returns None or missing fields
|
||||||
if already_edge:
|
if already_edge:
|
||||||
# Get weight with default 0.0 if missing
|
# Get weight with default 1.0 if missing
|
||||||
already_weights.append(already_edge.get("weight", 0.0))
|
already_weights.append(already_edge.get("weight", 1.0))
|
||||||
|
|
||||||
# Get source_id with empty string default if missing or None
|
# Get source_id with empty string default if missing or None
|
||||||
if already_edge.get("source_id") is not None:
|
if already_edge.get("source_id") is not None:
|
||||||
@ -1284,7 +1284,7 @@ async def merge_nodes_and_edges(
|
|||||||
"content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}",
|
"content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}",
|
||||||
"source_id": edge_data["source_id"],
|
"source_id": edge_data["source_id"],
|
||||||
"file_path": edge_data.get("file_path", "unknown_source"),
|
"file_path": edge_data.get("file_path", "unknown_source"),
|
||||||
"weight": 0,
|
"weight": edge_data.get("weight", 1.0),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
await relationships_vdb.upsert(data_for_vdb)
|
await relationships_vdb.upsert(data_for_vdb)
|
||||||
@ -2494,9 +2494,9 @@ async def _find_most_related_edges_from_entities(
|
|||||||
if edge_props is not None:
|
if edge_props is not None:
|
||||||
if "weight" not in edge_props:
|
if "weight" not in edge_props:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Edge {pair} missing 'weight' attribute, using default value 0.0"
|
f"Edge {pair} missing 'weight' attribute, using default value 1.0"
|
||||||
)
|
)
|
||||||
edge_props["weight"] = 0.0
|
edge_props["weight"] = 1.0
|
||||||
|
|
||||||
combined = {
|
combined = {
|
||||||
"src_tgt": pair,
|
"src_tgt": pair,
|
||||||
@ -2549,9 +2549,9 @@ async def _get_edge_data(
|
|||||||
if edge_props is not None:
|
if edge_props is not None:
|
||||||
if "weight" not in edge_props:
|
if "weight" not in edge_props:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Edge {pair} missing 'weight' attribute, using default value 0.0"
|
f"Edge {pair} missing 'weight' attribute, using default value 1.0"
|
||||||
)
|
)
|
||||||
edge_props["weight"] = 0.0
|
edge_props["weight"] = 1.0
|
||||||
|
|
||||||
# Use edge degree from the batch as rank.
|
# Use edge degree from the batch as rank.
|
||||||
combined = {
|
combined = {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user