mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-29 17:49:14 +00:00
* fix: ingestion fails for Iceberg tables with nested partition column * test: added test to cover nested partition column for iceberg * refactor: used if-else in tablePartition check * fix: partition_column_name & column_partition_type typo (cherry picked from commit 2f655daedc1bb82fbb97bec0e42fca15a8bb7863)
This commit is contained in:
parent
ddc9e307be
commit
da67de51cc
@ -51,24 +51,33 @@ class IcebergTable(BaseModel):
|
|||||||
"""Responsible for parsing the needed information from a PyIceberg Table."""
|
"""Responsible for parsing the needed information from a PyIceberg Table."""
|
||||||
iceberg_columns = table.schema().fields
|
iceberg_columns = table.schema().fields
|
||||||
|
|
||||||
|
partition_columns = []
|
||||||
|
for partition in table.spec().fields:
|
||||||
|
partition_column_name = get_column_from_partition(
|
||||||
|
iceberg_columns, partition
|
||||||
|
)
|
||||||
|
column_partition_type = get_column_partition_type(
|
||||||
|
iceberg_columns, partition
|
||||||
|
)
|
||||||
|
|
||||||
|
if not (partition_column_name and column_partition_type):
|
||||||
|
continue
|
||||||
|
|
||||||
|
partition_columns.append(
|
||||||
|
PartitionColumnDetails(
|
||||||
|
columnName=partition_column_name,
|
||||||
|
intervalType=column_partition_type,
|
||||||
|
interval=None,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
return IcebergTable(
|
return IcebergTable(
|
||||||
name=name,
|
name=name,
|
||||||
tableType=table_type,
|
tableType=table_type,
|
||||||
description=table.properties.get("comment"),
|
description=table.properties.get("comment"),
|
||||||
owners=owners,
|
owners=owners,
|
||||||
columns=[IcebergColumnParser.parse(column) for column in iceberg_columns],
|
columns=[IcebergColumnParser.parse(column) for column in iceberg_columns],
|
||||||
tablePartition=TablePartition(
|
tablePartition=(
|
||||||
columns=[
|
TablePartition(columns=partition_columns) if partition_columns else None
|
||||||
PartitionColumnDetails(
|
|
||||||
columnName=get_column_from_partition(
|
|
||||||
iceberg_columns, partition
|
|
||||||
),
|
|
||||||
intervalType=get_column_partition_type(
|
|
||||||
iceberg_columns, partition
|
|
||||||
),
|
|
||||||
interval=None,
|
|
||||||
)
|
|
||||||
for partition in table.spec().fields
|
|
||||||
]
|
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|||||||
@ -647,9 +647,10 @@ class IcebergUnitTest(TestCase):
|
|||||||
def name(self):
|
def name(self):
|
||||||
return next(self.data)
|
return next(self.data)
|
||||||
|
|
||||||
with patch.object(
|
with (
|
||||||
HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST
|
patch.object(HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST),
|
||||||
), patch.object(HiveCatalog, "load_table", return_value=LoadTableMock()):
|
patch.object(HiveCatalog, "load_table", return_value=LoadTableMock()),
|
||||||
|
):
|
||||||
for i, table in enumerate(self.iceberg.get_tables_name_and_type()):
|
for i, table in enumerate(self.iceberg.get_tables_name_and_type()):
|
||||||
self.assertEqual(table, EXPECTED_TABLE_LIST[i])
|
self.assertEqual(table, EXPECTED_TABLE_LIST[i])
|
||||||
|
|
||||||
@ -658,10 +659,11 @@ class IcebergUnitTest(TestCase):
|
|||||||
def raise_no_such_iceberg_table():
|
def raise_no_such_iceberg_table():
|
||||||
raise pyiceberg.exceptions.NoSuchIcebergTableError()
|
raise pyiceberg.exceptions.NoSuchIcebergTableError()
|
||||||
|
|
||||||
with patch.object(
|
with (
|
||||||
HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST
|
patch.object(HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST),
|
||||||
), patch.object(
|
patch.object(
|
||||||
HiveCatalog, "load_table", side_effect=raise_no_such_iceberg_table
|
HiveCatalog, "load_table", side_effect=raise_no_such_iceberg_table
|
||||||
|
),
|
||||||
):
|
):
|
||||||
self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0)
|
self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0)
|
||||||
|
|
||||||
@ -670,9 +672,10 @@ class IcebergUnitTest(TestCase):
|
|||||||
def raise_no_such_table():
|
def raise_no_such_table():
|
||||||
raise pyiceberg.exceptions.NoSuchTableError()
|
raise pyiceberg.exceptions.NoSuchTableError()
|
||||||
|
|
||||||
with patch.object(
|
with (
|
||||||
HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST
|
patch.object(HiveCatalog, "list_tables", return_value=MOCK_TABLE_LIST),
|
||||||
), patch.object(HiveCatalog, "load_table", side_effect=raise_no_such_table):
|
patch.object(HiveCatalog, "load_table", side_effect=raise_no_such_table),
|
||||||
|
):
|
||||||
self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0)
|
self.assertEqual(len(list(self.iceberg.get_tables_name_and_type())), 0)
|
||||||
|
|
||||||
def test_get_owner_ref(self):
|
def test_get_owner_ref(self):
|
||||||
@ -856,9 +859,80 @@ class IcebergUnitTest(TestCase):
|
|||||||
databaseSchema=FullyQualifiedEntityName(fq_database_schema),
|
databaseSchema=FullyQualifiedEntityName(fq_database_schema),
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch.object(
|
with (
|
||||||
OpenMetadata, "get_reference_by_email", return_value=ref
|
patch.object(OpenMetadata, "get_reference_by_email", return_value=ref),
|
||||||
), patch.object(fqn, "build", return_value=fq_database_schema):
|
patch.object(fqn, "build", return_value=fq_database_schema),
|
||||||
|
):
|
||||||
|
result = next(self.iceberg.yield_table((table_name, table_type))).right
|
||||||
|
|
||||||
|
self.assertEqual(result, expected)
|
||||||
|
|
||||||
|
def test_yield_table_with_nested_partition_column(self):
|
||||||
|
table_name = "table_name"
|
||||||
|
table_type = TableType.Regular
|
||||||
|
|
||||||
|
iceberg_table = {
|
||||||
|
"identifier": (
|
||||||
|
self.iceberg.context.get().database,
|
||||||
|
self.iceberg.context.get().database_schema,
|
||||||
|
table_name,
|
||||||
|
),
|
||||||
|
"metadata": TableMetadataV2.model_validate(
|
||||||
|
{
|
||||||
|
"location": "foo",
|
||||||
|
"current-schema-id": 0,
|
||||||
|
"last_column_id": 1,
|
||||||
|
"format_version": 2,
|
||||||
|
"schemas": [
|
||||||
|
Schema(
|
||||||
|
fields=tuple(
|
||||||
|
MOCK_COLUMN_MAP[field]["iceberg"]
|
||||||
|
for field in MOCK_COLUMN_MAP.keys()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
],
|
||||||
|
"partition_spec": [],
|
||||||
|
"partition_specs": [
|
||||||
|
{
|
||||||
|
"fields": (
|
||||||
|
PartitionField(
|
||||||
|
source_id=100,
|
||||||
|
field_id=1000,
|
||||||
|
transform=IdentityTransform(),
|
||||||
|
name="nested1",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"properties": {"owner": "myself", "comment": "Table Description"},
|
||||||
|
}
|
||||||
|
),
|
||||||
|
"metadata_location": "bar",
|
||||||
|
"io": "pyiceberg.io.pyarrow.PyArrowFileIO",
|
||||||
|
"catalog": self.iceberg.connection_obj,
|
||||||
|
}
|
||||||
|
|
||||||
|
fq_database_schema = "FullyQualifiedDatabaseSchema"
|
||||||
|
|
||||||
|
ref = EntityReferenceList(root=[EntityReference(id=uuid.uuid4(), type="user")])
|
||||||
|
self.iceberg.context.get().iceberg_table = PyIcebergTable(**iceberg_table)
|
||||||
|
|
||||||
|
expected = CreateTableRequest(
|
||||||
|
name=EntityName(table_name),
|
||||||
|
tableType=table_type,
|
||||||
|
description=Markdown("Table Description"),
|
||||||
|
owners=ref,
|
||||||
|
columns=[
|
||||||
|
MOCK_COLUMN_MAP[field]["ometa"] for field in MOCK_COLUMN_MAP.keys()
|
||||||
|
],
|
||||||
|
tablePartition=None,
|
||||||
|
databaseSchema=FullyQualifiedEntityName(fq_database_schema),
|
||||||
|
)
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(OpenMetadata, "get_reference_by_email", return_value=ref),
|
||||||
|
patch.object(fqn, "build", return_value=fq_database_schema),
|
||||||
|
):
|
||||||
result = next(self.iceberg.yield_table((table_name, table_type))).right
|
result = next(self.iceberg.yield_table((table_name, table_type))).right
|
||||||
|
|
||||||
self.assertEqual(result, expected)
|
self.assertEqual(result, expected)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user