ISSUE #14765 - Implement Athena Injected Partition Check (#15318)

* refactor!: change partition metadata structure for table entities

* refactor!: updated json schema for TypeScript code gen

* chore: migration of partition for table entities

* style: python & java linting

* fix: catch injected partition table in Athena

* style: ran python linting
This commit is contained in:
Teddy 2024-02-28 15:20:59 +01:00 committed by GitHub
parent ddf02f24d4
commit 3e83bdac3d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 122 additions and 14 deletions

View File

@ -19,12 +19,53 @@ from metadata.generated.schema.entity.data.table import (
PartitionIntervalUnit,
PartitionProfilerConfig,
Table,
TablePartition,
TableProfilerConfig,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
def validate_athena_injected_partitioning(
table_partitions: TablePartition,
table_profiler_config: Optional[TableProfilerConfig],
profiler_partitioning_config: Optional[PartitionProfilerConfig],
) -> None:
"""Validate Athena partitioning. Injected partition need to be defined
in the table profiler c onfig for the profiler to work correctly. We'll throw an
error if the partitioning is not defined in the table profiler config.
Attr:
entity (Table): entity table
"""
error_msg = (
"Table profiler config is missing for table with injected partitioning. Please define "
"the partitioning in the table profiler config for column {column_name}. "
"For more information, visit "
"https://docs.open-metadata.org/v1.3.x/connectors/ingestion/workflows/profiler#profiler-options "
)
column_partitions: Optional[List[PartitionColumnDetails]] = table_partitions.columns
if not column_partitions:
raise RuntimeError("Table parition is set but no columns are defined.")
for column_partition in column_partitions:
if column_partition.intervalType == PartitionIntervalTypes.INJECTED:
if table_profiler_config is None or profiler_partitioning_config is None:
raise RuntimeError(
error_msg.format(column_name=column_partition.columnName)
)
if (
profiler_partitioning_config.partitionColumnName
!= column_partition.columnName
):
raise RuntimeError(
error_msg.format(column_name=column_partition.columnName)
)
def get_partition_details(entity: Table) -> Optional[PartitionProfilerConfig]:
"""Build PartitionProfilerConfig object from entity
@ -33,18 +74,33 @@ def get_partition_details(entity: Table) -> Optional[PartitionProfilerConfig]:
Returns:
PartitionProfilerConfig
"""
if (
hasattr(entity, "tableProfilerConfig")
and hasattr(entity.tableProfilerConfig, "partitioning")
and entity.tableProfilerConfig.partitioning
):
return entity.tableProfilerConfig.partitioning
# Gather service type information
service_type = getattr(entity, "serviceType", None)
if (
hasattr(entity, "serviceType")
and entity.serviceType == DatabaseServiceType.BigQuery
):
if hasattr(entity, "tablePartition") and entity.tablePartition:
# Gather table partitioning information
table_partition = getattr(entity, "tablePartition", None)
# Profiler config
profiler_partitioning_config: Optional[PartitionProfilerConfig] = None
profiler_config: Optional[TableProfilerConfig] = getattr(
entity, "tableProfilerConfig", None
)
if profiler_config:
profiler_partitioning_config = getattr(profiler_config, "partitioning", None)
if table_partition and service_type == DatabaseServiceType.Athena:
# if table is an Athena table and it has been partitioned we need to validate injected partitioning
validate_athena_injected_partitioning(
table_partition, profiler_config, profiler_partitioning_config
)
return profiler_partitioning_config
if profiler_partitioning_config:
# if table has partitioning defined in the profiler config, return it
return profiler_partitioning_config
if service_type == DatabaseServiceType.BigQuery:
if table_partition:
column_partitions: Optional[
List[PartitionColumnDetails]
] = entity.tablePartition.columns

View File

@ -46,8 +46,8 @@ class ErrorHandler:
def try_execute(self, func, *args, **kwargs):
try:
func(*args, **kwargs)
except Exception as e:
self.errors.append(e)
except Exception as exc:
self.errors.append(exc)
def raise_if_errors(self):
if len(self.errors) == 1:

View File

@ -13,6 +13,7 @@
from typing import Optional
import pytest
from pydantic import BaseModel
from metadata.generated.schema.entity.data.table import (
@ -47,6 +48,15 @@ class MockRedshiftTable(BaseModel):
arbitrary_types_allowed = True
class MockAthenaTable(BaseModel):
tablePartition: Optional[TablePartition]
tableProfilerConfig: Optional[TableProfilerConfig]
serviceType = DatabaseServiceType.Athena
class Config:
arbitrary_types_allowed = True
def test_get_partition_details():
"""test get_partition_details function"""
table_entity = MockRedshiftTable(
@ -60,7 +70,7 @@ def test_get_partition_details():
partitionValues=None,
)
)
)
) # type: ignore
partition = get_partition_details(table_entity)
@ -111,3 +121,45 @@ def test_get_partition_details():
assert partition.partitionIntervalType == PartitionIntervalTypes.INGESTION_TIME
assert partition.partitionInterval == 1
assert partition.partitionIntervalUnit == PartitionIntervalUnit.DAY
def test_athena_injected_partition():
"""Test injected partitioning for athena table"""
entity = MockAthenaTable(
tablePartition=TablePartition(
columns=[
PartitionColumnDetails(
columnName="e",
intervalType=PartitionIntervalTypes.INJECTED,
interval=None,
)
]
),
tableProfilerConfig=None,
)
with pytest.raises(
RuntimeError,
match="Table profiler config is missing for table with injected partitioning. Please define the partitioning in the table profiler config for column e",
):
# As athena table has injected partitioning, it should raise an error
# since we have not provided any partitioning details for the injected partition
get_partition_details(entity)
profiler_config = TableProfilerConfig(
partitioning=PartitionProfilerConfig(
enablePartitioning=True,
partitionColumnName="e",
partitionIntervalType="COLUMN-VALUE",
partitionValues=["red"],
)
)
entity.tableProfilerConfig = profiler_config
partition = get_partition_details(entity)
assert partition.enablePartitioning == True
assert partition.partitionColumnName == "e"
assert partition.partitionIntervalType == PartitionIntervalTypes.COLUMN_VALUE
assert partition.partitionValues == ["red"]