graphrag/tests/verbs/test_create_base_extracted_entities.py
Nathan Evans 61b3d6d56a
Migrate helper verbs (#1248)
* Remove genid

* Move snapshot_rows

* Move snapshot

* Delete spread_json

* Delete unzip

* Delete zip

* Move unpack_graph

* Move compute_edge_combined_degree

* Delete create_graph

* Delete concat

* Delete text replace

* Delete text_translate

* Move text_split

* Inline aggregate override

* Move cluster_graph

* Move merge_graphs

* Semver

* Move text_chunk

* Move layout_graph and fix some __init__s

* Move extract_covariates

* Rename text_split -> split_text

* Move extract_entities

* Move summarize_descriptions

* Rename text_chunk -> chunk_text

* Move community report creation

* Remove verb-level packing operators

* Streamline some naming

* Streamline param name/order

* Move mock LLM data to tests

* Fixed missed rename

* Update some strategy refs

* Rename run_gi

* Inject mock responses into integ test config
2024-10-09 13:46:44 -07:00

119 lines
3.3 KiB
Python

# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
import networkx as nx
import pytest
from datashaper.errors import VerbParallelizationError
from graphrag.config.enums import LLMType
from graphrag.index.storage.memory_pipeline_storage import MemoryPipelineStorage
from graphrag.index.workflows.v1.create_base_extracted_entities import (
build_steps,
workflow_name,
)
from .util import (
get_config_for_workflow,
get_workflow_output,
load_expected,
load_input_tables,
)
MOCK_LLM_RESPONSES = [
"""
("entity"<|>COMPANY_A<|>COMPANY<|>Company_A is a test company)
##
("entity"<|>COMPANY_B<|>COMPANY<|>Company_B owns Company_A and also shares an address with Company_A)
##
("entity"<|>PERSON_C<|>PERSON<|>Person_C is director of Company_A)
##
("relationship"<|>COMPANY_A<|>COMPANY_B<|>Company_A and Company_B are related because Company_A is 100% owned by Company_B and the two companies also share the same address)<|>2)
##
("relationship"<|>COMPANY_A<|>PERSON_C<|>Company_A and Person_C are related because Person_C is director of Company_A<|>1))
""".strip()
]
MOCK_LLM_CONFIG = {
"type": LLMType.StaticResponse,
"responses": MOCK_LLM_RESPONSES,
}
async def test_create_base_extracted_entities():
input_tables = load_input_tables(["workflow:create_base_text_units"])
expected = load_expected(workflow_name)
storage = MemoryPipelineStorage()
config = get_config_for_workflow(workflow_name)
config["entity_extract"]["strategy"]["llm"] = MOCK_LLM_CONFIG
steps = build_steps(config)
actual = await get_workflow_output(
input_tables,
{
"steps": steps,
},
storage=storage,
)
# let's parse a sample of the raw graphml
actual_graphml_0 = actual["entity_graph"][:1][0]
actual_graph_0 = nx.parse_graphml(actual_graphml_0)
assert actual_graph_0.number_of_nodes() == 3
assert actual_graph_0.number_of_edges() == 2
assert actual.columns == expected.columns
assert len(storage.keys()) == 0, "Storage should be empty"
async def test_create_base_extracted_entities_with_snapshots():
input_tables = load_input_tables(["workflow:create_base_text_units"])
expected = load_expected(workflow_name)
storage = MemoryPipelineStorage()
config = get_config_for_workflow(workflow_name)
config["entity_extract"]["strategy"]["llm"] = MOCK_LLM_CONFIG
config["raw_entity_snapshot"] = True
config["graphml_snapshot"] = True
steps = build_steps(config)
actual = await get_workflow_output(
input_tables,
{
"steps": steps,
},
storage=storage,
)
print(storage.keys())
assert actual.columns == expected.columns
assert storage.keys() == ["raw_extracted_entities.json", "merged_graph.graphml"]
async def test_create_base_extracted_entities_missing_llm_throws():
input_tables = load_input_tables(["workflow:create_base_text_units"])
config = get_config_for_workflow(workflow_name)
del config["entity_extract"]["strategy"]["llm"]
steps = build_steps(config)
with pytest.raises(VerbParallelizationError):
await get_workflow_output(
input_tables,
{
"steps": steps,
},
)