Incremental indexing/update final text units (#1241)

* Update final text units

* Format

* Address comments
This commit is contained in:
Alonso Guevara 2024-10-02 12:59:53 -06:00 committed by GitHub
parent d501813181
commit 6d23d6a03b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 79 additions and 14 deletions

View File

@ -0,0 +1,4 @@
{
"type": "patch",
"description": "Add text units update"
}

View File

@ -90,7 +90,9 @@ async def update_dataframe_outputs(
)
delta_entities = dataframe_dict["create_final_entities"]
merged_entities_df, _ = _group_and_resolve_entities(old_entities, delta_entities)
merged_entities_df, entity_id_mapping = _group_and_resolve_entities(
old_entities, delta_entities
)
# Save the updated entities back to storage
# TODO: Using _new in the meantime, to compare outputs without overwriting the original
await storage.set(
@ -112,6 +114,21 @@ async def update_dataframe_outputs(
"create_final_relationships_new.parquet", merged_relationships_df.to_parquet()
)
# Update and merge final text units
old_text_units = await _load_table_from_storage(
"create_final_text_units.parquet", storage
)
delta_text_units = dataframe_dict["create_final_text_units"]
merged_text_units = _update_and_merge_text_units(
old_text_units, delta_text_units, entity_id_mapping
)
# TODO: Using _new in the meantime, to compare outputs without overwriting the original
await storage.set(
"create_final_text_units_new.parquet", merged_text_units.to_parquet()
)
async def _concat_dataframes(name, dataframe_dict, storage):
"""Concatenate the dataframes.
@ -136,16 +153,16 @@ async def _concat_dataframes(name, dataframe_dict, storage):
def _group_and_resolve_entities(
df_a: pd.DataFrame, df_b: pd.DataFrame
old_entities_df: pd.DataFrame, delta_entities_df: pd.DataFrame
) -> tuple[pd.DataFrame, dict]:
"""Group and resolve entities.
Parameters
----------
df_a : pd.DataFrame
old_entities_df : pd.DataFrame
The first dataframe.
df_b : pd.DataFrame
The second dataframe.
delta_entities_df : pd.DataFrame
The delta dataframe.
Returns
-------
@ -155,8 +172,8 @@ def _group_and_resolve_entities(
The id mapping for existing entities. In the form of {df_b.id: df_a.id}.
"""
# If a name exists in A and B, make a dictionary for {B.id : A.id}
merged = df_b[["id", "name"]].merge(
df_a[["id", "name"]],
merged = delta_entities_df[["id", "name"]].merge(
old_entities_df[["id", "name"]],
on="name",
suffixes=("_B", "_A"),
copy=False,
@ -164,10 +181,14 @@ def _group_and_resolve_entities(
id_mapping = dict(zip(merged["id_B"], merged["id_A"], strict=True))
# Increment human readable id in b by the max of a
df_b["human_readable_id"] += df_a["human_readable_id"].max() + 1
initial_id = old_entities_df["human_readable_id"].max() + 1
delta_entities_df["human_readable_id"] = np.arange(
initial_id, initial_id + len(delta_entities_df)
)
# Concat A and B
combined = pd.concat([df_a, df_b], copy=False)
combined = pd.concat(
[old_entities_df, delta_entities_df], ignore_index=True, copy=False
)
# Group by name and resolve conflicts
aggregated = (
@ -225,13 +246,23 @@ def _update_and_merge_relationships(
The updated relationships.
"""
# Increment the human readable id in b by the max of a
delta_relationships["human_readable_id"] += (
old_relationships["human_readable_id"].max() + 1
# Ensure both columns are integers
delta_relationships["human_readable_id"] = delta_relationships[
"human_readable_id"
].astype(int)
old_relationships["human_readable_id"] = old_relationships[
"human_readable_id"
].astype(int)
# Adjust delta_relationships IDs to be greater than any in old_relationships
initial_id = old_relationships["human_readable_id"].max() + 1
delta_relationships["human_readable_id"] = np.arange(
initial_id, initial_id + len(delta_relationships)
)
# Merge the final relationships
# Merge the DataFrames without copying if possible
final_relationships = pd.concat(
[old_relationships, delta_relationships], copy=False
[old_relationships, delta_relationships], ignore_index=True, copy=False
)
# Recalculate target and source degrees
@ -248,3 +279,33 @@ def _update_and_merge_relationships(
)
return final_relationships
def _update_and_merge_text_units(
old_text_units: pd.DataFrame,
delta_text_units: pd.DataFrame,
entity_id_mapping: dict,
) -> pd.DataFrame:
"""Update and merge text units.
Parameters
----------
old_text_units : pd.DataFrame
The old text units.
delta_text_units : pd.DataFrame
The delta text units.
entity_id_mapping : dict
The entity id mapping.
Returns
-------
pd.DataFrame
The updated text units.
"""
# Look for entity ids in entity_ids and replace them with the corresponding id in the mapping
delta_text_units["entity_ids"] = delta_text_units["entity_ids"].apply(
lambda x: [entity_id_mapping.get(i, i) for i in x]
)
# Merge the final text units
return pd.concat([old_text_units, delta_text_units], ignore_index=True, copy=False)