From 3e83bdac3de97dd8fc22ec12a8d793f67ff31625 Mon Sep 17 00:00:00 2001 From: Teddy Date: Wed, 28 Feb 2024 15:20:59 +0100 Subject: [PATCH] ISSUE #14765 - Implement Athena Injected Partition Check (#15318) * refactor!: change partition metadata structure for table entities * refactor!: updated json schema for TypeScript code gen * chore: migration of partition for table entities * style: python & java linting * fix: catch injected partition table in Athena * style: ran python linting --- ingestion/src/metadata/utils/partition.py | 78 +++++++++++++++++++--- ingestion/src/metadata/utils/test_utils.py | 4 +- ingestion/tests/unit/test_partition.py | 54 ++++++++++++++- 3 files changed, 122 insertions(+), 14 deletions(-) diff --git a/ingestion/src/metadata/utils/partition.py b/ingestion/src/metadata/utils/partition.py index da5daa53559..020e8a0f920 100644 --- a/ingestion/src/metadata/utils/partition.py +++ b/ingestion/src/metadata/utils/partition.py @@ -19,12 +19,53 @@ from metadata.generated.schema.entity.data.table import ( PartitionIntervalUnit, PartitionProfilerConfig, Table, + TablePartition, + TableProfilerConfig, ) from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) +def validate_athena_injected_partitioning( + table_partitions: TablePartition, + table_profiler_config: Optional[TableProfilerConfig], + profiler_partitioning_config: Optional[PartitionProfilerConfig], +) -> None: + """Validate Athena partitioning. Injected partition need to be defined + in the table profiler c onfig for the profiler to work correctly. We'll throw an + error if the partitioning is not defined in the table profiler config. + + Attr: + entity (Table): entity table + """ + error_msg = ( + "Table profiler config is missing for table with injected partitioning. Please define " + "the partitioning in the table profiler config for column {column_name}. " + "For more information, visit " + "https://docs.open-metadata.org/v1.3.x/connectors/ingestion/workflows/profiler#profiler-options " + ) + + column_partitions: Optional[List[PartitionColumnDetails]] = table_partitions.columns + if not column_partitions: + raise RuntimeError("Table parition is set but no columns are defined.") + + for column_partition in column_partitions: + if column_partition.intervalType == PartitionIntervalTypes.INJECTED: + if table_profiler_config is None or profiler_partitioning_config is None: + raise RuntimeError( + error_msg.format(column_name=column_partition.columnName) + ) + + if ( + profiler_partitioning_config.partitionColumnName + != column_partition.columnName + ): + raise RuntimeError( + error_msg.format(column_name=column_partition.columnName) + ) + + def get_partition_details(entity: Table) -> Optional[PartitionProfilerConfig]: """Build PartitionProfilerConfig object from entity @@ -33,18 +74,33 @@ def get_partition_details(entity: Table) -> Optional[PartitionProfilerConfig]: Returns: PartitionProfilerConfig """ - if ( - hasattr(entity, "tableProfilerConfig") - and hasattr(entity.tableProfilerConfig, "partitioning") - and entity.tableProfilerConfig.partitioning - ): - return entity.tableProfilerConfig.partitioning + # Gather service type information + service_type = getattr(entity, "serviceType", None) - if ( - hasattr(entity, "serviceType") - and entity.serviceType == DatabaseServiceType.BigQuery - ): - if hasattr(entity, "tablePartition") and entity.tablePartition: + # Gather table partitioning information + table_partition = getattr(entity, "tablePartition", None) + + # Profiler config + profiler_partitioning_config: Optional[PartitionProfilerConfig] = None + profiler_config: Optional[TableProfilerConfig] = getattr( + entity, "tableProfilerConfig", None + ) + if profiler_config: + profiler_partitioning_config = getattr(profiler_config, "partitioning", None) + + if table_partition and service_type == DatabaseServiceType.Athena: + # if table is an Athena table and it has been partitioned we need to validate injected partitioning + validate_athena_injected_partitioning( + table_partition, profiler_config, profiler_partitioning_config + ) + return profiler_partitioning_config + + if profiler_partitioning_config: + # if table has partitioning defined in the profiler config, return it + return profiler_partitioning_config + + if service_type == DatabaseServiceType.BigQuery: + if table_partition: column_partitions: Optional[ List[PartitionColumnDetails] ] = entity.tablePartition.columns diff --git a/ingestion/src/metadata/utils/test_utils.py b/ingestion/src/metadata/utils/test_utils.py index 999f0f30e48..266e8e72dd6 100644 --- a/ingestion/src/metadata/utils/test_utils.py +++ b/ingestion/src/metadata/utils/test_utils.py @@ -46,8 +46,8 @@ class ErrorHandler: def try_execute(self, func, *args, **kwargs): try: func(*args, **kwargs) - except Exception as e: - self.errors.append(e) + except Exception as exc: + self.errors.append(exc) def raise_if_errors(self): if len(self.errors) == 1: diff --git a/ingestion/tests/unit/test_partition.py b/ingestion/tests/unit/test_partition.py index 014f81a66aa..705f37efb1d 100644 --- a/ingestion/tests/unit/test_partition.py +++ b/ingestion/tests/unit/test_partition.py @@ -13,6 +13,7 @@ from typing import Optional +import pytest from pydantic import BaseModel from metadata.generated.schema.entity.data.table import ( @@ -47,6 +48,15 @@ class MockRedshiftTable(BaseModel): arbitrary_types_allowed = True +class MockAthenaTable(BaseModel): + tablePartition: Optional[TablePartition] + tableProfilerConfig: Optional[TableProfilerConfig] + serviceType = DatabaseServiceType.Athena + + class Config: + arbitrary_types_allowed = True + + def test_get_partition_details(): """test get_partition_details function""" table_entity = MockRedshiftTable( @@ -60,7 +70,7 @@ def test_get_partition_details(): partitionValues=None, ) ) - ) + ) # type: ignore partition = get_partition_details(table_entity) @@ -111,3 +121,45 @@ def test_get_partition_details(): assert partition.partitionIntervalType == PartitionIntervalTypes.INGESTION_TIME assert partition.partitionInterval == 1 assert partition.partitionIntervalUnit == PartitionIntervalUnit.DAY + + +def test_athena_injected_partition(): + """Test injected partitioning for athena table""" + entity = MockAthenaTable( + tablePartition=TablePartition( + columns=[ + PartitionColumnDetails( + columnName="e", + intervalType=PartitionIntervalTypes.INJECTED, + interval=None, + ) + ] + ), + tableProfilerConfig=None, + ) + + with pytest.raises( + RuntimeError, + match="Table profiler config is missing for table with injected partitioning. Please define the partitioning in the table profiler config for column e", + ): + # As athena table has injected partitioning, it should raise an error + # since we have not provided any partitioning details for the injected partition + get_partition_details(entity) + + profiler_config = TableProfilerConfig( + partitioning=PartitionProfilerConfig( + enablePartitioning=True, + partitionColumnName="e", + partitionIntervalType="COLUMN-VALUE", + partitionValues=["red"], + ) + ) + + entity.tableProfilerConfig = profiler_config + + partition = get_partition_details(entity) + + assert partition.enablePartitioning == True + assert partition.partitionColumnName == "e" + assert partition.partitionIntervalType == PartitionIntervalTypes.COLUMN_VALUE + assert partition.partitionValues == ["red"]