MINOR: datalake column subfields fix (#23576)

This commit is contained in:
harshsoni2024 2025-10-03 16:13:10 +05:30 committed by GitHub
parent aa5da1e330
commit ea54b6b883
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 294 additions and 0 deletions

View File

@ -542,8 +542,129 @@ class JsonDataFrameColumnParser(GenericDataFrameColumnParser):
"""
if self.raw_data:
try:
# First, check if this is an Iceberg/Delta Lake metadata file
data = json.loads(self.raw_data)
if self._is_iceberg_delta_metadata(data):
return self._parse_iceberg_delta_schema(data)
# Otherwise, try to parse as standard JSON Schema
return parse_json_schema(schema_text=self.raw_data, cls=Column)
except Exception as exc:
logger.warning(f"Unable to parse the json schema: {exc}")
logger.debug(traceback.format_exc())
return self._get_columns(self.data_frame)
def _is_iceberg_delta_metadata(self, data: dict) -> bool:
"""
Check if the JSON data is an Iceberg or Delta Lake metadata file.
These files have a specific structure with 'schema' containing 'fields'.
"""
return (
isinstance(data, dict)
and "schema" in data
and isinstance(data["schema"], dict)
and "fields" in data["schema"]
and isinstance(data["schema"]["fields"], list)
)
def _parse_iceberg_delta_schema(self, data: dict) -> List[Column]:
"""
Parse Iceberg/Delta Lake metadata file schema to extract columns.
These files have structure: {"schema": {"fields": [{"id": ..., "name": ..., "type": ..., "required": ...}, ...]}}
"""
columns = []
schema = data.get("schema", {})
fields = schema.get("fields", [])
for field in fields:
try:
column_name = field.get("name", "")
column_type = field.get("type", "string")
# Get the type string from dict if needed
type_str = column_type
if isinstance(column_type, dict):
type_str = column_type.get("type", "string")
# Use DataType enum directly - it will handle the conversion
try:
data_type = (
DataType(type_str.upper())
if isinstance(type_str, str)
else DataType.STRING
)
except (ValueError, AttributeError):
# If the type is not recognized, default to STRING
data_type = DataType.STRING
column = Column(
name=truncate_column_name(column_name),
displayName=column_name,
dataType=data_type,
dataTypeDisplay=column_type
if isinstance(column_type, str)
else str(column_type),
)
# Handle nested struct types
if (
isinstance(column_type, dict)
and column_type.get("type") == "struct"
):
column.children = self._parse_struct_fields(
column_type.get("fields", [])
)
column.dataType = DataType.STRUCT
columns.append(column)
except Exception as exc:
logger.warning(f"Unable to parse field {field}: {exc}")
logger.debug(traceback.format_exc())
return columns
def _parse_struct_fields(self, fields: list) -> List[dict]:
"""
Parse nested struct fields in Iceberg/Delta Lake metadata.
"""
children = []
for field in fields:
try:
child_name = field.get("name", "")
child_type = field.get("type", "string")
# Get the type string from dict if needed
type_str = child_type
if isinstance(child_type, dict):
type_str = child_type.get("type", "string")
# Use DataType enum directly
try:
data_type = (
DataType(type_str.upper())
if isinstance(type_str, str)
else DataType.STRING
)
except (ValueError, AttributeError):
data_type = DataType.STRING
child = {
"name": truncate_column_name(child_name),
"displayName": child_name,
"dataType": data_type.value,
"dataTypeDisplay": child_type
if isinstance(child_type, str)
else str(child_type),
}
# Recursively handle nested structs
if isinstance(child_type, dict) and child_type.get("type") == "struct":
child["children"] = self._parse_struct_fields(
child_type.get("fields", [])
)
children.append(child)
except Exception as exc:
logger.warning(f"Unable to parse nested field {field}: {exc}")
logger.debug(traceback.format_exc())
return children

View File

@ -12,6 +12,7 @@
Test datalake utils
"""
import json
import os
from unittest import TestCase
@ -22,6 +23,7 @@ from metadata.readers.dataframe.reader_factory import SupportedTypes
from metadata.utils.datalake.datalake_utils import (
DataFrameColumnParser,
GenericDataFrameColumnParser,
JsonDataFrameColumnParser,
ParquetDataFrameColumnParser,
get_file_format_type,
)
@ -590,3 +592,174 @@ class TestParquetDataFrameColumnParser(TestCase):
self.assertIn(SupportedTypes.CSVGZ, dsv_types)
except Exception as e:
self.fail(f"CSVGZ integration test failed: {e}")
class TestIcebergDeltaLakeMetadataParsing(TestCase):
"""Test Iceberg/Delta Lake metadata JSON parsing"""
def test_iceberg_metadata_parsing(self):
"""Test parsing of Iceberg/Delta Lake metadata files with nested schema.fields structure"""
# Sample Iceberg/Delta Lake metadata structure
iceberg_metadata = {
"format-version": 1,
"table-uuid": "e9182d72-131b-48fe-b530-79edc044fb01",
"location": "s3://bucket/path/table",
"schema": {
"type": "struct",
"schema-id": 0,
"fields": [
{
"id": 1,
"name": "customer_id",
"required": False,
"type": "string",
},
{
"id": 2,
"name": "customer_type_cd",
"required": False,
"type": "string",
},
{"id": 3, "name": "amount", "required": True, "type": "double"},
{
"id": 4,
"name": "is_active",
"required": False,
"type": "boolean",
},
{"id": 5, "name": "order_count", "required": False, "type": "int"},
{
"id": 6,
"name": "created_date",
"required": False,
"type": "date",
},
{
"id": 7,
"name": "updated_timestamp",
"required": False,
"type": "timestamp",
},
{
"id": 8,
"name": "metadata",
"required": False,
"type": {
"type": "struct",
"fields": [
{
"id": 9,
"name": "source_system",
"required": False,
"type": "string",
},
{
"id": 10,
"name": "last_sync_time",
"required": False,
"type": "timestamp",
},
],
},
},
],
},
}
# Convert to JSON string as would be received from file
raw_data = json.dumps(iceberg_metadata)
# Create a dummy DataFrame (required by parser but not used for Iceberg metadata)
df = pd.DataFrame()
# Create parser and parse columns
parser = JsonDataFrameColumnParser(df, raw_data=raw_data)
columns = parser.get_columns()
# Verify the correct number of columns were parsed
self.assertEqual(len(columns), 8)
# Verify field names were correctly parsed
expected_names = [
"customer_id",
"customer_type_cd",
"amount",
"is_active",
"order_count",
"created_date",
"updated_timestamp",
"metadata",
]
actual_names = [col.displayName for col in columns]
self.assertEqual(expected_names, actual_names)
# Verify data types were correctly mapped
expected_types = [
DataType.STRING, # customer_id
DataType.STRING, # customer_type_cd
DataType.DOUBLE, # amount
DataType.BOOLEAN, # is_active
DataType.INT, # order_count
DataType.DATE, # created_date
DataType.TIMESTAMP, # updated_timestamp
DataType.STRUCT, # metadata
]
actual_types = [col.dataType for col in columns]
self.assertEqual(expected_types, actual_types)
# Verify nested struct field (metadata)
metadata_column = columns[7]
self.assertEqual(metadata_column.displayName, "metadata")
self.assertEqual(metadata_column.dataType, DataType.STRUCT)
self.assertIsNotNone(metadata_column.children)
self.assertEqual(len(metadata_column.children), 2)
# Verify nested field details
nested_fields = metadata_column.children
self.assertEqual(nested_fields[0]["displayName"], "source_system")
self.assertEqual(nested_fields[0]["dataType"], DataType.STRING.value)
self.assertEqual(nested_fields[1]["displayName"], "last_sync_time")
self.assertEqual(nested_fields[1]["dataType"], DataType.TIMESTAMP.value)
def test_is_iceberg_delta_metadata_detection(self):
"""Test detection of Iceberg/Delta Lake metadata format"""
df = pd.DataFrame()
parser = JsonDataFrameColumnParser(df, raw_data=None)
# Test valid Iceberg/Delta Lake metadata
valid_metadata = {"schema": {"fields": [{"name": "field1", "type": "string"}]}}
self.assertTrue(parser._is_iceberg_delta_metadata(valid_metadata))
# Test invalid formats
invalid_cases = [
{}, # Empty dict
{"schema": "not_a_dict"}, # Schema not a dict
{"schema": {}}, # No fields
{"schema": {"fields": "not_a_list"}}, # Fields not a list
{"properties": {}}, # JSON Schema format (not Iceberg)
]
for invalid_case in invalid_cases:
with self.subTest(invalid_case=invalid_case):
self.assertFalse(parser._is_iceberg_delta_metadata(invalid_case))
def test_fallback_to_json_schema_parser(self):
"""Test that non-Iceberg JSON files fall back to standard JSON Schema parser"""
# Standard JSON Schema format
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {"name": {"type": "string"}, "age": {"type": "integer"}},
}
raw_data = json.dumps(json_schema)
df = pd.DataFrame()
# This should use the standard JSON Schema parser, not Iceberg parser
parser = JsonDataFrameColumnParser(df, raw_data=raw_data)
columns = parser.get_columns()
# The standard parser behavior would be different
# This test ensures we don't break existing JSON Schema parsing
self.assertIsNotNone(columns)