import io import json import pathlib import shutil from unittest.mock import patch import fastavro import pytest from avrogen import avrojson from freezegun import freeze_time import datahub.metadata.schema_classes as models from datahub.cli.json_file import check_mce_file from datahub.emitter import mce_builder from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.source.file import ( FileSourceConfig, GenericFileSource, read_metadata_file, ) from datahub.metadata.schema_classes import MetadataChangeEventClass from datahub.metadata.schemas import getMetadataChangeEventSchema from datahub.testing.pytest_hooks import get_golden_settings from tests.test_helpers import mce_helpers from tests.test_helpers.click_helpers import run_datahub_cmd FROZEN_TIME = "2021-07-22 18:54:06" @freeze_time(FROZEN_TIME) @pytest.mark.parametrize( "json_filename", [ # Normal test. "tests/unit/serde/test_serde_large.json", # Ensure correct representation of chart info's input list. "tests/unit/serde/test_serde_chart_snapshot.json", # Check usage stats as well. "tests/unit/serde/test_serde_usage.json", # Profiles with the MetadataChangeProposal format. "tests/unit/serde/test_serde_profile.json", # Test one that uses patch. "tests/unit/serde/test_serde_patch.json", ], ) def test_serde_to_json( pytestconfig: pytest.Config, tmp_path: pathlib.Path, json_filename: str ) -> None: golden_file = pytestconfig.rootpath / json_filename output_file = tmp_path / "output.json" pipeline = Pipeline.create( { "source": {"type": "file", "config": {"filename": str(golden_file)}}, "sink": {"type": "file", "config": {"filename": str(output_file)}}, "run_id": "serde_test", } ) pipeline.run() pipeline.raise_from_status() mce_helpers.check_golden_file( pytestconfig, output_path=f"{output_file}", golden_path=golden_file, ) @pytest.mark.parametrize( "json_filename", [ "tests/unit/serde/test_serde_large.json", "tests/unit/serde/test_serde_chart_snapshot.json", "tests/unit/serde/test_serde_extra_field.json", ], ) @freeze_time(FROZEN_TIME) def test_serde_to_avro( pytestconfig: pytest.Config, json_filename: str, ) -> None: # In this test, we want to read in from JSON -> MCE object. # Next we serialize from MCE to Avro and then deserialize back to MCE. # Finally, we want to compare the two MCE objects. with patch("datahub.ingestion.api.common.PipelineContext") as mock_pipeline_context: json_path = pytestconfig.rootpath / json_filename source = GenericFileSource( ctx=mock_pipeline_context, config=FileSourceConfig(path=str(json_path)) ) mces = list(source.iterate_mce_file(str(json_path))) # Serialize to Avro. parsed_schema = fastavro.parse_schema( json.loads(getMetadataChangeEventSchema()) ) fo = io.BytesIO() out_records = [mce.to_obj(tuples=True) for mce in mces] fastavro.writer(fo, parsed_schema, out_records) # Deserialized from Avro. fo.seek(0) in_records = list(fastavro.reader(fo, return_record_name=True)) in_mces = [ MetadataChangeEventClass.from_obj(record, tuples=True) # type: ignore for record in in_records ] # Check diff assert len(mces) == len(in_mces) for i in range(len(mces)): assert mces[i] == in_mces[i] @pytest.mark.parametrize( "json_filename", [ # Normal test. "tests/unit/serde/test_serde_large.json", # Check for backwards compatibility with specifying all union types. "tests/unit/serde/test_serde_backwards_compat.json", # Usage stats. "tests/unit/serde/test_serde_usage.json", # Profiles with the MetadataChangeProposal format. "tests/unit/serde/test_serde_profile.json", # Ensure sample MCE files are valid. "examples/mce_files/single_mce.json", "examples/mce_files/mce_list.json", "examples/mce_files/bootstrap_mce.json", ], ) @freeze_time(FROZEN_TIME) def test_check_metadata_schema(pytestconfig: pytest.Config, json_filename: str) -> None: json_file_path = pytestconfig.rootpath / json_filename run_datahub_cmd(["check", "metadata-file", f"{json_file_path}"]) def test_serde_paired(pytestconfig: pytest.Config) -> None: # Test with a pair of python object + json file. # Validates both deserialization and serialization. python_metadata = [ MetadataChangeProposalWrapper( entityUrn="urn:li:domain:marketing", aspect=models.DomainPropertiesClass( name="Marketing", description="Description of the marketing domain", parentDomain="urn:li:domain:gtm", ), ) ] for metadata in python_metadata: assert metadata.validate() json_file_path = ( pytestconfig.rootpath / "tests/unit/serde/test_domain_properties.json" ) if not get_golden_settings().update_golden: json_metadata = list(read_metadata_file(json_file_path)) assert python_metadata == json_metadata mce_helpers.check_goldens_stream( outputs=python_metadata, golden_path=json_file_path, ignore_order=False, ) def test_unknown_object_deser_error(pytestconfig: pytest.Config) -> None: json_file_path = pytestconfig.rootpath / "tests/unit/serde/test_invalid_object.json" with pytest.raises(ValueError, match="Unknown object type"): list(read_metadata_file(json_file_path)) def test_check_metadata_rewrite( pytestconfig: pytest.Config, tmp_path: pathlib.Path ) -> None: json_input = ( pytestconfig.rootpath / "tests/unit/serde/test_canonicalization_input.json" ) json_output_reference = ( pytestconfig.rootpath / "tests/unit/serde/test_canonicalization_output.json" ) output_file_path = tmp_path / "output.json" shutil.copyfile(json_input, output_file_path) run_datahub_cmd( ["check", "metadata-file", f"{output_file_path}", "--rewrite", "--unpack-mces"] ) mce_helpers.check_golden_file( pytestconfig, output_path=output_file_path, golden_path=json_output_reference ) @pytest.mark.parametrize( "json_filename", [ # Missing fields. "tests/unit/serde/test_serde_missing_field.json", ], ) def test_check_mce_schema_failure( pytestconfig: pytest.Config, json_filename: str ) -> None: json_file_path = pytestconfig.rootpath / json_filename try: check_mce_file(str(json_file_path)) raise AssertionError("MCE File validated successfully when it should not have") except Exception as e: assert "is missing required field: active" in str(e) def test_field_discriminator() -> None: cost_object = models.CostClass( costType=models.CostTypeClass.ORG_COST_TYPE, cost=models.CostCostClass( fieldDiscriminator=models.CostCostDiscriminatorClass.costCode, costCode="sampleCostCode", ), ) assert cost_object.validate() redo = models.CostClass.from_obj(cost_object.to_obj()) assert redo == cost_object def test_type_error() -> None: dataflow = models.DataFlowSnapshotClass( urn=mce_builder.make_data_flow_urn( orchestrator="argo", flow_id="42", cluster="DEV" ), aspects=[ models.DataFlowInfoClass( name="hello_datahub", description="Hello Datahub", externalUrl="http://example.com", # This is a type error - custom properties should be a Dict[str, str]. customProperties={"x": 1}, # type: ignore ) ], ) with pytest.raises(avrojson.AvroTypeException): dataflow.to_obj() def test_null_hiding() -> None: schemaField = models.SchemaFieldClass( fieldPath="foo", type=models.SchemaFieldDataTypeClass(type=models.StringTypeClass()), nativeDataType="VARCHAR(50)", ) assert schemaField.validate() ser = schemaField.to_obj() ser_without_ordered_fields = json.loads(json.dumps(ser)) assert ser_without_ordered_fields == { "fieldPath": "foo", "isPartOfKey": False, "nativeDataType": "VARCHAR(50)", "nullable": False, "recursive": False, "type": {"type": {"com.linkedin.pegasus2avro.schema.StringType": {}}}, } def test_missing_optional_simple() -> None: original = models.DataHubResourceFilterClass.from_obj( { "allResources": False, "filter": { "criteria": [ { "condition": "EQUALS", "field": "TYPE", "values": ["notebook", "dataset", "dashboard"], } ] }, } ) # This one is missing the optional filters.allResources field. revised_obj = { "filter": { "criteria": [ { "condition": "EQUALS", "field": "TYPE", "values": ["notebook", "dataset", "dashboard"], } ] }, } revised = models.DataHubResourceFilterClass.from_obj(revised_obj) assert revised.validate() assert original == revised def test_missing_optional_in_union() -> None: # This one doesn't contain any optional fields and should work fine. revised_json = json.loads( '{"lastUpdatedTimestamp":1662356745807,"actors":{"groups":[],"resourceOwners":false,"allUsers":true,"allGroups":false,"users":[]},"privileges":["EDIT_ENTITY_ASSERTIONS","EDIT_DATASET_COL_GLOSSARY_TERMS","EDIT_DATASET_COL_TAGS","EDIT_DATASET_COL_DESCRIPTION"],"displayName":"customtest","resources":{"filter":{"criteria":[{"field":"TYPE","condition":"EQUALS","values":["notebook","dataset","dashboard"]}]},"allResources":false},"description":"","state":"ACTIVE","type":"METADATA"}' ) revised = models.DataHubPolicyInfoClass.from_obj(revised_json) # This one is missing the optional filters.allResources field. original_json = json.loads( '{"privileges":["EDIT_ENTITY_ASSERTIONS","EDIT_DATASET_COL_GLOSSARY_TERMS","EDIT_DATASET_COL_TAGS","EDIT_DATASET_COL_DESCRIPTION"],"actors":{"resourceOwners":false,"groups":[],"allGroups":false,"allUsers":true,"users":[]},"lastUpdatedTimestamp":1662356745807,"displayName":"customtest","description":"","resources":{"filter":{"criteria":[{"field":"TYPE","condition":"EQUALS","values":["notebook","dataset","dashboard"]}]}},"state":"ACTIVE","type":"METADATA"}' ) original = models.DataHubPolicyInfoClass.from_obj(original_json) assert revised == original def test_reserved_keywords() -> None: filter1 = models.FilterClass() assert filter1.or_ is None filter2 = models.FilterClass( or_=[ models.ConjunctiveCriterionClass( and_=[ models.CriterionClass(field="foo", value="var", negated=True), ] ) ] ) assert "or" in filter2.to_obj() filter3 = models.FilterClass.from_obj(filter2.to_obj()) assert filter2 == filter3 def test_read_empty_dict() -> None: original = '{"type": "SUCCESS", "nativeResults": {}}' model = models.AssertionResultClass.from_obj(json.loads(original)) assert model.nativeResults == {} assert model == models.AssertionResultClass( type=models.AssertionResultTypeClass.SUCCESS, nativeResults={} ) def test_write_optional_empty_dict() -> None: model = models.AssertionResultClass( type=models.AssertionResultTypeClass.SUCCESS, nativeResults={} ) assert model.nativeResults == {} out = json.dumps(model.to_obj()) assert out == '{"type": "SUCCESS", "nativeResults": {}}' @pytest.mark.parametrize( "model,ref_server_obj", [ ( models.MLModelSnapshotClass( urn="urn:li:mlModel:(urn:li:dataPlatform:science,scienceModel,PROD)", aspects=[ models.CostClass( costType=models.CostTypeClass.ORG_COST_TYPE, cost=models.CostCostClass( fieldDiscriminator=models.CostCostDiscriminatorClass.costCode, costCode="sampleCostCode", ), ) ], ), { "urn": "urn:li:mlModel:(urn:li:dataPlatform:science,scienceModel,PROD)", "aspects": [ { "com.linkedin.common.Cost": { "costType": "ORG_COST_TYPE", "cost": {"costCode": "sampleCostCode"}, } } ], }, ), ], ) def test_json_transforms(model, ref_server_obj): server_obj = pre_json_transform(model.to_obj()) assert server_obj == ref_server_obj post_obj = post_json_transform(server_obj) recovered = type(model).from_obj(post_obj) assert recovered == model def test_unions_with_aliases_assumptions() -> None: # We have special handling for unions with aliases in our json serialization helpers. # Specifically, we assume that cost is the only instance of a union with alias. # This test validates that assumption. for cls in set(models.__SCHEMA_TYPES.values()): if cls is models.CostCostClass: continue if hasattr(cls, "fieldDiscriminator"): raise ValueError(f"{cls} has a fieldDiscriminator") assert set(models.CostClass.RECORD_SCHEMA.fields_dict.keys()) == { "cost", "costType", } assert set(models.CostCostClass.RECORD_SCHEMA.fields_dict.keys()) == { "fieldDiscriminator", "costId", "costCode", }