datahub/metadata-ingestion/tests/unit/test_schema_util.py

926 lines
28 KiB
Python
Raw Permalink Normal View History

import json
import logging
import os
import re
from pathlib import Path
from typing import Dict, List, Type
import pytest
from freezegun import freeze_time
from datahub.emitter.mce_builder import (
make_global_tag_aspect_with_tag_list,
make_glossary_terms_aspect_from_urn_list,
)
from datahub.ingestion.extractor.schema_util import avro_schema_to_mce_fields
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
DateTypeClass,
NumberTypeClass,
SchemaField,
StringTypeClass,
TimeTypeClass,
)
from datahub.utilities.mapping import OperationProcessor
logger = logging.getLogger(__name__)
SCHEMA_WITH_OPTIONAL_FIELD_VIA_UNION_TYPE = """
{
"type": "record",
"name": "TestRecord",
"namespace": "some.event.namespace",
"fields": [
{
"name": "my.field",
"type": ["null", "string"],
"doc": "some.doc"
}
]
}
"""
SCHEMA_WITH_OPTIONAL_FIELD_VIA_UNION_TYPE_NULL_ISNT_FIRST_IN_UNION = """
{
"type": "record",
"name": "TestRecord",
"namespace": "some.event.namespace",
"fields": [
{
"name": "my.field",
"type": ["string", "null"],
"doc": "some.doc"
}
]
}
"""
SCHEMA_WITH_OPTIONAL_FIELD_VIA_PRIMITIVE_TYPE = """
{
"type": "record",
"name": "TestRecord",
"namespace": "some.event.namespace",
"fields": [
{
"name": "my.field",
"type": "null",
"doc": "some.doc"
}
]
}
"""
SCHEMA_WITH_OPTIONAL_FIELD_VIA_FIXED_TYPE: str = json.dumps(
{
"type": "record",
"name": "__struct_",
"fields": [
{
"name": "value",
"type": {
"type": "fixed",
"name": "__fixed_d9d2d051916045d9975d6c573aaabb89",
"size": 4,
"native_data_type": "fixed[4]",
"_nullable": True,
},
},
],
}
)
def log_field_paths(fields: List[SchemaField]) -> None:
logger.debug('FieldPaths=\n"' + '",\n"'.join(f.fieldPath for f in fields) + '"')
def assert_field_paths_are_unique(fields: List[SchemaField]) -> None:
avro_fields_paths = [
f.fieldPath for f in fields if re.match(".*[^]]$", f.fieldPath)
]
if avro_fields_paths:
assert len(avro_fields_paths) == len(set(avro_fields_paths))
def assert_field_paths_match(
fields: List[SchemaField], expected_field_paths: List[str]
) -> None:
log_field_paths(fields)
assert len(fields) == len(expected_field_paths)
for f, efp in zip(fields, expected_field_paths):
assert f.fieldPath == efp
assert_field_paths_are_unique(fields)
@pytest.mark.parametrize(
"schema",
[
SCHEMA_WITH_OPTIONAL_FIELD_VIA_UNION_TYPE,
SCHEMA_WITH_OPTIONAL_FIELD_VIA_UNION_TYPE_NULL_ISNT_FIRST_IN_UNION,
SCHEMA_WITH_OPTIONAL_FIELD_VIA_PRIMITIVE_TYPE,
SCHEMA_WITH_OPTIONAL_FIELD_VIA_FIXED_TYPE,
],
ids=[
"optional_field_via_union_type",
"optional_field_via_union_null_not_first",
"optional_field_via_primitive",
"optional_field_via_fixed",
],
)
def test_avro_schema_to_mce_fields_events_with_nullable_fields(schema):
fields = avro_schema_to_mce_fields(schema)
2025-02-28 17:49:52 +05:30
assert len(fields) == 1
assert fields[0].nullable
def test_avro_schema_to_mce_fields_sample_events_with_different_field_types():
schema = """
{
"type": "record",
"name": "R",
"namespace": "some.namespace",
"fields": [
{
"name": "a_map_of_longs_field",
"type": {
"type": "map",
"values": "long"
}
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=R].[type=map].[type=long].a_map_of_longs_field",
]
assert_field_paths_match(fields, expected_field_paths)
def test_avro_schema_to_mce_fields_record_with_two_fields():
schema = """
{
"type": "record",
"name": "some.event.name",
"namespace": "not.relevant.namespace",
"fields": [
{
"name": "a",
"type": "string",
"doc": "some.doc"
},
{
"name": "b",
"type": "string",
"doc": "some.doc"
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=name].[type=string].a",
"[version=2.0].[type=name].[type=string].b",
]
assert_field_paths_match(fields, expected_field_paths)
def test_avro_schema_to_mce_fields_toplevel_isnt_a_record():
schema = """
{
"type": "string"
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = ["[version=2.0].[type=string]"]
assert_field_paths_match(fields, expected_field_paths)
def test_avro_schema_namespacing():
schema = """
{
"type": "record",
"name": "name",
"namespace": "should.not.show.up.namespace",
"fields": [
{
"name": "aStringField",
"type": "string",
"doc": "some docs",
"default": "this is custom, default value"
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=name].[type=string].aStringField",
]
assert_field_paths_match(fields, expected_field_paths)
def test_avro_schema_to_mce_fields_with_default():
schema = """
{
"type": "record",
"name": "some.event.name",
"namespace": "not.relevant.namespace",
"fields": [
{
"name": "aStringField",
"type": "string",
"doc": "some docs",
"default": "this is custom, default value"
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=name].[type=string].aStringField",
]
assert_field_paths_match(fields, expected_field_paths)
description = fields[0].description
assert description and "custom, default value" in description
def test_avro_schema_with_recursion():
schema = """
{
"type": "record",
"name": "TreeNode",
"fields": [
{
"name": "value",
"type": "long"
},
{
"name": "children",
"type": { "type": "array", "items": "TreeNode" }
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=TreeNode].[type=long].value",
"[version=2.0].[type=TreeNode].[type=array].[type=TreeNode].children",
]
assert_field_paths_match(fields, expected_field_paths)
def test_avro_sample_payment_schema_to_mce_fields_with_nesting():
schema = """
{
"type": "record",
"name": "Payment",
"namespace": "some.event.namespace",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double", "doc": "amountDoc"},
{"name": "name","type": "string","default": ""},
{"name": "phoneNumber",
"type": [{
"type": "record",
"name": "PhoneNumber",
"doc": "testDoc",
"fields": [{
"name": "areaCode",
"type": "string",
"doc": "areaCodeDoc",
"default": ""
}, {
"name": "countryCode",
"type": "string",
"default": ""
}, {
"name": "prefix",
"type": "string",
"default": ""
}, {
"name": "number",
"type": "string",
"default": ""
}]
},
"null"
],
"default": "null"
},
{"name": "address",
"type": [{
"type": "record",
"name": "Address",
"fields": [{
"name": "street",
"type": "string",
"default": ""
}]
},
"null"
],
"doc": "addressDoc",
"default": "null"
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=Payment].[type=string].id",
"[version=2.0].[type=Payment].[type=double].amount",
"[version=2.0].[type=Payment].[type=string].name",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].areaCode",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].countryCode",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].prefix",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].number",
"[version=2.0].[type=Payment].[type=Address].address",
"[version=2.0].[type=Payment].[type=Address].address.[type=string].street",
]
assert_field_paths_match(fields, expected_field_paths)
assert fields[1].description == "amountDoc"
assert fields[3].description == "testDoc\nField default value: null"
assert fields[4].description == "areaCodeDoc\nField default value: "
assert fields[8].description == "addressDoc\nField default value: null"
def test_avro_schema_to_mce_fields_with_nesting_across_records():
schema = """
[
{
"type": "record",
"name": "Address",
"fields": [
{"name": "streetAddress", "type": "string"},
{"name": "city", "type": "string"}
]
},
{
"type": "record",
"name": "Person",
"fields": [
{"name": "firstname", "type": "string"},
{"name": "lastname", "type": "string" },
{"name": "address", "type": "Address"}
]
}
]
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=union]",
"[version=2.0].[type=union].[type=Address].[type=string].streetAddress",
"[version=2.0].[type=union].[type=Address].[type=string].city",
"[version=2.0].[type=union].[type=Person].[type=string].firstname",
"[version=2.0].[type=union].[type=Person].[type=string].lastname",
"[version=2.0].[type=union].[type=Person].[type=Address].address",
]
assert_field_paths_match(fields, expected_field_paths)
def test_simple_record_with_primitive_types():
schema = """
{
"type": "record",
"name": "Simple",
"namespace": "com.linkedin",
"fields": [
{"name": "stringField", "type": "string", "doc": "string field"},
{"name": "booleanField", "type": "boolean" },
{"name": "intField", "type": "int" },
{
"name": "enumField",
"type": {
"type": "enum",
"name": "MyTestEnumField",
"symbols": [ "TEST", "TEST1" ],
"symbolDoc": {
"TEST": "test enum",
"TEST1": "test1 enum"
}
}
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=Simple].[type=string].stringField",
"[version=2.0].[type=Simple].[type=boolean].booleanField",
"[version=2.0].[type=Simple].[type=int].intField",
"[version=2.0].[type=Simple].[type=enum].enumField",
]
assert_field_paths_match(fields, expected_field_paths)
def test_simple_nested_record_with_a_string_field_for_key_schema():
schema = """
{
"type": "record",
"name": "SimpleNested",
"namespace": "com.linkedin",
"fields": [{
"name": "nestedRcd",
"type": {
"type": "record",
"name": "InnerRcd",
"fields": [{
"name": "aStringField",
"type": "string"
} ]
}
}]
}
"""
fields = avro_schema_to_mce_fields(schema, True)
expected_field_paths: List[str] = [
"[version=2.0].[key=True].[type=SimpleNested].[type=InnerRcd].nestedRcd",
"[version=2.0].[key=True].[type=SimpleNested].[type=InnerRcd].nestedRcd.[type=string].aStringField",
]
assert_field_paths_match(fields, expected_field_paths)
def test_union_with_nested_record_of_union():
schema = """
{
"type": "record",
"name": "UnionSample",
"namespace": "com.linkedin",
"fields": [
{
"name": "aUnion",
"type": [
"boolean",
{
"type": "record",
"name": "Rcd",
"fields": [
{
"name": "aNullableStringField",
"type": ["null", "string"]
}
]
}
]
}
]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=UnionSample].[type=union].aUnion",
"[version=2.0].[type=UnionSample].[type=union].[type=boolean].aUnion",
"[version=2.0].[type=UnionSample].[type=union].[type=Rcd].aUnion",
"[version=2.0].[type=UnionSample].[type=union].[type=Rcd].aUnion.[type=string].aNullableStringField",
]
assert_field_paths_match(fields, expected_field_paths)
assert isinstance(fields[3].type.type, StringTypeClass)
assert fields[0].nativeDataType == "union"
assert fields[1].nativeDataType == "boolean"
assert fields[2].nativeDataType == "Rcd"
assert fields[3].nativeDataType == "string"
def test_nested_arrays():
schema = """
{
"type": "record",
"name": "NestedArray",
"namespace": "com.linkedin",
"fields": [{
"name": "ar",
"type": {
"type": "array",
"items": {
"type": "array",
"items": [
"null",
{
"type": "record",
"name": "Foo",
"fields": [ {
"name": "a",
"type": "long"
} ]
}
]
}
}
} ]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths: List[str] = [
"[version=2.0].[type=NestedArray].[type=array].[type=array].[type=Foo].ar",
"[version=2.0].[type=NestedArray].[type=array].[type=array].[type=Foo].ar.[type=long].a",
]
assert_field_paths_match(fields, expected_field_paths)
def test_map_of_union_of_int_and_record_of_union():
schema = """
{
"type": "record",
"name": "MapSample",
"namespace": "com.linkedin",
"fields": [{
"name": "aMap",
"type": {
"type": "map",
"values": [
"int",
{
"type": "record",
"name": "Rcd",
"fields": [{
"name": "aUnion",
"type": ["null", "string", "int"]
}]
}
]
}
}]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=MapSample].[type=map].[type=union].aMap",
"[version=2.0].[type=MapSample].[type=map].[type=union].[type=int].aMap",
"[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap",
"[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap.[type=union].aUnion",
"[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap.[type=union].[type=string].aUnion",
"[version=2.0].[type=MapSample].[type=map].[type=union].[type=Rcd].aMap.[type=union].[type=int].aUnion",
]
assert_field_paths_match(fields, expected_field_paths)
def test_recursive_avro():
schema = """
{
"type": "record",
"name": "Recursive",
"namespace": "com.linkedin",
"fields": [{
"name": "r",
"type": {
"type": "record",
"name": "R",
"fields": [
{ "name" : "anIntegerField", "type" : "int" },
{ "name": "aRecursiveField", "type": "com.linkedin.R"}
]
}
}]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths = [
"[version=2.0].[type=Recursive].[type=R].r",
"[version=2.0].[type=Recursive].[type=R].r.[type=int].anIntegerField",
"[version=2.0].[type=Recursive].[type=R].r.[type=R].aRecursiveField",
]
assert_field_paths_match(fields, expected_field_paths)
def test_needs_disambiguation_nested_union_of_records_with_same_field_name():
schema = """
{
"type": "record",
"name": "ABFooUnion",
"namespace": "com.linkedin",
"fields": [{
"name": "a",
"type": [ {
"type": "record",
"name": "A",
"fields": [{ "name": "f", "type": "string" } ]
}, {
"type": "record",
"name": "B",
"fields": [{ "name": "f", "type": "string" } ]
}, {
"type": "array",
"items": {
"type": "array",
"items": [
"null",
{
"type": "record",
"name": "Foo",
"fields": [{ "name": "f", "type": "long" }]
}
]
}
}]
}]
}
"""
fields = avro_schema_to_mce_fields(schema)
expected_field_paths: List[str] = [
"[version=2.0].[type=ABFooUnion].[type=union].a",
"[version=2.0].[type=ABFooUnion].[type=union].[type=A].a",
"[version=2.0].[type=ABFooUnion].[type=union].[type=A].a.[type=string].f",
"[version=2.0].[type=ABFooUnion].[type=union].[type=B].a",
"[version=2.0].[type=ABFooUnion].[type=union].[type=B].a.[type=string].f",
"[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a",
"[version=2.0].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a.[type=long].f",
]
assert_field_paths_match(fields, expected_field_paths)
def test_mce_avro_parses_okay():
"""This test helps to exercise the complexity in parsing and catch unexpected regressions."""
schema = Path(
os.path.join(
os.path.dirname(__file__),
"..",
"..",
"src",
"datahub",
"metadata",
"schema.avsc",
)
).read_text()
fields = avro_schema_to_mce_fields(schema)
assert len(fields)
# Ensure that all the paths corresponding to the AVRO fields are unique.
assert_field_paths_are_unique(fields)
log_field_paths(fields)
def test_key_schema_handling():
"""Tests key schema handling"""
schema = """
{
"type": "record",
"name": "ABFooUnion",
"namespace": "com.linkedin",
"fields": [{
"name": "a",
"type": [ {
"type": "record",
"name": "A",
"fields": [{ "name": "f", "type": "string" } ]
}, {
"type": "record",
"name": "B",
"fields": [{ "name": "f", "type": "string" } ]
}, {
"type": "array",
"items": {
"type": "array",
"items": [
"null",
{
"type": "record",
"name": "Foo",
"fields": [{ "name": "f", "type": "long" }]
}
]
}
}]
}]
}
"""
fields: List[SchemaField] = avro_schema_to_mce_fields(schema, is_key_schema=True)
expected_field_paths: List[str] = [
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].a",
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=A].a",
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=A].a.[type=string].f",
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=B].a",
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=B].a.[type=string].f",
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a",
"[version=2.0].[key=True].[type=ABFooUnion].[type=union].[type=array].[type=array].[type=Foo].a.[type=long].f",
]
assert_field_paths_match(fields, expected_field_paths)
for f in fields:
assert f.isPartOfKey
def test_logical_types_bare():
schema: str = """
{
"type": "record",
"name": "test_logical_types",
"fields": [
{"name": "decimal_logical", "type": "bytes", "logicalType": "decimal", "precision": 4, "scale": 2},
{"name": "uuid_logical", "type": "string", "logicalType": "uuid"},
{"name": "date_logical", "type": "int", "logicalType": "date"},
{"name": "time_millis_logical", "type": "int", "logicalType": "time-millis"},
{"name": "time_micros_logical", "type": "long", "logicalType": "time-micros"},
{"name": "timestamp_millis_logical", "type": "long", "logicalType": "timestamp-millis"},
{"name": "timestamp_micros_logical", "type": "long", "logicalType": "timestamp-micros"}
]
}
"""
fields: List[SchemaField] = avro_schema_to_mce_fields(schema, is_key_schema=False)
# validate field paths
expected_field_paths: List[str] = [
"[version=2.0].[type=test_logical_types].[type=bytes].decimal_logical",
"[version=2.0].[type=test_logical_types].[type=string].uuid_logical",
"[version=2.0].[type=test_logical_types].[type=int].date_logical",
"[version=2.0].[type=test_logical_types].[type=int].time_millis_logical",
"[version=2.0].[type=test_logical_types].[type=long].time_micros_logical",
"[version=2.0].[type=test_logical_types].[type=long].timestamp_millis_logical",
"[version=2.0].[type=test_logical_types].[type=long].timestamp_micros_logical",
]
assert_field_paths_match(fields, expected_field_paths)
# validate field types.
expected_types: List[Type] = [
NumberTypeClass,
StringTypeClass,
DateTypeClass,
TimeTypeClass,
TimeTypeClass,
TimeTypeClass,
TimeTypeClass,
]
assert expected_types == [type(field.type.type) for field in fields]
def test_logical_types_fully_specified_in_type():
schema: Dict = {
"type": "record",
"name": "test",
"fields": [
{
"name": "name",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 3,
"scale": 2,
"native_data_type": "decimal(3, 2)",
"_nullable": True,
},
}
],
}
fields: List[SchemaField] = avro_schema_to_mce_fields(
json.dumps(schema), default_nullable=True
)
assert len(fields) == 1
2025-02-28 17:49:52 +05:30
assert fields[0].fieldPath == "[version=2.0].[type=test].[type=bytes].name"
assert isinstance(fields[0].type.type, NumberTypeClass)
def test_ignore_exceptions():
malformed_schema: str = """
"name": "event_ts",
"type": "long",
"logicalType": "timestamp-millis",
"tags": [
"business-timestamp"
]
"""
fields: List[SchemaField] = avro_schema_to_mce_fields(malformed_schema)
assert not fields
@freeze_time("2023-09-12")
def test_avro_schema_to_mce_fields_with_field_meta_mapping():
schema = """
{
"type": "record",
"name": "Payment",
"namespace": "some.event.namespace",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double", "doc": "amountDoc","has_pii": "False"},
{"name": "name","type": "string","default": "","has_pii": "True"},
{"name": "phoneNumber",
"type": [{
"type": "record",
"name": "PhoneNumber",
"doc": "testDoc",
"fields": [{
"name": "areaCode",
"type": "string",
"doc": "areaCodeDoc",
"default": ""
}, {
"name": "countryCode",
"type": "string",
"default": ""
}, {
"name": "prefix",
"type": "string",
"default": ""
}, {
"name": "number",
"type": "string",
"default": ""
}]
},
"null"
],
"default": "null",
"has_pii": "True",
"glossary_field": "TERM_PhoneNumber"
},
{"name": "address",
"type": [{
"type": "record",
"name": "Address",
"fields": [{
"name": "street",
"type": "string",
"default": ""
}]
},
"null"
],
"doc": "addressDoc",
"default": "null",
"has_pii": "True",
"glossary_field": "TERM_Address"
}
]
}
"""
processor = OperationProcessor(
operation_defs={
"has_pii": {
"match": "True",
"operation": "add_tag",
"config": {"tag": "has_pii_test"},
},
"glossary_field": {
"match": "TERM_(.*)",
"operation": "add_term",
"config": {"term": "{{ $match }}"},
},
}
)
fields = avro_schema_to_mce_fields(schema, meta_mapping_processor=processor)
expected_field_paths = [
"[version=2.0].[type=Payment].[type=string].id",
"[version=2.0].[type=Payment].[type=double].amount",
"[version=2.0].[type=Payment].[type=string].name",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].areaCode",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].countryCode",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].prefix",
"[version=2.0].[type=Payment].[type=PhoneNumber].phoneNumber.[type=string].number",
"[version=2.0].[type=Payment].[type=Address].address",
"[version=2.0].[type=Payment].[type=Address].address.[type=string].street",
]
assert_field_paths_match(fields, expected_field_paths)
pii_tag_aspect = make_global_tag_aspect_with_tag_list(["has_pii_test"])
assert fields[1].globalTags is None
assert fields[2].globalTags == pii_tag_aspect
assert fields[3].globalTags == pii_tag_aspect
assert fields[3].glossaryTerms == make_glossary_terms_aspect_from_urn_list(
["urn:li:glossaryTerm:PhoneNumber"]
)
assert fields[8].globalTags == pii_tag_aspect
assert fields[8].glossaryTerms == make_glossary_terms_aspect_from_urn_list(
["urn:li:glossaryTerm:Address"]
)
def test_jsonProps_propagation():
jsonProps_schema = """
{
"type": "record",
"name": "record.name",
"namespace": "namespace",
"fields": [
{
"name": "non_optional_field",
"type": {
"type": "bytes",
"scale": 0,
"precision": 8,
"logicalType": "decimal"
}
},
{
"name": "optional_field",
"type": [
"null",
{
"type": "long",
"logicalType": "timestamp-millis"
}
],
"default": null
}
]
}
"""
fields: List[SchemaField] = avro_schema_to_mce_fields(jsonProps_schema)
assert len(fields) == 2
# required field
assert fields[0].jsonProps is not None
assert "logicalType" in json.loads(fields[0].jsonProps)
assert json.loads(fields[0].jsonProps)["logicalType"] == "decimal"
# optional field
assert fields[1].jsonProps is not None
assert "logicalType" in json.loads(fields[1].jsonProps)
assert json.loads(fields[1].jsonProps)["logicalType"] == "timestamp-millis"