test(ingest): Aspect level golden file comparison (#8310)

This commit is contained in:
Andrew Sikowitz 2023-07-11 10:39:47 -04:00 committed by GitHub
parent 18c1f12436
commit 2261531e31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 576 additions and 164 deletions

View File

@ -424,6 +424,11 @@ mypy_stubs = {
"types-protobuf>=4.21.0.1",
}
pytest_dep = "pytest>=6.2.2"
deepdiff_dep = "deepdiff"
test_api_requirements = {pytest_dep, deepdiff_dep, "PyYAML"}
base_dev_requirements = {
*base_requirements,
*framework_common,
@ -442,11 +447,12 @@ base_dev_requirements = {
# pydantic 1.8.2 is incompatible with mypy 0.910.
# See https://github.com/samuelcolvin/pydantic/pull/3175#issuecomment-995382910.
"pydantic>=1.9.0",
"pytest>=6.2.2",
*test_api_requirements,
pytest_dep,
"pytest-asyncio>=0.16.0",
"pytest-cov>=2.8.1",
"pytest-docker>=1.0.1",
"deepdiff",
deepdiff_dep,
"requests-mock",
"freezegun",
"jsonpickle",
@ -697,6 +703,7 @@ setuptools.setup(
)
),
"dev": list(dev_requirements),
"testing-utils": list(test_api_requirements), # To import `datahub.testing`
"integration-tests": list(full_test_dev_requirements),
},
)

View File

@ -1,7 +1,8 @@
import logging
import pprint
import shutil
import tempfile
from typing import Optional
from typing import List, Optional
import click
@ -14,6 +15,8 @@ from datahub.ingestion.sink.sink_registry import sink_registry
from datahub.ingestion.source.source_registry import source_registry
from datahub.ingestion.transformer.transform_registry import transform_registry
from datahub.telemetry import telemetry
from datahub.testing.compare_metadata_json import diff_metadata_json, load_json_file
from datahub.testing.mcp_diff import MCPDiff
logger = logging.getLogger(__name__)
@ -69,6 +72,48 @@ def metadata_file(json_file: str, rewrite: bool, unpack_mces: bool) -> None:
shutil.copy(out_file.name, json_file)
@check.command(no_args_is_help=True)
@click.argument(
"actual-file",
type=click.Path(exists=True, dir_okay=False, readable=True),
)
@click.argument(
"expected-file",
type=click.Path(exists=True, dir_okay=False, readable=True),
)
@click.option(
"--verbose",
"-v",
type=bool,
default=False,
help="Print full aspects that were changed, when comparing MCPs",
)
@click.option(
"--ignore-path",
multiple=True,
type=str,
default=(),
help="[Advanced] Paths in the deepdiff object to ignore",
)
@telemetry.with_telemetry()
def metadata_diff(
actual_file: str, expected_file: str, verbose: bool, ignore_path: List[str]
) -> None:
"""Compare two metadata (MCE or MCP) JSON files.
Comparison is more sophisticated for files composed solely of MCPs.
"""
actual = load_json_file(actual_file)
expected = load_json_file(expected_file)
diff = diff_metadata_json(output=actual, golden=expected, ignore_paths=ignore_path)
if isinstance(diff, MCPDiff):
click.echo(diff.pretty(verbose=verbose))
else:
click.echo(pprint.pformat(diff))
@check.command()
@click.option(
"--verbose",

View File

@ -9,3 +9,6 @@ TIMESERIES_ASPECT_MAP = {
for name, klass in ASPECT_MAP.items()
if klass.get_aspect_type() == "timeseries"
}
JSON_CONTENT_TYPE = "application/json"
JSON_PATCH_CONTENT_TYPE = "application/json-patch+json"

View File

@ -2,7 +2,7 @@ import dataclasses
import json
from typing import TYPE_CHECKING, List, Optional, Tuple, Union
from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP
from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE, TIMESERIES_ASPECT_MAP
from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform
from datahub.metadata.schema_classes import (
ChangeTypeClass,
@ -20,14 +20,12 @@ if TYPE_CHECKING:
_ENTITY_TYPE_UNSET = "ENTITY_TYPE_UNSET"
_ASPECT_CONTENT_TYPE = "application/json"
def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass:
serialized = json.dumps(pre_json_transform(codegen_obj.to_obj()))
return GenericAspectClass(
value=serialized.encode(),
contentType=_ASPECT_CONTENT_TYPE,
contentType=JSON_CONTENT_TYPE,
)
@ -42,7 +40,7 @@ def _try_from_generic_aspect(
return True, None
assert aspectName is not None, "aspectName must be set if aspect is set"
if aspect.contentType != _ASPECT_CONTENT_TYPE:
if aspect.contentType != JSON_CONTENT_TYPE:
return False, None
if aspectName not in ASPECT_MAP:
@ -155,7 +153,7 @@ class MetadataChangeProposalWrapper:
# Undo the double JSON serialization that happens in the MCP aspect.
if (
obj.get("aspect")
and obj["aspect"].get("contentType") == _ASPECT_CONTENT_TYPE
and obj["aspect"].get("contentType") == JSON_CONTENT_TYPE
):
obj["aspect"] = {"json": json.loads(obj["aspect"]["value"])}
return obj
@ -174,7 +172,7 @@ class MetadataChangeProposalWrapper:
# routine works.
if obj.get("aspect") and obj["aspect"].get("json"):
obj["aspect"] = {
"contentType": _ASPECT_CONTENT_TYPE,
"contentType": JSON_CONTENT_TYPE,
"value": json.dumps(obj["aspect"]["json"]),
}

View File

@ -3,6 +3,7 @@ from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional
from datahub.emitter.aspect import JSON_PATCH_CONTENT_TYPE
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.metadata.schema_classes import (
ChangeTypeClass,
@ -72,7 +73,7 @@ class MetadataPatchProposal:
value=json.dumps(
pre_json_transform(_recursive_to_obj(patches))
).encode(),
contentType="application/json-patch+json",
contentType=JSON_PATCH_CONTENT_TYPE,
),
auditHeader=self.audit_header,
systemMetadata=self.system_metadata,

View File

@ -10,6 +10,7 @@ from datahub.configuration.common import (
IgnorableError,
redact_raw_config,
)
from datahub.emitter.aspect import JSON_CONTENT_TYPE
from datahub.emitter.mce_builder import datahub_guid
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import make_data_platform_urn
@ -205,7 +206,7 @@ class DatahubIngestionRunSummaryProvider(PipelineRunListener):
structured_report = StructuredExecutionReportClass(
type="CLI_INGEST",
serializedValue=structured_report_str,
contentType="application/json",
contentType=JSON_CONTENT_TYPE,
)
execution_result_aspect = ExecutionRequestResultClass(
status=status,

View File

@ -4,6 +4,7 @@ import pathlib
from typing import Iterable, Union
from datahub.configuration.common import ConfigModel
from datahub.emitter.aspect import JSON_CONTENT_TYPE, JSON_PATCH_CONTENT_TYPE
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import RecordEnvelope
from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback
@ -27,6 +28,14 @@ def _to_obj_for_file(
) -> dict:
if isinstance(obj, MetadataChangeProposalWrapper):
return obj.to_obj(simplified_structure=simplified_structure)
elif isinstance(obj, MetadataChangeProposal) and simplified_structure:
serialized = obj.to_obj()
if serialized.get("aspect") and serialized["aspect"].get("contentType") in [
JSON_CONTENT_TYPE,
JSON_PATCH_CONTENT_TYPE,
]:
serialized["aspect"] = {"json": json.loads(serialized["aspect"]["value"])}
return serialized
return obj.to_obj()
@ -82,6 +91,7 @@ def write_metadata_file(
MetadataChangeProposal,
MetadataChangeProposalWrapper,
UsageAggregation,
dict, # Serialized MCE or MCP
]
],
) -> None:
@ -91,6 +101,7 @@ def write_metadata_file(
for i, record in enumerate(records):
if i > 0:
f.write(",\n")
obj = _to_obj_for_file(record)
json.dump(obj, f, indent=4)
if not isinstance(record, dict):
record = _to_obj_for_file(record)
json.dump(record, f, indent=4)
f.write("\n]")

View File

@ -0,0 +1,109 @@
"""Utilities for comparing MCE and MCP files."""
import json
import logging
import os
import pathlib
import pprint
import shutil
import tempfile
from typing import Any, Dict, List, Sequence, Union
import pytest
from deepdiff import DeepDiff
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.file import read_metadata_file
from datahub.testing.mcp_diff import MCPDiff, get_aspects_by_urn
logger = logging.getLogger(__name__)
MetadataJson = List[Dict[str, Any]]
default_exclude_paths = [
r"root\[\d+\]\['systemMetadata'\]\['lastObserved'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['timestampMillis'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['lastUpdatedTimestamp'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['created'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['lastModified'\]",
]
def load_json_file(filename: Union[str, os.PathLike]) -> MetadataJson:
with open(str(filename)) as f:
return json.load(f)
def assert_metadata_files_equal(
output_path: Union[str, os.PathLike],
golden_path: Union[str, os.PathLike],
update_golden: bool,
copy_output: bool,
ignore_paths: Sequence[str] = (),
) -> None:
golden_exists = os.path.isfile(golden_path)
if copy_output:
shutil.copyfile(str(output_path), str(golden_path) + ".output")
print(f"Copied output file to {golden_path}.output")
if not update_golden and not golden_exists:
raise FileNotFoundError(
"Golden file does not exist. Please run with the --update-golden-files option to create."
)
output = load_json_file(output_path)
if update_golden and not golden_exists:
golden = load_json_file(output_path)
shutil.copyfile(str(output_path), str(golden_path))
return
else:
# We have to "normalize" the golden file by reading and writing it back out.
# This will clean up nulls, double serialization, and other formatting issues.
with tempfile.NamedTemporaryFile() as temp:
golden_metadata = read_metadata_file(pathlib.Path(golden_path))
write_metadata_file(pathlib.Path(temp.name), golden_metadata)
golden = load_json_file(temp.name)
diff = diff_metadata_json(output, golden, ignore_paths)
if diff and update_golden:
if isinstance(diff, MCPDiff):
diff.apply_delta(golden)
write_metadata_file(pathlib.Path(golden_path), golden)
else:
shutil.copyfile(str(output_path), str(golden_path))
return
if diff:
# Call pytest.fail rather than raise an exception to omit stack trace
if isinstance(diff, MCPDiff):
print(diff.pretty(verbose=True))
pytest.fail(diff.pretty(), pytrace=False)
else:
pytest.fail(pprint.pformat(diff), pytrace=False)
def diff_metadata_json(
output: MetadataJson,
golden: MetadataJson,
ignore_paths: Sequence[str] = (),
) -> Union[DeepDiff, MCPDiff]:
ignore_paths = (*ignore_paths, *default_exclude_paths)
try:
golden_map = get_aspects_by_urn(golden)
output_map = get_aspects_by_urn(output)
return MCPDiff.create(
golden=golden_map,
output=output_map,
ignore_paths=ignore_paths,
)
except AssertionError as e:
logger.warning(f"Reverting to old diff method: {e}")
logger.debug("Error with new diff method", exc_info=True)
return DeepDiff(
golden,
output,
exclude_regex_paths=ignore_paths,
ignore_order=True,
)

View File

@ -0,0 +1,266 @@
import re
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any, Dict, List, Sequence, Set, Tuple, Union
import deepdiff.serialization
import yaml
from deepdiff import DeepDiff
from deepdiff.model import DiffLevel
from deepdiff.operator import BaseOperator
from typing_extensions import Literal
ReportType = Literal[
"type_changes",
"dictionary_item_added",
"dictionary_item_removed",
"values_changed",
"unprocessed",
"iterable_item_added",
"iterable_item_removed",
"iterable_item_moved",
"attribute_added",
"attribute_removed",
"set_item_added",
"set_item_removed",
"repetition_change",
]
@dataclass(frozen=True)
class AspectForDiff:
urn: str
change_type: str
aspect_name: str
aspect: Dict[str, Any] = field(hash=False)
delta_info: "DeltaInfo" = field(hash=False)
@classmethod
def create_from_mcp(cls, idx: int, obj: Dict[str, Any]) -> "AspectForDiff":
aspect = obj["aspect"]
return cls(
urn=obj["entityUrn"],
change_type=obj["changeType"],
aspect_name=obj["aspectName"],
aspect=aspect.get("json", aspect),
delta_info=DeltaInfo(idx=idx, original=obj),
)
@dataclass
class DeltaInfo:
"""Information about an MCP used to construct a diff delta.
In a separate class so it can be ignored by DeepDiff via MCPDeltaInfoOperator.
"""
idx: int # Location in list of MCEs in golden file
original: Dict[str, Any] # Original json-serialized MCP
class DeltaInfoOperator(BaseOperator):
def __init__(self):
super().__init__(types=[DeltaInfo])
def give_up_diffing(self, *args: Any, **kwargs: Any) -> bool:
return True
AspectsByUrn = Dict[str, Dict[str, List[AspectForDiff]]]
def get_aspects_by_urn(obj: object) -> AspectsByUrn:
"""Restructure a list of serialized MCPs by urn and aspect.
Retains information like the original dict and index to facilitate `apply_delta` later.
Raises:
AssertionError: If the input is not purely a list of MCPs.
"""
d: AspectsByUrn = defaultdict(dict)
assert isinstance(obj, list), obj
for i, entry in enumerate(obj):
assert isinstance(entry, dict), entry
if "proposedSnapshot" in entry:
raise AssertionError("Found MCEs in output")
elif "entityUrn" in entry and "aspectName" in entry and "aspect" in entry:
urn = entry["entityUrn"]
aspect_name = entry["aspectName"]
aspect = AspectForDiff.create_from_mcp(i, entry)
d[urn].setdefault(aspect_name, []).append(aspect)
else:
raise AssertionError(f"Unrecognized MCE: {entry}")
return d
@dataclass
class MCPAspectDiff:
aspects_added: Dict[int, AspectForDiff]
aspects_removed: Dict[int, AspectForDiff]
aspects_changed: Dict[Tuple[int, AspectForDiff, AspectForDiff], List[DiffLevel]]
@classmethod
def create(cls, diff: DeepDiff) -> "MCPAspectDiff":
# Parse DeepDiff to distinguish between aspects that were added, removed, or changed
aspects_added = {}
aspects_removed = {}
aspects_changed = defaultdict(list)
for key, diff_levels in diff.tree.items():
for diff_level in diff_levels:
path = diff_level.path(output_format="list")
idx = int(path[0])
if len(path) == 1 and key == "iterable_item_added":
aspects_added[idx] = diff_level.t2
elif len(path) == 1 and key == "iterable_item_removed":
aspects_removed[idx] = diff_level.t1
else:
level = diff_level
while not isinstance(level.t1, AspectForDiff):
level = level.up
aspects_changed[(idx, level.t1, level.t2)].append(diff_level)
return cls(
aspects_added=aspects_added,
aspects_removed=aspects_removed,
aspects_changed=aspects_changed,
)
@dataclass
class MCPDiff:
aspect_changes: Dict[str, Dict[str, MCPAspectDiff]] # urn -> aspect -> diff
urns_added: Set[str]
urns_removed: Set[str]
def __bool__(self) -> bool:
return bool(self.aspect_changes)
@classmethod
def create(
cls,
golden: AspectsByUrn,
output: AspectsByUrn,
ignore_paths: Sequence[str],
) -> "MCPDiff":
ignore_paths = [cls.convert_path(path) for path in ignore_paths]
aspect_changes: Dict[str, Dict[str, MCPAspectDiff]] = defaultdict(dict)
for urn in golden.keys() | output.keys():
golden_map = golden.get(urn, {})
output_map = output.get(urn, {})
for aspect_name in golden_map.keys() | output_map.keys():
diff = DeepDiff(
t1=golden_map.get(aspect_name, []),
t2=output_map.get(aspect_name, []),
exclude_regex_paths=ignore_paths,
ignore_order=True,
custom_operators=[DeltaInfoOperator()],
)
if diff:
aspect_changes[urn][aspect_name] = MCPAspectDiff.create(diff)
return cls(
urns_added=output.keys() - golden.keys(),
urns_removed=golden.keys() - output.keys(),
aspect_changes=aspect_changes,
)
@staticmethod
def convert_path(path: str) -> str:
# Attempt to use paths intended for the root golden... sorry for the regex
return re.sub(
r"root\\?\[([0-9]+|\\d\+)\\?]\\?\['aspect'\\?](\\?\['(json|value)'\\?])?",
r"root\[\\d+].aspect",
path,
)
def apply_delta(self, golden: List[Dict[str, Any]]) -> None:
aspect_diffs = [v for d in self.aspect_changes.values() for v in d.values()]
for aspect_diff in aspect_diffs:
for (_, old, new), diffs in aspect_diff.aspects_changed.items():
golden[old.delta_info.idx] = new.delta_info.original
indices_to_remove = set()
for aspect_diff in aspect_diffs:
for ga in aspect_diff.aspects_removed.values():
indices_to_remove.add(ga.delta_info.idx)
for idx in sorted(indices_to_remove, reverse=True):
del golden[idx]
for aspect_diff in aspect_diffs: # Ideally would have smarter way to do this
for ga in aspect_diff.aspects_added.values():
golden.insert(ga.delta_info.idx, ga.delta_info.original)
def pretty(self, verbose: bool = False) -> str:
"""The pretty human-readable string output of the diff between golden and output."""
s = []
for urn in self.urns_added:
s.append(f"Urn added, {urn}{' with aspects:' if verbose else ''}")
if verbose:
for aspect_diff in self.aspect_changes[urn].values():
for i, ga in aspect_diff.aspects_added.items():
s.append(self.report_aspect(ga, i))
s.append(serialize_aspect(ga.aspect))
if self.urns_added:
s.append("")
for urn in self.urns_removed:
s.append(f"Urn removed, {urn}{' with aspects:' if verbose else ''}")
if verbose:
for aspect_diff in self.aspect_changes[urn].values():
for i, ga in aspect_diff.aspects_removed.items():
s.append(self.report_aspect(ga, i))
s.append(serialize_aspect(ga.aspect))
if self.urns_removed:
s.append("")
for urn in self.aspect_changes.keys() - self.urns_added - self.urns_removed:
aspect_map = self.aspect_changes[urn]
s.append(f"Urn changed, {urn}:")
for aspect_name, aspect_diffs in aspect_map.items():
for i, ga in aspect_diffs.aspects_added.items():
s.append(self.report_aspect(ga, i, "added"))
if verbose:
s.append(serialize_aspect(ga.aspect))
for i, ga in aspect_diffs.aspects_removed.items():
s.append(self.report_aspect(ga, i, "removed"))
if verbose:
s.append(serialize_aspect(ga.aspect))
for (i, old, new), diffs in aspect_diffs.aspects_changed.items():
s.append(self.report_aspect(old, i, "changed") + ":")
for diff_level in diffs:
s.append(self.report_diff_level(diff_level, i))
if verbose:
s.append(f"Old aspect:\n{serialize_aspect(old.aspect)}")
s.append(f"New aspect:\n{serialize_aspect(new.aspect)}")
s.append("")
return "\n".join(s)
@staticmethod
def report_aspect(ga: AspectForDiff, idx: int, msg: str = "") -> str:
# Describe as "nth <aspect>" if n > 1
base = (idx + 1) % 10
if base == 1:
suffix = "st"
elif base == 2:
suffix = "nd"
elif base == 3:
suffix = "rd"
else:
suffix = "th"
ordinal = f"{(idx+1)}{suffix} " if idx else ""
return f"{ordinal}<{ga.aspect_name}> {msg}"
@staticmethod
def report_diff_level(diff: DiffLevel, idx: int) -> str:
return "\t" + deepdiff.serialization.pretty_print_diff(diff).replace(
f"root[{idx}].", ""
)
def serialize_aspect(aspect: Union[AspectForDiff, Dict[str, Any]]) -> str:
if isinstance(aspect, AspectForDiff): # Unpack aspect
aspect = aspect.aspect
return " " + yaml.dump(aspect, sort_keys=False).replace("\n", "\n ").strip()

View File

@ -1,19 +1,26 @@
import json
import logging
import os
import pathlib
import pprint
import re
import shutil
import tempfile
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Type, Union
import deepdiff
from typing import (
Any,
Callable,
Dict,
List,
Optional,
Sequence,
Set,
Tuple,
Type,
Union,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.sink.file import write_metadata_file
from datahub.ingestion.source.file import read_metadata_file
from datahub.metadata.schema_classes import MetadataChangeEventClass
from datahub.testing.compare_metadata_json import (
assert_metadata_files_equal,
load_json_file,
)
from datahub.utilities.urns.urn import Urn
from tests.test_helpers.type_helpers import PytestConfig
@ -21,9 +28,6 @@ logger = logging.getLogger(__name__)
IGNORE_PATH_TIMESTAMPS = [
# Ignore timestamps from the ETL pipeline. A couple examples:
# root[0]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][0]['com.linkedin.pegasus2avro.common.Ownership']['lastModified']['time']
# root[69]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][0]['com.linkedin.pegasus2avro.schema.SchemaMetadata']['lastModified']['time']"
# root[0]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][1]['com.linkedin.pegasus2avro.dataset.UpstreamLineage']['upstreams'][0]['auditStamp']['time']
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['created'\]\['time'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['lastModified'\]\['time'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['createStamp'\]\['time'\]",
@ -56,12 +60,6 @@ class EntityType:
GROUP = "corpGroup"
def load_json_file(filename: Union[str, os.PathLike]) -> object:
with open(str(filename)) as f:
a = json.load(f)
return a
def clean_nones(value):
"""
Recursively remove all None values from dictionaries and lists, and returns
@ -75,92 +73,21 @@ def clean_nones(value):
return value
def assert_mces_equal(
output: object, golden: object, ignore_paths: Optional[List[str]] = None
) -> None:
# This method assumes we're given a list of MCE json objects.
diff = deepdiff.DeepDiff(
golden, output, exclude_regex_paths=ignore_paths, ignore_order=True
)
if diff:
# Attempt a clean diff (removing None-s)
assert isinstance(output, list)
assert isinstance(golden, list)
clean_output = [clean_nones(o) for o in output]
clean_golden = [clean_nones(g) for g in golden]
clean_diff = deepdiff.DeepDiff(
clean_golden,
clean_output,
exclude_regex_paths=ignore_paths,
ignore_order=True,
)
if not clean_diff:
logger.debug(f"MCE-s differ, clean MCE-s are fine\n{pprint.pformat(diff)}")
diff = clean_diff
if diff:
# do some additional processing to emit helpful messages
output_urns = _get_entity_urns(output)
golden_urns = _get_entity_urns(golden)
in_golden_but_not_in_output = golden_urns - output_urns
in_output_but_not_in_golden = output_urns - golden_urns
if in_golden_but_not_in_output:
logger.info(
f"Golden file has {len(in_golden_but_not_in_output)} more urns: {in_golden_but_not_in_output}"
)
if in_output_but_not_in_golden:
logger.info(
f"Golden file has {len(in_output_but_not_in_golden)} more urns: {in_output_but_not_in_golden}"
)
assert (
not diff
), f"MCEs differ\n{pprint.pformat(diff)} \n output was: {json.dumps(output)}"
def check_golden_file(
pytestconfig: PytestConfig,
output_path: Union[str, os.PathLike],
golden_path: Union[str, os.PathLike],
ignore_paths: Optional[List[str]] = None,
ignore_paths: Sequence[str] = (),
) -> None:
update_golden = pytestconfig.getoption("--update-golden-files")
copy_output = pytestconfig.getoption("--copy-output-files")
golden_exists = os.path.isfile(golden_path)
if copy_output:
shutil.copyfile(str(output_path), str(golden_path) + ".output")
print(f"Copied output file to {golden_path}.output")
if not update_golden and not golden_exists:
raise FileNotFoundError(
"Golden file does not exist. Please run with the --update-golden-files option to create."
)
output = load_json_file(output_path)
# if updating a golden file that doesn't exist yet, load the output again
if update_golden and not golden_exists:
golden = load_json_file(output_path)
shutil.copyfile(str(output_path), str(golden_path))
else:
# We have to "normalize" the golden file by reading and writing it back out.
# This will clean up nulls, double serialization, and other formatting issues.
with tempfile.NamedTemporaryFile() as temp:
golden_metadata = read_metadata_file(pathlib.Path(golden_path))
write_metadata_file(pathlib.Path(temp.name), golden_metadata)
golden = load_json_file(temp.name)
try:
assert_mces_equal(output, golden, ignore_paths)
except AssertionError as e:
# only update golden files if the diffs are not empty
if update_golden:
shutil.copyfile(str(output_path), str(golden_path))
# raise the error if we're just running the test
else:
raise e
assert_metadata_files_equal(
output_path=output_path,
golden_path=golden_path,
update_golden=update_golden,
copy_output=copy_output,
ignore_paths=ignore_paths,
)
def _get_field_for_entity_type_in_mce(entity_type: str) -> str:

View File

@ -1,42 +1,83 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "datasetProperties",
"aspect": {
"value": "[{\"op\": \"replace\", \"path\": \"/description\", \"value\": \"test description\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_1\", \"value\": \"test_value_1\"}, {\"op\": \"add\", \"path\": \"/customProperties/test_key_2\", \"value\": \"test_value_2\"}]",
"contentType": "application/json-patch+json"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "globalTags",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/tags/urn:li:tag:test_tag\", \"value\": {\"tag\": \"urn:li:tag:test_tag\"}}]",
"contentType": "application/json-patch+json"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "upstreamLineage",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29\", \"value\": {\"auditStamp\": {\"time\": 0, \"actor\": \"urn:li:corpuser:unknown\"}, \"dataset\": \"urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)\", \"type\": \"TRANSFORMED\"}}]",
"contentType": "application/json-patch+json"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "editableSchemaMetadata",
"aspect": {
"value": "[{\"op\": \"add\", \"path\": \"/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1\", \"value\": {\"tag\": \"urn:li:tag:tag1\"}}]",
"contentType": "application/json-patch+json"
}
}
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "datasetProperties",
"aspect": {
"json": [
{
"op": "replace",
"path": "/description",
"value": "test description"
},
{
"op": "add",
"path": "/customProperties/test_key_1",
"value": "test_value_1"
},
{
"op": "add",
"path": "/customProperties/test_key_2",
"value": "test_value_2"
}
]
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "globalTags",
"aspect": {
"json": [
{
"op": "add",
"path": "/tags/urn:li:tag:test_tag",
"value": {
"tag": "urn:li:tag:test_tag"
}
}
]
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "upstreamLineage",
"aspect": {
"json": [
{
"op": "add",
"path": "/upstreams/urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahive%2Cfct_users_created_upstream%2CPROD%29",
"value": {
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created_upstream,PROD)",
"type": "TRANSFORMED"
}
}
]
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
"changeType": "PATCH",
"aspectName": "editableSchemaMetadata",
"aspect": {
"json": [
{
"op": "add",
"path": "/editableSchemaFieldInfo/field1/globalTags/tags/urn:li:tag:tag1",
"value": {
"tag": "urn:li:tag:tag1"
}
}
]
}
}
]

View File

@ -34,10 +34,10 @@ from datahub.metadata.schema_classes import (
OperationClass,
TimeWindowSizeClass,
)
from datahub.testing.compare_metadata_json import diff_metadata_json
from tests.performance.bigquery import generate_events, ref_from_table
from tests.performance.data_generation import generate_data, generate_queries
from tests.performance.data_model import Container, FieldAccess, Query, Table, View
from tests.test_helpers.mce_helpers import assert_mces_equal
PROJECT_1 = "project-1"
PROJECT_2 = "project-2"
@ -224,7 +224,7 @@ def make_zero_usage_workunit(
def compare_workunits(
output: Iterable[MetadataWorkUnit], expected: Iterable[MetadataWorkUnit]
) -> None:
assert_mces_equal(
assert not diff_metadata_json(
[wu.metadata.to_obj() for wu in output],
[wu.metadata.to_obj() for wu in expected],
)

View File

@ -2,6 +2,7 @@ import json
import pytest
from datahub.testing.compare_metadata_json import diff_metadata_json
from tests.test_helpers import mce_helpers
basic_1 = json.loads(
@ -149,18 +150,18 @@ basic_3 = json.loads(
def test_basic_diff_same() -> None:
mce_helpers.assert_mces_equal(basic_1, basic_2, mce_helpers.IGNORE_PATH_TIMESTAMPS)
assert not diff_metadata_json(basic_1, basic_2, mce_helpers.IGNORE_PATH_TIMESTAMPS)
def test_basic_diff_only_owner_change() -> None:
with pytest.raises(AssertionError):
mce_helpers.assert_mces_equal(
assert not diff_metadata_json(
basic_2, basic_3, mce_helpers.IGNORE_PATH_TIMESTAMPS
)
def test_basic_diff_owner_change() -> None:
with pytest.raises(AssertionError):
mce_helpers.assert_mces_equal(
assert not diff_metadata_json(
basic_1, basic_3, mce_helpers.IGNORE_PATH_TIMESTAMPS
)

View File

@ -2,6 +2,7 @@ import json
import unittest
from typing import Any, List, Optional
from datahub.emitter.aspect import JSON_CONTENT_TYPE
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import EndOfStream, PipelineContext, RecordEnvelope
from datahub.ingestion.transformer.generic_aspect_transformer import (
@ -94,7 +95,7 @@ class DummyGenericAspectTransformer(GenericAspectTransformer):
aspect.value if aspect else json.dumps({"customAspect": 10}).encode("utf-8")
)
result_aspect = GenericAspectClass(
contentType="application/json",
contentType=JSON_CONTENT_TYPE,
value=value,
)
return result_aspect
@ -183,7 +184,7 @@ class TestDummyGenericAspectTransformer(unittest.TestCase):
entity_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,example1,PROD)",
aspect_name="customAspect",
aspect=GenericAspectClass(
contentType="application/json",
contentType=JSON_CONTENT_TYPE,
value=json.dumps({"customAspect": 5}).encode("utf-8"),
),
)
@ -251,7 +252,7 @@ class TestDummyRemoveGenericAspectTransformer(unittest.TestCase):
entity_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,example1,PROD)",
aspect_name="customAspect",
aspect=GenericAspectClass(
contentType="application/json",
contentType=JSON_CONTENT_TYPE,
value=json.dumps({"customAspect": 5}).encode("utf-8"),
),
)

View File

@ -1,5 +1,6 @@
import json
from datahub.emitter.aspect import JSON_CONTENT_TYPE, JSON_PATCH_CONTENT_TYPE
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.schema_classes import (
@ -48,7 +49,7 @@ def test_get_aspects_of_type_mcpc():
aspectName=StatusClass.ASPECT_NAME,
aspect=GenericAspectClass(
value=json.dumps(aspect.to_obj()).encode(),
contentType="application/json",
contentType=JSON_CONTENT_TYPE,
),
)
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
@ -63,7 +64,7 @@ def test_get_aspects_of_type_mcpc():
aspectName="not status",
aspect=GenericAspectClass(
value=json.dumps(aspect.to_obj()).encode(),
contentType="application/json",
contentType=JSON_CONTENT_TYPE,
),
)
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
@ -76,7 +77,7 @@ def test_get_aspects_of_type_mcpc():
aspectName=StatusClass.ASPECT_NAME,
aspect=GenericAspectClass(
value=json.dumps({"not_status": True}).encode(),
contentType="application/json-patch+json",
contentType=JSON_PATCH_CONTENT_TYPE,
),
)
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
@ -89,7 +90,7 @@ def test_get_aspects_of_type_mcpc():
aspectName=StatusClass.ASPECT_NAME,
aspect=GenericAspectClass(
value=(json.dumps(aspect.to_obj()) + "aaa").encode(),
contentType="application/json",
contentType=JSON_CONTENT_TYPE,
),
)
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)
@ -102,7 +103,7 @@ def test_get_aspects_of_type_mcpc():
aspectName=StatusClass.ASPECT_NAME,
aspect=GenericAspectClass(
value='{"ß": 2}'.encode("latin_1"),
contentType="application/json",
contentType=JSON_CONTENT_TYPE,
),
)
wu = MetadataWorkUnit(id="id", mcp_raw=mcpc)