From fb65989c05edf77174875463a07c26ca51d3db06 Mon Sep 17 00:00:00 2001 From: Alonso Guevara Date: Fri, 20 Sep 2024 14:21:50 -0600 Subject: [PATCH] Incremental indexing/update old outputs (#1155) * Create entypoint for cli and api (#1067) * Add cli and api entrypoints for update index * Semver * Update docs * Run tests on feature branch main * Better /main handling in tests * Incremental indexing/file delta (#1123) * Calculate new inputs and deleted inputs on update * Semver * Clear ruff checks * Fix pyright * Fix PyRight * Ruff again * Update Final Entities merging in new and existing entities from delta * Update formatting * Pyright * Ruff * Fix for pyright * Yet Another Pyright test * Pyright * Format --- .../patch-20240911201935470388.json | 4 + .../patch-20240918221118566693.json | 4 + graphrag/index/run/run.py | 60 ++++-- graphrag/index/update/__init__.py | 4 + graphrag/index/update/dataframes.py | 192 ++++++++++++++++++ 5 files changed, 246 insertions(+), 18 deletions(-) create mode 100644 .semversioner/next-release/patch-20240911201935470388.json create mode 100644 .semversioner/next-release/patch-20240918221118566693.json create mode 100644 graphrag/index/update/__init__.py create mode 100644 graphrag/index/update/dataframes.py diff --git a/.semversioner/next-release/patch-20240911201935470388.json b/.semversioner/next-release/patch-20240911201935470388.json new file mode 100644 index 00000000..36b24ff4 --- /dev/null +++ b/.semversioner/next-release/patch-20240911201935470388.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "Calculate new inputs and deleted inputs on update" +} diff --git a/.semversioner/next-release/patch-20240918221118566693.json b/.semversioner/next-release/patch-20240918221118566693.json new file mode 100644 index 00000000..5e62e9bd --- /dev/null +++ b/.semversioner/next-release/patch-20240918221118566693.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "Merge existing and new entities, updating values accordingly" +} diff --git a/graphrag/index/run/run.py b/graphrag/index/run/run.py index cad63b20..0ef973ed 100644 --- a/graphrag/index/run/run.py +++ b/graphrag/index/run/run.py @@ -46,6 +46,7 @@ from graphrag.index.storage import PipelineStorage from graphrag.index.typing import PipelineRunResult # Register all verbs +from graphrag.index.update.dataframes import get_delta_docs, update_dataframe_outputs from graphrag.index.verbs import * # noqa from graphrag.index.workflows import ( VerbDefinitions, @@ -111,9 +112,6 @@ async def run_pipeline_with_config( else await _create_input(config.input, progress_reporter, root_dir) ) - if is_update_run: - # TODO: Filter dataset to only include new data (this should be done in the input module) - pass post_process_steps = input_post_process_steps or _create_postprocess_steps( config.input ) @@ -123,21 +121,47 @@ async def run_pipeline_with_config( msg = "No dataset provided!" raise ValueError(msg) - async for table in run_pipeline( - workflows=workflows, - dataset=dataset, - storage=storage, - cache=cache, - callbacks=callbacks, - input_post_process_steps=post_process_steps, - memory_profile=memory_profile, - additional_verbs=additional_verbs, - additional_workflows=additional_workflows, - progress_reporter=progress_reporter, - emit=emit, - is_resume_run=is_resume_run, - ): - yield table + if is_update_run: + delta_dataset = await get_delta_docs(dataset, storage) + + delta_storage = storage.child("delta") + + # Run the pipeline on the new documents + tables_dict = {} + async for table in run_pipeline( + workflows=workflows, + dataset=delta_dataset.new_inputs, + storage=delta_storage, + cache=cache, + callbacks=callbacks, + input_post_process_steps=post_process_steps, + memory_profile=memory_profile, + additional_verbs=additional_verbs, + additional_workflows=additional_workflows, + progress_reporter=progress_reporter, + emit=emit, + is_resume_run=False, + ): + tables_dict[table.workflow] = table.result + + await update_dataframe_outputs(tables_dict, storage) + + else: + async for table in run_pipeline( + workflows=workflows, + dataset=dataset, + storage=storage, + cache=cache, + callbacks=callbacks, + input_post_process_steps=post_process_steps, + memory_profile=memory_profile, + additional_verbs=additional_verbs, + additional_workflows=additional_workflows, + progress_reporter=progress_reporter, + emit=emit, + is_resume_run=is_resume_run, + ): + yield table async def run_pipeline( diff --git a/graphrag/index/update/__init__.py b/graphrag/index/update/__init__.py new file mode 100644 index 00000000..e6966408 --- /dev/null +++ b/graphrag/index/update/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) 2024 Microsoft Corporation. +# Licensed under the MIT License + +"""Incremental Indexing main module definition.""" diff --git a/graphrag/index/update/dataframes.py b/graphrag/index/update/dataframes.py new file mode 100644 index 00000000..ee9ccf07 --- /dev/null +++ b/graphrag/index/update/dataframes.py @@ -0,0 +1,192 @@ +# Copyright (c) 2024 Microsoft Corporation. +# Licensed under the MIT License + +"""Dataframe operations and utils for Incremental Indexing.""" + +import os +from dataclasses import dataclass + +import numpy as np +import pandas as pd + +from graphrag.index.storage.typing import PipelineStorage +from graphrag.utils.storage import _load_table_from_storage + +mergeable_outputs = [ + "create_final_documents", + "create_final_entities", + "create_final_relationships", +] + + +@dataclass +class InputDelta: + """Dataclass to hold the input delta. + + Attributes + ---------- + new_inputs : pd.DataFrame + The new inputs. + deleted_inputs : pd.DataFrame + The deleted inputs. + """ + + new_inputs: pd.DataFrame + deleted_inputs: pd.DataFrame + + +async def get_delta_docs( + input_dataset: pd.DataFrame, storage: PipelineStorage +) -> InputDelta: + """Get the delta between the input dataset and the final documents. + + Parameters + ---------- + input_dataset : pd.DataFrame + The input dataset. + storage : PipelineStorage + The Pipeline storage. + + Returns + ------- + InputDelta + The input delta. With new inputs and deleted inputs. + """ + final_docs = await _load_table_from_storage( + "create_final_documents.parquet", storage + ) + + # Select distinct title from final docs and from dataset + previous_docs: list[str] = final_docs["title"].unique().tolist() + dataset_docs: list[str] = input_dataset["title"].unique().tolist() + + # Get the new documents (using loc to ensure DataFrame) + new_docs = input_dataset.loc[~input_dataset["title"].isin(previous_docs)] + + # Get the deleted documents (again using loc to ensure DataFrame) + deleted_docs = final_docs.loc[~final_docs["title"].isin(dataset_docs)] + + return InputDelta(new_docs, deleted_docs) + + +async def update_dataframe_outputs( + dataframe_dict: dict[str, pd.DataFrame], + storage: PipelineStorage, +) -> None: + """Update the mergeable outputs. + + Parameters + ---------- + dataframe_dict : dict[str, pd.DataFrame] + The dictionary of dataframes. + storage : PipelineStorage + The storage used to store the dataframes. + """ + await _concat_dataframes("create_base_text_units", dataframe_dict, storage) + await _concat_dataframes("create_final_documents", dataframe_dict, storage) + + old_entities = await _load_table_from_storage( + "create_final_entities.parquet", storage + ) + delta_entities = dataframe_dict["create_final_entities"] + + merged_entities_df, _ = _group_and_resolve_entities(old_entities, delta_entities) + # Save the updated entities back to storage + # TODO: Using _new in the mean time, to compare outputs without overwriting the original + await storage.set( + "create_final_entities_new.parquet", merged_entities_df.to_parquet() + ) + + +async def _concat_dataframes(name, dataframe_dict, storage): + """Concatenate the dataframes. + + Parameters + ---------- + name : str + The name of the dataframe to concatenate. + dataframe_dict : dict[str, pd.DataFrame] + The dictionary of dataframes from a pipeline run. + storage : PipelineStorage + The storage used to store the dataframes. + """ + old_df = await _load_table_from_storage(f"{name}.parquet", storage) + delta_df = dataframe_dict[name] + + # Merge the final documents + final_df = pd.concat([old_df, delta_df], copy=False) + + # TODO: Using _new in the mean time, to compare outputs without overwriting the original + await storage.set(f"{name}_new.parquet", final_df.to_parquet()) + + +def _group_and_resolve_entities( + df_a: pd.DataFrame, df_b: pd.DataFrame +) -> tuple[pd.DataFrame, dict]: + """Group and resolve entities. + + Parameters + ---------- + df_a : pd.DataFrame + The first dataframe. + df_b : pd.DataFrame + The second dataframe. + + Returns + ------- + pd.DataFrame + The resolved dataframe. + dict + The id mapping for existing entities. In the form of {df_b.id: df_a.id}. + """ + # If a name exists in A and B, make a dictionary for {B.id : A.id} + merged = df_b[["id", "name"]].merge( + df_a[["id", "name"]], + on="name", + suffixes=("_B", "_A"), + copy=False, + ) + id_mapping = dict(zip(merged["id_B"], merged["id_A"], strict=True)) + + # Concat A and B + combined = pd.concat([df_a, df_b], copy=False) + + # Group by name and resolve conflicts + aggregated = ( + combined.groupby("name") + .agg({ + "id": "first", + "type": "first", + "human_readable_id": "first", + "graph_embedding": "first", + "description": lambda x: os.linesep.join(x.astype(str)), # Ensure str + # Concatenate nd.array into a single list + "text_unit_ids": lambda x: ",".join(str(i) for j in x.tolist() for i in j), + # Keep only descriptions where the original value wasn't modified + "description_embedding": lambda x: x.iloc[0] if len(x) == 1 else np.nan, + }) + .reset_index() + ) + + # Force the result into a DataFrame + resolved: pd.DataFrame = pd.DataFrame(aggregated) + + # Recreate humand readable id with an autonumeric + resolved["human_readable_id"] = range(len(resolved)) + + # Modify column order to keep consistency + resolved = resolved.loc[ + :, + [ + "id", + "name", + "description", + "type", + "human_readable_id", + "graph_embedding", + "text_unit_ids", + "description_embedding", + ], + ] + + return resolved, id_mapping