Incremental indexing/update old outputs (#1155)

* Create entypoint for cli and api (#1067)

* Add cli and api entrypoints for update index

* Semver

* Update docs

* Run tests on feature branch main

* Better /main handling in tests

* Incremental indexing/file delta (#1123)

* Calculate new inputs and deleted inputs on update

* Semver

* Clear ruff checks

* Fix pyright

* Fix PyRight

* Ruff again

* Update Final Entities merging in new and existing entities from delta

* Update formatting

* Pyright

* Ruff

* Fix for pyright

* Yet Another Pyright test

* Pyright

* Format
This commit is contained in:
Alonso Guevara 2024-09-20 14:21:50 -06:00 committed by GitHub
parent 1dbcc42b81
commit fb65989c05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 246 additions and 18 deletions

View File

@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Calculate new inputs and deleted inputs on update"
}

View File

@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Merge existing and new entities, updating values accordingly"
}

View File

@ -46,6 +46,7 @@ from graphrag.index.storage import PipelineStorage
from graphrag.index.typing import PipelineRunResult
# Register all verbs
from graphrag.index.update.dataframes import get_delta_docs, update_dataframe_outputs
from graphrag.index.verbs import * # noqa
from graphrag.index.workflows import (
VerbDefinitions,
@ -111,9 +112,6 @@ async def run_pipeline_with_config(
else await _create_input(config.input, progress_reporter, root_dir)
)
if is_update_run:
# TODO: Filter dataset to only include new data (this should be done in the input module)
pass
post_process_steps = input_post_process_steps or _create_postprocess_steps(
config.input
)
@ -123,21 +121,47 @@ async def run_pipeline_with_config(
msg = "No dataset provided!"
raise ValueError(msg)
async for table in run_pipeline(
workflows=workflows,
dataset=dataset,
storage=storage,
cache=cache,
callbacks=callbacks,
input_post_process_steps=post_process_steps,
memory_profile=memory_profile,
additional_verbs=additional_verbs,
additional_workflows=additional_workflows,
progress_reporter=progress_reporter,
emit=emit,
is_resume_run=is_resume_run,
):
yield table
if is_update_run:
delta_dataset = await get_delta_docs(dataset, storage)
delta_storage = storage.child("delta")
# Run the pipeline on the new documents
tables_dict = {}
async for table in run_pipeline(
workflows=workflows,
dataset=delta_dataset.new_inputs,
storage=delta_storage,
cache=cache,
callbacks=callbacks,
input_post_process_steps=post_process_steps,
memory_profile=memory_profile,
additional_verbs=additional_verbs,
additional_workflows=additional_workflows,
progress_reporter=progress_reporter,
emit=emit,
is_resume_run=False,
):
tables_dict[table.workflow] = table.result
await update_dataframe_outputs(tables_dict, storage)
else:
async for table in run_pipeline(
workflows=workflows,
dataset=dataset,
storage=storage,
cache=cache,
callbacks=callbacks,
input_post_process_steps=post_process_steps,
memory_profile=memory_profile,
additional_verbs=additional_verbs,
additional_workflows=additional_workflows,
progress_reporter=progress_reporter,
emit=emit,
is_resume_run=is_resume_run,
):
yield table
async def run_pipeline(

View File

@ -0,0 +1,4 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
"""Incremental Indexing main module definition."""

View File

@ -0,0 +1,192 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
"""Dataframe operations and utils for Incremental Indexing."""
import os
from dataclasses import dataclass
import numpy as np
import pandas as pd
from graphrag.index.storage.typing import PipelineStorage
from graphrag.utils.storage import _load_table_from_storage
mergeable_outputs = [
"create_final_documents",
"create_final_entities",
"create_final_relationships",
]
@dataclass
class InputDelta:
"""Dataclass to hold the input delta.
Attributes
----------
new_inputs : pd.DataFrame
The new inputs.
deleted_inputs : pd.DataFrame
The deleted inputs.
"""
new_inputs: pd.DataFrame
deleted_inputs: pd.DataFrame
async def get_delta_docs(
input_dataset: pd.DataFrame, storage: PipelineStorage
) -> InputDelta:
"""Get the delta between the input dataset and the final documents.
Parameters
----------
input_dataset : pd.DataFrame
The input dataset.
storage : PipelineStorage
The Pipeline storage.
Returns
-------
InputDelta
The input delta. With new inputs and deleted inputs.
"""
final_docs = await _load_table_from_storage(
"create_final_documents.parquet", storage
)
# Select distinct title from final docs and from dataset
previous_docs: list[str] = final_docs["title"].unique().tolist()
dataset_docs: list[str] = input_dataset["title"].unique().tolist()
# Get the new documents (using loc to ensure DataFrame)
new_docs = input_dataset.loc[~input_dataset["title"].isin(previous_docs)]
# Get the deleted documents (again using loc to ensure DataFrame)
deleted_docs = final_docs.loc[~final_docs["title"].isin(dataset_docs)]
return InputDelta(new_docs, deleted_docs)
async def update_dataframe_outputs(
dataframe_dict: dict[str, pd.DataFrame],
storage: PipelineStorage,
) -> None:
"""Update the mergeable outputs.
Parameters
----------
dataframe_dict : dict[str, pd.DataFrame]
The dictionary of dataframes.
storage : PipelineStorage
The storage used to store the dataframes.
"""
await _concat_dataframes("create_base_text_units", dataframe_dict, storage)
await _concat_dataframes("create_final_documents", dataframe_dict, storage)
old_entities = await _load_table_from_storage(
"create_final_entities.parquet", storage
)
delta_entities = dataframe_dict["create_final_entities"]
merged_entities_df, _ = _group_and_resolve_entities(old_entities, delta_entities)
# Save the updated entities back to storage
# TODO: Using _new in the mean time, to compare outputs without overwriting the original
await storage.set(
"create_final_entities_new.parquet", merged_entities_df.to_parquet()
)
async def _concat_dataframes(name, dataframe_dict, storage):
"""Concatenate the dataframes.
Parameters
----------
name : str
The name of the dataframe to concatenate.
dataframe_dict : dict[str, pd.DataFrame]
The dictionary of dataframes from a pipeline run.
storage : PipelineStorage
The storage used to store the dataframes.
"""
old_df = await _load_table_from_storage(f"{name}.parquet", storage)
delta_df = dataframe_dict[name]
# Merge the final documents
final_df = pd.concat([old_df, delta_df], copy=False)
# TODO: Using _new in the mean time, to compare outputs without overwriting the original
await storage.set(f"{name}_new.parquet", final_df.to_parquet())
def _group_and_resolve_entities(
df_a: pd.DataFrame, df_b: pd.DataFrame
) -> tuple[pd.DataFrame, dict]:
"""Group and resolve entities.
Parameters
----------
df_a : pd.DataFrame
The first dataframe.
df_b : pd.DataFrame
The second dataframe.
Returns
-------
pd.DataFrame
The resolved dataframe.
dict
The id mapping for existing entities. In the form of {df_b.id: df_a.id}.
"""
# If a name exists in A and B, make a dictionary for {B.id : A.id}
merged = df_b[["id", "name"]].merge(
df_a[["id", "name"]],
on="name",
suffixes=("_B", "_A"),
copy=False,
)
id_mapping = dict(zip(merged["id_B"], merged["id_A"], strict=True))
# Concat A and B
combined = pd.concat([df_a, df_b], copy=False)
# Group by name and resolve conflicts
aggregated = (
combined.groupby("name")
.agg({
"id": "first",
"type": "first",
"human_readable_id": "first",
"graph_embedding": "first",
"description": lambda x: os.linesep.join(x.astype(str)), # Ensure str
# Concatenate nd.array into a single list
"text_unit_ids": lambda x: ",".join(str(i) for j in x.tolist() for i in j),
# Keep only descriptions where the original value wasn't modified
"description_embedding": lambda x: x.iloc[0] if len(x) == 1 else np.nan,
})
.reset_index()
)
# Force the result into a DataFrame
resolved: pd.DataFrame = pd.DataFrame(aggregated)
# Recreate humand readable id with an autonumeric
resolved["human_readable_id"] = range(len(resolved))
# Modify column order to keep consistency
resolved = resolved.loc[
:,
[
"id",
"name",
"description",
"type",
"human_readable_id",
"graph_embedding",
"text_unit_ids",
"description_embedding",
],
]
return resolved, id_mapping