mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-07 09:11:47 +00:00
422 lines
14 KiB
Python
422 lines
14 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import re
|
|
import tempfile
|
|
from typing import (
|
|
Any,
|
|
Callable,
|
|
Dict,
|
|
List,
|
|
Optional,
|
|
Sequence,
|
|
Set,
|
|
Tuple,
|
|
Type,
|
|
Union,
|
|
)
|
|
|
|
import pytest
|
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.ingestion.sink.file import write_metadata_file
|
|
from datahub.metadata.schema_classes import MetadataChangeEventClass
|
|
from datahub.metadata.urns import Urn
|
|
from datahub.testing.compare_metadata_json import (
|
|
assert_metadata_files_equal,
|
|
load_json_file,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
IGNORE_PATH_TIMESTAMPS = [
|
|
# Ignore timestamps from the ETL pipeline. A couple examples:
|
|
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['created'\]\['time'\]",
|
|
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['lastModified'\]\['time'\]",
|
|
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['createStamp'\]\['time'\]",
|
|
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['auditStamp'\]\['time'\]",
|
|
]
|
|
|
|
|
|
class MCEConstants:
|
|
PROPOSED_SNAPSHOT = "proposedSnapshot"
|
|
DATASET_SNAPSHOT_CLASS = (
|
|
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot"
|
|
)
|
|
|
|
|
|
class MCPConstants:
|
|
CHANGE_TYPE = "changeType"
|
|
ENTITY_URN = "entityUrn"
|
|
ENTITY_TYPE = "entityType"
|
|
ASPECT_NAME = "aspectName"
|
|
ASPECT_VALUE = "aspect"
|
|
|
|
|
|
class EntityType:
|
|
DATASET = "dataset"
|
|
PIPELINE = "dataFlow"
|
|
FLOW = "dataFlow"
|
|
TASK = "dataJob"
|
|
JOB = "dataJob"
|
|
USER = "corpuser"
|
|
GROUP = "corpGroup"
|
|
|
|
|
|
def clean_nones(value):
|
|
"""
|
|
Recursively remove all None values from dictionaries and lists, and returns
|
|
the result as a new dictionary or list.
|
|
"""
|
|
if isinstance(value, list):
|
|
return [clean_nones(x) for x in value if x is not None]
|
|
elif isinstance(value, dict):
|
|
return {key: clean_nones(val) for key, val in value.items() if val is not None}
|
|
else:
|
|
return value
|
|
|
|
|
|
def check_golden_file(
|
|
pytestconfig: pytest.Config,
|
|
output_path: Union[str, os.PathLike],
|
|
golden_path: Union[str, os.PathLike],
|
|
ignore_paths: Sequence[str] = (),
|
|
ignore_paths_v2: Sequence[str] = (),
|
|
ignore_order: bool = True,
|
|
) -> None:
|
|
# TODO: Remove the pytestconfig parameter since it's redundant.
|
|
# Or more straightforward - we can remove the `check_golden_file` method
|
|
# and use assert_metadata_files_equal directly. Maybe call it "check_golden_metadata"?
|
|
# In a lot of cases, the output_path is also just annoying - our pytest setup
|
|
# should be responsible for figuring out where to put the temp file.
|
|
assert_metadata_files_equal(
|
|
output_path=output_path,
|
|
golden_path=golden_path,
|
|
ignore_paths=ignore_paths,
|
|
ignore_paths_v2=ignore_paths_v2,
|
|
ignore_order=ignore_order,
|
|
)
|
|
|
|
|
|
def check_goldens_stream(
|
|
outputs: List,
|
|
golden_path: Union[str, os.PathLike],
|
|
ignore_paths: Sequence[str] = (),
|
|
ignore_order: bool = True,
|
|
) -> None:
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
write_metadata_file(pathlib.Path(f.name), outputs)
|
|
|
|
assert_metadata_files_equal(
|
|
output_path=f.name,
|
|
golden_path=golden_path,
|
|
ignore_paths=ignore_paths,
|
|
ignore_order=ignore_order,
|
|
)
|
|
|
|
|
|
def _get_field_for_entity_type_in_mce(entity_type: str) -> str:
|
|
"""Returns the field to look for depending on the type of entity in the MCE"""
|
|
if entity_type == EntityType.DATASET:
|
|
return MCEConstants.DATASET_SNAPSHOT_CLASS
|
|
raise Exception(f"Not implemented for entity_type {entity_type}")
|
|
|
|
|
|
def _get_filter(
|
|
mce: bool = False, mcp: bool = False, entity_type: Optional[str] = None
|
|
) -> Callable[[Dict], bool]:
|
|
if mce:
|
|
# cheap way to determine if we are working with an MCE for the appropriate entity_type
|
|
if entity_type:
|
|
return (
|
|
lambda x: MCEConstants.PROPOSED_SNAPSHOT in x
|
|
and _get_field_for_entity_type_in_mce(str(entity_type))
|
|
in x[MCEConstants.PROPOSED_SNAPSHOT]
|
|
)
|
|
else:
|
|
return lambda x: MCEConstants.PROPOSED_SNAPSHOT in x
|
|
if mcp:
|
|
# cheap way to determine if we are working with an MCP
|
|
return lambda x: MCPConstants.CHANGE_TYPE in x and (
|
|
x[MCPConstants.ENTITY_TYPE] == entity_type if entity_type else True
|
|
)
|
|
return lambda _: False
|
|
|
|
|
|
def _get_element(event: Dict[str, Any], path_spec: List[str]) -> Any:
|
|
try:
|
|
for p in path_spec:
|
|
if p not in event:
|
|
return None
|
|
else:
|
|
event = event.get(p, {})
|
|
return event
|
|
except Exception as e:
|
|
print(event)
|
|
raise e
|
|
|
|
|
|
def _element_matches_pattern(
|
|
event: Dict[str, Any], path_spec: List[str], pattern: str
|
|
) -> Tuple[bool, bool]:
|
|
import re
|
|
|
|
element = _get_element(event, path_spec)
|
|
if element is None:
|
|
return (False, False)
|
|
else:
|
|
return (True, re.search(pattern, str(element)) is not None)
|
|
|
|
|
|
def get_entity_urns(events_file: str) -> Set[str]:
|
|
events = load_json_file(events_file)
|
|
assert isinstance(events, list)
|
|
return _get_entity_urns(events)
|
|
|
|
|
|
def _get_entity_urns(events_list: List[Dict]) -> Set[str]:
|
|
entity_type = "dataset"
|
|
# mce urns
|
|
mce_urns = {
|
|
_get_element(x, _get_mce_urn_path_spec(entity_type))
|
|
for x in events_list
|
|
if _get_filter(mce=True, entity_type=entity_type)(x)
|
|
}
|
|
mcp_urns = {
|
|
_get_element(x, _get_mcp_urn_path_spec())
|
|
for x in events_list
|
|
if _get_filter(mcp=True, entity_type=entity_type)(x)
|
|
}
|
|
all_urns = mce_urns.union(mcp_urns)
|
|
return all_urns
|
|
|
|
|
|
def assert_mcp_entity_urn(
|
|
filter: str, entity_type: str, regex_pattern: str, file: str
|
|
) -> int:
|
|
def get_path_spec_for_urn() -> List[str]:
|
|
return [MCPConstants.ENTITY_URN]
|
|
|
|
test_output = load_json_file(file)
|
|
if isinstance(test_output, list):
|
|
path_spec = get_path_spec_for_urn()
|
|
filter_operator = _get_filter(mcp=True, entity_type=entity_type)
|
|
filtered_events = [
|
|
(x, _element_matches_pattern(x, path_spec, regex_pattern))
|
|
for x in test_output
|
|
if filter_operator(x)
|
|
]
|
|
failed_events = [y for y in filtered_events if not y[1][0] or not y[1][1]]
|
|
if failed_events:
|
|
raise Exception("Failed to match events", failed_events)
|
|
return len(filtered_events)
|
|
else:
|
|
raise Exception(
|
|
f"Did not expect the file {file} to not contain a list of items"
|
|
)
|
|
|
|
|
|
def _get_mce_urn_path_spec(entity_type: str) -> List[str]:
|
|
if entity_type == EntityType.DATASET:
|
|
return [
|
|
MCEConstants.PROPOSED_SNAPSHOT,
|
|
MCEConstants.DATASET_SNAPSHOT_CLASS,
|
|
"urn",
|
|
]
|
|
raise Exception(f"Not implemented for entity_type: {entity_type}")
|
|
|
|
|
|
def _get_mcp_urn_path_spec() -> List[str]:
|
|
return [MCPConstants.ENTITY_URN]
|
|
|
|
|
|
def assert_mce_entity_urn(
|
|
filter: str, entity_type: str, regex_pattern: str, file: str
|
|
) -> int:
|
|
"""Assert that all mce entity urns must match the regex pattern passed in. Return the number of events matched"""
|
|
|
|
test_output = load_json_file(file)
|
|
if isinstance(test_output, list):
|
|
path_spec = _get_mce_urn_path_spec(entity_type)
|
|
filter_operator = _get_filter(mce=True)
|
|
filtered_events = [
|
|
(x, _element_matches_pattern(x, path_spec, regex_pattern))
|
|
for x in test_output
|
|
if filter_operator(x)
|
|
]
|
|
failed_events = [y for y in filtered_events if not y[1][0] or not y[1][1]]
|
|
if failed_events:
|
|
raise Exception(
|
|
"Failed to match events: {json.dumps(failed_events, indent=2)}"
|
|
)
|
|
return len(filtered_events)
|
|
else:
|
|
raise Exception(
|
|
f"Did not expect the file {file} to not contain a list of items"
|
|
)
|
|
|
|
|
|
def assert_for_each_entity(
|
|
entity_type: str,
|
|
aspect_name: str,
|
|
aspect_field_matcher: Dict[str, Any],
|
|
file: str,
|
|
exception_urns: Optional[List[str]] = None,
|
|
) -> int:
|
|
"""Assert that an aspect name with the desired fields exists for each entity urn"""
|
|
if exception_urns is None:
|
|
exception_urns = []
|
|
test_output = load_json_file(file)
|
|
assert isinstance(test_output, list)
|
|
# mce urns
|
|
mce_urns = {
|
|
_get_element(x, _get_mce_urn_path_spec(entity_type))
|
|
for x in test_output
|
|
if _get_filter(mce=True, entity_type=entity_type)(x)
|
|
}
|
|
mcp_urns = {
|
|
_get_element(x, _get_mcp_urn_path_spec())
|
|
for x in test_output
|
|
if _get_filter(mcp=True, entity_type=entity_type)(x)
|
|
}
|
|
all_urns = mce_urns.union(mcp_urns)
|
|
# there should not be any None urns
|
|
assert None not in all_urns
|
|
aspect_map = {urn: None for urn in all_urns}
|
|
# iterate over all mcps
|
|
for o in [
|
|
mcp
|
|
for mcp in test_output
|
|
if _get_filter(mcp=True, entity_type=entity_type)(mcp)
|
|
]:
|
|
if o.get(MCPConstants.ASPECT_NAME) == aspect_name:
|
|
# load the inner aspect payload and assign to this urn
|
|
aspect_map[o[MCPConstants.ENTITY_URN]] = o.get(
|
|
MCPConstants.ASPECT_VALUE, {}
|
|
).get("json")
|
|
|
|
success: List[str] = []
|
|
failures: List[str] = []
|
|
for urn, aspect_val in aspect_map.items():
|
|
if aspect_val is not None:
|
|
for f in aspect_field_matcher:
|
|
assert aspect_field_matcher[f] == _get_element(aspect_val, [f]), (
|
|
f"urn: {urn} -> Field {f} must match value {aspect_field_matcher[f]}, found {_get_element(aspect_val, [f])}"
|
|
)
|
|
success.append(urn)
|
|
elif urn not in exception_urns:
|
|
print(f"Adding {urn} to failures")
|
|
failures.append(urn)
|
|
|
|
if success:
|
|
print(f"Succeeded on assertion for urns {success}")
|
|
if failures:
|
|
raise AssertionError(
|
|
f"Failed to find aspect_name {aspect_name} for urns {json.dumps(failures, indent=2)}"
|
|
)
|
|
|
|
return len(success)
|
|
|
|
|
|
def assert_entity_mce_aspect(
|
|
entity_urn: str, aspect: Any, aspect_type: Type, file: str
|
|
) -> int:
|
|
# TODO: Replace with read_metadata_file()
|
|
test_output = load_json_file(file)
|
|
entity_type = Urn.from_string(entity_urn).entity_type
|
|
assert isinstance(test_output, list)
|
|
# mce urns
|
|
mces: List[MetadataChangeEventClass] = [
|
|
MetadataChangeEventClass.from_obj(x)
|
|
for x in test_output
|
|
if _get_filter(mce=True, entity_type=entity_type)(x)
|
|
and _get_element(x, _get_mce_urn_path_spec(entity_type)) == entity_urn
|
|
]
|
|
matches = 0
|
|
for mce in mces:
|
|
for a in mce.proposedSnapshot.aspects:
|
|
if isinstance(a, aspect_type):
|
|
assert a == aspect
|
|
matches = matches + 1
|
|
return matches
|
|
|
|
|
|
def assert_entity_mcp_aspect(
|
|
entity_urn: str, aspect_field_matcher: Dict[str, Any], aspect_name: str, file: str
|
|
) -> int:
|
|
# TODO: Replace with read_metadata_file()
|
|
test_output = load_json_file(file)
|
|
entity_type = Urn.from_string(entity_urn).entity_type
|
|
assert isinstance(test_output, list)
|
|
# mcps that match entity_urn
|
|
mcps: List[MetadataChangeProposalWrapper] = [
|
|
MetadataChangeProposalWrapper.from_obj_require_wrapper(x)
|
|
for x in test_output
|
|
if _get_filter(mcp=True, entity_type=entity_type)(x)
|
|
and _get_element(x, _get_mcp_urn_path_spec()) == entity_urn
|
|
]
|
|
matches = 0
|
|
for mcp in mcps:
|
|
if mcp.aspectName == aspect_name:
|
|
assert mcp.aspect
|
|
aspect_val = mcp.aspect.to_obj()
|
|
for f in aspect_field_matcher:
|
|
assert aspect_field_matcher[f] == _get_element(aspect_val, [f]), (
|
|
f"urn: {mcp.entityUrn} -> Field {f} must match value {aspect_field_matcher[f]}, found {_get_element(aspect_val, [f])}"
|
|
)
|
|
matches = matches + 1
|
|
return matches
|
|
|
|
|
|
def assert_entity_urn_not_like(entity_type: str, regex_pattern: str, file: str) -> int:
|
|
"""Assert that there are no entity urns that match the regex pattern passed in. Returns the total number of events in the file"""
|
|
|
|
# TODO: Refactor common code with assert_entity_urn_like.
|
|
test_output = load_json_file(file)
|
|
assert isinstance(test_output, list)
|
|
# mce urns
|
|
mce_urns = {
|
|
_get_element(x, _get_mce_urn_path_spec(entity_type))
|
|
for x in test_output
|
|
if _get_filter(mce=True, entity_type=entity_type)(x)
|
|
}
|
|
mcp_urns = {
|
|
_get_element(x, _get_mcp_urn_path_spec())
|
|
for x in test_output
|
|
if _get_filter(mcp=True, entity_type=entity_type)(x)
|
|
}
|
|
all_urns = mce_urns.union(mcp_urns)
|
|
print(all_urns)
|
|
matched_urns = [u for u in all_urns if re.match(regex_pattern, u)]
|
|
if matched_urns:
|
|
raise AssertionError(f"urns found that match the deny list {matched_urns}")
|
|
return len(test_output)
|
|
|
|
|
|
def assert_entity_urn_like(entity_type: str, regex_pattern: str, file: str) -> int:
|
|
"""Assert that there exist entity urns that match the regex pattern passed in. Returns the total number of events in the file"""
|
|
|
|
test_output = load_json_file(file)
|
|
assert isinstance(test_output, list)
|
|
# mce urns
|
|
mce_urns = {
|
|
_get_element(x, _get_mce_urn_path_spec(entity_type))
|
|
for x in test_output
|
|
if _get_filter(mce=True, entity_type=entity_type)(x)
|
|
}
|
|
mcp_urns = {
|
|
_get_element(x, _get_mcp_urn_path_spec())
|
|
for x in test_output
|
|
if _get_filter(mcp=True, entity_type=entity_type)(x)
|
|
}
|
|
all_urns = mce_urns.union(mcp_urns)
|
|
print(all_urns)
|
|
matched_urns = [u for u in all_urns if re.match(regex_pattern, u)]
|
|
if matched_urns:
|
|
return len(matched_urns)
|
|
else:
|
|
raise AssertionError(
|
|
f"No urns found that match the pattern {regex_pattern}. Full list is {all_urns}"
|
|
)
|