import json import logging import os import pathlib import re import tempfile from typing import ( Any, Callable, Dict, List, Optional, Sequence, Set, Tuple, Type, Union, ) import pytest from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.sink.file import write_metadata_file from datahub.metadata.schema_classes import MetadataChangeEventClass from datahub.metadata.urns import Urn from datahub.testing.compare_metadata_json import ( assert_metadata_files_equal, load_json_file, ) logger = logging.getLogger(__name__) IGNORE_PATH_TIMESTAMPS = [ # Ignore timestamps from the ETL pipeline. A couple examples: r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['created'\]\['time'\]", r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['lastModified'\]\['time'\]", r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['createStamp'\]\['time'\]", r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['auditStamp'\]\['time'\]", ] class MCEConstants: PROPOSED_SNAPSHOT = "proposedSnapshot" DATASET_SNAPSHOT_CLASS = ( "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot" ) class MCPConstants: CHANGE_TYPE = "changeType" ENTITY_URN = "entityUrn" ENTITY_TYPE = "entityType" ASPECT_NAME = "aspectName" ASPECT_VALUE = "aspect" class EntityType: DATASET = "dataset" PIPELINE = "dataFlow" FLOW = "dataFlow" TASK = "dataJob" JOB = "dataJob" USER = "corpuser" GROUP = "corpGroup" def clean_nones(value): """ Recursively remove all None values from dictionaries and lists, and returns the result as a new dictionary or list. """ if isinstance(value, list): return [clean_nones(x) for x in value if x is not None] elif isinstance(value, dict): return {key: clean_nones(val) for key, val in value.items() if val is not None} else: return value def check_golden_file( pytestconfig: pytest.Config, output_path: Union[str, os.PathLike], golden_path: Union[str, os.PathLike], ignore_paths: Sequence[str] = (), ignore_paths_v2: Sequence[str] = (), ignore_order: bool = True, ) -> None: # TODO: Remove the pytestconfig parameter since it's redundant. # Or more straightforward - we can remove the `check_golden_file` method # and use assert_metadata_files_equal directly. Maybe call it "check_golden_metadata"? # In a lot of cases, the output_path is also just annoying - our pytest setup # should be responsible for figuring out where to put the temp file. assert_metadata_files_equal( output_path=output_path, golden_path=golden_path, ignore_paths=ignore_paths, ignore_paths_v2=ignore_paths_v2, ignore_order=ignore_order, ) def check_goldens_stream( outputs: List, golden_path: Union[str, os.PathLike], ignore_paths: Sequence[str] = (), ignore_order: bool = True, ) -> None: with tempfile.NamedTemporaryFile() as f: write_metadata_file(pathlib.Path(f.name), outputs) assert_metadata_files_equal( output_path=f.name, golden_path=golden_path, ignore_paths=ignore_paths, ignore_order=ignore_order, ) def _get_field_for_entity_type_in_mce(entity_type: str) -> str: """Returns the field to look for depending on the type of entity in the MCE""" if entity_type == EntityType.DATASET: return MCEConstants.DATASET_SNAPSHOT_CLASS raise Exception(f"Not implemented for entity_type {entity_type}") def _get_filter( mce: bool = False, mcp: bool = False, entity_type: Optional[str] = None ) -> Callable[[Dict], bool]: if mce: # cheap way to determine if we are working with an MCE for the appropriate entity_type if entity_type: return ( lambda x: MCEConstants.PROPOSED_SNAPSHOT in x and _get_field_for_entity_type_in_mce(str(entity_type)) in x[MCEConstants.PROPOSED_SNAPSHOT] ) else: return lambda x: MCEConstants.PROPOSED_SNAPSHOT in x if mcp: # cheap way to determine if we are working with an MCP return lambda x: MCPConstants.CHANGE_TYPE in x and ( x[MCPConstants.ENTITY_TYPE] == entity_type if entity_type else True ) return lambda _: False def _get_element(event: Dict[str, Any], path_spec: List[str]) -> Any: try: for p in path_spec: if p not in event: return None else: event = event.get(p, {}) return event except Exception as e: print(event) raise e def _element_matches_pattern( event: Dict[str, Any], path_spec: List[str], pattern: str ) -> Tuple[bool, bool]: import re element = _get_element(event, path_spec) if element is None: return (False, False) else: return (True, re.search(pattern, str(element)) is not None) def get_entity_urns(events_file: str) -> Set[str]: events = load_json_file(events_file) assert isinstance(events, list) return _get_entity_urns(events) def _get_entity_urns(events_list: List[Dict]) -> Set[str]: entity_type = "dataset" # mce urns mce_urns = { _get_element(x, _get_mce_urn_path_spec(entity_type)) for x in events_list if _get_filter(mce=True, entity_type=entity_type)(x) } mcp_urns = { _get_element(x, _get_mcp_urn_path_spec()) for x in events_list if _get_filter(mcp=True, entity_type=entity_type)(x) } all_urns = mce_urns.union(mcp_urns) return all_urns def assert_mcp_entity_urn( filter: str, entity_type: str, regex_pattern: str, file: str ) -> int: def get_path_spec_for_urn() -> List[str]: return [MCPConstants.ENTITY_URN] test_output = load_json_file(file) if isinstance(test_output, list): path_spec = get_path_spec_for_urn() filter_operator = _get_filter(mcp=True, entity_type=entity_type) filtered_events = [ (x, _element_matches_pattern(x, path_spec, regex_pattern)) for x in test_output if filter_operator(x) ] failed_events = [y for y in filtered_events if not y[1][0] or not y[1][1]] if failed_events: raise Exception("Failed to match events", failed_events) return len(filtered_events) else: raise Exception( f"Did not expect the file {file} to not contain a list of items" ) def _get_mce_urn_path_spec(entity_type: str) -> List[str]: if entity_type == EntityType.DATASET: return [ MCEConstants.PROPOSED_SNAPSHOT, MCEConstants.DATASET_SNAPSHOT_CLASS, "urn", ] raise Exception(f"Not implemented for entity_type: {entity_type}") def _get_mcp_urn_path_spec() -> List[str]: return [MCPConstants.ENTITY_URN] def assert_mce_entity_urn( filter: str, entity_type: str, regex_pattern: str, file: str ) -> int: """Assert that all mce entity urns must match the regex pattern passed in. Return the number of events matched""" test_output = load_json_file(file) if isinstance(test_output, list): path_spec = _get_mce_urn_path_spec(entity_type) filter_operator = _get_filter(mce=True) filtered_events = [ (x, _element_matches_pattern(x, path_spec, regex_pattern)) for x in test_output if filter_operator(x) ] failed_events = [y for y in filtered_events if not y[1][0] or not y[1][1]] if failed_events: raise Exception( "Failed to match events: {json.dumps(failed_events, indent=2)}" ) return len(filtered_events) else: raise Exception( f"Did not expect the file {file} to not contain a list of items" ) def assert_for_each_entity( entity_type: str, aspect_name: str, aspect_field_matcher: Dict[str, Any], file: str, exception_urns: Optional[List[str]] = None, ) -> int: """Assert that an aspect name with the desired fields exists for each entity urn""" if exception_urns is None: exception_urns = [] test_output = load_json_file(file) assert isinstance(test_output, list) # mce urns mce_urns = { _get_element(x, _get_mce_urn_path_spec(entity_type)) for x in test_output if _get_filter(mce=True, entity_type=entity_type)(x) } mcp_urns = { _get_element(x, _get_mcp_urn_path_spec()) for x in test_output if _get_filter(mcp=True, entity_type=entity_type)(x) } all_urns = mce_urns.union(mcp_urns) # there should not be any None urns assert None not in all_urns aspect_map = {urn: None for urn in all_urns} # iterate over all mcps for o in [ mcp for mcp in test_output if _get_filter(mcp=True, entity_type=entity_type)(mcp) ]: if o.get(MCPConstants.ASPECT_NAME) == aspect_name: # load the inner aspect payload and assign to this urn aspect_map[o[MCPConstants.ENTITY_URN]] = o.get( MCPConstants.ASPECT_VALUE, {} ).get("json") success: List[str] = [] failures: List[str] = [] for urn, aspect_val in aspect_map.items(): if aspect_val is not None: for f in aspect_field_matcher: assert aspect_field_matcher[f] == _get_element(aspect_val, [f]), ( f"urn: {urn} -> Field {f} must match value {aspect_field_matcher[f]}, found {_get_element(aspect_val, [f])}" ) success.append(urn) elif urn not in exception_urns: print(f"Adding {urn} to failures") failures.append(urn) if success: print(f"Succeeded on assertion for urns {success}") if failures: raise AssertionError( f"Failed to find aspect_name {aspect_name} for urns {json.dumps(failures, indent=2)}" ) return len(success) def assert_entity_mce_aspect( entity_urn: str, aspect: Any, aspect_type: Type, file: str ) -> int: # TODO: Replace with read_metadata_file() test_output = load_json_file(file) entity_type = Urn.from_string(entity_urn).entity_type assert isinstance(test_output, list) # mce urns mces: List[MetadataChangeEventClass] = [ MetadataChangeEventClass.from_obj(x) for x in test_output if _get_filter(mce=True, entity_type=entity_type)(x) and _get_element(x, _get_mce_urn_path_spec(entity_type)) == entity_urn ] matches = 0 for mce in mces: for a in mce.proposedSnapshot.aspects: if isinstance(a, aspect_type): assert a == aspect matches = matches + 1 return matches def assert_entity_mcp_aspect( entity_urn: str, aspect_field_matcher: Dict[str, Any], aspect_name: str, file: str ) -> int: # TODO: Replace with read_metadata_file() test_output = load_json_file(file) entity_type = Urn.from_string(entity_urn).entity_type assert isinstance(test_output, list) # mcps that match entity_urn mcps: List[MetadataChangeProposalWrapper] = [ MetadataChangeProposalWrapper.from_obj_require_wrapper(x) for x in test_output if _get_filter(mcp=True, entity_type=entity_type)(x) and _get_element(x, _get_mcp_urn_path_spec()) == entity_urn ] matches = 0 for mcp in mcps: if mcp.aspectName == aspect_name: assert mcp.aspect aspect_val = mcp.aspect.to_obj() for f in aspect_field_matcher: assert aspect_field_matcher[f] == _get_element(aspect_val, [f]), ( f"urn: {mcp.entityUrn} -> Field {f} must match value {aspect_field_matcher[f]}, found {_get_element(aspect_val, [f])}" ) matches = matches + 1 return matches def assert_entity_urn_not_like(entity_type: str, regex_pattern: str, file: str) -> int: """Assert that there are no entity urns that match the regex pattern passed in. Returns the total number of events in the file""" # TODO: Refactor common code with assert_entity_urn_like. test_output = load_json_file(file) assert isinstance(test_output, list) # mce urns mce_urns = { _get_element(x, _get_mce_urn_path_spec(entity_type)) for x in test_output if _get_filter(mce=True, entity_type=entity_type)(x) } mcp_urns = { _get_element(x, _get_mcp_urn_path_spec()) for x in test_output if _get_filter(mcp=True, entity_type=entity_type)(x) } all_urns = mce_urns.union(mcp_urns) print(all_urns) matched_urns = [u for u in all_urns if re.match(regex_pattern, u)] if matched_urns: raise AssertionError(f"urns found that match the deny list {matched_urns}") return len(test_output) def assert_entity_urn_like(entity_type: str, regex_pattern: str, file: str) -> int: """Assert that there exist entity urns that match the regex pattern passed in. Returns the total number of events in the file""" test_output = load_json_file(file) assert isinstance(test_output, list) # mce urns mce_urns = { _get_element(x, _get_mce_urn_path_spec(entity_type)) for x in test_output if _get_filter(mce=True, entity_type=entity_type)(x) } mcp_urns = { _get_element(x, _get_mcp_urn_path_spec()) for x in test_output if _get_filter(mcp=True, entity_type=entity_type)(x) } all_urns = mce_urns.union(mcp_urns) print(all_urns) matched_urns = [u for u in all_urns if re.match(regex_pattern, u)] if matched_urns: return len(matched_urns) else: raise AssertionError( f"No urns found that match the pattern {regex_pattern}. Full list is {all_urns}" )