diff --git a/ingestion/src/metadata/utils/datalake/datalake_utils.py b/ingestion/src/metadata/utils/datalake/datalake_utils.py index 0702510871b..7a76fd950a6 100644 --- a/ingestion/src/metadata/utils/datalake/datalake_utils.py +++ b/ingestion/src/metadata/utils/datalake/datalake_utils.py @@ -542,8 +542,129 @@ class JsonDataFrameColumnParser(GenericDataFrameColumnParser): """ if self.raw_data: try: + # First, check if this is an Iceberg/Delta Lake metadata file + data = json.loads(self.raw_data) + if self._is_iceberg_delta_metadata(data): + return self._parse_iceberg_delta_schema(data) + # Otherwise, try to parse as standard JSON Schema return parse_json_schema(schema_text=self.raw_data, cls=Column) except Exception as exc: logger.warning(f"Unable to parse the json schema: {exc}") logger.debug(traceback.format_exc()) return self._get_columns(self.data_frame) + + def _is_iceberg_delta_metadata(self, data: dict) -> bool: + """ + Check if the JSON data is an Iceberg or Delta Lake metadata file. + These files have a specific structure with 'schema' containing 'fields'. + """ + return ( + isinstance(data, dict) + and "schema" in data + and isinstance(data["schema"], dict) + and "fields" in data["schema"] + and isinstance(data["schema"]["fields"], list) + ) + + def _parse_iceberg_delta_schema(self, data: dict) -> List[Column]: + """ + Parse Iceberg/Delta Lake metadata file schema to extract columns. + These files have structure: {"schema": {"fields": [{"id": ..., "name": ..., "type": ..., "required": ...}, ...]}} + """ + columns = [] + schema = data.get("schema", {}) + fields = schema.get("fields", []) + + for field in fields: + try: + column_name = field.get("name", "") + column_type = field.get("type", "string") + + # Get the type string from dict if needed + type_str = column_type + if isinstance(column_type, dict): + type_str = column_type.get("type", "string") + + # Use DataType enum directly - it will handle the conversion + try: + data_type = ( + DataType(type_str.upper()) + if isinstance(type_str, str) + else DataType.STRING + ) + except (ValueError, AttributeError): + # If the type is not recognized, default to STRING + data_type = DataType.STRING + + column = Column( + name=truncate_column_name(column_name), + displayName=column_name, + dataType=data_type, + dataTypeDisplay=column_type + if isinstance(column_type, str) + else str(column_type), + ) + + # Handle nested struct types + if ( + isinstance(column_type, dict) + and column_type.get("type") == "struct" + ): + column.children = self._parse_struct_fields( + column_type.get("fields", []) + ) + column.dataType = DataType.STRUCT + + columns.append(column) + except Exception as exc: + logger.warning(f"Unable to parse field {field}: {exc}") + logger.debug(traceback.format_exc()) + + return columns + + def _parse_struct_fields(self, fields: list) -> List[dict]: + """ + Parse nested struct fields in Iceberg/Delta Lake metadata. + """ + children = [] + for field in fields: + try: + child_name = field.get("name", "") + child_type = field.get("type", "string") + + # Get the type string from dict if needed + type_str = child_type + if isinstance(child_type, dict): + type_str = child_type.get("type", "string") + + # Use DataType enum directly + try: + data_type = ( + DataType(type_str.upper()) + if isinstance(type_str, str) + else DataType.STRING + ) + except (ValueError, AttributeError): + data_type = DataType.STRING + + child = { + "name": truncate_column_name(child_name), + "displayName": child_name, + "dataType": data_type.value, + "dataTypeDisplay": child_type + if isinstance(child_type, str) + else str(child_type), + } + + # Recursively handle nested structs + if isinstance(child_type, dict) and child_type.get("type") == "struct": + child["children"] = self._parse_struct_fields( + child_type.get("fields", []) + ) + + children.append(child) + except Exception as exc: + logger.warning(f"Unable to parse nested field {field}: {exc}") + logger.debug(traceback.format_exc()) + + return children diff --git a/ingestion/tests/unit/utils/test_datalake.py b/ingestion/tests/unit/utils/test_datalake.py index 79cbbfee473..1cc75aef5b9 100644 --- a/ingestion/tests/unit/utils/test_datalake.py +++ b/ingestion/tests/unit/utils/test_datalake.py @@ -12,6 +12,7 @@ Test datalake utils """ +import json import os from unittest import TestCase @@ -22,6 +23,7 @@ from metadata.readers.dataframe.reader_factory import SupportedTypes from metadata.utils.datalake.datalake_utils import ( DataFrameColumnParser, GenericDataFrameColumnParser, + JsonDataFrameColumnParser, ParquetDataFrameColumnParser, get_file_format_type, ) @@ -590,3 +592,174 @@ class TestParquetDataFrameColumnParser(TestCase): self.assertIn(SupportedTypes.CSVGZ, dsv_types) except Exception as e: self.fail(f"CSVGZ integration test failed: {e}") + + +class TestIcebergDeltaLakeMetadataParsing(TestCase): + """Test Iceberg/Delta Lake metadata JSON parsing""" + + def test_iceberg_metadata_parsing(self): + """Test parsing of Iceberg/Delta Lake metadata files with nested schema.fields structure""" + + # Sample Iceberg/Delta Lake metadata structure + iceberg_metadata = { + "format-version": 1, + "table-uuid": "e9182d72-131b-48fe-b530-79edc044fb01", + "location": "s3://bucket/path/table", + "schema": { + "type": "struct", + "schema-id": 0, + "fields": [ + { + "id": 1, + "name": "customer_id", + "required": False, + "type": "string", + }, + { + "id": 2, + "name": "customer_type_cd", + "required": False, + "type": "string", + }, + {"id": 3, "name": "amount", "required": True, "type": "double"}, + { + "id": 4, + "name": "is_active", + "required": False, + "type": "boolean", + }, + {"id": 5, "name": "order_count", "required": False, "type": "int"}, + { + "id": 6, + "name": "created_date", + "required": False, + "type": "date", + }, + { + "id": 7, + "name": "updated_timestamp", + "required": False, + "type": "timestamp", + }, + { + "id": 8, + "name": "metadata", + "required": False, + "type": { + "type": "struct", + "fields": [ + { + "id": 9, + "name": "source_system", + "required": False, + "type": "string", + }, + { + "id": 10, + "name": "last_sync_time", + "required": False, + "type": "timestamp", + }, + ], + }, + }, + ], + }, + } + + # Convert to JSON string as would be received from file + raw_data = json.dumps(iceberg_metadata) + + # Create a dummy DataFrame (required by parser but not used for Iceberg metadata) + df = pd.DataFrame() + + # Create parser and parse columns + parser = JsonDataFrameColumnParser(df, raw_data=raw_data) + columns = parser.get_columns() + + # Verify the correct number of columns were parsed + self.assertEqual(len(columns), 8) + + # Verify field names were correctly parsed + expected_names = [ + "customer_id", + "customer_type_cd", + "amount", + "is_active", + "order_count", + "created_date", + "updated_timestamp", + "metadata", + ] + actual_names = [col.displayName for col in columns] + self.assertEqual(expected_names, actual_names) + + # Verify data types were correctly mapped + expected_types = [ + DataType.STRING, # customer_id + DataType.STRING, # customer_type_cd + DataType.DOUBLE, # amount + DataType.BOOLEAN, # is_active + DataType.INT, # order_count + DataType.DATE, # created_date + DataType.TIMESTAMP, # updated_timestamp + DataType.STRUCT, # metadata + ] + actual_types = [col.dataType for col in columns] + self.assertEqual(expected_types, actual_types) + + # Verify nested struct field (metadata) + metadata_column = columns[7] + self.assertEqual(metadata_column.displayName, "metadata") + self.assertEqual(metadata_column.dataType, DataType.STRUCT) + self.assertIsNotNone(metadata_column.children) + self.assertEqual(len(metadata_column.children), 2) + + # Verify nested field details + nested_fields = metadata_column.children + self.assertEqual(nested_fields[0]["displayName"], "source_system") + self.assertEqual(nested_fields[0]["dataType"], DataType.STRING.value) + self.assertEqual(nested_fields[1]["displayName"], "last_sync_time") + self.assertEqual(nested_fields[1]["dataType"], DataType.TIMESTAMP.value) + + def test_is_iceberg_delta_metadata_detection(self): + """Test detection of Iceberg/Delta Lake metadata format""" + df = pd.DataFrame() + parser = JsonDataFrameColumnParser(df, raw_data=None) + + # Test valid Iceberg/Delta Lake metadata + valid_metadata = {"schema": {"fields": [{"name": "field1", "type": "string"}]}} + self.assertTrue(parser._is_iceberg_delta_metadata(valid_metadata)) + + # Test invalid formats + invalid_cases = [ + {}, # Empty dict + {"schema": "not_a_dict"}, # Schema not a dict + {"schema": {}}, # No fields + {"schema": {"fields": "not_a_list"}}, # Fields not a list + {"properties": {}}, # JSON Schema format (not Iceberg) + ] + + for invalid_case in invalid_cases: + with self.subTest(invalid_case=invalid_case): + self.assertFalse(parser._is_iceberg_delta_metadata(invalid_case)) + + def test_fallback_to_json_schema_parser(self): + """Test that non-Iceberg JSON files fall back to standard JSON Schema parser""" + # Standard JSON Schema format + json_schema = { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": {"name": {"type": "string"}, "age": {"type": "integer"}}, + } + + raw_data = json.dumps(json_schema) + df = pd.DataFrame() + + # This should use the standard JSON Schema parser, not Iceberg parser + parser = JsonDataFrameColumnParser(df, raw_data=raw_data) + columns = parser.get_columns() + + # The standard parser behavior would be different + # This test ensures we don't break existing JSON Schema parsing + self.assertIsNotNone(columns)