From 2261531e3103953189d19d5a446d3d77df85dcdf Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Tue, 11 Jul 2023 10:39:47 -0400 Subject: [PATCH] test(ingest): Aspect level golden file comparison (#8310) --- metadata-ingestion/setup.py | 11 +- .../src/datahub/cli/check_cli.py | 47 +++- .../src/datahub/emitter/aspect.py | 3 + metadata-ingestion/src/datahub/emitter/mcp.py | 12 +- .../src/datahub/emitter/mcp_patch_builder.py | 3 +- .../datahub_ingestion_run_summary_provider.py | 3 +- .../src/datahub/ingestion/sink/file.py | 15 +- .../datahub/testing/compare_metadata_json.py | 109 +++++++ .../src/datahub/testing/mcp_diff.py | 266 ++++++++++++++++++ .../tests/test_helpers/mce_helpers.py | 121 ++------ .../unit/patch/complex_dataset_patch.json | 121 +++++--- .../tests/unit/test_bigquery_usage.py | 4 +- ...ce_helpers.py => test_compare_metadata.py} | 7 +- .../unit/test_generic_aspect_transformer.py | 7 +- .../tests/unit/test_workunit.py | 11 +- 15 files changed, 576 insertions(+), 164 deletions(-) create mode 100644 metadata-ingestion/src/datahub/testing/compare_metadata_json.py create mode 100644 metadata-ingestion/src/datahub/testing/mcp_diff.py rename metadata-ingestion/tests/unit/{test_mce_helpers.py => test_compare_metadata.py} (96%) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 87cd2bbbd1..47a9d7f88e 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -424,6 +424,11 @@ mypy_stubs = { "types-protobuf>=4.21.0.1", } + +pytest_dep = "pytest>=6.2.2" +deepdiff_dep = "deepdiff" +test_api_requirements = {pytest_dep, deepdiff_dep, "PyYAML"} + base_dev_requirements = { *base_requirements, *framework_common, @@ -442,11 +447,12 @@ base_dev_requirements = { # pydantic 1.8.2 is incompatible with mypy 0.910. # See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910. "pydantic>=1.9.0", - "pytest>=6.2.2", + *test_api_requirements, + pytest_dep, "pytest-asyncio>=0.16.0", "pytest-cov>=2.8.1", "pytest-docker>=1.0.1", - "deepdiff", + deepdiff_dep, "requests-mock", "freezegun", "jsonpickle", @@ -697,6 +703,7 @@ setuptools.setup( ) ), "dev": list(dev_requirements), + "testing-utils": list(test_api_requirements), # To import `datahub.testing` "integration-tests": list(full_test_dev_requirements), }, ) diff --git a/metadata-ingestion/src/datahub/cli/check_cli.py b/metadata-ingestion/src/datahub/cli/check_cli.py index 5d34967d49..5cee21e45b 100644 --- a/metadata-ingestion/src/datahub/cli/check_cli.py +++ b/metadata-ingestion/src/datahub/cli/check_cli.py @@ -1,7 +1,8 @@ import logging +import pprint import shutil import tempfile -from typing import Optional +from typing import List, Optional import click @@ -14,6 +15,8 @@ from datahub.ingestion.sink.sink_registry import sink_registry from datahub.ingestion.source.source_registry import source_registry from datahub.ingestion.transformer.transform_registry import transform_registry from datahub.telemetry import telemetry +from datahub.testing.compare_metadata_json import diff_metadata_json, load_json_file +from datahub.testing.mcp_diff import MCPDiff logger = logging.getLogger(__name__) @@ -69,6 +72,48 @@ def metadata_file(json_file: str, rewrite: bool, unpack_mces: bool) -> None: shutil.copy(out_file.name, json_file) +@check.command(no_args_is_help=True) +@click.argument( + "actual-file", + type=click.Path(exists=True, dir_okay=False, readable=True), +) +@click.argument( + "expected-file", + type=click.Path(exists=True, dir_okay=False, readable=True), +) +@click.option( + "--verbose", + "-v", + type=bool, + default=False, + help="Print full aspects that were changed, when comparing MCPs", +) +@click.option( + "--ignore-path", + multiple=True, + type=str, + default=(), + help="[Advanced] Paths in the deepdiff object to ignore", +) +@telemetry.with_telemetry() +def metadata_diff( + actual_file: str, expected_file: str, verbose: bool, ignore_path: List[str] +) -> None: + """Compare two metadata (MCE or MCP) JSON files. + + Comparison is more sophisticated for files composed solely of MCPs. + """ + + actual = load_json_file(actual_file) + expected = load_json_file(expected_file) + + diff = diff_metadata_json(output=actual, golden=expected, ignore_paths=ignore_path) + if isinstance(diff, MCPDiff): + click.echo(diff.pretty(verbose=verbose)) + else: + click.echo(pprint.pformat(diff)) + + @check.command() @click.option( "--verbose", diff --git a/metadata-ingestion/src/datahub/emitter/aspect.py b/metadata-ingestion/src/datahub/emitter/aspect.py index d3dff1f900..9118967a07 100644 --- a/metadata-ingestion/src/datahub/emitter/aspect.py +++ b/metadata-ingestion/src/datahub/emitter/aspect.py @@ -9,3 +9,6 @@ TIMESERIES_ASPECT_MAP = { for name, klass in ASPECT_MAP.items() if klass.get_aspect_type() == "timeseries" } + +JSON_CONTENT_TYPE = "application/json" +JSON_PATCH_CONTENT_TYPE = "application/json-patch+json" diff --git a/metadata-ingestion/src/datahub/emitter/mcp.py b/metadata-ingestion/src/datahub/emitter/mcp.py index 20e0c659ae..6f9a22bffd 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp.py +++ b/metadata-ingestion/src/datahub/emitter/mcp.py @@ -2,7 +2,7 @@ import dataclasses import json from typing import TYPE_CHECKING, List, Optional, Tuple, Union -from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP +from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE, TIMESERIES_ASPECT_MAP from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform from datahub.metadata.schema_classes import ( ChangeTypeClass, @@ -20,14 +20,12 @@ if TYPE_CHECKING: _ENTITY_TYPE_UNSET = "ENTITY_TYPE_UNSET" -_ASPECT_CONTENT_TYPE = "application/json" - def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass: serialized = json.dumps(pre_json_transform(codegen_obj.to_obj())) return GenericAspectClass( value=serialized.encode(), - contentType=_ASPECT_CONTENT_TYPE, + contentType=JSON_CONTENT_TYPE, ) @@ -42,7 +40,7 @@ def _try_from_generic_aspect( return True, None assert aspectName is not None, "aspectName must be set if aspect is set" - if aspect.contentType != _ASPECT_CONTENT_TYPE: + if aspect.contentType != JSON_CONTENT_TYPE: return False, None if aspectName not in ASPECT_MAP: @@ -155,7 +153,7 @@ class MetadataChangeProposalWrapper: # Undo the double JSON serialization that happens in the MCP aspect. if ( obj.get("aspect") - and obj["aspect"].get("contentType") == _ASPECT_CONTENT_TYPE + and obj["aspect"].get("contentType") == JSON_CONTENT_TYPE ): obj["aspect"] = {"json": json.loads(obj["aspect"]["value"])} return obj @@ -174,7 +172,7 @@ class MetadataChangeProposalWrapper: # routine works. if obj.get("aspect") and obj["aspect"].get("json"): obj["aspect"] = { - "contentType": _ASPECT_CONTENT_TYPE, + "contentType": JSON_CONTENT_TYPE, "value": json.dumps(obj["aspect"]["json"]), } diff --git a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py index a7c0e1c9c9..be68d46472 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py @@ -3,6 +3,7 @@ from collections import defaultdict from dataclasses import dataclass from typing import Any, Dict, Iterable, List, Optional +from datahub.emitter.aspect import JSON_PATCH_CONTENT_TYPE from datahub.emitter.serialization_helper import pre_json_transform from datahub.metadata.schema_classes import ( ChangeTypeClass, @@ -72,7 +73,7 @@ class MetadataPatchProposal: value=json.dumps( pre_json_transform(_recursive_to_obj(patches)) ).encode(), - contentType="application/json-patch+json", + contentType=JSON_PATCH_CONTENT_TYPE, ), auditHeader=self.audit_header, systemMetadata=self.system_metadata, diff --git a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py index c78e72ac52..da70364d9a 100644 --- a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py +++ b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_run_summary_provider.py @@ -10,6 +10,7 @@ from datahub.configuration.common import ( IgnorableError, redact_raw_config, ) +from datahub.emitter.aspect import JSON_CONTENT_TYPE from datahub.emitter.mce_builder import datahub_guid from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.mcp_builder import make_data_platform_urn @@ -205,7 +206,7 @@ class DatahubIngestionRunSummaryProvider(PipelineRunListener): structured_report = StructuredExecutionReportClass( type="CLI_INGEST", serializedValue=structured_report_str, - contentType="application/json", + contentType=JSON_CONTENT_TYPE, ) execution_result_aspect = ExecutionRequestResultClass( status=status, diff --git a/metadata-ingestion/src/datahub/ingestion/sink/file.py b/metadata-ingestion/src/datahub/ingestion/sink/file.py index 4d7b881cf7..c4f34d780f 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/file.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/file.py @@ -4,6 +4,7 @@ import pathlib from typing import Iterable, Union from datahub.configuration.common import ConfigModel +from datahub.emitter.aspect import JSON_CONTENT_TYPE, JSON_PATCH_CONTENT_TYPE from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import RecordEnvelope from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback @@ -27,6 +28,14 @@ def _to_obj_for_file( ) -> dict: if isinstance(obj, MetadataChangeProposalWrapper): return obj.to_obj(simplified_structure=simplified_structure) + elif isinstance(obj, MetadataChangeProposal) and simplified_structure: + serialized = obj.to_obj() + if serialized.get("aspect") and serialized["aspect"].get("contentType") in [ + JSON_CONTENT_TYPE, + JSON_PATCH_CONTENT_TYPE, + ]: + serialized["aspect"] = {"json": json.loads(serialized["aspect"]["value"])} + return serialized return obj.to_obj() @@ -82,6 +91,7 @@ def write_metadata_file( MetadataChangeProposal, MetadataChangeProposalWrapper, UsageAggregation, + dict, # Serialized MCE or MCP ] ], ) -> None: @@ -91,6 +101,7 @@ def write_metadata_file( for i, record in enumerate(records): if i > 0: f.write(",\n") - obj = _to_obj_for_file(record) - json.dump(obj, f, indent=4) + if not isinstance(record, dict): + record = _to_obj_for_file(record) + json.dump(record, f, indent=4) f.write("\n]") diff --git a/metadata-ingestion/src/datahub/testing/compare_metadata_json.py b/metadata-ingestion/src/datahub/testing/compare_metadata_json.py new file mode 100644 index 0000000000..30a6db6871 --- /dev/null +++ b/metadata-ingestion/src/datahub/testing/compare_metadata_json.py @@ -0,0 +1,109 @@ +"""Utilities for comparing MCE and MCP files.""" + +import json +import logging +import os +import pathlib +import pprint +import shutil +import tempfile +from typing import Any, Dict, List, Sequence, Union + +import pytest +from deepdiff import DeepDiff + +from datahub.ingestion.sink.file import write_metadata_file +from datahub.ingestion.source.file import read_metadata_file +from datahub.testing.mcp_diff import MCPDiff, get_aspects_by_urn + +logger = logging.getLogger(__name__) + +MetadataJson = List[Dict[str, Any]] + +default_exclude_paths = [ + r"root\[\d+\]\['systemMetadata'\]\['lastObserved'\]", + r"root\[\d+\]\['aspect'\]\['json'\]\['timestampMillis'\]", + r"root\[\d+\]\['aspect'\]\['json'\]\['lastUpdatedTimestamp'\]", + r"root\[\d+\]\['aspect'\]\['json'\]\['created'\]", + r"root\[\d+\]\['aspect'\]\['json'\]\['lastModified'\]", +] + + +def load_json_file(filename: Union[str, os.PathLike]) -> MetadataJson: + with open(str(filename)) as f: + return json.load(f) + + +def assert_metadata_files_equal( + output_path: Union[str, os.PathLike], + golden_path: Union[str, os.PathLike], + update_golden: bool, + copy_output: bool, + ignore_paths: Sequence[str] = (), +) -> None: + golden_exists = os.path.isfile(golden_path) + + if copy_output: + shutil.copyfile(str(output_path), str(golden_path) + ".output") + print(f"Copied output file to {golden_path}.output") + + if not update_golden and not golden_exists: + raise FileNotFoundError( + "Golden file does not exist. Please run with the --update-golden-files option to create." + ) + + output = load_json_file(output_path) + + if update_golden and not golden_exists: + golden = load_json_file(output_path) + shutil.copyfile(str(output_path), str(golden_path)) + return + else: + # We have to "normalize" the golden file by reading and writing it back out. + # This will clean up nulls, double serialization, and other formatting issues. + with tempfile.NamedTemporaryFile() as temp: + golden_metadata = read_metadata_file(pathlib.Path(golden_path)) + write_metadata_file(pathlib.Path(temp.name), golden_metadata) + golden = load_json_file(temp.name) + + diff = diff_metadata_json(output, golden, ignore_paths) + if diff and update_golden: + if isinstance(diff, MCPDiff): + diff.apply_delta(golden) + write_metadata_file(pathlib.Path(golden_path), golden) + else: + shutil.copyfile(str(output_path), str(golden_path)) + return + + if diff: + # Call pytest.fail rather than raise an exception to omit stack trace + if isinstance(diff, MCPDiff): + print(diff.pretty(verbose=True)) + pytest.fail(diff.pretty(), pytrace=False) + else: + pytest.fail(pprint.pformat(diff), pytrace=False) + + +def diff_metadata_json( + output: MetadataJson, + golden: MetadataJson, + ignore_paths: Sequence[str] = (), +) -> Union[DeepDiff, MCPDiff]: + ignore_paths = (*ignore_paths, *default_exclude_paths) + try: + golden_map = get_aspects_by_urn(golden) + output_map = get_aspects_by_urn(output) + return MCPDiff.create( + golden=golden_map, + output=output_map, + ignore_paths=ignore_paths, + ) + except AssertionError as e: + logger.warning(f"Reverting to old diff method: {e}") + logger.debug("Error with new diff method", exc_info=True) + return DeepDiff( + golden, + output, + exclude_regex_paths=ignore_paths, + ignore_order=True, + ) diff --git a/metadata-ingestion/src/datahub/testing/mcp_diff.py b/metadata-ingestion/src/datahub/testing/mcp_diff.py new file mode 100644 index 0000000000..f7aeb6f829 --- /dev/null +++ b/metadata-ingestion/src/datahub/testing/mcp_diff.py @@ -0,0 +1,266 @@ +import re +from collections import defaultdict +from dataclasses import dataclass, field +from typing import Any, Dict, List, Sequence, Set, Tuple, Union + +import deepdiff.serialization +import yaml +from deepdiff import DeepDiff +from deepdiff.model import DiffLevel +from deepdiff.operator import BaseOperator +from typing_extensions import Literal + +ReportType = Literal[ + "type_changes", + "dictionary_item_added", + "dictionary_item_removed", + "values_changed", + "unprocessed", + "iterable_item_added", + "iterable_item_removed", + "iterable_item_moved", + "attribute_added", + "attribute_removed", + "set_item_added", + "set_item_removed", + "repetition_change", +] + + +@dataclass(frozen=True) +class AspectForDiff: + urn: str + change_type: str + aspect_name: str + aspect: Dict[str, Any] = field(hash=False) + delta_info: "DeltaInfo" = field(hash=False) + + @classmethod + def create_from_mcp(cls, idx: int, obj: Dict[str, Any]) -> "AspectForDiff": + aspect = obj["aspect"] + return cls( + urn=obj["entityUrn"], + change_type=obj["changeType"], + aspect_name=obj["aspectName"], + aspect=aspect.get("json", aspect), + delta_info=DeltaInfo(idx=idx, original=obj), + ) + + +@dataclass +class DeltaInfo: + """Information about an MCP used to construct a diff delta. + + In a separate class so it can be ignored by DeepDiff via MCPDeltaInfoOperator. + """ + + idx: int # Location in list of MCEs in golden file + original: Dict[str, Any] # Original json-serialized MCP + + +class DeltaInfoOperator(BaseOperator): + def __init__(self): + super().__init__(types=[DeltaInfo]) + + def give_up_diffing(self, *args: Any, **kwargs: Any) -> bool: + return True + + +AspectsByUrn = Dict[str, Dict[str, List[AspectForDiff]]] + + +def get_aspects_by_urn(obj: object) -> AspectsByUrn: + """Restructure a list of serialized MCPs by urn and aspect. + Retains information like the original dict and index to facilitate `apply_delta` later. + + Raises: + AssertionError: If the input is not purely a list of MCPs. + """ + d: AspectsByUrn = defaultdict(dict) + assert isinstance(obj, list), obj + for i, entry in enumerate(obj): + assert isinstance(entry, dict), entry + if "proposedSnapshot" in entry: + raise AssertionError("Found MCEs in output") + elif "entityUrn" in entry and "aspectName" in entry and "aspect" in entry: + urn = entry["entityUrn"] + aspect_name = entry["aspectName"] + aspect = AspectForDiff.create_from_mcp(i, entry) + d[urn].setdefault(aspect_name, []).append(aspect) + else: + raise AssertionError(f"Unrecognized MCE: {entry}") + + return d + + +@dataclass +class MCPAspectDiff: + aspects_added: Dict[int, AspectForDiff] + aspects_removed: Dict[int, AspectForDiff] + aspects_changed: Dict[Tuple[int, AspectForDiff, AspectForDiff], List[DiffLevel]] + + @classmethod + def create(cls, diff: DeepDiff) -> "MCPAspectDiff": + # Parse DeepDiff to distinguish between aspects that were added, removed, or changed + aspects_added = {} + aspects_removed = {} + aspects_changed = defaultdict(list) + for key, diff_levels in diff.tree.items(): + for diff_level in diff_levels: + path = diff_level.path(output_format="list") + idx = int(path[0]) + if len(path) == 1 and key == "iterable_item_added": + aspects_added[idx] = diff_level.t2 + elif len(path) == 1 and key == "iterable_item_removed": + aspects_removed[idx] = diff_level.t1 + else: + level = diff_level + while not isinstance(level.t1, AspectForDiff): + level = level.up + aspects_changed[(idx, level.t1, level.t2)].append(diff_level) + + return cls( + aspects_added=aspects_added, + aspects_removed=aspects_removed, + aspects_changed=aspects_changed, + ) + + +@dataclass +class MCPDiff: + aspect_changes: Dict[str, Dict[str, MCPAspectDiff]] # urn -> aspect -> diff + urns_added: Set[str] + urns_removed: Set[str] + + def __bool__(self) -> bool: + return bool(self.aspect_changes) + + @classmethod + def create( + cls, + golden: AspectsByUrn, + output: AspectsByUrn, + ignore_paths: Sequence[str], + ) -> "MCPDiff": + ignore_paths = [cls.convert_path(path) for path in ignore_paths] + + aspect_changes: Dict[str, Dict[str, MCPAspectDiff]] = defaultdict(dict) + for urn in golden.keys() | output.keys(): + golden_map = golden.get(urn, {}) + output_map = output.get(urn, {}) + for aspect_name in golden_map.keys() | output_map.keys(): + diff = DeepDiff( + t1=golden_map.get(aspect_name, []), + t2=output_map.get(aspect_name, []), + exclude_regex_paths=ignore_paths, + ignore_order=True, + custom_operators=[DeltaInfoOperator()], + ) + if diff: + aspect_changes[urn][aspect_name] = MCPAspectDiff.create(diff) + + return cls( + urns_added=output.keys() - golden.keys(), + urns_removed=golden.keys() - output.keys(), + aspect_changes=aspect_changes, + ) + + @staticmethod + def convert_path(path: str) -> str: + # Attempt to use paths intended for the root golden... sorry for the regex + return re.sub( + r"root\\?\[([0-9]+|\\d\+)\\?]\\?\['aspect'\\?](\\?\['(json|value)'\\?])?", + r"root\[\\d+].aspect", + path, + ) + + def apply_delta(self, golden: List[Dict[str, Any]]) -> None: + aspect_diffs = [v for d in self.aspect_changes.values() for v in d.values()] + for aspect_diff in aspect_diffs: + for (_, old, new), diffs in aspect_diff.aspects_changed.items(): + golden[old.delta_info.idx] = new.delta_info.original + + indices_to_remove = set() + for aspect_diff in aspect_diffs: + for ga in aspect_diff.aspects_removed.values(): + indices_to_remove.add(ga.delta_info.idx) + for idx in sorted(indices_to_remove, reverse=True): + del golden[idx] + + for aspect_diff in aspect_diffs: # Ideally would have smarter way to do this + for ga in aspect_diff.aspects_added.values(): + golden.insert(ga.delta_info.idx, ga.delta_info.original) + + def pretty(self, verbose: bool = False) -> str: + """The pretty human-readable string output of the diff between golden and output.""" + s = [] + for urn in self.urns_added: + s.append(f"Urn added, {urn}{' with aspects:' if verbose else ''}") + if verbose: + for aspect_diff in self.aspect_changes[urn].values(): + for i, ga in aspect_diff.aspects_added.items(): + s.append(self.report_aspect(ga, i)) + s.append(serialize_aspect(ga.aspect)) + if self.urns_added: + s.append("") + + for urn in self.urns_removed: + s.append(f"Urn removed, {urn}{' with aspects:' if verbose else ''}") + if verbose: + for aspect_diff in self.aspect_changes[urn].values(): + for i, ga in aspect_diff.aspects_removed.items(): + s.append(self.report_aspect(ga, i)) + s.append(serialize_aspect(ga.aspect)) + if self.urns_removed: + s.append("") + + for urn in self.aspect_changes.keys() - self.urns_added - self.urns_removed: + aspect_map = self.aspect_changes[urn] + s.append(f"Urn changed, {urn}:") + for aspect_name, aspect_diffs in aspect_map.items(): + for i, ga in aspect_diffs.aspects_added.items(): + s.append(self.report_aspect(ga, i, "added")) + if verbose: + s.append(serialize_aspect(ga.aspect)) + for i, ga in aspect_diffs.aspects_removed.items(): + s.append(self.report_aspect(ga, i, "removed")) + if verbose: + s.append(serialize_aspect(ga.aspect)) + for (i, old, new), diffs in aspect_diffs.aspects_changed.items(): + s.append(self.report_aspect(old, i, "changed") + ":") + for diff_level in diffs: + s.append(self.report_diff_level(diff_level, i)) + if verbose: + s.append(f"Old aspect:\n{serialize_aspect(old.aspect)}") + s.append(f"New aspect:\n{serialize_aspect(new.aspect)}") + + s.append("") + + return "\n".join(s) + + @staticmethod + def report_aspect(ga: AspectForDiff, idx: int, msg: str = "") -> str: + # Describe as "nth " if n > 1 + base = (idx + 1) % 10 + if base == 1: + suffix = "st" + elif base == 2: + suffix = "nd" + elif base == 3: + suffix = "rd" + else: + suffix = "th" + ordinal = f"{(idx+1)}{suffix} " if idx else "" + return f"{ordinal}<{ga.aspect_name}> {msg}" + + @staticmethod + def report_diff_level(diff: DiffLevel, idx: int) -> str: + return "\t" + deepdiff.serialization.pretty_print_diff(diff).replace( + f"root[{idx}].", "" + ) + + +def serialize_aspect(aspect: Union[AspectForDiff, Dict[str, Any]]) -> str: + if isinstance(aspect, AspectForDiff): # Unpack aspect + aspect = aspect.aspect + return " " + yaml.dump(aspect, sort_keys=False).replace("\n", "\n ").strip() diff --git a/metadata-ingestion/tests/test_helpers/mce_helpers.py b/metadata-ingestion/tests/test_helpers/mce_helpers.py index 3914a7f77a..46bf5cbea6 100644 --- a/metadata-ingestion/tests/test_helpers/mce_helpers.py +++ b/metadata-ingestion/tests/test_helpers/mce_helpers.py @@ -1,19 +1,26 @@ import json import logging import os -import pathlib -import pprint import re -import shutil -import tempfile -from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union - -import deepdiff +from typing import ( + Any, + Callable, + Dict, + List, + Optional, + Sequence, + Set, + Tuple, + Type, + Union, +) from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.ingestion.sink.file import write_metadata_file -from datahub.ingestion.source.file import read_metadata_file from datahub.metadata.schema_classes import MetadataChangeEventClass +from datahub.testing.compare_metadata_json import ( + assert_metadata_files_equal, + load_json_file, +) from datahub.utilities.urns.urn import Urn from tests.test_helpers.type_helpers import PytestConfig @@ -21,9 +28,6 @@ logger = logging.getLogger(__name__) IGNORE_PATH_TIMESTAMPS = [ # Ignore timestamps from the ETL pipeline. A couple examples: - # root[0]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][0]['com.linkedin.pegasus2avro.common.Ownership']['lastModified']['time'] - # root[69]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][0]['com.linkedin.pegasus2avro.schema.SchemaMetadata']['lastModified']['time']" - # root[0]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][1]['com.linkedin.pegasus2avro.dataset.UpstreamLineage']['upstreams'][0]['auditStamp']['time'] r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['created'\]\['time'\]", r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['lastModified'\]\['time'\]", r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['createStamp'\]\['time'\]", @@ -56,12 +60,6 @@ class EntityType: GROUP = "corpGroup" -def load_json_file(filename: Union[str, os.PathLike]) -> object: - with open(str(filename)) as f: - a = json.load(f) - return a - - def clean_nones(value): """ Recursively remove all None values from dictionaries and lists, and returns @@ -75,92 +73,21 @@ def clean_nones(value): return value -def assert_mces_equal( - output: object, golden: object, ignore_paths: Optional[List[str]] = None -) -> None: - # This method assumes we're given a list of MCE json objects. - diff = deepdiff.DeepDiff( - golden, output, exclude_regex_paths=ignore_paths, ignore_order=True - ) - if diff: - # Attempt a clean diff (removing None-s) - assert isinstance(output, list) - assert isinstance(golden, list) - clean_output = [clean_nones(o) for o in output] - clean_golden = [clean_nones(g) for g in golden] - clean_diff = deepdiff.DeepDiff( - clean_golden, - clean_output, - exclude_regex_paths=ignore_paths, - ignore_order=True, - ) - if not clean_diff: - logger.debug(f"MCE-s differ, clean MCE-s are fine\n{pprint.pformat(diff)}") - diff = clean_diff - if diff: - # do some additional processing to emit helpful messages - output_urns = _get_entity_urns(output) - golden_urns = _get_entity_urns(golden) - in_golden_but_not_in_output = golden_urns - output_urns - in_output_but_not_in_golden = output_urns - golden_urns - if in_golden_but_not_in_output: - logger.info( - f"Golden file has {len(in_golden_but_not_in_output)} more urns: {in_golden_but_not_in_output}" - ) - if in_output_but_not_in_golden: - logger.info( - f"Golden file has {len(in_output_but_not_in_golden)} more urns: {in_output_but_not_in_golden}" - ) - - assert ( - not diff - ), f"MCEs differ\n{pprint.pformat(diff)} \n output was: {json.dumps(output)}" - - def check_golden_file( pytestconfig: PytestConfig, output_path: Union[str, os.PathLike], golden_path: Union[str, os.PathLike], - ignore_paths: Optional[List[str]] = None, + ignore_paths: Sequence[str] = (), ) -> None: update_golden = pytestconfig.getoption("--update-golden-files") copy_output = pytestconfig.getoption("--copy-output-files") - golden_exists = os.path.isfile(golden_path) - - if copy_output: - shutil.copyfile(str(output_path), str(golden_path) + ".output") - print(f"Copied output file to {golden_path}.output") - - if not update_golden and not golden_exists: - raise FileNotFoundError( - "Golden file does not exist. Please run with the --update-golden-files option to create." - ) - - output = load_json_file(output_path) - - # if updating a golden file that doesn't exist yet, load the output again - if update_golden and not golden_exists: - golden = load_json_file(output_path) - shutil.copyfile(str(output_path), str(golden_path)) - else: - # We have to "normalize" the golden file by reading and writing it back out. - # This will clean up nulls, double serialization, and other formatting issues. - with tempfile.NamedTemporaryFile() as temp: - golden_metadata = read_metadata_file(pathlib.Path(golden_path)) - write_metadata_file(pathlib.Path(temp.name), golden_metadata) - golden = load_json_file(temp.name) - - try: - assert_mces_equal(output, golden, ignore_paths) - - except AssertionError as e: - # only update golden files if the diffs are not empty - if update_golden: - shutil.copyfile(str(output_path), str(golden_path)) - - # raise the error if we're just running the test - else: - raise e + assert_metadata_files_equal( + output_path=output_path, + golden_path=golden_path, + update_golden=update_golden, + copy_output=copy_output, + ignore_paths=ignore_paths, + ) def _get_field_for_entity_type_in_mce(entity_type: str) -> str: diff --git a/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json b/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json index 08e100140c..d5dfe12594 100644 --- a/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json +++ b/metadata-ingestion/tests/unit/patch/complex_dataset_patch.json @@ -1,42 +1,83 @@ [ - { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", - "changeType": "PATCH", - "aspectName": "datasetProperties", - "aspect": { - "value": "[{\"op\": \"replace\", \"path\": \"/description\", \"value\": \"test description\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_1\", \"value\": \"test_value_1\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_2\", \"value\": \"test_value_2\"}]", - "contentType": "application/json-patch+json" - } - }, - { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", - "changeType": "PATCH", - "aspectName": "globalTags", - "aspect": { - "value": "[{\"op\": \"add\", \"path\": \"/tags/urn:li:tag:test_tag\", \"value\": {\"tag\": \"urn:li:tag:test_tag\"}}]", - "contentType": "application/json-patch+json" - } - }, - { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", - "changeType": "PATCH", - "aspectName": "upstreamLineage", - "aspect": { - "value": "[{\"op\": \"add\", \"path\": \"/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29\", \"value\": {\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)\", \"type\": \"TRANSFORMED\"}}]", - "contentType": "application/json-patch+json" - } - }, - { - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", - "changeType": "PATCH", - "aspectName": "editableSchemaMetadata", - "aspect": { - "value": "[{\"op\": \"add\", \"path\": \"/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1\", \"value\": {\"tag\": \"urn:li:tag:tag1\"}}]", - "contentType": "application/json-patch+json" - } - } +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "datasetProperties", + "aspect": { + "json": [ + { + "op": "replace", + "path": "/description", + "value": "test description" + }, + { + "op": "add", + "path": "/customProperties/test_key_1", + "value": "test_value_1" + }, + { + "op": "add", + "path": "/customProperties/test_key_2", + "value": "test_value_2" + } + ] + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "globalTags", + "aspect": { + "json": [ + { + "op": "add", + "path": "/tags/urn:li:tag:test_tag", + "value": { + "tag": "urn:li:tag:test_tag" + } + } + ] + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "upstreamLineage", + "aspect": { + "json": [ + { + "op": "add", + "path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29", + "value": { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)", + "type": "TRANSFORMED" + } + } + ] + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)", + "changeType": "PATCH", + "aspectName": "editableSchemaMetadata", + "aspect": { + "json": [ + { + "op": "add", + "path": "/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1", + "value": { + "tag": "urn:li:tag:tag1" + } + } + ] + } +} ] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/test_bigquery_usage.py b/metadata-ingestion/tests/unit/test_bigquery_usage.py index 01a7b34f3d..e06c6fb3fe 100644 --- a/metadata-ingestion/tests/unit/test_bigquery_usage.py +++ b/metadata-ingestion/tests/unit/test_bigquery_usage.py @@ -34,10 +34,10 @@ from datahub.metadata.schema_classes import ( OperationClass, TimeWindowSizeClass, ) +from datahub.testing.compare_metadata_json import diff_metadata_json from tests.performance.bigquery import generate_events, ref_from_table from tests.performance.data_generation import generate_data, generate_queries from tests.performance.data_model import Container, FieldAccess, Query, Table, View -from tests.test_helpers.mce_helpers import assert_mces_equal PROJECT_1 = "project-1" PROJECT_2 = "project-2" @@ -224,7 +224,7 @@ def make_zero_usage_workunit( def compare_workunits( output: Iterable[MetadataWorkUnit], expected: Iterable[MetadataWorkUnit] ) -> None: - assert_mces_equal( + assert not diff_metadata_json( [wu.metadata.to_obj() for wu in output], [wu.metadata.to_obj() for wu in expected], ) diff --git a/metadata-ingestion/tests/unit/test_mce_helpers.py b/metadata-ingestion/tests/unit/test_compare_metadata.py similarity index 96% rename from metadata-ingestion/tests/unit/test_mce_helpers.py rename to metadata-ingestion/tests/unit/test_compare_metadata.py index 5af991755f..8316e226d4 100644 --- a/metadata-ingestion/tests/unit/test_mce_helpers.py +++ b/metadata-ingestion/tests/unit/test_compare_metadata.py @@ -2,6 +2,7 @@ import json import pytest +from datahub.testing.compare_metadata_json import diff_metadata_json from tests.test_helpers import mce_helpers basic_1 = json.loads( @@ -149,18 +150,18 @@ basic_3 = json.loads( def test_basic_diff_same() -> None: - mce_helpers.assert_mces_equal(basic_1, basic_2, mce_helpers.IGNORE_PATH_TIMESTAMPS) + assert not diff_metadata_json(basic_1, basic_2, mce_helpers.IGNORE_PATH_TIMESTAMPS) def test_basic_diff_only_owner_change() -> None: with pytest.raises(AssertionError): - mce_helpers.assert_mces_equal( + assert not diff_metadata_json( basic_2, basic_3, mce_helpers.IGNORE_PATH_TIMESTAMPS ) def test_basic_diff_owner_change() -> None: with pytest.raises(AssertionError): - mce_helpers.assert_mces_equal( + assert not diff_metadata_json( basic_1, basic_3, mce_helpers.IGNORE_PATH_TIMESTAMPS ) diff --git a/metadata-ingestion/tests/unit/test_generic_aspect_transformer.py b/metadata-ingestion/tests/unit/test_generic_aspect_transformer.py index fe05ac49a9..18b0d9fd40 100644 --- a/metadata-ingestion/tests/unit/test_generic_aspect_transformer.py +++ b/metadata-ingestion/tests/unit/test_generic_aspect_transformer.py @@ -2,6 +2,7 @@ import json import unittest from typing import Any, List, Optional +from datahub.emitter.aspect import JSON_CONTENT_TYPE from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import EndOfStream, PipelineContext, RecordEnvelope from datahub.ingestion.transformer.generic_aspect_transformer import ( @@ -94,7 +95,7 @@ class DummyGenericAspectTransformer(GenericAspectTransformer): aspect.value if aspect else json.dumps({"customAspect": 10}).encode("utf-8") ) result_aspect = GenericAspectClass( - contentType="application/json", + contentType=JSON_CONTENT_TYPE, value=value, ) return result_aspect @@ -183,7 +184,7 @@ class TestDummyGenericAspectTransformer(unittest.TestCase): entity_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,example1,PROD)", aspect_name="customAspect", aspect=GenericAspectClass( - contentType="application/json", + contentType=JSON_CONTENT_TYPE, value=json.dumps({"customAspect": 5}).encode("utf-8"), ), ) @@ -251,7 +252,7 @@ class TestDummyRemoveGenericAspectTransformer(unittest.TestCase): entity_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,example1,PROD)", aspect_name="customAspect", aspect=GenericAspectClass( - contentType="application/json", + contentType=JSON_CONTENT_TYPE, value=json.dumps({"customAspect": 5}).encode("utf-8"), ), ) diff --git a/metadata-ingestion/tests/unit/test_workunit.py b/metadata-ingestion/tests/unit/test_workunit.py index 9d31b3fc60..5a4fbf315e 100644 --- a/metadata-ingestion/tests/unit/test_workunit.py +++ b/metadata-ingestion/tests/unit/test_workunit.py @@ -1,5 +1,6 @@ import json +from datahub.emitter.aspect import JSON_CONTENT_TYPE, JSON_PATCH_CONTENT_TYPE from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.metadata.schema_classes import ( @@ -48,7 +49,7 @@ def test_get_aspects_of_type_mcpc(): aspectName=StatusClass.ASPECT_NAME, aspect=GenericAspectClass( value=json.dumps(aspect.to_obj()).encode(), - contentType="application/json", + contentType=JSON_CONTENT_TYPE, ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) @@ -63,7 +64,7 @@ def test_get_aspects_of_type_mcpc(): aspectName="not status", aspect=GenericAspectClass( value=json.dumps(aspect.to_obj()).encode(), - contentType="application/json", + contentType=JSON_CONTENT_TYPE, ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) @@ -76,7 +77,7 @@ def test_get_aspects_of_type_mcpc(): aspectName=StatusClass.ASPECT_NAME, aspect=GenericAspectClass( value=json.dumps({"not_status": True}).encode(), - contentType="application/json-patch+json", + contentType=JSON_PATCH_CONTENT_TYPE, ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) @@ -89,7 +90,7 @@ def test_get_aspects_of_type_mcpc(): aspectName=StatusClass.ASPECT_NAME, aspect=GenericAspectClass( value=(json.dumps(aspect.to_obj()) + "aaa").encode(), - contentType="application/json", + contentType=JSON_CONTENT_TYPE, ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc) @@ -102,7 +103,7 @@ def test_get_aspects_of_type_mcpc(): aspectName=StatusClass.ASPECT_NAME, aspect=GenericAspectClass( value='{"ß": 2}'.encode("latin_1"), - contentType="application/json", + contentType=JSON_CONTENT_TYPE, ), ) wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)