diff --git a/.semversioner/next-release/patch-20241002002557586548.json b/.semversioner/next-release/patch-20241002002557586548.json new file mode 100644 index 00000000..dc8f6304 --- /dev/null +++ b/.semversioner/next-release/patch-20241002002557586548.json @@ -0,0 +1,4 @@ +{ + "type": "patch", + "description": "Add text units update" +} diff --git a/graphrag/index/update/dataframes.py b/graphrag/index/update/dataframes.py index 1ae23d64..15833fd4 100644 --- a/graphrag/index/update/dataframes.py +++ b/graphrag/index/update/dataframes.py @@ -90,7 +90,9 @@ async def update_dataframe_outputs( ) delta_entities = dataframe_dict["create_final_entities"] - merged_entities_df, _ = _group_and_resolve_entities(old_entities, delta_entities) + merged_entities_df, entity_id_mapping = _group_and_resolve_entities( + old_entities, delta_entities + ) # Save the updated entities back to storage # TODO: Using _new in the meantime, to compare outputs without overwriting the original await storage.set( @@ -112,6 +114,21 @@ async def update_dataframe_outputs( "create_final_relationships_new.parquet", merged_relationships_df.to_parquet() ) + # Update and merge final text units + old_text_units = await _load_table_from_storage( + "create_final_text_units.parquet", storage + ) + delta_text_units = dataframe_dict["create_final_text_units"] + + merged_text_units = _update_and_merge_text_units( + old_text_units, delta_text_units, entity_id_mapping + ) + + # TODO: Using _new in the meantime, to compare outputs without overwriting the original + await storage.set( + "create_final_text_units_new.parquet", merged_text_units.to_parquet() + ) + async def _concat_dataframes(name, dataframe_dict, storage): """Concatenate the dataframes. @@ -136,16 +153,16 @@ async def _concat_dataframes(name, dataframe_dict, storage): def _group_and_resolve_entities( - df_a: pd.DataFrame, df_b: pd.DataFrame + old_entities_df: pd.DataFrame, delta_entities_df: pd.DataFrame ) -> tuple[pd.DataFrame, dict]: """Group and resolve entities. Parameters ---------- - df_a : pd.DataFrame + old_entities_df : pd.DataFrame The first dataframe. - df_b : pd.DataFrame - The second dataframe. + delta_entities_df : pd.DataFrame + The delta dataframe. Returns ------- @@ -155,8 +172,8 @@ def _group_and_resolve_entities( The id mapping for existing entities. In the form of {df_b.id: df_a.id}. """ # If a name exists in A and B, make a dictionary for {B.id : A.id} - merged = df_b[["id", "name"]].merge( - df_a[["id", "name"]], + merged = delta_entities_df[["id", "name"]].merge( + old_entities_df[["id", "name"]], on="name", suffixes=("_B", "_A"), copy=False, @@ -164,10 +181,14 @@ def _group_and_resolve_entities( id_mapping = dict(zip(merged["id_B"], merged["id_A"], strict=True)) # Increment human readable id in b by the max of a - df_b["human_readable_id"] += df_a["human_readable_id"].max() + 1 - + initial_id = old_entities_df["human_readable_id"].max() + 1 + delta_entities_df["human_readable_id"] = np.arange( + initial_id, initial_id + len(delta_entities_df) + ) # Concat A and B - combined = pd.concat([df_a, df_b], copy=False) + combined = pd.concat( + [old_entities_df, delta_entities_df], ignore_index=True, copy=False + ) # Group by name and resolve conflicts aggregated = ( @@ -225,13 +246,23 @@ def _update_and_merge_relationships( The updated relationships. """ # Increment the human readable id in b by the max of a - delta_relationships["human_readable_id"] += ( - old_relationships["human_readable_id"].max() + 1 + # Ensure both columns are integers + delta_relationships["human_readable_id"] = delta_relationships[ + "human_readable_id" + ].astype(int) + old_relationships["human_readable_id"] = old_relationships[ + "human_readable_id" + ].astype(int) + + # Adjust delta_relationships IDs to be greater than any in old_relationships + initial_id = old_relationships["human_readable_id"].max() + 1 + delta_relationships["human_readable_id"] = np.arange( + initial_id, initial_id + len(delta_relationships) ) - # Merge the final relationships + # Merge the DataFrames without copying if possible final_relationships = pd.concat( - [old_relationships, delta_relationships], copy=False + [old_relationships, delta_relationships], ignore_index=True, copy=False ) # Recalculate target and source degrees @@ -248,3 +279,33 @@ def _update_and_merge_relationships( ) return final_relationships + + +def _update_and_merge_text_units( + old_text_units: pd.DataFrame, + delta_text_units: pd.DataFrame, + entity_id_mapping: dict, +) -> pd.DataFrame: + """Update and merge text units. + + Parameters + ---------- + old_text_units : pd.DataFrame + The old text units. + delta_text_units : pd.DataFrame + The delta text units. + entity_id_mapping : dict + The entity id mapping. + + Returns + ------- + pd.DataFrame + The updated text units. + """ + # Look for entity ids in entity_ids and replace them with the corresponding id in the mapping + delta_text_units["entity_ids"] = delta_text_units["entity_ids"].apply( + lambda x: [entity_id_mapping.get(i, i) for i in x] + ) + + # Merge the final text units + return pd.concat([old_text_units, delta_text_units], ignore_index=True, copy=False)