import json import logging import os import re from pathlib import Path from typing import Dict, List, Type import pytest from freezegun import freeze_time from datahub.emitter.mce_builder import ( make_global_tag_aspect_with_tag_list, make_glossary_terms_aspect_from_urn_list, ) from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields from datahub.metadata.com.linkedin.pegasus2avro.schema import ( DateTypeClass, NumberTypeClass, SchemaField, StringTypeClass, TimeTypeClass, ) from datahub.utilities.mapping import OperationProcessor logger = logging.getLogger(__name__) SCHEMA_WITH_OPTIONAL_FIELD_VIA_UNION_TYPE = """ { "type": "record", "name": "TestRecord", "namespace": "some.event.namespace", "fields": [ { "name": "my.field", "type": ["null", "string"], "doc": "some.doc" } ] } """ SCHEMA_WITH_OPTIONAL_FIELD_VIA_UNION_TYPE_NULL_ISNT_FIRST_IN_UNION = """ { "type": "record", "name": "TestRecord", "namespace": "some.event.namespace", "fields": [ { "name": "my.field", "type": ["string", "null"], "doc": "some.doc" } ] } """ SCHEMA_WITH_OPTIONAL_FIELD_VIA_PRIMITIVE_TYPE = """ { "type": "record", "name": "TestRecord", "namespace": "some.event.namespace", "fields": [ { "name": "my.field", "type": "null", "doc": "some.doc" } ] } """ SCHEMA_WITH_OPTIONAL_FIELD_VIA_FIXED_TYPE: str = json.dumps( { "type": "record", "name": "__struct_", "fields": [ { "name": "value", "type": { "type": "fixed", "name": "__fixed_d9d2d051916045d9975d6c573aaabb89", "size": 4, "native_data_type": "fixed[4]", "_nullable": True, }, }, ], } ) def log_field_paths(fields: List[SchemaField]) -> None: logger.debug('FieldPaths=\n"' + '",\n"'.join(f.fieldPath for f in fields) + '"') def assert_field_paths_are_unique(fields: List[SchemaField]) -> None: avro_fields_paths = [ f.fieldPath for f in fields if re.match(".*[^]]$", f.fieldPath) ] if avro_fields_paths: assert len(avro_fields_paths) == len(set(avro_fields_paths)) def assert_field_paths_match( fields: List[SchemaField], expected_field_paths: List[str] ) -> None: log_field_paths(fields) assert len(fields) == len(expected_field_paths) for f, efp in zip(fields, expected_field_paths): assert f.fieldPath == efp assert_field_paths_are_unique(fields) @pytest.mark.parametrize( "schema", [ SCHEMA_WITH_OPTIONAL_FIELD_VIA_UNION_TYPE, SCHEMA_WITH_OPTIONAL_FIELD_VIA_UNION_TYPE_NULL_ISNT_FIRST_IN_UNION, SCHEMA_WITH_OPTIONAL_FIELD_VIA_PRIMITIVE_TYPE, SCHEMA_WITH_OPTIONAL_FIELD_VIA_FIXED_TYPE, ], ids=[ "optional_field_via_union_type", "optional_field_via_union_null_not_first", "optional_field_via_primitive", "optional_field_via_fixed", ], ) def test_avro_schema_to_mce_fields_events_with_nullable_fields(schema): fields = avro_schema_to_mce_fields(schema) assert len(fields) == 1 assert fields[0].nullable def test_avro_schema_to_mce_fields_sample_events_with_different_field_types(): schema = """ { "type": "record", "name": "R", "namespace": "some.namespace", "fields": [ { "name": "a_map_of_longs_field", "type": { "type": "map", "values": "long" } } ] } """ fields = avro_schema_to_mce_fields(schema) expected_field_paths = [ "[version=2.0].[type=R].[type=map].[type=long].a_map_of_longs_field", ] assert_field_paths_match(fields, expected_field_paths) def test_avro_schema_to_mce_fields_record_with_two_fields(): schema = """ { "type": "record", "name": "some.event.name", "namespace": "not.relevant.namespace", "fields": [ { "name": "a", "type": "string", "doc": "some.doc" }, { "name": "b", "type": "string", "doc": "some.doc" } ] } """ fields = avro_schema_to_mce_fields(schema) expected_field_paths = [ "[version=2.0].[type=name].[type=string].a", "[version=2.0].[type=name].[type=string].b", ] assert_field_paths_match(fields, expected_field_paths) def test_avro_schema_to_mce_fields_toplevel_isnt_a_record(): schema = """ { "type": "string" } """ fields = avro_schema_to_mce_fields(schema) expected_field_paths = ["[version=2.0].[type=string]"] assert_field_paths_match(fields, expected_field_paths) def test_avro_schema_namespacing(): schema = """ { "type": "record", "name": "name", "namespace": "should.not.show.up.namespace", "fields": [ { "name": "aStringField", "type": "string", "doc": "some docs", "default": "this is custom, default value" } ] } """ fields = avro_schema_to_mce_fields(schema) expected_field_paths = [ "[version=2.0].[type=name].[type=string].aStringField", ] assert_field_paths_match(fields, expected_field_paths) def test_avro_schema_to_mce_fields_with_default(): schema = """ { "type": "record", "name": "some.event.name", "namespace": "not.relevant.namespace", "fields": [ { "name": "aStringField", "type": "string", "doc": "some docs", "default": "this is custom, default value" } ] } """ fields = avro_schema_to_mce_fields(schema) expected_field_paths = [ "[version=2.0].[type=name].[type=string].aStringField", ] assert_field_paths_match(fields, expected_field_paths) description = fields[0].description assert description and "custom, default value" in description def test_avro_schema_with_recursion(): schema = """ { "type": "record", "name": "TreeNode", "fields": [ { "name": "value", "type": "long" }, { "name": "children", "type": { "type": "array", "items": "TreeNode" } } ] } """ fields = avro_schema_to_mce_fields(schema) expected_field_paths = [ "[version=2.0].[type=TreeNode].[type=long].value", "[version=2.0].[type=TreeNode].[type=array].[type=TreeNode].children", ] assert_field_paths_match(fields, expected_field_paths) def test_avro_sample_payment_schema_to_mce_fields_with_nesting(): schema = """ { "type": "record", "name": "Payment", "namespace": "some.event.namespace", "fields": [ {"name": "id", "type": "string"}, {"name": "amount", "type": "double", "doc": "amountDoc"}, {"name": "name","type": "string","default": ""}, {"name": "phoneNumber", "type": [{ "type": "record", "name": "PhoneNumber", "doc": "testDoc", "fields": [{ "name": "areaCode", "type": "string", "doc": "areaCodeDoc", "default": "" }, { "name": "countryCode", "type": "string", "default": "" }, { "name": "prefix", "type": "string", "default": "" }, { "name": "number", "type": "string", "default": "" }] }, "null" ], "default": "null" }, {"name": "address", "type": [{ "type": "record", "name": "Address", "fields": [{ "name": "street", "type": "string", "default": "" }] }, "null" ], "doc": "addressDoc", "default": "null" } ] } """ fields = avro_schema_to_mce_fields(schema) expected_field_paths = [ "[version=2.0].[type=Payment].[type=string].id", "[version=2.0].[type=Payment].[type=double].amount", "[version=2.0].[type=Payment].[type=string].name", "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber", "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].areaCode", "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].countryCode", "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].prefix", "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].number", "[version=2.0].[type=Payment].[type=Address].address", "[version=2.0].[type=Payment].[type=Address].address.[type=string].street", ] assert_field_paths_match(fields, expected_field_paths) assert fields[1].description == "amountDoc" assert fields[3].description == "testDoc\nField default value: null" assert fields[4].description == "areaCodeDoc\nField default value: " assert fields[8].description == "addressDoc\nField default value: null" def test_avro_schema_to_mce_fields_with_nesting_across_records(): schema = """ [ { "type": "record", "name": "Address", "fields": [ {"name": "streetAddress", "type": "string"}, {"name": "city", "type": "string"} ] }, { "type": "record", "name": "Person", "fields": [ {"name": "firstname", "type": "string"}, {"name": "lastname", "type": "string" }, {"name": "address", "type": "Address"} ] } ] """ fields = avro_schema_to_mce_fields(schema) expected_field_paths = [ "[version=2.0].[type=union]", "[version=2.0].[type=union].[type=Address].[type=string].streetAddress", "[version=2.0].[type=union].[type=Address].[type=string].city", "[version=2.0].[type=union].[type=Person].[type=string].firstname", "[version=2.0].[type=union].[type=Person].[type=string].lastname", "[version=2.0].[type=union].[type=Person].[type=Address].address", ] assert_field_paths_match(fields, expected_field_paths) def test_simple_record_with_primitive_types(): schema = """ { "type": "record", "name": "Simple", "namespace": "com.linkedin", "fields": [ {"name": "stringField", "type": "string", "doc": "string field"}, {"name": "booleanField", "type": "boolean" }, {"name": "intField", "type": "int" }, { "name": "enumField", "type": { "type": "enum", "name": "MyTestEnumField", "symbols": [ "TEST", "TEST1" ], "symbolDoc": { "TEST": "test enum", "TEST1": "test1 enum" } } } ] } """ fields = avro_schema_to_mce_fields(schema) expected_field_paths = [ "[version=2.0].[type=Simple].[type=string].stringField", "[version=2.0].[type=Simple].[type=boolean].booleanField", "[version=2.0].[type=Simple].[type=int].intField", "[version=2.0].[type=Simple].[type=enum].enumField", ] assert_field_paths_match(fields, expected_field_paths) def test_simple_nested_record_with_a_string_field_for_key_schema(): schema = """ { "type": "record", "name": "SimpleNested", "namespace": "com.linkedin", "fields": [{ "name": "nestedRcd", "type": { "type": "record", "name": "InnerRcd", "fields": [{ "name": "aStringField", "type": "string" } ] } }] } """ fields = avro_schema_to_mce_fields(schema, True) expected_field_paths: List[str] = [ "[version=2.0].[key=True].[type=SimpleNested].[type=InnerRcd].nestedRcd", "[version=2.0].[key=True].[type=SimpleNested].[type=InnerRcd].nestedRcd.[type=string].aStringField", ] assert_field_paths_match(fields, expected_field_paths) def test_union_with_nested_record_of_union(): schema = """ { "type": "record", "name": "UnionSample", "namespace": "com.linkedin", "fields": [ { "name": "aUnion", "type": [ "boolean", { "type": "record", "name": "Rcd", "fields": [ { "name": "aNullableStringField", "type": ["null", "string"] } ] } ] } ] } """ fields = avro_schema_to_mce_fields(schema) expected_field_paths = [ "[version=2.0].[type=UnionSample].[type=union].aUnion", "[version=2.0].[type=UnionSample].[type=union].[type=boolean].aUnion", "[version=2.0].[type=UnionSample].[type=union].[type=Rcd].aUnion", "[version=2.0].[type=UnionSample].[type=union].[type=Rcd].aUnion.[type=string].aNullableStringField", ] assert_field_paths_match(fields, expected_field_paths) assert isinstance(fields[3].type.type, StringTypeClass) assert fields[0].nativeDataType == "union" assert fields[1].nativeDataType == "boolean" assert fields[2].nativeDataType == "Rcd" assert fields[3].nativeDataType == "string" def test_nested_arrays(): schema = """ { "type": "record", "name": "NestedArray", "namespace": "com.linkedin", "fields": [{ "name": "ar", "type": { "type": "array", "items": { "type": "array", "items": [ "null", { "type": "record", "name": "Foo", "fields": [ { "name": "a", "type": "long" } ] } ] } } } ] } """ fields = avro_schema_to_mce_fields(schema) expected_field_paths: List[str] = [ "[version=2.0].[type=NestedArray].[type=array].[type=array].[type=Foo].ar", "[version=2.0].[type=NestedArray].[type=array].[type=array].[type=Foo].ar.[type=long].a", ] assert_field_paths_match(fields, expected_field_paths) def test_map_of_union_of_int_and_record_of_union(): schema = """ { "type": "record", "name": "MapSample", "namespace": "com.linkedin", "fields": [{ "name": "aMap", "type": { "type": "map", "values": [ "int", { "type": "record", "name": "Rcd", "fields": [{ "name": "aUnion", "type": ["null", "string", "int"] }] } ] } }] } """ fields = avro_schema_to_mce_fields(schema) expected_field_paths = [ "[version=2.0].[type=MapSample].[type=map].[type=union].aMap", "[version=2.0].[type=MapSample].[type=map].[type=union].[type=int].aMap", "[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap", "[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap.[type=union].aUnion", "[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap.[type=union].[type=string].aUnion", "[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap.[type=union].[type=int].aUnion", ] assert_field_paths_match(fields, expected_field_paths) def test_recursive_avro(): schema = """ { "type": "record", "name": "Recursive", "namespace": "com.linkedin", "fields": [{ "name": "r", "type": { "type": "record", "name": "R", "fields": [ { "name" : "anIntegerField", "type" : "int" }, { "name": "aRecursiveField", "type": "com.linkedin.R"} ] } }] } """ fields = avro_schema_to_mce_fields(schema) expected_field_paths = [ "[version=2.0].[type=Recursive].[type=R].r", "[version=2.0].[type=Recursive].[type=R].r.[type=int].anIntegerField", "[version=2.0].[type=Recursive].[type=R].r.[type=R].aRecursiveField", ] assert_field_paths_match(fields, expected_field_paths) def test_needs_disambiguation_nested_union_of_records_with_same_field_name(): schema = """ { "type": "record", "name": "ABFooUnion", "namespace": "com.linkedin", "fields": [{ "name": "a", "type": [ { "type": "record", "name": "A", "fields": [{ "name": "f", "type": "string" } ] }, { "type": "record", "name": "B", "fields": [{ "name": "f", "type": "string" } ] }, { "type": "array", "items": { "type": "array", "items": [ "null", { "type": "record", "name": "Foo", "fields": [{ "name": "f", "type": "long" }] } ] } }] }] } """ fields = avro_schema_to_mce_fields(schema) expected_field_paths: List[str] = [ "[version=2.0].[type=ABFooUnion].[type=union].a", "[version=2.0].[type=ABFooUnion].[type=union].[type=A].a", "[version=2.0].[type=ABFooUnion].[type=union].[type=A].a.[type=string].f", "[version=2.0].[type=ABFooUnion].[type=union].[type=B].a", "[version=2.0].[type=ABFooUnion].[type=union].[type=B].a.[type=string].f", "[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a", "[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a.[type=long].f", ] assert_field_paths_match(fields, expected_field_paths) def test_mce_avro_parses_okay(): """This test helps to exercise the complexity in parsing and catch unexpected regressions.""" schema = Path( os.path.join( os.path.dirname(__file__), "..", "..", "src", "datahub", "metadata", "schema.avsc", ) ).read_text() fields = avro_schema_to_mce_fields(schema) assert len(fields) # Ensure that all the paths corresponding to the AVRO fields are unique. assert_field_paths_are_unique(fields) log_field_paths(fields) def test_key_schema_handling(): """Tests key schema handling""" schema = """ { "type": "record", "name": "ABFooUnion", "namespace": "com.linkedin", "fields": [{ "name": "a", "type": [ { "type": "record", "name": "A", "fields": [{ "name": "f", "type": "string" } ] }, { "type": "record", "name": "B", "fields": [{ "name": "f", "type": "string" } ] }, { "type": "array", "items": { "type": "array", "items": [ "null", { "type": "record", "name": "Foo", "fields": [{ "name": "f", "type": "long" }] } ] } }] }] } """ fields: List[SchemaField] = avro_schema_to_mce_fields(schema, is_key_schema=True) expected_field_paths: List[str] = [ "[version=2.0].[key=True].[type=ABFooUnion].[type=union].a", "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=A].a", "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=A].a.[type=string].f", "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=B].a", "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=B].a.[type=string].f", "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a", "[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a.[type=long].f", ] assert_field_paths_match(fields, expected_field_paths) for f in fields: assert f.isPartOfKey def test_logical_types_bare(): schema: str = """ { "type": "record", "name": "test_logical_types", "fields": [ {"name": "decimal_logical", "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2}, {"name": "uuid_logical", "type": "string", "logicalType": "uuid"}, {"name": "date_logical", "type": "int", "logicalType": "date"}, {"name": "time_millis_logical", "type": "int", "logicalType": "time-millis"}, {"name": "time_micros_logical", "type": "long", "logicalType": "time-micros"}, {"name": "timestamp_millis_logical", "type": "long", "logicalType": "timestamp-millis"}, {"name": "timestamp_micros_logical", "type": "long", "logicalType": "timestamp-micros"} ] } """ fields: List[SchemaField] = avro_schema_to_mce_fields(schema, is_key_schema=False) # validate field paths expected_field_paths: List[str] = [ "[version=2.0].[type=test_logical_types].[type=bytes].decimal_logical", "[version=2.0].[type=test_logical_types].[type=string].uuid_logical", "[version=2.0].[type=test_logical_types].[type=int].date_logical", "[version=2.0].[type=test_logical_types].[type=int].time_millis_logical", "[version=2.0].[type=test_logical_types].[type=long].time_micros_logical", "[version=2.0].[type=test_logical_types].[type=long].timestamp_millis_logical", "[version=2.0].[type=test_logical_types].[type=long].timestamp_micros_logical", ] assert_field_paths_match(fields, expected_field_paths) # validate field types. expected_types: List[Type] = [ NumberTypeClass, StringTypeClass, DateTypeClass, TimeTypeClass, TimeTypeClass, TimeTypeClass, TimeTypeClass, ] assert expected_types == [type(field.type.type) for field in fields] def test_logical_types_fully_specified_in_type(): schema: Dict = { "type": "record", "name": "test", "fields": [ { "name": "name", "type": { "type": "bytes", "logicalType": "decimal", "precision": 3, "scale": 2, "native_data_type": "decimal(3, 2)", "_nullable": True, }, } ], } fields: List[SchemaField] = avro_schema_to_mce_fields( json.dumps(schema), default_nullable=True ) assert len(fields) == 1 assert fields[0].fieldPath == "[version=2.0].[type=test].[type=bytes].name" assert isinstance(fields[0].type.type, NumberTypeClass) def test_ignore_exceptions(): malformed_schema: str = """ "name": "event_ts", "type": "long", "logicalType": "timestamp-millis", "tags": [ "business-timestamp" ] """ fields: List[SchemaField] = avro_schema_to_mce_fields(malformed_schema) assert not fields @freeze_time("2023-09-12") def test_avro_schema_to_mce_fields_with_field_meta_mapping(): schema = """ { "type": "record", "name": "Payment", "namespace": "some.event.namespace", "fields": [ {"name": "id", "type": "string"}, {"name": "amount", "type": "double", "doc": "amountDoc","has_pii": "False"}, {"name": "name","type": "string","default": "","has_pii": "True"}, {"name": "phoneNumber", "type": [{ "type": "record", "name": "PhoneNumber", "doc": "testDoc", "fields": [{ "name": "areaCode", "type": "string", "doc": "areaCodeDoc", "default": "" }, { "name": "countryCode", "type": "string", "default": "" }, { "name": "prefix", "type": "string", "default": "" }, { "name": "number", "type": "string", "default": "" }] }, "null" ], "default": "null", "has_pii": "True", "glossary_field": "TERM_PhoneNumber" }, {"name": "address", "type": [{ "type": "record", "name": "Address", "fields": [{ "name": "street", "type": "string", "default": "" }] }, "null" ], "doc": "addressDoc", "default": "null", "has_pii": "True", "glossary_field": "TERM_Address" } ] } """ processor = OperationProcessor( operation_defs={ "has_pii": { "match": "True", "operation": "add_tag", "config": {"tag": "has_pii_test"}, }, "glossary_field": { "match": "TERM_(.*)", "operation": "add_term", "config": {"term": "{{ $match }}"}, }, } ) fields = avro_schema_to_mce_fields(schema, meta_mapping_processor=processor) expected_field_paths = [ "[version=2.0].[type=Payment].[type=string].id", "[version=2.0].[type=Payment].[type=double].amount", "[version=2.0].[type=Payment].[type=string].name", "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber", "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].areaCode", "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].countryCode", "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].prefix", "[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].number", "[version=2.0].[type=Payment].[type=Address].address", "[version=2.0].[type=Payment].[type=Address].address.[type=string].street", ] assert_field_paths_match(fields, expected_field_paths) pii_tag_aspect = make_global_tag_aspect_with_tag_list(["has_pii_test"]) assert fields[1].globalTags is None assert fields[2].globalTags == pii_tag_aspect assert fields[3].globalTags == pii_tag_aspect assert fields[3].glossaryTerms == make_glossary_terms_aspect_from_urn_list( ["urn:li:glossaryTerm:PhoneNumber"] ) assert fields[8].globalTags == pii_tag_aspect assert fields[8].glossaryTerms == make_glossary_terms_aspect_from_urn_list( ["urn:li:glossaryTerm:Address"] ) def test_jsonProps_propagation(): jsonProps_schema = """ { "type": "record", "name": "record.name", "namespace": "namespace", "fields": [ { "name": "non_optional_field", "type": { "type": "bytes", "scale": 0, "precision": 8, "logicalType": "decimal" } }, { "name": "optional_field", "type": [ "null", { "type": "long", "logicalType": "timestamp-millis" } ], "default": null } ] } """ fields: List[SchemaField] = avro_schema_to_mce_fields(jsonProps_schema) assert len(fields) == 2 # required field assert fields[0].jsonProps is not None assert "logicalType" in json.loads(fields[0].jsonProps) assert json.loads(fields[0].jsonProps)["logicalType"] == "decimal" # optional field assert fields[1].jsonProps is not None assert "logicalType" in json.loads(fields[1].jsonProps) assert json.loads(fields[1].jsonProps)["logicalType"] == "timestamp-millis"