LightRAG/tests/test_graph_storage.py

649 lines
28 KiB
Python
Raw Normal View History

2025-04-04 03:40:46 +08:00
#!/usr/bin/env python
"""
通用图存储测试程序
该程序根据.env中的LIGHTRAG_GRAPH_STORAGE配置选择使用的图存储类型
并对其进行基本操作和高级操作的测试
支持的图存储类型包括
- NetworkXStorage
- Neo4JStorage
- PGGraphStorage
"""
import asyncio
import os
import sys
import importlib
import numpy as np
from dotenv import load_dotenv
from ascii_colors import ASCIIColors
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from lightrag.types import KnowledgeGraph
from lightrag.kg import (
STORAGE_IMPLEMENTATIONS,
STORAGE_ENV_REQUIREMENTS,
STORAGES,
2025-04-04 03:41:05 +08:00
verify_storage_implementation,
2025-04-04 03:40:46 +08:00
)
from lightrag.kg.shared_storage import initialize_share_data
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 模拟的嵌入函数,返回随机向量
async def mock_embedding_func(texts):
return np.random.rand(len(texts), 10) # 返回10维随机向量
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
def check_env_file():
"""
检查.env文件是否存在如果不存在则发出警告
返回True表示应该继续执行False表示应该退出
"""
if not os.path.exists(".env"):
warning_msg = "警告: 当前目录中没有找到.env文件这可能会影响存储配置的加载。"
ASCIIColors.yellow(warning_msg)
# 检查是否在交互式终端中运行
if sys.stdin.isatty():
response = input("是否继续执行? (yes/no): ")
if response.lower() != "yes":
ASCIIColors.red("测试程序已取消")
return False
return True
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
async def initialize_graph_storage():
"""
根据环境变量初始化相应的图存储实例
返回初始化的存储实例
"""
# 从环境变量中获取图存储类型
graph_storage_type = os.getenv("LIGHTRAG_GRAPH_STORAGE", "NetworkXStorage")
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 验证存储类型是否有效
try:
verify_storage_implementation("GRAPH_STORAGE", graph_storage_type)
except ValueError as e:
ASCIIColors.red(f"错误: {str(e)}")
2025-04-04 03:41:05 +08:00
ASCIIColors.yellow(
f"支持的图存储类型: {', '.join(STORAGE_IMPLEMENTATIONS['GRAPH_STORAGE']['implementations'])}"
)
2025-04-04 03:40:46 +08:00
return None
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 检查所需的环境变量
required_env_vars = STORAGE_ENV_REQUIREMENTS.get(graph_storage_type, [])
missing_env_vars = [var for var in required_env_vars if not os.getenv(var)]
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
if missing_env_vars:
2025-04-04 03:41:05 +08:00
ASCIIColors.red(
f"错误: {graph_storage_type} 需要以下环境变量,但未设置: {', '.join(missing_env_vars)}"
)
2025-04-04 03:40:46 +08:00
return None
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 动态导入相应的模块
module_path = STORAGES.get(graph_storage_type)
if not module_path:
ASCIIColors.red(f"错误: 未找到 {graph_storage_type} 的模块路径")
return None
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
try:
module = importlib.import_module(module_path, package="lightrag")
storage_class = getattr(module, graph_storage_type)
except (ImportError, AttributeError) as e:
ASCIIColors.red(f"错误: 导入 {graph_storage_type} 失败: {str(e)}")
return None
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 初始化存储实例
global_config = {
"embedding_batch_num": 10, # 批处理大小
"vector_db_storage_cls_kwargs": {
"cosine_better_than_threshold": 0.5 # 余弦相似度阈值
},
2025-04-04 03:41:05 +08:00
"working_dir": os.environ.get("WORKING_DIR", "./rag_storage"), # 工作目录
2025-04-04 03:40:46 +08:00
}
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 如果使用 NetworkXStorage需要先初始化 shared_storage
if graph_storage_type == "NetworkXStorage":
initialize_share_data() # 使用单进程模式
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
try:
storage = storage_class(
namespace="test_graph",
global_config=global_config,
2025-04-04 03:41:05 +08:00
embedding_func=mock_embedding_func,
2025-04-04 03:40:46 +08:00
)
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 初始化连接
await storage.initialize()
return storage
except Exception as e:
ASCIIColors.red(f"错误: 初始化 {graph_storage_type} 失败: {str(e)}")
return None
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
async def test_graph_basic(storage):
"""
测试图数据库的基本操作:
1. 使用 upsert_node 插入两个节点
2. 使用 upsert_edge 插入一条连接两个节点的边
3. 使用 get_node 读取一个节点
4. 使用 get_edge 读取一条边
"""
try:
# 清理之前的测试数据
print("清理之前的测试数据...")
await storage.drop()
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 1. 插入第一个节点
node1_id = "人工智能"
node1_data = {
"entity_id": node1_id,
"description": "人工智能是计算机科学的一个分支,它企图了解智能的实质,并生产出一种新的能以人类智能相似的方式做出反应的智能机器。",
"keywords": "AI,机器学习,深度学习",
2025-04-04 03:41:05 +08:00
"entity_type": "技术领域",
2025-04-04 03:40:46 +08:00
}
print(f"插入节点1: {node1_id}")
await storage.upsert_node(node1_id, node1_data)
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 2. 插入第二个节点
node2_id = "机器学习"
node2_data = {
"entity_id": node2_id,
"description": "机器学习是人工智能的一个分支,它使用统计学方法让计算机系统在不被明确编程的情况下也能够学习。",
"keywords": "监督学习,无监督学习,强化学习",
2025-04-04 03:41:05 +08:00
"entity_type": "技术领域",
2025-04-04 03:40:46 +08:00
}
print(f"插入节点2: {node2_id}")
await storage.upsert_node(node2_id, node2_data)
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 3. 插入连接边
edge_data = {
"relationship": "包含",
"weight": 1.0,
2025-04-04 03:41:05 +08:00
"description": "人工智能领域包含机器学习这个子领域",
2025-04-04 03:40:46 +08:00
}
print(f"插入边: {node1_id} -> {node2_id}")
await storage.upsert_edge(node1_id, node2_id, edge_data)
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 4. 读取节点属性
print(f"读取节点属性: {node1_id}")
node1_props = await storage.get_node(node1_id)
if node1_props:
print(f"成功读取节点属性: {node1_id}")
print(f"节点描述: {node1_props.get('description', '无描述')}")
print(f"节点类型: {node1_props.get('entity_type', '无类型')}")
print(f"节点关键词: {node1_props.get('keywords', '无关键词')}")
# 验证返回的属性是否正确
2025-04-04 03:41:05 +08:00
assert (
node1_props.get("entity_id") == node1_id
), f"节点ID不匹配: 期望 {node1_id}, 实际 {node1_props.get('entity_id')}"
assert (
node1_props.get("description") == node1_data["description"]
), "节点描述不匹配"
assert (
node1_props.get("entity_type") == node1_data["entity_type"]
), "节点类型不匹配"
2025-04-04 03:40:46 +08:00
else:
print(f"读取节点属性失败: {node1_id}")
assert False, f"未能读取节点属性: {node1_id}"
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 5. 读取边属性
print(f"读取边属性: {node1_id} -> {node2_id}")
edge_props = await storage.get_edge(node1_id, node2_id)
if edge_props:
print(f"成功读取边属性: {node1_id} -> {node2_id}")
print(f"边关系: {edge_props.get('relationship', '无关系')}")
print(f"边描述: {edge_props.get('description', '无描述')}")
print(f"边权重: {edge_props.get('weight', '无权重')}")
# 验证返回的属性是否正确
2025-04-04 03:41:05 +08:00
assert (
edge_props.get("relationship") == edge_data["relationship"]
), "边关系不匹配"
assert (
edge_props.get("description") == edge_data["description"]
), "边描述不匹配"
assert edge_props.get("weight") == edge_data["weight"], "边权重不匹配"
2025-04-04 03:40:46 +08:00
else:
print(f"读取边属性失败: {node1_id} -> {node2_id}")
assert False, f"未能读取边属性: {node1_id} -> {node2_id}"
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
print("基本测试完成,数据已保留在数据库中")
return True
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
except Exception as e:
ASCIIColors.red(f"测试过程中发生错误: {str(e)}")
return False
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
async def test_graph_advanced(storage):
"""
测试图数据库的高级操作:
1. 使用 node_degree 获取节点的度数
2. 使用 edge_degree 获取边的度数
3. 使用 get_node_edges 获取节点的所有边
4. 使用 get_all_labels 获取所有标签
5. 使用 get_knowledge_graph 获取知识图谱
6. 使用 delete_node 删除节点
7. 使用 remove_nodes 批量删除节点
8. 使用 remove_edges 删除边
9. 使用 drop 清理数据
"""
try:
# 清理之前的测试数据
print("清理之前的测试数据...\n")
await storage.drop()
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 1. 插入测试数据
# 插入节点1: 人工智能
node1_id = "人工智能"
node1_data = {
"entity_id": node1_id,
"description": "人工智能是计算机科学的一个分支,它企图了解智能的实质,并生产出一种新的能以人类智能相似的方式做出反应的智能机器。",
"keywords": "AI,机器学习,深度学习",
2025-04-04 03:41:05 +08:00
"entity_type": "技术领域",
2025-04-04 03:40:46 +08:00
}
print(f"插入节点1: {node1_id}")
await storage.upsert_node(node1_id, node1_data)
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 插入节点2: 机器学习
node2_id = "机器学习"
node2_data = {
"entity_id": node2_id,
"description": "机器学习是人工智能的一个分支,它使用统计学方法让计算机系统在不被明确编程的情况下也能够学习。",
"keywords": "监督学习,无监督学习,强化学习",
2025-04-04 03:41:05 +08:00
"entity_type": "技术领域",
2025-04-04 03:40:46 +08:00
}
print(f"插入节点2: {node2_id}")
await storage.upsert_node(node2_id, node2_data)
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 插入节点3: 深度学习
node3_id = "深度学习"
node3_data = {
"entity_id": node3_id,
"description": "深度学习是机器学习的一个分支,它使用多层神经网络来模拟人脑的学习过程。",
"keywords": "神经网络,CNN,RNN",
2025-04-04 03:41:05 +08:00
"entity_type": "技术领域",
2025-04-04 03:40:46 +08:00
}
print(f"插入节点3: {node3_id}")
await storage.upsert_node(node3_id, node3_data)
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 插入边1: 人工智能 -> 机器学习
edge1_data = {
"relationship": "包含",
"weight": 1.0,
2025-04-04 03:41:05 +08:00
"description": "人工智能领域包含机器学习这个子领域",
2025-04-04 03:40:46 +08:00
}
print(f"插入边1: {node1_id} -> {node2_id}")
await storage.upsert_edge(node1_id, node2_id, edge1_data)
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 插入边2: 机器学习 -> 深度学习
edge2_data = {
"relationship": "包含",
"weight": 1.0,
2025-04-04 03:41:05 +08:00
"description": "机器学习领域包含深度学习这个子领域",
2025-04-04 03:40:46 +08:00
}
print(f"插入边2: {node2_id} -> {node3_id}")
await storage.upsert_edge(node2_id, node3_id, edge2_data)
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 2. 测试 node_degree - 获取节点的度数
print(f"== 测试 node_degree: {node1_id}")
node1_degree = await storage.node_degree(node1_id)
print(f"节点 {node1_id} 的度数: {node1_degree}")
assert node1_degree == 1, f"节点 {node1_id} 的度数应为1实际为 {node1_degree}"
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 3. 测试 edge_degree - 获取边的度数
print(f"== 测试 edge_degree: {node1_id} -> {node2_id}")
edge_degree = await storage.edge_degree(node1_id, node2_id)
print(f"{node1_id} -> {node2_id} 的度数: {edge_degree}")
2025-04-04 03:41:05 +08:00
assert (
edge_degree == 3
), f"{node1_id} -> {node2_id} 的度数应为2实际为 {edge_degree}"
2025-04-04 03:40:46 +08:00
# 4. 测试 get_node_edges - 获取节点的所有边
print(f"== 测试 get_node_edges: {node2_id}")
node2_edges = await storage.get_node_edges(node2_id)
print(f"节点 {node2_id} 的所有边: {node2_edges}")
2025-04-04 03:41:05 +08:00
assert (
len(node2_edges) == 2
), f"节点 {node2_id} 应有2条边实际有 {len(node2_edges)}"
2025-04-04 03:40:46 +08:00
# 5. 测试 get_all_labels - 获取所有标签
print("== 测试 get_all_labels")
all_labels = await storage.get_all_labels()
print(f"所有标签: {all_labels}")
assert len(all_labels) == 3, f"应有3个标签实际有 {len(all_labels)}"
assert node1_id in all_labels, f"{node1_id} 应在标签列表中"
assert node2_id in all_labels, f"{node2_id} 应在标签列表中"
assert node3_id in all_labels, f"{node3_id} 应在标签列表中"
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 6. 测试 get_knowledge_graph - 获取知识图谱
print("== 测试 get_knowledge_graph")
kg = await storage.get_knowledge_graph("*", max_depth=2, max_nodes=10)
print(f"知识图谱节点数: {len(kg.nodes)}")
print(f"知识图谱边数: {len(kg.edges)}")
assert isinstance(kg, KnowledgeGraph), "返回结果应为 KnowledgeGraph 类型"
assert len(kg.nodes) == 3, f"知识图谱应有3个节点实际有 {len(kg.nodes)}"
assert len(kg.edges) == 2, f"知识图谱应有2条边实际有 {len(kg.edges)}"
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 7. 测试 delete_node - 删除节点
print(f"== 测试 delete_node: {node3_id}")
await storage.delete_node(node3_id)
node3_props = await storage.get_node(node3_id)
print(f"删除后查询节点属性 {node3_id}: {node3_props}")
assert node3_props is None, f"节点 {node3_id} 应已被删除"
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 重新插入节点3用于后续测试
await storage.upsert_node(node3_id, node3_data)
await storage.upsert_edge(node2_id, node3_id, edge2_data)
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 8. 测试 remove_edges - 删除边
print(f"== 测试 remove_edges: {node2_id} -> {node3_id}")
await storage.remove_edges([(node2_id, node3_id)])
edge_props = await storage.get_edge(node2_id, node3_id)
print(f"删除后查询边属性 {node2_id} -> {node3_id}: {edge_props}")
assert edge_props is None, f"{node2_id} -> {node3_id} 应已被删除"
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 9. 测试 remove_nodes - 批量删除节点
print(f"== 测试 remove_nodes: [{node2_id}, {node3_id}]")
await storage.remove_nodes([node2_id, node3_id])
node2_props = await storage.get_node(node2_id)
node3_props = await storage.get_node(node3_id)
print(f"删除后查询节点属性 {node2_id}: {node2_props}")
print(f"删除后查询节点属性 {node3_id}: {node3_props}")
assert node2_props is None, f"节点 {node2_id} 应已被删除"
assert node3_props is None, f"节点 {node3_id} 应已被删除"
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 10. 测试 drop - 清理数据
print("== 测试 drop")
result = await storage.drop()
print(f"清理结果: {result}")
2025-04-04 03:41:05 +08:00
assert (
result["status"] == "success"
), f"清理应成功,实际状态为 {result['status']}"
2025-04-04 03:40:46 +08:00
# 验证清理结果
all_labels = await storage.get_all_labels()
print(f"清理后的所有标签: {all_labels}")
assert len(all_labels) == 0, f"清理后应没有标签,实际有 {len(all_labels)}"
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
print("\n高级测试完成")
return True
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
except Exception as e:
ASCIIColors.red(f"测试过程中发生错误: {str(e)}")
return False
2025-04-04 03:41:05 +08:00
async def test_graph_batch_operations(storage):
"""
测试图数据库的批量操作:
1. 使用 get_nodes_batch 批量获取多个节点的属性
2. 使用 node_degrees_batch 批量获取多个节点的度数
3. 使用 edge_degrees_batch 批量获取多个边的度数
4. 使用 get_edges_batch 批量获取多个边的属性
5. 使用 get_nodes_edges_batch 批量获取多个节点的所有边
"""
try:
# 清理之前的测试数据
print("清理之前的测试数据...\n")
await storage.drop()
# 1. 插入测试数据
# 插入节点1: 人工智能
node1_id = "人工智能"
node1_data = {
"entity_id": node1_id,
"description": "人工智能是计算机科学的一个分支,它企图了解智能的实质,并生产出一种新的能以人类智能相似的方式做出反应的智能机器。",
"keywords": "AI,机器学习,深度学习",
"entity_type": "技术领域",
}
print(f"插入节点1: {node1_id}")
await storage.upsert_node(node1_id, node1_data)
# 插入节点2: 机器学习
node2_id = "机器学习"
node2_data = {
"entity_id": node2_id,
"description": "机器学习是人工智能的一个分支,它使用统计学方法让计算机系统在不被明确编程的情况下也能够学习。",
"keywords": "监督学习,无监督学习,强化学习",
"entity_type": "技术领域",
}
print(f"插入节点2: {node2_id}")
await storage.upsert_node(node2_id, node2_data)
# 插入节点3: 深度学习
node3_id = "深度学习"
node3_data = {
"entity_id": node3_id,
"description": "深度学习是机器学习的一个分支,它使用多层神经网络来模拟人脑的学习过程。",
"keywords": "神经网络,CNN,RNN",
"entity_type": "技术领域",
}
print(f"插入节点3: {node3_id}")
await storage.upsert_node(node3_id, node3_data)
# 插入节点4: 自然语言处理
node4_id = "自然语言处理"
node4_data = {
"entity_id": node4_id,
"description": "自然语言处理是人工智能的一个分支,专注于使计算机理解和处理人类语言。",
"keywords": "NLP,文本分析,语言模型",
"entity_type": "技术领域",
}
print(f"插入节点4: {node4_id}")
await storage.upsert_node(node4_id, node4_data)
# 插入节点5: 计算机视觉
node5_id = "计算机视觉"
node5_data = {
"entity_id": node5_id,
"description": "计算机视觉是人工智能的一个分支,专注于使计算机能够从图像或视频中获取信息。",
"keywords": "CV,图像识别,目标检测",
"entity_type": "技术领域",
}
print(f"插入节点5: {node5_id}")
await storage.upsert_node(node5_id, node5_data)
# 插入边1: 人工智能 -> 机器学习
edge1_data = {
"relationship": "包含",
"weight": 1.0,
"description": "人工智能领域包含机器学习这个子领域",
}
print(f"插入边1: {node1_id} -> {node2_id}")
await storage.upsert_edge(node1_id, node2_id, edge1_data)
# 插入边2: 机器学习 -> 深度学习
edge2_data = {
"relationship": "包含",
"weight": 1.0,
"description": "机器学习领域包含深度学习这个子领域",
}
print(f"插入边2: {node2_id} -> {node3_id}")
await storage.upsert_edge(node2_id, node3_id, edge2_data)
# 插入边3: 人工智能 -> 自然语言处理
edge3_data = {
"relationship": "包含",
"weight": 1.0,
"description": "人工智能领域包含自然语言处理这个子领域",
}
print(f"插入边3: {node1_id} -> {node4_id}")
await storage.upsert_edge(node1_id, node4_id, edge3_data)
# 插入边4: 人工智能 -> 计算机视觉
edge4_data = {
"relationship": "包含",
"weight": 1.0,
"description": "人工智能领域包含计算机视觉这个子领域",
}
print(f"插入边4: {node1_id} -> {node5_id}")
await storage.upsert_edge(node1_id, node5_id, edge4_data)
# 插入边5: 深度学习 -> 自然语言处理
edge5_data = {
"relationship": "应用于",
"weight": 0.8,
"description": "深度学习技术应用于自然语言处理领域",
}
print(f"插入边5: {node3_id} -> {node4_id}")
await storage.upsert_edge(node3_id, node4_id, edge5_data)
# 插入边6: 深度学习 -> 计算机视觉
edge6_data = {
"relationship": "应用于",
"weight": 0.8,
"description": "深度学习技术应用于计算机视觉领域",
}
print(f"插入边6: {node3_id} -> {node5_id}")
await storage.upsert_edge(node3_id, node5_id, edge6_data)
# 2. 测试 get_nodes_batch - 批量获取多个节点的属性
print("== 测试 get_nodes_batch")
node_ids = [node1_id, node2_id, node3_id]
nodes_dict = await storage.get_nodes_batch(node_ids)
print(f"批量获取节点属性结果: {nodes_dict.keys()}")
assert len(nodes_dict) == 3, f"应返回3个节点实际返回 {len(nodes_dict)}"
assert node1_id in nodes_dict, f"{node1_id} 应在返回结果中"
assert node2_id in nodes_dict, f"{node2_id} 应在返回结果中"
assert node3_id in nodes_dict, f"{node3_id} 应在返回结果中"
assert nodes_dict[node1_id]["description"] == node1_data["description"], f"{node1_id} 描述不匹配"
assert nodes_dict[node2_id]["description"] == node2_data["description"], f"{node2_id} 描述不匹配"
assert nodes_dict[node3_id]["description"] == node3_data["description"], f"{node3_id} 描述不匹配"
# 3. 测试 node_degrees_batch - 批量获取多个节点的度数
print("== 测试 node_degrees_batch")
node_degrees = await storage.node_degrees_batch(node_ids)
print(f"批量获取节点度数结果: {node_degrees}")
assert len(node_degrees) == 3, f"应返回3个节点的度数实际返回 {len(node_degrees)}"
assert node1_id in node_degrees, f"{node1_id} 应在返回结果中"
assert node2_id in node_degrees, f"{node2_id} 应在返回结果中"
assert node3_id in node_degrees, f"{node3_id} 应在返回结果中"
assert node_degrees[node1_id] == 3, f"{node1_id} 度数应为3实际为 {node_degrees[node1_id]}"
assert node_degrees[node2_id] == 2, f"{node2_id} 度数应为2实际为 {node_degrees[node2_id]}"
assert node_degrees[node3_id] == 3, f"{node3_id} 度数应为3实际为 {node_degrees[node3_id]}"
# 4. 测试 edge_degrees_batch - 批量获取多个边的度数
print("== 测试 edge_degrees_batch")
edges = [(node1_id, node2_id), (node2_id, node3_id), (node3_id, node4_id)]
edge_degrees = await storage.edge_degrees_batch(edges)
print(f"批量获取边度数结果: {edge_degrees}")
assert len(edge_degrees) == 3, f"应返回3条边的度数实际返回 {len(edge_degrees)}"
assert (node1_id, node2_id) in edge_degrees, f"{node1_id} -> {node2_id} 应在返回结果中"
assert (node2_id, node3_id) in edge_degrees, f"{node2_id} -> {node3_id} 应在返回结果中"
assert (node3_id, node4_id) in edge_degrees, f"{node3_id} -> {node4_id} 应在返回结果中"
# 验证边的度数是否正确(源节点度数 + 目标节点度数)
assert edge_degrees[(node1_id, node2_id)] == 5, f"{node1_id} -> {node2_id} 度数应为5实际为 {edge_degrees[(node1_id, node2_id)]}"
assert edge_degrees[(node2_id, node3_id)] == 5, f"{node2_id} -> {node3_id} 度数应为5实际为 {edge_degrees[(node2_id, node3_id)]}"
assert edge_degrees[(node3_id, node4_id)] == 5, f"{node3_id} -> {node4_id} 度数应为5实际为 {edge_degrees[(node3_id, node4_id)]}"
# 5. 测试 get_edges_batch - 批量获取多个边的属性
print("== 测试 get_edges_batch")
# 将元组列表转换为Neo4j风格的字典列表
edge_dicts = [{"src": src, "tgt": tgt} for src, tgt in edges]
edges_dict = await storage.get_edges_batch(edge_dicts)
print(f"批量获取边属性结果: {edges_dict.keys()}")
assert len(edges_dict) == 3, f"应返回3条边的属性实际返回 {len(edges_dict)}"
assert (node1_id, node2_id) in edges_dict, f"{node1_id} -> {node2_id} 应在返回结果中"
assert (node2_id, node3_id) in edges_dict, f"{node2_id} -> {node3_id} 应在返回结果中"
assert (node3_id, node4_id) in edges_dict, f"{node3_id} -> {node4_id} 应在返回结果中"
assert edges_dict[(node1_id, node2_id)]["relationship"] == edge1_data["relationship"], f"{node1_id} -> {node2_id} 关系不匹配"
assert edges_dict[(node2_id, node3_id)]["relationship"] == edge2_data["relationship"], f"{node2_id} -> {node3_id} 关系不匹配"
assert edges_dict[(node3_id, node4_id)]["relationship"] == edge5_data["relationship"], f"{node3_id} -> {node4_id} 关系不匹配"
# 6. 测试 get_nodes_edges_batch - 批量获取多个节点的所有边
print("== 测试 get_nodes_edges_batch")
nodes_edges = await storage.get_nodes_edges_batch([node1_id, node3_id])
print(f"批量获取节点边结果: {nodes_edges.keys()}")
assert len(nodes_edges) == 2, f"应返回2个节点的边实际返回 {len(nodes_edges)}"
assert node1_id in nodes_edges, f"{node1_id} 应在返回结果中"
assert node3_id in nodes_edges, f"{node3_id} 应在返回结果中"
assert len(nodes_edges[node1_id]) == 3, f"{node1_id} 应有3条边实际有 {len(nodes_edges[node1_id])}"
assert len(nodes_edges[node3_id]) == 3, f"{node3_id} 应有3条边实际有 {len(nodes_edges[node3_id])}"
# 7. 清理数据
print("== 测试 drop")
result = await storage.drop()
print(f"清理结果: {result}")
assert result["status"] == "success", f"清理应成功,实际状态为 {result['status']}"
print("\n批量操作测试完成")
return True
except Exception as e:
ASCIIColors.red(f"测试过程中发生错误: {str(e)}")
return False
2025-04-04 03:40:46 +08:00
async def main():
"""主函数"""
# 显示程序标题
ASCIIColors.cyan("""
通用图存储测试程序
""")
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 检查.env文件
if not check_env_file():
return
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 加载环境变量
load_dotenv(dotenv_path=".env", override=False)
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
# 获取图存储类型
graph_storage_type = os.getenv("LIGHTRAG_GRAPH_STORAGE", "NetworkXStorage")
ASCIIColors.magenta(f"\n当前配置的图存储类型: {graph_storage_type}")
2025-04-04 03:41:05 +08:00
ASCIIColors.white(
f"支持的图存储类型: {', '.join(STORAGE_IMPLEMENTATIONS['GRAPH_STORAGE']['implementations'])}"
)
2025-04-04 03:40:46 +08:00
# 初始化存储实例
storage = await initialize_graph_storage()
if not storage:
ASCIIColors.red("初始化存储实例失败,测试程序退出")
return
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
try:
# 显示测试选项
ASCIIColors.yellow("\n请选择测试类型:")
ASCIIColors.white("1. 基本测试 (节点和边的插入、读取)")
ASCIIColors.white("2. 高级测试 (度数、标签、知识图谱、删除操作等)")
ASCIIColors.white("3. 批量操作测试 (批量获取节点、边属性和度数等)")
ASCIIColors.white("4. 全部测试")
2025-04-04 03:41:05 +08:00
choice = input("\n请输入选项 (1/2/3/4): ")
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
if choice == "1":
await test_graph_basic(storage)
elif choice == "2":
await test_graph_advanced(storage)
elif choice == "3":
await test_graph_batch_operations(storage)
elif choice == "4":
2025-04-04 03:40:46 +08:00
ASCIIColors.cyan("\n=== 开始基本测试 ===")
basic_result = await test_graph_basic(storage)
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
if basic_result:
ASCIIColors.cyan("\n=== 开始高级测试 ===")
advanced_result = await test_graph_advanced(storage)
if advanced_result:
ASCIIColors.cyan("\n=== 开始批量操作测试 ===")
await test_graph_batch_operations(storage)
2025-04-04 03:40:46 +08:00
else:
ASCIIColors.red("无效的选项")
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
finally:
# 关闭连接
if storage:
await storage.finalize()
ASCIIColors.green("\n存储连接已关闭")
2025-04-04 03:41:05 +08:00
2025-04-04 03:40:46 +08:00
if __name__ == "__main__":
asyncio.run(main())