datahub/metadata-ingestion/tests/unit/test_schema_util.py
Günther Hackl f48b8dd6c0
fix(ingestion): schema-metadata - fix jsonProps not being ingested for optional fields (#12927)
Co-authored-by: Sergio Gómez Villamor <sgomezvillamor@gmail.com>
2025-03-22 08:41:11 -07:00

926 lines
28 KiB
Python

import json
import logging
import os
import re
from pathlib import Path
from typing import Dict, List, Type
import pytest
from freezegun import freeze_time
from datahub.emitter.mce_builder import (
make_global_tag_aspect_with_tag_list,
make_glossary_terms_aspect_from_urn_list,
)
from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
DateTypeClass,
NumberTypeClass,
SchemaField,
StringTypeClass,
TimeTypeClass,
)
from datahub.utilities.mapping import OperationProcessor
logger = logging.getLogger(__name__)
SCHEMA_WITH_OPTIONAL_FIELD_VIA_UNION_TYPE = """
{
"type": "record",
"name": "TestRecord",
"namespace": "some.event.namespace",
"fields": [
{
"name": "my.field",
"type": ["null", "string"],
"doc": "some.doc"
}
]
}
"""
SCHEMA_WITH_OPTIONAL_FIELD_VIA_UNION_TYPE_NULL_ISNT_FIRST_IN_UNION = """
{
"type": "record",
"name": "TestRecord",
"namespace": "some.event.namespace",
"fields": [
{
"name": "my.field",
"type": ["string", "null"],
"doc": "some.doc"
}
]
}
"""
SCHEMA_WITH_OPTIONAL_FIELD_VIA_PRIMITIVE_TYPE = """
{
"type": "record",
"name": "TestRecord",
"namespace": "some.event.namespace",
"fields": [
{
"name": "my.field",
"type": "null",
"doc": "some.doc"
}
]
}
"""
SCHEMA_WITH_OPTIONAL_FIELD_VIA_FIXED_TYPE: str = json.dumps(
{
"type": "record",
"name": "__struct_",
"fields": [
{
"name": "value",
"type": {
"type": "fixed",
"name": "__fixed_d9d2d051916045d9975d6c573aaabb89",
"size": 4,
"native_data_type": "fixed[4]",
"_nullable": True,
},
},
],
}
)
def log_field_paths(fields: List[SchemaField]) -> None:
logger.debug('FieldPaths=\n"' + '",\n"'.join(f.fieldPath for f in fields) + '"')
def assert_field_paths_are_unique(fields: List[SchemaField]) -> None:
avro_fields_paths = [
f.fieldPath for f in fields if re.match(".*[^]]$", f.fieldPath)
]
if avro_fields_paths:
assert len(avro_fields_paths) == len(set(avro_fields_paths))
def assert_field_paths_match(
fields: List[SchemaField], expected_field_paths: List[str]
) -> None:
log_field_paths(fields)
assert len(fields) == len(expected_field_paths)
for f, efp in zip(fields, expected_field_paths):
assert f.fieldPath == efp
assert_field_paths_are_unique(fields)
@pytest.mark.parametrize(
"schema",
[
SCHEMA_WITH_OPTIONAL_FIELD_VIA_UNION_TYPE,
SCHEMA_WITH_OPTIONAL_FIELD_VIA_UNION_TYPE_NULL_ISNT_FIRST_IN_UNION,
SCHEMA_WITH_OPTIONAL_FIELD_VIA_PRIMITIVE_TYPE,
SCHEMA_WITH_OPTIONAL_FIELD_VIA_FIXED_TYPE,
],
ids=[
"optional_field_via_union_type",
"optional_field_via_union_null_not_first",
"optional_field_via_primitive",
"optional_field_via_fixed",
],
)
def test_avro_schema_to_mce_fields_events_with_nullable_fields(schema):
fields = avro_schema_to_mce_fields(schema)
assert len(fields) == 1
assert fields[0].nullable
def test_avro_schema_to_mce_fields_sample_events_with_different_field_types():
schema = """
{
"type": "record",
"name": "R",
"namespace": "some.namespace",
"fields": [
{
"name": "a_map_of_longs_field",
"type": {
"type": "map",
"values": "long"
}
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=R].[type=map].[type=long].a_map_of_longs_field",
]
assert_field_paths_match(fields, expected_field_paths)
def test_avro_schema_to_mce_fields_record_with_two_fields():
schema = """
{
"type": "record",
"name": "some.event.name",
"namespace": "not.relevant.namespace",
"fields": [
{
"name": "a",
"type": "string",
"doc": "some.doc"
},
{
"name": "b",
"type": "string",
"doc": "some.doc"
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=name].[type=string].a",
"[version=2.0].[type=name].[type=string].b",
]
assert_field_paths_match(fields, expected_field_paths)
def test_avro_schema_to_mce_fields_toplevel_isnt_a_record():
schema = """
{
"type": "string"
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = ["[version=2.0].[type=string]"]
assert_field_paths_match(fields, expected_field_paths)
def test_avro_schema_namespacing():
schema = """
{
"type": "record",
"name": "name",
"namespace": "should.not.show.up.namespace",
"fields": [
{
"name": "aStringField",
"type": "string",
"doc": "some docs",
"default": "this is custom, default value"
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=name].[type=string].aStringField",
]
assert_field_paths_match(fields, expected_field_paths)
def test_avro_schema_to_mce_fields_with_default():
schema = """
{
"type": "record",
"name": "some.event.name",
"namespace": "not.relevant.namespace",
"fields": [
{
"name": "aStringField",
"type": "string",
"doc": "some docs",
"default": "this is custom, default value"
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=name].[type=string].aStringField",
]
assert_field_paths_match(fields, expected_field_paths)
description = fields[0].description
assert description and "custom, default value" in description
def test_avro_schema_with_recursion():
schema = """
{
"type": "record",
"name": "TreeNode",
"fields": [
{
"name": "value",
"type": "long"
},
{
"name": "children",
"type": { "type": "array", "items": "TreeNode" }
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=TreeNode].[type=long].value",
"[version=2.0].[type=TreeNode].[type=array].[type=TreeNode].children",
]
assert_field_paths_match(fields, expected_field_paths)
def test_avro_sample_payment_schema_to_mce_fields_with_nesting():
schema = """
{
"type": "record",
"name": "Payment",
"namespace": "some.event.namespace",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double", "doc": "amountDoc"},
{"name": "name","type": "string","default": ""},
{"name": "phoneNumber",
"type": [{
"type": "record",
"name": "PhoneNumber",
"doc": "testDoc",
"fields": [{
"name": "areaCode",
"type": "string",
"doc": "areaCodeDoc",
"default": ""
}, {
"name": "countryCode",
"type": "string",
"default": ""
}, {
"name": "prefix",
"type": "string",
"default": ""
}, {
"name": "number",
"type": "string",
"default": ""
}]
},
"null"
],
"default": "null"
},
{"name": "address",
"type": [{
"type": "record",
"name": "Address",
"fields": [{
"name": "street",
"type": "string",
"default": ""
}]
},
"null"
],
"doc": "addressDoc",
"default": "null"
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=Payment].[type=string].id",
"[version=2.0].[type=Payment].[type=double].amount",
"[version=2.0].[type=Payment].[type=string].name",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].areaCode",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].countryCode",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].prefix",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].number",
"[version=2.0].[type=Payment].[type=Address].address",
"[version=2.0].[type=Payment].[type=Address].address.[type=string].street",
]
assert_field_paths_match(fields, expected_field_paths)
assert fields[1].description == "amountDoc"
assert fields[3].description == "testDoc\nField default value: null"
assert fields[4].description == "areaCodeDoc\nField default value: "
assert fields[8].description == "addressDoc\nField default value: null"
def test_avro_schema_to_mce_fields_with_nesting_across_records():
schema = """
[
{
"type": "record",
"name": "Address",
"fields": [
{"name": "streetAddress", "type": "string"},
{"name": "city", "type": "string"}
]
},
{
"type": "record",
"name": "Person",
"fields": [
{"name": "firstname", "type": "string"},
{"name": "lastname", "type": "string" },
{"name": "address", "type": "Address"}
]
}
]
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=union]",
"[version=2.0].[type=union].[type=Address].[type=string].streetAddress",
"[version=2.0].[type=union].[type=Address].[type=string].city",
"[version=2.0].[type=union].[type=Person].[type=string].firstname",
"[version=2.0].[type=union].[type=Person].[type=string].lastname",
"[version=2.0].[type=union].[type=Person].[type=Address].address",
]
assert_field_paths_match(fields, expected_field_paths)
def test_simple_record_with_primitive_types():
schema = """
{
"type": "record",
"name": "Simple",
"namespace": "com.linkedin",
"fields": [
{"name": "stringField", "type": "string", "doc": "string field"},
{"name": "booleanField", "type": "boolean" },
{"name": "intField", "type": "int" },
{
"name": "enumField",
"type": {
"type": "enum",
"name": "MyTestEnumField",
"symbols": [ "TEST", "TEST1" ],
"symbolDoc": {
"TEST": "test enum",
"TEST1": "test1 enum"
}
}
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=Simple].[type=string].stringField",
"[version=2.0].[type=Simple].[type=boolean].booleanField",
"[version=2.0].[type=Simple].[type=int].intField",
"[version=2.0].[type=Simple].[type=enum].enumField",
]
assert_field_paths_match(fields, expected_field_paths)
def test_simple_nested_record_with_a_string_field_for_key_schema():
schema = """
{
"type": "record",
"name": "SimpleNested",
"namespace": "com.linkedin",
"fields": [{
"name": "nestedRcd",
"type": {
"type": "record",
"name": "InnerRcd",
"fields": [{
"name": "aStringField",
"type": "string"
} ]
}
}]
}
"""
fields = avro_schema_to_mce_fields(schema, True)
expected_field_paths: List[str] = [
"[version=2.0].[key=True].[type=SimpleNested].[type=InnerRcd].nestedRcd",
"[version=2.0].[key=True].[type=SimpleNested].[type=InnerRcd].nestedRcd.[type=string].aStringField",
]
assert_field_paths_match(fields, expected_field_paths)
def test_union_with_nested_record_of_union():
schema = """
{
"type": "record",
"name": "UnionSample",
"namespace": "com.linkedin",
"fields": [
{
"name": "aUnion",
"type": [
"boolean",
{
"type": "record",
"name": "Rcd",
"fields": [
{
"name": "aNullableStringField",
"type": ["null", "string"]
}
]
}
]
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=UnionSample].[type=union].aUnion",
"[version=2.0].[type=UnionSample].[type=union].[type=boolean].aUnion",
"[version=2.0].[type=UnionSample].[type=union].[type=Rcd].aUnion",
"[version=2.0].[type=UnionSample].[type=union].[type=Rcd].aUnion.[type=string].aNullableStringField",
]
assert_field_paths_match(fields, expected_field_paths)
assert isinstance(fields[3].type.type, StringTypeClass)
assert fields[0].nativeDataType == "union"
assert fields[1].nativeDataType == "boolean"
assert fields[2].nativeDataType == "Rcd"
assert fields[3].nativeDataType == "string"
def test_nested_arrays():
schema = """
{
"type": "record",
"name": "NestedArray",
"namespace": "com.linkedin",
"fields": [{
"name": "ar",
"type": {
"type": "array",
"items": {
"type": "array",
"items": [
"null",
{
"type": "record",
"name": "Foo",
"fields": [ {
"name": "a",
"type": "long"
} ]
}
]
}
}
} ]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths: List[str] = [
"[version=2.0].[type=NestedArray].[type=array].[type=array].[type=Foo].ar",
"[version=2.0].[type=NestedArray].[type=array].[type=array].[type=Foo].ar.[type=long].a",
]
assert_field_paths_match(fields, expected_field_paths)
def test_map_of_union_of_int_and_record_of_union():
schema = """
{
"type": "record",
"name": "MapSample",
"namespace": "com.linkedin",
"fields": [{
"name": "aMap",
"type": {
"type": "map",
"values": [
"int",
{
"type": "record",
"name": "Rcd",
"fields": [{
"name": "aUnion",
"type": ["null", "string", "int"]
}]
}
]
}
}]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=MapSample].[type=map].[type=union].aMap",
"[version=2.0].[type=MapSample].[type=map].[type=union].[type=int].aMap",
"[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap",
"[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap.[type=union].aUnion",
"[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap.[type=union].[type=string].aUnion",
"[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap.[type=union].[type=int].aUnion",
]
assert_field_paths_match(fields, expected_field_paths)
def test_recursive_avro():
schema = """
{
"type": "record",
"name": "Recursive",
"namespace": "com.linkedin",
"fields": [{
"name": "r",
"type": {
"type": "record",
"name": "R",
"fields": [
{ "name" : "anIntegerField", "type" : "int" },
{ "name": "aRecursiveField", "type": "com.linkedin.R"}
]
}
}]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=Recursive].[type=R].r",
"[version=2.0].[type=Recursive].[type=R].r.[type=int].anIntegerField",
"[version=2.0].[type=Recursive].[type=R].r.[type=R].aRecursiveField",
]
assert_field_paths_match(fields, expected_field_paths)
def test_needs_disambiguation_nested_union_of_records_with_same_field_name():
schema = """
{
"type": "record",
"name": "ABFooUnion",
"namespace": "com.linkedin",
"fields": [{
"name": "a",
"type": [ {
"type": "record",
"name": "A",
"fields": [{ "name": "f", "type": "string" } ]
}, {
"type": "record",
"name": "B",
"fields": [{ "name": "f", "type": "string" } ]
}, {
"type": "array",
"items": {
"type": "array",
"items": [
"null",
{
"type": "record",
"name": "Foo",
"fields": [{ "name": "f", "type": "long" }]
}
]
}
}]
}]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths: List[str] = [
"[version=2.0].[type=ABFooUnion].[type=union].a",
"[version=2.0].[type=ABFooUnion].[type=union].[type=A].a",
"[version=2.0].[type=ABFooUnion].[type=union].[type=A].a.[type=string].f",
"[version=2.0].[type=ABFooUnion].[type=union].[type=B].a",
"[version=2.0].[type=ABFooUnion].[type=union].[type=B].a.[type=string].f",
"[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a",
"[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a.[type=long].f",
]
assert_field_paths_match(fields, expected_field_paths)
def test_mce_avro_parses_okay():
"""This test helps to exercise the complexity in parsing and catch unexpected regressions."""
schema = Path(
os.path.join(
os.path.dirname(__file__),
"..",
"..",
"src",
"datahub",
"metadata",
"schema.avsc",
)
).read_text()
fields = avro_schema_to_mce_fields(schema)
assert len(fields)
# Ensure that all the paths corresponding to the AVRO fields are unique.
assert_field_paths_are_unique(fields)
log_field_paths(fields)
def test_key_schema_handling():
"""Tests key schema handling"""
schema = """
{
"type": "record",
"name": "ABFooUnion",
"namespace": "com.linkedin",
"fields": [{
"name": "a",
"type": [ {
"type": "record",
"name": "A",
"fields": [{ "name": "f", "type": "string" } ]
}, {
"type": "record",
"name": "B",
"fields": [{ "name": "f", "type": "string" } ]
}, {
"type": "array",
"items": {
"type": "array",
"items": [
"null",
{
"type": "record",
"name": "Foo",
"fields": [{ "name": "f", "type": "long" }]
}
]
}
}]
}]
}
"""
fields: List[SchemaField] = avro_schema_to_mce_fields(schema, is_key_schema=True)
expected_field_paths: List[str] = [
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].a",
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=A].a",
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=A].a.[type=string].f",
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=B].a",
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=B].a.[type=string].f",
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a",
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a.[type=long].f",
]
assert_field_paths_match(fields, expected_field_paths)
for f in fields:
assert f.isPartOfKey
def test_logical_types_bare():
schema: str = """
{
"type": "record",
"name": "test_logical_types",
"fields": [
{"name": "decimal_logical", "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2},
{"name": "uuid_logical", "type": "string", "logicalType": "uuid"},
{"name": "date_logical", "type": "int", "logicalType": "date"},
{"name": "time_millis_logical", "type": "int", "logicalType": "time-millis"},
{"name": "time_micros_logical", "type": "long", "logicalType": "time-micros"},
{"name": "timestamp_millis_logical", "type": "long", "logicalType": "timestamp-millis"},
{"name": "timestamp_micros_logical", "type": "long", "logicalType": "timestamp-micros"}
]
}
"""
fields: List[SchemaField] = avro_schema_to_mce_fields(schema, is_key_schema=False)
# validate field paths
expected_field_paths: List[str] = [
"[version=2.0].[type=test_logical_types].[type=bytes].decimal_logical",
"[version=2.0].[type=test_logical_types].[type=string].uuid_logical",
"[version=2.0].[type=test_logical_types].[type=int].date_logical",
"[version=2.0].[type=test_logical_types].[type=int].time_millis_logical",
"[version=2.0].[type=test_logical_types].[type=long].time_micros_logical",
"[version=2.0].[type=test_logical_types].[type=long].timestamp_millis_logical",
"[version=2.0].[type=test_logical_types].[type=long].timestamp_micros_logical",
]
assert_field_paths_match(fields, expected_field_paths)
# validate field types.
expected_types: List[Type] = [
NumberTypeClass,
StringTypeClass,
DateTypeClass,
TimeTypeClass,
TimeTypeClass,
TimeTypeClass,
TimeTypeClass,
]
assert expected_types == [type(field.type.type) for field in fields]
def test_logical_types_fully_specified_in_type():
schema: Dict = {
"type": "record",
"name": "test",
"fields": [
{
"name": "name",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 3,
"scale": 2,
"native_data_type": "decimal(3, 2)",
"_nullable": True,
},
}
],
}
fields: List[SchemaField] = avro_schema_to_mce_fields(
json.dumps(schema), default_nullable=True
)
assert len(fields) == 1
assert fields[0].fieldPath == "[version=2.0].[type=test].[type=bytes].name"
assert isinstance(fields[0].type.type, NumberTypeClass)
def test_ignore_exceptions():
malformed_schema: str = """
"name": "event_ts",
"type": "long",
"logicalType": "timestamp-millis",
"tags": [
"business-timestamp"
]
"""
fields: List[SchemaField] = avro_schema_to_mce_fields(malformed_schema)
assert not fields
@freeze_time("2023-09-12")
def test_avro_schema_to_mce_fields_with_field_meta_mapping():
schema = """
{
"type": "record",
"name": "Payment",
"namespace": "some.event.namespace",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double", "doc": "amountDoc","has_pii": "False"},
{"name": "name","type": "string","default": "","has_pii": "True"},
{"name": "phoneNumber",
"type": [{
"type": "record",
"name": "PhoneNumber",
"doc": "testDoc",
"fields": [{
"name": "areaCode",
"type": "string",
"doc": "areaCodeDoc",
"default": ""
}, {
"name": "countryCode",
"type": "string",
"default": ""
}, {
"name": "prefix",
"type": "string",
"default": ""
}, {
"name": "number",
"type": "string",
"default": ""
}]
},
"null"
],
"default": "null",
"has_pii": "True",
"glossary_field": "TERM_PhoneNumber"
},
{"name": "address",
"type": [{
"type": "record",
"name": "Address",
"fields": [{
"name": "street",
"type": "string",
"default": ""
}]
},
"null"
],
"doc": "addressDoc",
"default": "null",
"has_pii": "True",
"glossary_field": "TERM_Address"
}
]
}
"""
processor = OperationProcessor(
operation_defs={
"has_pii": {
"match": "True",
"operation": "add_tag",
"config": {"tag": "has_pii_test"},
},
"glossary_field": {
"match": "TERM_(.*)",
"operation": "add_term",
"config": {"term": "{{ $match }}"},
},
}
)
fields = avro_schema_to_mce_fields(schema, meta_mapping_processor=processor)
expected_field_paths = [
"[version=2.0].[type=Payment].[type=string].id",
"[version=2.0].[type=Payment].[type=double].amount",
"[version=2.0].[type=Payment].[type=string].name",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].areaCode",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].countryCode",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].prefix",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].number",
"[version=2.0].[type=Payment].[type=Address].address",
"[version=2.0].[type=Payment].[type=Address].address.[type=string].street",
]
assert_field_paths_match(fields, expected_field_paths)
pii_tag_aspect = make_global_tag_aspect_with_tag_list(["has_pii_test"])
assert fields[1].globalTags is None
assert fields[2].globalTags == pii_tag_aspect
assert fields[3].globalTags == pii_tag_aspect
assert fields[3].glossaryTerms == make_glossary_terms_aspect_from_urn_list(
["urn:li:glossaryTerm:PhoneNumber"]
)
assert fields[8].globalTags == pii_tag_aspect
assert fields[8].glossaryTerms == make_glossary_terms_aspect_from_urn_list(
["urn:li:glossaryTerm:Address"]
)
def test_jsonProps_propagation():
jsonProps_schema = """
{
"type": "record",
"name": "record.name",
"namespace": "namespace",
"fields": [
{
"name": "non_optional_field",
"type": {
"type": "bytes",
"scale": 0,
"precision": 8,
"logicalType": "decimal"
}
},
{
"name": "optional_field",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
],
"default": null
}
]
}
"""
fields: List[SchemaField] = avro_schema_to_mce_fields(jsonProps_schema)
assert len(fields) == 2
# required field
assert fields[0].jsonProps is not None
assert "logicalType" in json.loads(fields[0].jsonProps)
assert json.loads(fields[0].jsonProps)["logicalType"] == "decimal"
# optional field
assert fields[1].jsonProps is not None
assert "logicalType" in json.loads(fields[1].jsonProps)
assert json.loads(fields[1].jsonProps)["logicalType"] == "timestamp-millis"