mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-26 17:34:41 +00:00
feat-21984: REST service process nested objects inside array dtype in schema (#21984)
This commit is contained in:
parent
80aa8052e0
commit
6eaec954d6
@ -300,38 +300,79 @@ class RestSource(ApiServiceSource):
|
|||||||
logger.warning(f"Error while parsing response schema: {err}")
|
logger.warning(f"Error while parsing response schema: {err}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def process_schema_fields(self, schema_ref: str) -> Optional[List[FieldModel]]:
|
def process_schema_fields(
|
||||||
|
self, schema_ref: str, parent_refs: Optional[List[str]] = None
|
||||||
|
) -> Optional[List[FieldModel]]:
|
||||||
try:
|
try:
|
||||||
|
if parent_refs is None:
|
||||||
|
parent_refs = []
|
||||||
schema_name = schema_ref.split("/")[-1]
|
schema_name = schema_ref.split("/")[-1]
|
||||||
schema_fields = (
|
schema_fields = (
|
||||||
self.json_response.get("components").get("schemas").get(schema_name)
|
self.json_response.get("components").get("schemas").get(schema_name)
|
||||||
)
|
)
|
||||||
|
parent_refs.append(schema_ref)
|
||||||
fetched_fields = []
|
fetched_fields = []
|
||||||
for key, val in schema_fields.get("properties", {}).items():
|
for key, val in schema_fields.get("properties", {}).items():
|
||||||
dtype = val.get("type")
|
dtype = val.get("type")
|
||||||
if dtype:
|
if dtype:
|
||||||
|
dtype = "INT" if dtype.upper() == "INTEGER" else dtype
|
||||||
parsed_dtype = (
|
parsed_dtype = (
|
||||||
DataTypeTopic[dtype.upper()]
|
DataTypeTopic[dtype.upper()]
|
||||||
if dtype.upper() in DataTypeTopic.__members__
|
if dtype.upper() in DataTypeTopic.__members__
|
||||||
else DataTypeTopic.UNKNOWN
|
else DataTypeTopic.UNKNOWN
|
||||||
)
|
)
|
||||||
fetched_fields.append(FieldModel(name=key, dataType=parsed_dtype))
|
children = None
|
||||||
|
if parsed_dtype.value == DataTypeTopic.ARRAY.value:
|
||||||
|
# If field of array type then parse children
|
||||||
|
children_ref = val.get("items", {}).get("$ref")
|
||||||
|
if children_ref:
|
||||||
|
# check infinite recursion by checking pre-processed schemas(parent_refs)
|
||||||
|
if children_ref not in parent_refs:
|
||||||
|
logger.debug(
|
||||||
|
f"Processing array fields inside schema: {children_ref}"
|
||||||
|
)
|
||||||
|
children = self.process_schema_fields(
|
||||||
|
children_ref, parent_refs
|
||||||
|
)
|
||||||
|
logger.debug(
|
||||||
|
f"Completed processing array fields inside schema: {children_ref}"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.debug(
|
||||||
|
f"Skipping array fields inside schema: {children_ref} to avoid infinite recursion"
|
||||||
|
)
|
||||||
|
fetched_fields.append(
|
||||||
|
FieldModel(name=key, dataType=parsed_dtype, children=children)
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# If type of field is not defined then check for sub-schema
|
# If type of field is not defined then check for sub-schema
|
||||||
# Check if it's `object` type field
|
# Check if it's `object` type field
|
||||||
# check infinite recrusrion by comparing with parent(schema_ref)
|
children = None
|
||||||
object_children = None
|
if val.get("$ref"):
|
||||||
if val.get("$ref") and val.get("$ref") != schema_ref:
|
# check infinite recursion by checking pre-processed schemas(parent_refs)
|
||||||
object_children = self.process_schema_fields(val.get("$ref"))
|
if val.get("$ref") not in parent_refs:
|
||||||
|
children = self.process_schema_fields(
|
||||||
|
val.get("$ref"), parent_refs
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
logger.debug(
|
||||||
|
f"Skipping object fields inside schema: {val.get('$ref')} to avoid infinite recursion"
|
||||||
|
)
|
||||||
fetched_fields.append(
|
fetched_fields.append(
|
||||||
FieldModel(
|
FieldModel(
|
||||||
name=key,
|
name=key,
|
||||||
dataType=DataTypeTopic.UNKNOWN,
|
dataType=DataTypeTopic.UNKNOWN,
|
||||||
children=object_children,
|
children=children,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
if parent_refs and (schema_ref in parent_refs):
|
||||||
|
parent_refs.pop()
|
||||||
return fetched_fields
|
return fetched_fields
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.warning(f"Error while processing schema fields: {err}")
|
logger.warning(f"Error while processing schema fields: {err}")
|
||||||
|
if parent_refs and (schema_ref in parent_refs):
|
||||||
|
parent_refs.pop()
|
||||||
|
logger.debug(
|
||||||
|
f"Popping {schema_ref} from parent_refs due to processing error"
|
||||||
|
)
|
||||||
return None
|
return None
|
||||||
|
@ -35,6 +35,7 @@ from metadata.generated.schema.type.basic import (
|
|||||||
FullyQualifiedEntityName,
|
FullyQualifiedEntityName,
|
||||||
Markdown,
|
Markdown,
|
||||||
)
|
)
|
||||||
|
from metadata.generated.schema.type.schema import DataTypeTopic
|
||||||
from metadata.ingestion.api.models import Either
|
from metadata.ingestion.api.models import Either
|
||||||
from metadata.ingestion.source.api.rest.metadata import RestSource
|
from metadata.ingestion.source.api.rest.metadata import RestSource
|
||||||
from metadata.ingestion.source.api.rest.models import RESTCollection, RESTEndpoint
|
from metadata.ingestion.source.api.rest.models import RESTCollection, RESTEndpoint
|
||||||
@ -142,6 +143,106 @@ MOCK_JSON_RESPONSE = {
|
|||||||
],
|
],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Mock data for testing process_schema_fields
|
||||||
|
MOCK_SCHEMA_RESPONSE_SIMPLE = {
|
||||||
|
"components": {
|
||||||
|
"schemas": {
|
||||||
|
"User": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"id": {"type": "integer"},
|
||||||
|
"name": {"type": "string"},
|
||||||
|
"email": {"type": "string"},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MOCK_SCHEMA_RESPONSE_WITH_ARRAY = {
|
||||||
|
"components": {
|
||||||
|
"schemas": {
|
||||||
|
"User": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"id": {"type": "integer"},
|
||||||
|
"name": {"type": "string"},
|
||||||
|
"tags": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {"$ref": "#/components/schemas/Tag"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"Tag": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {"id": {"type": "integer"}, "name": {"type": "string"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MOCK_SCHEMA_RESPONSE_WITH_OBJECT_REF = {
|
||||||
|
"components": {
|
||||||
|
"schemas": {
|
||||||
|
"User": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"id": {"type": "integer"},
|
||||||
|
"name": {"type": "string"},
|
||||||
|
"address": {"$ref": "#/components/schemas/Address"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"Address": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"street": {"type": "string"},
|
||||||
|
"city": {"type": "string"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MOCK_SCHEMA_RESPONSE_WITH_ARRAY_CIRCULAR = {
|
||||||
|
"components": {
|
||||||
|
"schemas": {
|
||||||
|
"User": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"id": {"type": "integer"},
|
||||||
|
"name": {"type": "string"},
|
||||||
|
"friends": {
|
||||||
|
"type": "array",
|
||||||
|
"items": {"$ref": "#/components/schemas/User"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MOCK_SCHEMA_RESPONSE_WITH_OBJECT_REF_CIRCULAR = {
|
||||||
|
"components": {
|
||||||
|
"schemas": {
|
||||||
|
"User": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"id": {"type": "integer"},
|
||||||
|
"name": {"type": "string"},
|
||||||
|
"profile": {"$ref": "#/components/schemas/Profile"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"Profile": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"bio": {"type": "string"},
|
||||||
|
"user": {"$ref": "#/components/schemas/User"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class RESTTest(TestCase):
|
class RESTTest(TestCase):
|
||||||
@patch("metadata.ingestion.source.api.api_service.ApiServiceSource.test_connection")
|
@patch("metadata.ingestion.source.api.api_service.ApiServiceSource.test_connection")
|
||||||
@ -244,3 +345,95 @@ class RESTTest(TestCase):
|
|||||||
)
|
)
|
||||||
collections_invalid = list(rest_source_invalid.get_api_collections())
|
collections_invalid = list(rest_source_invalid.get_api_collections())
|
||||||
assert len(collections_invalid) == 0
|
assert len(collections_invalid) == 0
|
||||||
|
|
||||||
|
def test_process_schema_fields_simple(self):
|
||||||
|
"""Test processing simple schema fields without references"""
|
||||||
|
self.rest_source.json_response = MOCK_SCHEMA_RESPONSE_SIMPLE
|
||||||
|
|
||||||
|
result = self.rest_source.process_schema_fields("#/components/schemas/User")
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert len(result) == 3
|
||||||
|
# Check field names and types
|
||||||
|
field_names = {field.name.root for field in result}
|
||||||
|
assert field_names == {"id", "name", "email"}
|
||||||
|
|
||||||
|
# Check specific field types
|
||||||
|
id_field = next(field for field in result if field.name.root == "id")
|
||||||
|
assert id_field.dataType == DataTypeTopic.INT
|
||||||
|
assert id_field.children is None
|
||||||
|
|
||||||
|
def test_process_schema_fields_with_array(self):
|
||||||
|
"""Test processing schema fields with array references"""
|
||||||
|
self.rest_source.json_response = MOCK_SCHEMA_RESPONSE_WITH_ARRAY
|
||||||
|
|
||||||
|
result = self.rest_source.process_schema_fields("#/components/schemas/User")
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert len(result) == 3
|
||||||
|
|
||||||
|
# Find the tags field (array type)
|
||||||
|
tags_field = next(field for field in result if field.name.root == "tags")
|
||||||
|
assert tags_field.dataType == DataTypeTopic.ARRAY
|
||||||
|
assert tags_field.children is not None
|
||||||
|
assert len(tags_field.children) == 2
|
||||||
|
|
||||||
|
# Check array children fields
|
||||||
|
child_names = {child.name.root for child in tags_field.children}
|
||||||
|
assert child_names == {"id", "name"}
|
||||||
|
|
||||||
|
def test_process_schema_fields_with_object_reference(self):
|
||||||
|
"""Test processing schema fields with object references"""
|
||||||
|
self.rest_source.json_response = MOCK_SCHEMA_RESPONSE_WITH_OBJECT_REF
|
||||||
|
|
||||||
|
result = self.rest_source.process_schema_fields("#/components/schemas/User")
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert len(result) == 3
|
||||||
|
|
||||||
|
# Find the address field (object reference)
|
||||||
|
address_field = next(field for field in result if field.name.root == "address")
|
||||||
|
assert address_field.dataType == DataTypeTopic.UNKNOWN
|
||||||
|
assert address_field.children is not None
|
||||||
|
assert len(address_field.children) == 2
|
||||||
|
|
||||||
|
# Check object children fields
|
||||||
|
child_names = {child.name.root for child in address_field.children}
|
||||||
|
assert child_names == {"street", "city"}
|
||||||
|
|
||||||
|
def test_process_schema_fields_circular_reference_array(self):
|
||||||
|
"""Test processing schema fields with circular references in arrays"""
|
||||||
|
self.rest_source.json_response = MOCK_SCHEMA_RESPONSE_WITH_ARRAY_CIRCULAR
|
||||||
|
|
||||||
|
result = self.rest_source.process_schema_fields("#/components/schemas/User")
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert len(result) == 3
|
||||||
|
|
||||||
|
# Find the friends field (array with circular reference)
|
||||||
|
friends_field = next(field for field in result if field.name.root == "friends")
|
||||||
|
assert friends_field.dataType == DataTypeTopic.ARRAY
|
||||||
|
# Should be None due to circular reference prevention
|
||||||
|
assert friends_field.children is None
|
||||||
|
|
||||||
|
def test_process_schema_fields_circular_reference_object(self):
|
||||||
|
"""Test processing schema fields with circular references in objects"""
|
||||||
|
self.rest_source.json_response = MOCK_SCHEMA_RESPONSE_WITH_OBJECT_REF_CIRCULAR
|
||||||
|
|
||||||
|
result = self.rest_source.process_schema_fields("#/components/schemas/User")
|
||||||
|
|
||||||
|
assert result is not None
|
||||||
|
assert len(result) == 3
|
||||||
|
|
||||||
|
# Find the profile field
|
||||||
|
profile_field = next(field for field in result if field.name.root == "profile")
|
||||||
|
assert profile_field.dataType == DataTypeTopic.UNKNOWN
|
||||||
|
assert profile_field.children is not None
|
||||||
|
assert len(profile_field.children) == 2
|
||||||
|
|
||||||
|
# Check that the circular reference is prevented
|
||||||
|
user_field_in_profile = next(
|
||||||
|
child for child in profile_field.children if child.name.root == "user"
|
||||||
|
)
|
||||||
|
# Should be None due to circular reference prevention
|
||||||
|
assert user_field_in_profile.children is None
|
||||||
|
Loading…
x
Reference in New Issue
Block a user