256 lines
8.6 KiB
Python
Raw Normal View History

2021-02-11 12:53:44 -08:00
import json
import logging
import os
import pprint
import shutil
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
2021-02-11 23:14:20 -08:00
2021-02-11 12:53:44 -08:00
import deepdiff
from tests.test_helpers.type_helpers import PytestConfig
logger = logging.getLogger(__name__)
IGNORE_PATH_TIMESTAMPS = [
# Ignore timestamps from the ETL pipeline. A couple examples:
# root[0]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][0]['com.linkedin.pegasus2avro.common.Ownership']['lastModified']['time']
# root[69]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][0]['com.linkedin.pegasus2avro.schema.SchemaMetadata']['lastModified']['time']"
# root[0]['proposedSnapshot']['com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot']['aspects'][1]['com.linkedin.pegasus2avro.dataset.UpstreamLineage']['upstreams'][0]['auditStamp']['time']
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['created'\]\['time'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['lastModified'\]\['time'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['createStamp'\]\['time'\]",
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['auditStamp'\]\['time'\]",
]
2021-02-11 12:53:44 -08:00
def load_json_file(filename: Union[str, os.PathLike]) -> object:
2021-02-11 12:53:44 -08:00
with open(str(filename)) as f:
a = json.load(f)
return a
def clean_nones(value):
"""
Recursively remove all None values from dictionaries and lists, and returns
the result as a new dictionary or list.
"""
if isinstance(value, list):
return [clean_nones(x) for x in value if x is not None]
elif isinstance(value, dict):
return {key: clean_nones(val) for key, val in value.items() if val is not None}
else:
return value
def assert_mces_equal(
output: object, golden: object, ignore_paths: Optional[List[str]] = None
) -> None:
2021-02-11 12:53:44 -08:00
# This method assumes we're given a list of MCE json objects.
diff = deepdiff.DeepDiff(
golden, output, exclude_regex_paths=ignore_paths, ignore_order=True
)
if diff:
# Attempt a clean diff (removing None-s)
assert isinstance(output, list)
assert isinstance(golden, list)
clean_output = [clean_nones(o) for o in output]
clean_golden = [clean_nones(g) for g in golden]
clean_diff = deepdiff.DeepDiff(
clean_golden,
clean_output,
exclude_regex_paths=ignore_paths,
ignore_order=True,
)
if clean_diff != diff:
logger.warning(
f"MCE-s differ, clean MCE-s are fine\n{pprint.pformat(diff)}"
)
diff = clean_diff
assert not diff, f"MCEs differ\n{pprint.pformat(diff)}"
def check_golden_file(
pytestconfig: PytestConfig,
output_path: Union[str, os.PathLike],
golden_path: Union[str, os.PathLike],
ignore_paths: Optional[List[str]] = None,
) -> None:
update_golden = pytestconfig.getoption("--update-golden-files")
golden_exists = os.path.isfile(golden_path)
if not update_golden and not golden_exists:
raise FileNotFoundError(
"Golden file does not exist. Please run with the --update-golden-files option to create."
)
output = load_json_file(output_path)
# if updating a golden file that doesn't exist yet, load the output again
if update_golden and not golden_exists:
golden = load_json_file(output_path)
shutil.copyfile(str(output_path), str(golden_path))
else:
golden = load_json_file(golden_path)
try:
assert_mces_equal(output, golden, ignore_paths)
except AssertionError as e:
# only update golden files if the diffs are not empty
if update_golden:
shutil.copyfile(str(output_path), str(golden_path))
# raise the error if we're just running the test
else:
raise e
def _get_filter(mce: bool = False, mcp: bool = False) -> Callable[[Dict], bool]:
if mce:
# cheap way to determine if we are working with an MCE
return lambda x: "proposedSnapshot" in x
if mcp:
# cheap way to determine if we are working with an MCP
return lambda x: "changeType" in x
return lambda _: False
def _get_element(event: Dict[str, Any], path_spec: List[str]) -> Any:
try:
for p in path_spec:
event = event.get(p, {})
if not event:
return None
return event
except Exception as e:
print(event)
raise e
def _element_matches_pattern(
event: Dict[str, Any], path_spec: List[str], pattern: str
) -> Tuple[bool, bool]:
import re
element = _get_element(event, path_spec)
if element is None:
return (False, False)
else:
return (True, re.search(pattern, str(element)) is not None)
def assert_mcp_entity_urn(
filter: str, entity_type: str, regex_pattern: str, file: str
) -> int:
def get_path_spec(entity_type: str) -> List[str]:
return ["entityUrn"]
test_output = load_json_file(file)
if isinstance(test_output, list):
path_spec = get_path_spec(entity_type)
filter_operator = _get_filter(mcp=True)
filtered_events = [
(x, _element_matches_pattern(x, path_spec, regex_pattern))
for x in test_output
if filter_operator(x)
]
failed_events = [y for y in filtered_events if not y[1][0] or not y[1][1]]
if failed_events:
raise Exception("Failed to match events", failed_events)
return len(filtered_events)
else:
raise Exception(
f"Did not expect the file {file} to not contain a list of items"
)
def _get_mce_urn_path_spec(entity_type: str) -> List[str]:
if entity_type == "dataset":
return [
"proposedSnapshot",
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot",
"urn",
]
raise Exception(f"Not implemented for entity_type: {entity_type}")
def _get_mcp_urn_path_spec() -> List[str]:
return ["entityUrn"]
def assert_mce_entity_urn(
filter: str, entity_type: str, regex_pattern: str, file: str
) -> int:
test_output = load_json_file(file)
if isinstance(test_output, list):
path_spec = _get_mce_urn_path_spec(entity_type)
filter_operator = _get_filter(mce=True)
filtered_events = [
(x, _element_matches_pattern(x, path_spec, regex_pattern))
for x in test_output
if filter_operator(x)
]
failed_events = [y for y in filtered_events if not y[1][0] or not y[1][1]]
if failed_events:
raise Exception("Failed to match events", failed_events)
return len(filtered_events)
else:
raise Exception(
f"Did not expect the file {file} to not contain a list of items"
)
def assert_for_each_entity(
entity_type: str, aspect_name: str, aspect_field_matcher: Dict[str, str], file: str
) -> int:
"""Assert that an aspect name with the desired fields exists for each entity urn"""
test_output = load_json_file(file)
assert isinstance(test_output, list)
# mce urns
mce_urns = set(
[
_get_element(x, _get_mce_urn_path_spec(entity_type))
for x in test_output
if _get_filter(mce=True)(x)
]
)
mcp_urns = set(
[
_get_element(x, _get_mcp_urn_path_spec())
for x in test_output
if _get_filter(mcp=True)(x)
]
)
all_urns = mce_urns.union(mcp_urns)
# there should not be any None urns
assert None not in all_urns
aspect_map = {urn: None for urn in all_urns}
# iterate over all mcps
for o in [mcp for mcp in test_output if _get_filter(mcp=True)(mcp)]:
if o.get("aspectName") == aspect_name:
aspect_map[o["entityUrn"]] = json.loads(o.get("aspect", {}).get("value"))
success: List[str] = []
failures: List[str] = []
# breakpoint()
for urn, aspect_val in aspect_map.items():
if aspect_val is not None:
for f in aspect_field_matcher:
assert aspect_field_matcher[f] == _get_element(
aspect_val, [f]
), f"urn: {urn} -> Field {f} must match value {aspect_field_matcher[f]}, found {_get_element(aspect_val, [f])}"
success.append(urn)
else:
print(f"Adding {urn} to failures")
failures.append(urn)
if success:
print(f"Succeeded on assertion for urns {success}")
if failures:
assert False, f"Failed to find aspect_name {aspect_name} for urns {failures}"
return len(success)