From da67de51cc37f1699eb6ecd1b0a9c5f2657fe9aa Mon Sep 17 00:00:00 2001 From: Keshav Mohta <68001229+keshavmohta09@users.noreply.github.com> Date: Fri, 22 Aug 2025 17:25:59 +0530 Subject: [PATCH] Fix #18491: ingestion fails for Iceberg tables with nested partition column (#23031) * fix: ingestion fails for Iceberg tables with nested partition column * test: added test to cover nested partition column for iceberg * refactor: used if-else in tablePartition check * fix: partition_column_name & column_partition_type typo (cherry picked from commit 2f655daedc1bb82fbb97bec0e42fca15a8bb7863) --- .../source/database/iceberg/models.py | 35 +++--- .../unit/topology/database/test_iceberg.py | 100 +++++++++++++++--- 2 files changed, 109 insertions(+), 26 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/iceberg/models.py b/ingestion/src/metadata/ingestion/source/database/iceberg/models.py index 0429757f4ea..71af3ed30d8 100644 --- a/ingestion/src/metadata/ingestion/source/database/iceberg/models.py +++ b/ingestion/src/metadata/ingestion/source/database/iceberg/models.py @@ -51,24 +51,33 @@ class IcebergTable(BaseModel): """Responsible for parsing the needed information from a PyIceberg Table.""" iceberg_columns = table.schema().fields + partition_columns = [] + for partition in table.spec().fields: + partition_column_name = get_column_from_partition( + iceberg_columns, partition + ) + column_partition_type = get_column_partition_type( + iceberg_columns, partition + ) + + if not (partition_column_name and column_partition_type): + continue + + partition_columns.append( + PartitionColumnDetails( + columnName=partition_column_name, + intervalType=column_partition_type, + interval=None, + ) + ) + return IcebergTable( name=name, tableType=table_type, description=table.properties.get("comment"), owners=owners, columns=[IcebergColumnParser.parse(column) for column in iceberg_columns], - tablePartition=TablePartition( - columns=[ - PartitionColumnDetails( - columnName=get_column_from_partition( - iceberg_columns, partition - ), - intervalType=get_column_partition_type( - iceberg_columns, partition - ), - interval=None, - ) - for partition in table.spec().fields - ] + tablePartition=( + TablePartition(columns=partition_columns) if partition_columns else None ), ) diff --git a/ingestion/tests/unit/topology/database/test_iceberg.py b/ingestion/tests/unit/topology/database/test_iceberg.py index f505680b209..46bf98c8f70 100644 --- a/ingestion/tests/unit/topology/database/test_iceberg.py +++ b/ingestion/tests/unit/topology/database/test_iceberg.py @@ -647,9 +647,10 @@ class IcebergUnitTest(TestCase): def name(self): return next(self.data) - with patch.object( - HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST - ), patch.object(HiveCatalog, "load_table", return_value=LoadTableMock()): + with ( + patch.object(HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST), + patch.object(HiveCatalog, "load_table", return_value=LoadTableMock()), + ): for i, table in enumerate(self.iceberg.get_tables_name_and_type()): self.assertEqual(table, EXPECTED_TABLE_LIST[i]) @@ -658,10 +659,11 @@ class IcebergUnitTest(TestCase): def raise_no_such_iceberg_table(): raise pyiceberg.exceptions.NoSuchIcebergTableError() - with patch.object( - HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST - ), patch.object( - HiveCatalog, "load_table", side_effect=raise_no_such_iceberg_table + with ( + patch.object(HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST), + patch.object( + HiveCatalog, "load_table", side_effect=raise_no_such_iceberg_table + ), ): self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0) @@ -670,9 +672,10 @@ class IcebergUnitTest(TestCase): def raise_no_such_table(): raise pyiceberg.exceptions.NoSuchTableError() - with patch.object( - HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST - ), patch.object(HiveCatalog, "load_table", side_effect=raise_no_such_table): + with ( + patch.object(HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST), + patch.object(HiveCatalog, "load_table", side_effect=raise_no_such_table), + ): self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0) def test_get_owner_ref(self): @@ -856,9 +859,80 @@ class IcebergUnitTest(TestCase): databaseSchema=FullyQualifiedEntityName(fq_database_schema), ) - with patch.object( - OpenMetadata, "get_reference_by_email", return_value=ref - ), patch.object(fqn, "build", return_value=fq_database_schema): + with ( + patch.object(OpenMetadata, "get_reference_by_email", return_value=ref), + patch.object(fqn, "build", return_value=fq_database_schema), + ): + result = next(self.iceberg.yield_table((table_name, table_type))).right + + self.assertEqual(result, expected) + + def test_yield_table_with_nested_partition_column(self): + table_name = "table_name" + table_type = TableType.Regular + + iceberg_table = { + "identifier": ( + self.iceberg.context.get().database, + self.iceberg.context.get().database_schema, + table_name, + ), + "metadata": TableMetadataV2.model_validate( + { + "location": "foo", + "current-schema-id": 0, + "last_column_id": 1, + "format_version": 2, + "schemas": [ + Schema( + fields=tuple( + MOCK_COLUMN_MAP[field]["iceberg"] + for field in MOCK_COLUMN_MAP.keys() + ) + ) + ], + "partition_spec": [], + "partition_specs": [ + { + "fields": ( + PartitionField( + source_id=100, + field_id=1000, + transform=IdentityTransform(), + name="nested1", + ), + ) + } + ], + "properties": {"owner": "myself", "comment": "Table Description"}, + } + ), + "metadata_location": "bar", + "io": "pyiceberg.io.pyarrow.PyArrowFileIO", + "catalog": self.iceberg.connection_obj, + } + + fq_database_schema = "FullyQualifiedDatabaseSchema" + + ref = EntityReferenceList(root=[EntityReference(id=uuid.uuid4(), type="user")]) + self.iceberg.context.get().iceberg_table = PyIcebergTable(**iceberg_table) + + expected = CreateTableRequest( + name=EntityName(table_name), + tableType=table_type, + description=Markdown("Table Description"), + owners=ref, + columns=[ + MOCK_COLUMN_MAP[field]["ometa"] for field in MOCK_COLUMN_MAP.keys() + ], + tablePartition=None, + databaseSchema=FullyQualifiedEntityName(fq_database_schema), + ) + + with ( + patch.object(OpenMetadata, "get_reference_by_email", return_value=ref), + patch.object(fqn, "build", return_value=fq_database_schema), + ): result = next(self.iceberg.yield_table((table_name, table_type))).right self.assertEqual(result, expected)