346 lines
12 KiB
Python
Raw Permalink Normal View History

import json
from typing import Dict, Any, Optional, Tuple
import click
import json
from typing import Dict, Any
def diff_lists(list1, list2):
"""
Compare two lists element by element and return their differences.
Args:
list1 (list): First list to compare
list2 (list): Second list to compare
Returns:
dict: A dictionary containing the differences
"""
result = {"added": {}, "removed": {}, "modified": set(), "modified_details": {}}
if len(list1) != len(list2):
# Let's first line up the elements that are common to both lists using
# the fieldPath as the key if it exists
if "fieldPath" in list1[0]:
list1_dict = {field["fieldPath"]: field for field in list1}
list2_dict = {field["fieldPath"]: field for field in list2}
common_keys = set(list1_dict.keys()) & set(list2_dict.keys())
list1 = [list1_dict[key] for key in common_keys]
list2 = [list2_dict[key] for key in common_keys]
list1.extend(
[list1_dict[key] for key in set(list1_dict.keys()) - common_keys]
)
list2.extend(
[list2_dict[key] for key in set(list2_dict.keys()) - common_keys]
)
# Handle added elements (if list2 is longer)
if len(list2) > len(list1):
for i in range(len(list1), len(list2)):
if "fieldPath" in list2[i]:
result["added"][list2[i]["fieldPath"]] = list2[i]
else:
result["added"][str(i)] = list2[i]
# Handle removed elements (if list1 is longer)
if len(list1) > len(list2):
for i in range(len(list2), len(list1)):
if "fieldPath" in list1[i]:
result["removed"][list1[i]["fieldPath"]] = list1[i]
else:
result["removed"][str(i)] = list1[i]
# Compare common indices
for i in range(min(len(list1), len(list2))):
value1 = list1[i]
value2 = list2[i]
if type(value1) != type(value2):
result["modified"].add(str(i))
result["modified_details"][str(i)] = {"before": value1, "after": value2}
elif isinstance(value1, dict) and isinstance(value2, dict):
nested_diff = diff_dicts(
value1, value2, identifier=value1.get("fieldPath", i)
)
if any(nested_diff.values()):
result["modified"].add(value1.get("fieldPath", i))
result["modified_details"][value1.get("fieldPath", i)] = nested_diff
elif isinstance(value1, list) and isinstance(value2, list):
nested_diff = diff_lists(value1, value2)
if any(nested_diff.values()):
result["modified"].add(str(i))
result["modified_details"][str(i)] = nested_diff
elif value1 != value2:
result["modified"].add(str(i))
result["modified_details"][str(i)] = {
"before": value1,
"after": value2,
"identifier": i,
}
return result
def diff_schema_field(field1_dict, field2_dict):
from datahub.metadata.schema_classes import SchemaFieldClass
field1 = SchemaFieldClass.from_obj(field1_dict)
field2 = SchemaFieldClass.from_obj(field2_dict)
# Initialize result structure
result = {"added": {}, "removed": {}, "modified": set(), "modified_details": {}}
result = {}
if field1.fieldPath != field2.fieldPath:
result["fieldPath"] = {"before": field1.fieldPath, "after": field2.fieldPath}
if field1.type != field2.type:
result["type"] = {
"before": field1.type,
"after": field2.type,
"identifier": field1.fieldPath,
}
if field1.description != field2.description:
result["description"] = {
"before": field1.description,
"after": field2.description,
"identifier": field1.fieldPath,
}
if field1.nullable != field2.nullable:
result["nullable"] = {
"before": field1.nullable,
"after": field2.nullable,
"identifier": field1.fieldPath,
}
return result
def diff_schema_metadata(schema1_dict, schema2_dict):
ignored_for_diff = [
"created",
"modified",
"hash",
"platformSchema",
"lastModified",
] # TODO: Reduce this list
for key in ignored_for_diff:
schema1_dict.pop(key, None)
schema2_dict.pop(key, None)
return diff_dicts(schema1_dict, schema2_dict)
def is_empty_diff(diff_dict) -> bool:
if diff_dict.keys() == EMPTY_DIFF().keys():
for key in diff_dict:
if diff_dict[key]:
return False
return True
return False
def format_diff(diff_dict) -> Any:
if isinstance(diff_dict, set):
diff_dict = sorted(list([x for x in diff_dict]))
elif isinstance(diff_dict, dict):
for key in diff_dict:
diff_dict[key] = format_diff(diff_dict[key])
return diff_dict
def EMPTY_DIFF():
return {
"added": {},
"removed": {},
"modified": set(),
"modified_details": {},
}
def diff_dicts(dict1, dict2, identifier=None):
"""
Compare two dictionaries recursively and return their differences.
Args:
dict1 (dict): First dictionary to compare
dict2 (dict): Second dictionary to compare
Returns:
dict: A dictionary containing the differences with the following structure:
{
'added': Keys present in dict2 but not in dict1,
'removed': Keys present in dict1 but not in dict2,
'modified': Keys present in both but with different values,
'modified_details': Detailed before/after values for modified keys
}
"""
if "nullable" in dict1:
# Assume this is a SchemaFieldClass
return diff_schema_field(dict1, dict2)
if "hash" in dict1:
# Assume this is a schema metadata class
return diff_schema_metadata(dict1, dict2)
dict1_keys = set(dict1.keys())
dict2_keys = set(dict2.keys())
# Find keys that were added, removed, or modified
added_keys = dict2_keys - dict1_keys
removed_keys = dict1_keys - dict2_keys
common_keys = dict1_keys & dict2_keys
# Initialize result structure
result = EMPTY_DIFF()
# Handle added keys
for key in added_keys:
result["added"][key] = dict2[key]
# Handle removed keys
for key in removed_keys:
result["removed"][key] = dict1[key]
# Check common keys for modifications
for key in common_keys:
value1 = dict1[key]
value2 = dict2[key]
# If both values are dictionaries, recurse
if isinstance(value1, dict) and isinstance(value2, dict):
nested_diff = diff_dicts(
value1, value2, identifier=value1.get("fieldPath", key)
)
if any(nested_diff.values()): # If there are any differences
result["modified"].add(key)
result["modified_details"][key] = nested_diff
# If both values are lists, compare them element by element
elif isinstance(value1, list) and isinstance(value2, list):
nested_diff = diff_lists(value1, value2)
if any(nested_diff.values()):
result["modified"].add(key)
result["modified_details"][key] = nested_diff
# Otherwise compare directly
elif value1 != value2:
result["modified"].add(key)
result["modified_details"][key] = {
"before": value1,
"after": value2,
"identifier": identifier,
}
return result
def process_single_element(element) -> Tuple[str, str, Dict[str, Any]]:
if "entityUrn" in element:
entity = element["entityUrn"]
else:
raise Exception("Element does not have an entityUrn key")
if "aspectName" in element:
aspect = element["aspectName"]
else:
raise Exception("Element does not have an aspectName key")
if "aspect" in element:
if "json" in element["aspect"]:
return entity, aspect, element["aspect"]["json"]
elif "value" in element["aspect"]:
json_value = json.loads(element["aspect"]["value"])
return entity, aspect, json_value
else:
raise Exception("Element does not have a json or value key")
else:
raise Exception("Element does not have an aspect key")
def process_element_with_dict(element, global_dict):
entity, aspect, data = process_single_element(element)
if entity not in global_dict:
global_dict[entity] = {}
if aspect not in global_dict[entity]:
global_dict[entity][aspect] = data
else:
# breakpoint()
raise Exception("Duplicate aspect found")
@click.command("compute_diff")
@click.argument("input_file_1", type=click.Path(exists=True))
@click.argument("input_file_2", type=click.Path(exists=True))
@click.option("--golden-diff-file", type=click.Path(), default=None)
@click.option("--update-golden-diff", is_flag=True)
def compute_diff(
input_file_1: str,
input_file_2: str,
golden_diff_file: Optional[str] = None,
update_golden_diff: bool = False,
):
# Read the files into json objects and compare them
# If they are the same, exit 0
# If they are different, exit 1
file_1_mcps = {}
with open(input_file_1) as file1:
data1 = json.load(file1)
assert isinstance(data1, list)
for element in data1:
process_element_with_dict(element, file_1_mcps)
print(f"Processed {len(file_1_mcps)} elements from file {input_file_1}")
file_2_mcps = {}
with open(input_file_2) as file2:
data2 = json.load(file2)
assert isinstance(data2, list)
for element in data2:
process_element_with_dict(element, file_2_mcps)
print(f"Processed {len(file_2_mcps)} elements from file {input_file_2}")
if golden_diff_file and not update_golden_diff:
with open(golden_diff_file) as golden_diff:
golden_diff_data = json.load(golden_diff)
else:
golden_diff_data = None
computed_diff_data = {}
assert len(file_1_mcps) == len(file_2_mcps)
for entity in file_1_mcps:
assert entity in file_2_mcps
assert len(file_1_mcps[entity]) == len(file_2_mcps[entity])
for aspect in file_1_mcps[entity]:
assert aspect in file_2_mcps[entity]
aspect_diff = diff_dicts(
file_1_mcps[entity][aspect], file_2_mcps[entity][aspect]
)
if golden_diff_data:
assert aspect in golden_diff_data[entity]
assert format_diff(aspect_diff) == golden_diff_data[entity][aspect], (
f"Computed difference is {json.dumps(format_diff(aspect_diff), indent=2)}\n"
f"Expected difference is {json.dumps(golden_diff_data[entity][aspect], indent=2)}"
)
else:
if update_golden_diff:
if entity not in computed_diff_data:
computed_diff_data[entity] = {}
computed_diff_data[entity][aspect] = format_diff(aspect_diff)
else:
assert is_empty_diff(
aspect_diff
), f"Difference is {json.dumps(format_diff(aspect_diff), indent=2)}"
if update_golden_diff:
with open(golden_diff_file, "w") as golden_diff:
json.dump(computed_diff_data, golden_diff, indent=2, sort_keys=True)
if __name__ == "__main__":
compute_diff()