mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-25 18:30:00 +00:00
fixed avro recursive record (#13856)
This commit is contained in:
parent
f89b52cb11
commit
c7834e74cc
@ -37,7 +37,9 @@ def _parse_array_children(
|
|||||||
return f"ARRAY<{display_type}>", children
|
return f"ARRAY<{display_type}>", children
|
||||||
|
|
||||||
if isinstance(arr_item, UnionSchema):
|
if isinstance(arr_item, UnionSchema):
|
||||||
display_type, children = _parse_union_children(arr_item, cls=cls)
|
display_type, children = _parse_union_children(
|
||||||
|
parent=None, union_field=arr_item, cls=cls
|
||||||
|
)
|
||||||
return f"UNION<{display_type}>", children
|
return f"UNION<{display_type}>", children
|
||||||
|
|
||||||
if isinstance(arr_item, RecordSchema):
|
if isinstance(arr_item, RecordSchema):
|
||||||
@ -104,7 +106,7 @@ def parse_array_fields(
|
|||||||
|
|
||||||
|
|
||||||
def _parse_union_children(
|
def _parse_union_children(
|
||||||
union_field: UnionSchema, cls: ModelMetaclass = FieldModel
|
parent: Optional[Schema], union_field: UnionSchema, cls: ModelMetaclass = FieldModel
|
||||||
) -> Tuple[str, Optional[Union[FieldModel, Column]]]:
|
) -> Tuple[str, Optional[Union[FieldModel, Column]]]:
|
||||||
non_null_schema = [
|
non_null_schema = [
|
||||||
(i, schema)
|
(i, schema)
|
||||||
@ -122,11 +124,12 @@ def _parse_union_children(
|
|||||||
sub_type[non_null_schema[0][0] ^ 1] = "null"
|
sub_type[non_null_schema[0][0] ^ 1] = "null"
|
||||||
return ",".join(sub_type), children
|
return ",".join(sub_type), children
|
||||||
|
|
||||||
|
# if the child is a recursive instance of parent we will only process it once
|
||||||
if isinstance(field, RecordSchema):
|
if isinstance(field, RecordSchema):
|
||||||
children = cls(
|
children = cls(
|
||||||
name=field.name,
|
name=field.name,
|
||||||
dataType=str(field.type).upper(),
|
dataType=str(field.type).upper(),
|
||||||
children=get_avro_fields(field, cls),
|
children=None if field == parent else get_avro_fields(field, cls),
|
||||||
description=field.doc,
|
description=field.doc,
|
||||||
)
|
)
|
||||||
return sub_type, children
|
return sub_type, children
|
||||||
@ -155,7 +158,9 @@ def parse_record_fields(field: RecordSchema, cls: ModelMetaclass = FieldModel):
|
|||||||
|
|
||||||
|
|
||||||
def parse_union_fields(
|
def parse_union_fields(
|
||||||
union_field: Schema, cls: ModelMetaclass = FieldModel
|
parent: Optional[Schema],
|
||||||
|
union_field: Schema,
|
||||||
|
cls: ModelMetaclass = FieldModel,
|
||||||
) -> Optional[List[Union[FieldModel, Column]]]:
|
) -> Optional[List[Union[FieldModel, Column]]]:
|
||||||
"""
|
"""
|
||||||
Parse union field for avro schema
|
Parse union field for avro schema
|
||||||
@ -194,7 +199,9 @@ def parse_union_fields(
|
|||||||
dataType=str(field_type.type).upper(),
|
dataType=str(field_type.type).upper(),
|
||||||
description=union_field.doc,
|
description=union_field.doc,
|
||||||
)
|
)
|
||||||
sub_type, children = _parse_union_children(field_type, cls)
|
sub_type, children = _parse_union_children(
|
||||||
|
union_field=field_type, cls=cls, parent=parent
|
||||||
|
)
|
||||||
obj.dataTypeDisplay = f"UNION<{sub_type}>"
|
obj.dataTypeDisplay = f"UNION<{sub_type}>"
|
||||||
if children and cls == FieldModel:
|
if children and cls == FieldModel:
|
||||||
obj.children = [children]
|
obj.children = [children]
|
||||||
@ -252,7 +259,9 @@ def get_avro_fields(
|
|||||||
if isinstance(field.type, ArraySchema):
|
if isinstance(field.type, ArraySchema):
|
||||||
field_models.append(parse_array_fields(field, cls=cls))
|
field_models.append(parse_array_fields(field, cls=cls))
|
||||||
elif isinstance(field.type, UnionSchema):
|
elif isinstance(field.type, UnionSchema):
|
||||||
field_models.append(parse_union_fields(field, cls=cls))
|
field_models.append(
|
||||||
|
parse_union_fields(union_field=field, cls=cls, parent=parsed_schema)
|
||||||
|
)
|
||||||
elif isinstance(field.type, RecordSchema):
|
elif isinstance(field.type, RecordSchema):
|
||||||
field_models.append(parse_record_fields(field, cls=cls))
|
field_models.append(parse_record_fields(field, cls=cls))
|
||||||
else:
|
else:
|
||||||
|
@ -76,6 +76,65 @@ SAMPLE_AVRO_SCHEMA = """
|
|||||||
}
|
}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
RECURSIVE_AVRO_SCHEMA = """
|
||||||
|
{
|
||||||
|
"name":"MainRecord",
|
||||||
|
"type":"record",
|
||||||
|
"fields":[
|
||||||
|
{
|
||||||
|
"default":"None",
|
||||||
|
"name":"NestedRecord",
|
||||||
|
"type":[
|
||||||
|
"null",
|
||||||
|
{
|
||||||
|
"fields":[
|
||||||
|
{
|
||||||
|
"default":"None",
|
||||||
|
"name":"FieldA",
|
||||||
|
"type":[
|
||||||
|
"null",
|
||||||
|
{
|
||||||
|
"items":{
|
||||||
|
"fields":[
|
||||||
|
{
|
||||||
|
"name":"FieldAA",
|
||||||
|
"type":"string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"default":"None",
|
||||||
|
"name":"FieldBB",
|
||||||
|
"type":[
|
||||||
|
"null",
|
||||||
|
"string"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"default":"None",
|
||||||
|
"name":"FieldCC",
|
||||||
|
"type":[
|
||||||
|
"null",
|
||||||
|
"RecursionIssueRecord"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"name":"RecursionIssueRecord",
|
||||||
|
"type":"record"
|
||||||
|
},
|
||||||
|
"type":"array"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"name":"FieldInNestedRecord",
|
||||||
|
"type":"record"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
ARRAY_OF_STR = """
|
ARRAY_OF_STR = """
|
||||||
{
|
{
|
||||||
"type": "record",
|
"type": "record",
|
||||||
@ -647,3 +706,48 @@ class AvroParserTests(TestCase):
|
|||||||
parsed_record_schema[0].children[2].children[0].children[1].dataType.name,
|
parsed_record_schema[0].children[2].children[0].children[1].dataType.name,
|
||||||
"ARRAY",
|
"ARRAY",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_recursive_record_parsing(self):
|
||||||
|
parsed_recursive_schema = parse_avro_schema(RECURSIVE_AVRO_SCHEMA)
|
||||||
|
|
||||||
|
# test that the recursive schema stops processing after 1st occurrence
|
||||||
|
self.assertEqual(
|
||||||
|
parsed_recursive_schema[0]
|
||||||
|
.children[0]
|
||||||
|
.children[0]
|
||||||
|
.children[0]
|
||||||
|
.children[0]
|
||||||
|
.name.__root__,
|
||||||
|
"RecursionIssueRecord",
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
parsed_recursive_schema[0]
|
||||||
|
.children[0]
|
||||||
|
.children[0]
|
||||||
|
.children[0]
|
||||||
|
.children[0]
|
||||||
|
.children[2]
|
||||||
|
.name.__root__,
|
||||||
|
"FieldCC",
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
parsed_recursive_schema[0]
|
||||||
|
.children[0]
|
||||||
|
.children[0]
|
||||||
|
.children[0]
|
||||||
|
.children[0]
|
||||||
|
.children[2]
|
||||||
|
.children[0]
|
||||||
|
.name.__root__,
|
||||||
|
"RecursionIssueRecord",
|
||||||
|
)
|
||||||
|
self.assertIsNone(
|
||||||
|
parsed_recursive_schema[0]
|
||||||
|
.children[0]
|
||||||
|
.children[0]
|
||||||
|
.children[0]
|
||||||
|
.children[0]
|
||||||
|
.children[2]
|
||||||
|
.children[0]
|
||||||
|
.children
|
||||||
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user