From 6eaec954d60b065a8a88ce20e22110eba6edf672 Mon Sep 17 00:00:00 2001 From: harshsoni2024 <64592571+harshsoni2024@users.noreply.github.com> Date: Fri, 27 Jun 2025 10:44:35 +0530 Subject: [PATCH] feat-21984: REST service process nested objects inside array dtype in schema (#21984) --- .../ingestion/source/api/rest/metadata.py | 57 +++++- .../tests/unit/topology/api/test_rest.py | 193 ++++++++++++++++++ 2 files changed, 242 insertions(+), 8 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/api/rest/metadata.py b/ingestion/src/metadata/ingestion/source/api/rest/metadata.py index 4cae45442e5..96ff1631ff2 100644 --- a/ingestion/src/metadata/ingestion/source/api/rest/metadata.py +++ b/ingestion/src/metadata/ingestion/source/api/rest/metadata.py @@ -300,38 +300,79 @@ class RestSource(ApiServiceSource): logger.warning(f"Error while parsing response schema: {err}") return None - def process_schema_fields(self, schema_ref: str) -> Optional[List[FieldModel]]: + def process_schema_fields( + self, schema_ref: str, parent_refs: Optional[List[str]] = None + ) -> Optional[List[FieldModel]]: try: + if parent_refs is None: + parent_refs = [] schema_name = schema_ref.split("/")[-1] schema_fields = ( self.json_response.get("components").get("schemas").get(schema_name) ) - + parent_refs.append(schema_ref) fetched_fields = [] for key, val in schema_fields.get("properties", {}).items(): dtype = val.get("type") if dtype: + dtype = "INT" if dtype.upper() == "INTEGER" else dtype parsed_dtype = ( DataTypeTopic[dtype.upper()] if dtype.upper() in DataTypeTopic.__members__ else DataTypeTopic.UNKNOWN ) - fetched_fields.append(FieldModel(name=key, dataType=parsed_dtype)) + children = None + if parsed_dtype.value == DataTypeTopic.ARRAY.value: + # If field of array type then parse children + children_ref = val.get("items", {}).get("$ref") + if children_ref: + # check infinite recursion by checking pre-processed schemas(parent_refs) + if children_ref not in parent_refs: + logger.debug( + f"Processing array fields inside schema: {children_ref}" + ) + children = self.process_schema_fields( + children_ref, parent_refs + ) + logger.debug( + f"Completed processing array fields inside schema: {children_ref}" + ) + else: + logger.debug( + f"Skipping array fields inside schema: {children_ref} to avoid infinite recursion" + ) + fetched_fields.append( + FieldModel(name=key, dataType=parsed_dtype, children=children) + ) else: # If type of field is not defined then check for sub-schema # Check if it's `object` type field - # check infinite recrusrion by comparing with parent(schema_ref) - object_children = None - if val.get("$ref") and val.get("$ref") != schema_ref: - object_children = self.process_schema_fields(val.get("$ref")) + children = None + if val.get("$ref"): + # check infinite recursion by checking pre-processed schemas(parent_refs) + if val.get("$ref") not in parent_refs: + children = self.process_schema_fields( + val.get("$ref"), parent_refs + ) + else: + logger.debug( + f"Skipping object fields inside schema: {val.get('$ref')} to avoid infinite recursion" + ) fetched_fields.append( FieldModel( name=key, dataType=DataTypeTopic.UNKNOWN, - children=object_children, + children=children, ) ) + if parent_refs and (schema_ref in parent_refs): + parent_refs.pop() return fetched_fields except Exception as err: logger.warning(f"Error while processing schema fields: {err}") + if parent_refs and (schema_ref in parent_refs): + parent_refs.pop() + logger.debug( + f"Popping {schema_ref} from parent_refs due to processing error" + ) return None diff --git a/ingestion/tests/unit/topology/api/test_rest.py b/ingestion/tests/unit/topology/api/test_rest.py index 8540df3ee87..96070379dc7 100644 --- a/ingestion/tests/unit/topology/api/test_rest.py +++ b/ingestion/tests/unit/topology/api/test_rest.py @@ -35,6 +35,7 @@ from metadata.generated.schema.type.basic import ( FullyQualifiedEntityName, Markdown, ) +from metadata.generated.schema.type.schema import DataTypeTopic from metadata.ingestion.api.models import Either from metadata.ingestion.source.api.rest.metadata import RestSource from metadata.ingestion.source.api.rest.models import RESTCollection, RESTEndpoint @@ -142,6 +143,106 @@ MOCK_JSON_RESPONSE = { ], } +# Mock data for testing process_schema_fields +MOCK_SCHEMA_RESPONSE_SIMPLE = { + "components": { + "schemas": { + "User": { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "email": {"type": "string"}, + }, + } + } + } +} + +MOCK_SCHEMA_RESPONSE_WITH_ARRAY = { + "components": { + "schemas": { + "User": { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "tags": { + "type": "array", + "items": {"$ref": "#/components/schemas/Tag"}, + }, + }, + }, + "Tag": { + "type": "object", + "properties": {"id": {"type": "integer"}, "name": {"type": "string"}}, + }, + } + } +} + +MOCK_SCHEMA_RESPONSE_WITH_OBJECT_REF = { + "components": { + "schemas": { + "User": { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "address": {"$ref": "#/components/schemas/Address"}, + }, + }, + "Address": { + "type": "object", + "properties": { + "street": {"type": "string"}, + "city": {"type": "string"}, + }, + }, + } + } +} + +MOCK_SCHEMA_RESPONSE_WITH_ARRAY_CIRCULAR = { + "components": { + "schemas": { + "User": { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "friends": { + "type": "array", + "items": {"$ref": "#/components/schemas/User"}, + }, + }, + } + } + } +} + +MOCK_SCHEMA_RESPONSE_WITH_OBJECT_REF_CIRCULAR = { + "components": { + "schemas": { + "User": { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + "profile": {"$ref": "#/components/schemas/Profile"}, + }, + }, + "Profile": { + "type": "object", + "properties": { + "bio": {"type": "string"}, + "user": {"$ref": "#/components/schemas/User"}, + }, + }, + } + } +} + class RESTTest(TestCase): @patch("metadata.ingestion.source.api.api_service.ApiServiceSource.test_connection") @@ -244,3 +345,95 @@ class RESTTest(TestCase): ) collections_invalid = list(rest_source_invalid.get_api_collections()) assert len(collections_invalid) == 0 + + def test_process_schema_fields_simple(self): + """Test processing simple schema fields without references""" + self.rest_source.json_response = MOCK_SCHEMA_RESPONSE_SIMPLE + + result = self.rest_source.process_schema_fields("#/components/schemas/User") + + assert result is not None + assert len(result) == 3 + # Check field names and types + field_names = {field.name.root for field in result} + assert field_names == {"id", "name", "email"} + + # Check specific field types + id_field = next(field for field in result if field.name.root == "id") + assert id_field.dataType == DataTypeTopic.INT + assert id_field.children is None + + def test_process_schema_fields_with_array(self): + """Test processing schema fields with array references""" + self.rest_source.json_response = MOCK_SCHEMA_RESPONSE_WITH_ARRAY + + result = self.rest_source.process_schema_fields("#/components/schemas/User") + + assert result is not None + assert len(result) == 3 + + # Find the tags field (array type) + tags_field = next(field for field in result if field.name.root == "tags") + assert tags_field.dataType == DataTypeTopic.ARRAY + assert tags_field.children is not None + assert len(tags_field.children) == 2 + + # Check array children fields + child_names = {child.name.root for child in tags_field.children} + assert child_names == {"id", "name"} + + def test_process_schema_fields_with_object_reference(self): + """Test processing schema fields with object references""" + self.rest_source.json_response = MOCK_SCHEMA_RESPONSE_WITH_OBJECT_REF + + result = self.rest_source.process_schema_fields("#/components/schemas/User") + + assert result is not None + assert len(result) == 3 + + # Find the address field (object reference) + address_field = next(field for field in result if field.name.root == "address") + assert address_field.dataType == DataTypeTopic.UNKNOWN + assert address_field.children is not None + assert len(address_field.children) == 2 + + # Check object children fields + child_names = {child.name.root for child in address_field.children} + assert child_names == {"street", "city"} + + def test_process_schema_fields_circular_reference_array(self): + """Test processing schema fields with circular references in arrays""" + self.rest_source.json_response = MOCK_SCHEMA_RESPONSE_WITH_ARRAY_CIRCULAR + + result = self.rest_source.process_schema_fields("#/components/schemas/User") + + assert result is not None + assert len(result) == 3 + + # Find the friends field (array with circular reference) + friends_field = next(field for field in result if field.name.root == "friends") + assert friends_field.dataType == DataTypeTopic.ARRAY + # Should be None due to circular reference prevention + assert friends_field.children is None + + def test_process_schema_fields_circular_reference_object(self): + """Test processing schema fields with circular references in objects""" + self.rest_source.json_response = MOCK_SCHEMA_RESPONSE_WITH_OBJECT_REF_CIRCULAR + + result = self.rest_source.process_schema_fields("#/components/schemas/User") + + assert result is not None + assert len(result) == 3 + + # Find the profile field + profile_field = next(field for field in result if field.name.root == "profile") + assert profile_field.dataType == DataTypeTopic.UNKNOWN + assert profile_field.children is not None + assert len(profile_field.children) == 2 + + # Check that the circular reference is prevented + user_field_in_profile = next( + child for child in profile_field.children if child.name.root == "user" + ) + # Should be None due to circular reference prevention + assert user_field_in_profile.children is None