Update operate.py

This commit is contained in:
zrguo 2025-06-09 19:40:29 +08:00
parent ead82a8dbd
commit 9a71a10bc0

View File

@ -451,63 +451,19 @@ async def _rebuild_single_entity(
if not current_entity: if not current_entity:
return return
# Collect all entity data from relevant chunks # Helper function to update entity in both graph and vector storage
all_entity_data = [] async def _update_entity_storage(
for chunk_id in chunk_ids: final_description: str,
if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]: entity_type: str,
all_entity_data.extend(chunk_entities[chunk_id][entity_name]) file_paths: set[str]
):
if not all_entity_data:
logger.warning(f"No cached entity data found for {entity_name}")
return
# Merge descriptions and get the most common entity type
descriptions = []
entity_types = []
file_paths = set()
for entity_data in all_entity_data:
if entity_data.get("description"):
descriptions.append(entity_data["description"])
if entity_data.get("entity_type"):
entity_types.append(entity_data["entity_type"])
if entity_data.get("file_path"):
file_paths.add(entity_data["file_path"])
# Combine all descriptions
combined_description = (
GRAPH_FIELD_SEP.join(descriptions)
if descriptions
else current_entity.get("description", "")
)
# Get most common entity type
entity_type = (
max(set(entity_types), key=entity_types.count)
if entity_types
else current_entity.get("entity_type", "UNKNOWN")
)
# Use summary if description is too long
if len(combined_description) > global_config["summary_to_max_tokens"]:
final_description = await _handle_entity_relation_summary(
entity_name,
combined_description,
global_config,
llm_response_cache=llm_response_cache,
)
else:
final_description = combined_description
# Update entity in graph storage # Update entity in graph storage
updated_entity_data = { updated_entity_data = {
**current_entity, **current_entity,
"description": final_description, "description": final_description,
"entity_type": entity_type, "entity_type": entity_type,
"source_id": GRAPH_FIELD_SEP.join(chunk_ids), "source_id": GRAPH_FIELD_SEP.join(chunk_ids),
"file_path": GRAPH_FIELD_SEP.join(file_paths) "file_path": GRAPH_FIELD_SEP.join(file_paths) if file_paths else current_entity.get("file_path", "unknown_source"),
if file_paths
else current_entity.get("file_path", "unknown_source"),
} }
await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data)
@ -535,6 +491,90 @@ async def _rebuild_single_entity(
} }
) )
# Helper function to generate final description with optional LLM summary
async def _generate_final_description(combined_description: str) -> str:
if len(combined_description) > global_config["summary_to_max_tokens"]:
return await _handle_entity_relation_summary(
entity_name,
combined_description,
global_config,
llm_response_cache=llm_response_cache,
)
else:
return combined_description
# Collect all entity data from relevant chunks
all_entity_data = []
for chunk_id in chunk_ids:
if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]:
all_entity_data.extend(chunk_entities[chunk_id][entity_name])
if not all_entity_data:
logger.warning(f"No cached entity data found for {entity_name}, trying to rebuild from relationships")
# Get all edges connected to this entity
edges = await knowledge_graph_inst.get_node_edges(entity_name)
if not edges:
logger.warning(f"No relationships found for entity {entity_name}")
return
# Collect relationship data to extract entity information
relationship_descriptions = []
file_paths = set()
# Get edge data for all connected relationships
for src_id, tgt_id in edges:
edge_data = await knowledge_graph_inst.get_edge(src_id, tgt_id)
if edge_data:
if edge_data.get("description"):
relationship_descriptions.append(edge_data["description"])
if edge_data.get("file_path"):
edge_file_paths = edge_data["file_path"].split(GRAPH_FIELD_SEP)
file_paths.update(edge_file_paths)
# Generate description from relationships or fallback to current
if relationship_descriptions:
combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions)
final_description = await _generate_final_description(combined_description)
else:
final_description = current_entity.get("description", "")
entity_type = current_entity.get("entity_type", "UNKNOWN")
await _update_entity_storage(final_description, entity_type, file_paths)
return
# Process cached entity data
descriptions = []
entity_types = []
file_paths = set()
for entity_data in all_entity_data:
if entity_data.get("description"):
descriptions.append(entity_data["description"])
if entity_data.get("entity_type"):
entity_types.append(entity_data["entity_type"])
if entity_data.get("file_path"):
file_paths.add(entity_data["file_path"])
# Combine all descriptions
combined_description = (
GRAPH_FIELD_SEP.join(descriptions)
if descriptions
else current_entity.get("description", "")
)
# Get most common entity type
entity_type = (
max(set(entity_types), key=entity_types.count)
if entity_types
else current_entity.get("entity_type", "UNKNOWN")
)
# Generate final description and update storage
final_description = await _generate_final_description(combined_description)
await _update_entity_storage(final_description, entity_type, file_paths)
async def _rebuild_single_relationship( async def _rebuild_single_relationship(
knowledge_graph_inst: BaseGraphStorage, knowledge_graph_inst: BaseGraphStorage,
@ -798,7 +838,8 @@ async def _merge_edges_then_upsert(
) )
# Process edges_data with None checks # Process edges_data with None checks
weight = sum([dp["weight"] for dp in edges_data] + already_weights) all_weights = [dp["weight"] for dp in edges_data] + already_weights
weight = sum(all_weights) / len(all_weights)
description = GRAPH_FIELD_SEP.join( description = GRAPH_FIELD_SEP.join(
sorted( sorted(
set( set(