From a39c4db8e70ae607361a4c110bb6e74d74ec031d Mon Sep 17 00:00:00 2001 From: Teddy Date: Tue, 30 Aug 2022 20:39:15 +0200 Subject: [PATCH] Add partial support for BQ partitioned table (#7066) * Added support for BQ time based partition (not ingestion) * Fixed minor errors in test suite workflow --- .../src/metadata/interfaces/sqa_interface.py | 5 +- .../src/metadata/orm_profiler/api/models.py | 2 +- .../src/metadata/orm_profiler/api/workflow.py | 21 ++++++ .../orm_profiler/profiler/handle_partition.py | 5 +- .../src/metadata/test_suite/api/workflow.py | 73 ++++++++++++++----- .../table/table_column_count_to_be_between.py | 4 +- .../test_suite/test_e2e_workflow.py | 4 +- .../integration/test_suite/test_workflow.py | 22 +++--- .../tests/unit/test_suite/test_validations.py | 4 +- 9 files changed, 98 insertions(+), 42 deletions(-) diff --git a/ingestion/src/metadata/interfaces/sqa_interface.py b/ingestion/src/metadata/interfaces/sqa_interface.py index e40750ed625..acfa9707f52 100644 --- a/ingestion/src/metadata/interfaces/sqa_interface.py +++ b/ingestion/src/metadata/interfaces/sqa_interface.py @@ -248,11 +248,8 @@ class SQAInterface(InterfaceProtocol): sample=sample, ) - try: + if column is not None: column = column.name - except Exception as exc: - logger.debug(traceback.format_exc()) - logger.warning(f"Unexpected exception computing metrics: {exc}") return row, column diff --git a/ingestion/src/metadata/orm_profiler/api/models.py b/ingestion/src/metadata/orm_profiler/api/models.py index c4b72168689..936df76e82f 100644 --- a/ingestion/src/metadata/orm_profiler/api/models.py +++ b/ingestion/src/metadata/orm_profiler/api/models.py @@ -34,7 +34,7 @@ class TablePartitionConfig(ConfigModel): """table partition config""" partitionField: Optional[str] = None - partitionQueryDuration: Optional[int] = 1 + partitionQueryDuration: Optional[int] = 30 partitionValues: Optional[List] = None diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index c2f0ceabfb3..d0d5ca1bcf8 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -28,6 +28,7 @@ from metadata.config.workflow import get_sink from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( ColumnProfilerConfig, + IntervalType, Table, TableProfile, ) @@ -210,6 +211,26 @@ class ProfilerWorkflow: if entity_config: return entity_config.partitionConfig + if entity.tablePartition: + if entity.tablePartition.intervalType in { + IntervalType.TIME_UNIT, + IntervalType.INGESTION_TIME, + }: + try: + partition_field = entity.tablePartition.columns[0] + except Exception: + raise TypeError( + "Unsupported ingestion based partition type. Skipping table" + ) + + return TablePartitionConfig( + partitionField=partition_field, + ) + + raise TypeError( + f"Unsupported partition type {entity.tablePartition.intervalType}. Skipping table" + ) + return None def create_profiler_interface(self, service_connection_config, table_entity: Table): diff --git a/ingestion/src/metadata/orm_profiler/profiler/handle_partition.py b/ingestion/src/metadata/orm_profiler/profiler/handle_partition.py index 20b0bd38fcb..21c1318e676 100644 --- a/ingestion/src/metadata/orm_profiler/profiler/handle_partition.py +++ b/ingestion/src/metadata/orm_profiler/profiler/handle_partition.py @@ -164,11 +164,10 @@ class partition_filter_handler: def handle_and_execute(_self, *args, **kwargs): """Handle partitioned queries""" if _self._partition_details: + partition_field = _self._partition_details["partition_field"] partition_filter = build_partition_predicate( _self._partition_details, - _self.table.__table__.c.get( - _self._partition_details["partition_field"] - ), + _self.table.__table__.c.get(partition_field), ) if self.build_sample: return ( diff --git a/ingestion/src/metadata/test_suite/api/workflow.py b/ingestion/src/metadata/test_suite/api/workflow.py index 579c7284fe1..411c846b205 100644 --- a/ingestion/src/metadata/test_suite/api/workflow.py +++ b/ingestion/src/metadata/test_suite/api/workflow.py @@ -26,7 +26,7 @@ from metadata.config.common import WorkflowExecutionError from metadata.config.workflow import get_sink from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.table import IntervalType, Table from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) @@ -40,11 +40,11 @@ from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.tests.testCase import TestCase from metadata.generated.schema.tests.testDefinition import TestDefinition from metadata.generated.schema.tests.testSuite import TestSuite -from metadata.generated.schema.type.basic import EntityLink from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.api.processor import ProcessorStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.interfaces.sqa_interface import SQAInterface +from metadata.orm_profiler.api.models import TablePartitionConfig from metadata.test_suite.api.models import TestCaseDefinition, TestSuiteProcessorConfig from metadata.test_suite.runner.core import DataTestsRunner from metadata.utils import entity_link @@ -178,6 +178,34 @@ class TestSuiteWorkflow: return None + def _get_partition_details(self, entity: Table) -> Optional[TablePartitionConfig]: + """Get partition details + + Args: + entity: table entity + """ + if entity.tablePartition: + if entity.tablePartition.intervalType in { + IntervalType.TIME_UNIT, + IntervalType.INGESTION_TIME, + }: + try: + partition_field = entity.tablePartition.columns[0] + except Exception: + raise TypeError( + "Unsupported ingestion based partition type. Skipping table" + ) + + return TablePartitionConfig( + partitionField=partition_field, + ) + + raise TypeError( + f"Unsupported partition type {entity.tablePartition.intervalType}. Skipping table" + ) + + return None + def _create_sqa_tests_runner_interface(self, table_fqn: str): """create the interface to execute test against SQA sources""" table_entity = self._get_table_entity_from_test_case(table_fqn) @@ -193,6 +221,9 @@ class TestSuiteWorkflow: profile_query=self._get_profile_query(table_entity) if not self._get_profile_sample(table_entity) else None, + partition_config=self._get_partition_details(table_entity) + if not self._get_profile_query(table_entity) + else None, ) def _create_data_tests_runner(self, sqa_interface): @@ -344,21 +375,29 @@ class TestSuiteWorkflow: unique_table_fqns = self._get_unique_table_entities(test_cases) for table_fqn in unique_table_fqns: - sqa_interface = self._create_sqa_tests_runner_interface(table_fqn) - for test_case in test_cases: - try: - data_test_runner = self._create_data_tests_runner(sqa_interface) - test_result = data_test_runner.run_and_handle(test_case) - if not test_result: - continue - if hasattr(self, "sink"): - self.sink.write_record(test_result) - logger.info(f"Successfuly ran test case {test_case.name.__root__}") - self.status.processed(test_case.fullyQualifiedName.__root__) - except Exception as exc: - logger.debug(traceback.format_exc(exc)) - logger.warning(f"Could not run test case {test_case.name}: {exc}") - self.status.failure(test_case.fullyQualifiedName.__root__) + try: + sqa_interface = self._create_sqa_tests_runner_interface(table_fqn) + for test_case in test_cases: + try: + data_test_runner = self._create_data_tests_runner(sqa_interface) + test_result = data_test_runner.run_and_handle(test_case) + if not test_result: + continue + if hasattr(self, "sink"): + self.sink.write_record(test_result) + logger.info( + f"Successfuly ran test case {test_case.name.__root__}" + ) + self.status.processed(test_case.fullyQualifiedName.__root__) + except Exception as exc: + logger.debug(traceback.format_exc(exc)) + logger.warning( + f"Could not run test case {test_case.name}: {exc}" + ) + except TypeError as exc: + logger.debug(traceback.format_exc(exc)) + logger.warning(f"Could not run test case {test_case.name}: {exc}") + self.status.failure(test_case.fullyQualifiedName.__root__) def print_status(self) -> int: """ diff --git a/ingestion/src/metadata/test_suite/validations/table/table_column_count_to_be_between.py b/ingestion/src/metadata/test_suite/validations/table/table_column_count_to_be_between.py index 81db702cc0f..62802cb13fa 100644 --- a/ingestion/src/metadata/test_suite/validations/table/table_column_count_to_be_between.py +++ b/ingestion/src/metadata/test_suite/validations/table/table_column_count_to_be_between.py @@ -67,7 +67,7 @@ def table_column_count_to_be_between( ( int(param_value.value) for param_value in test_case.parameterValues - if param_value.name == "minColvalue" + if param_value.name == "minColValue" ), None, ) @@ -75,7 +75,7 @@ def table_column_count_to_be_between( max_ = next( int(param_value.value) for param_value in test_case.parameterValues - if param_value.name == "maxColvalue" + if param_value.name == "maxColValue" ) status = ( diff --git a/ingestion/tests/integration/test_suite/test_e2e_workflow.py b/ingestion/tests/integration/test_suite/test_e2e_workflow.py index 881cad5469f..541a4109092 100644 --- a/ingestion/tests/integration/test_suite/test_e2e_workflow.py +++ b/ingestion/tests/integration/test_suite/test_e2e_workflow.py @@ -65,8 +65,8 @@ test_suite_config = { "testDefinitionName": "TableColumnCountToBeBetween", "entityLink": "<#E::table::test_suite_service_test.test_suite_database.test_suite_database_schema.users>", "parameterValues": [ - {"name": "minColvalue", "value": 1}, - {"name": "maxColvalue", "value": 5}, + {"name": "minColValue", "value": 1}, + {"name": "maxColValue", "value": 5}, ], }, { diff --git a/ingestion/tests/integration/test_suite/test_workflow.py b/ingestion/tests/integration/test_suite/test_workflow.py index 2304571d730..70907616c0d 100644 --- a/ingestion/tests/integration/test_suite/test_workflow.py +++ b/ingestion/tests/integration/test_suite/test_workflow.py @@ -76,8 +76,8 @@ class TestSuiteWorkflowTests(unittest.TestCase): "testDefinitionName": "TableColumnCountToBeBetween", "entityLink": "<#E::table::my.fully.qualified.name>", "parameterValues": [ - {"name": "minColvalue", "value": 1}, - {"name": "maxColvalue", "value": 5}, + {"name": "minColValue", "value": 1}, + {"name": "maxColValue", "value": 5}, ], } ], @@ -150,8 +150,8 @@ class TestSuiteWorkflowTests(unittest.TestCase): "testDefinitionName": "TableColumnCountToBeBetween", "entityLink": "<#E::table::my.fully.qualified.name>", "parameterValues": [ - {"name": "minColvalue", "value": 1}, - {"name": "maxColvalue", "value": 5}, + {"name": "minColValue", "value": 1}, + {"name": "maxColValue", "value": 5}, ], }, { @@ -159,8 +159,8 @@ class TestSuiteWorkflowTests(unittest.TestCase): "testDefinitionName": "TableColumnCountToBeBetween", "entityLink": "<#E::table::my.fully.qualified.name>", "parameterValues": [ - {"name": "minColvalue", "value": 1}, - {"name": "maxColvalue", "value": 5}, + {"name": "minColValue", "value": 1}, + {"name": "maxColValue", "value": 5}, ], }, ], @@ -173,8 +173,8 @@ class TestSuiteWorkflowTests(unittest.TestCase): "testDefinitionName": "TableColumnCountToBeBetween", "entityLink": "<#E::table::my.fully.qualified.name>", "parameterValues": [ - {"name": "minColvalue", "value": 1}, - {"name": "maxColvalue", "value": 5}, + {"name": "minColValue", "value": 1}, + {"name": "maxColValue", "value": 5}, ], }, ], @@ -216,8 +216,8 @@ class TestSuiteWorkflowTests(unittest.TestCase): "testDefinitionName": "TableColumnCountToBeBetween", "entityLink": "<#E::table::sample_data.ecommerce_db.shopify.dim_address>", "parameterValues": [ - {"name": "minColvalue", "value": 1}, - {"name": "maxColvalue", "value": 5}, + {"name": "minColValue", "value": 1}, + {"name": "maxColValue", "value": 5}, ], }, { @@ -290,7 +290,7 @@ class TestSuiteWorkflowTests(unittest.TestCase): "entityLink": "<#E::table::sample_data.ecommerce_db.shopify.dim_address>", "parameterValues": [ {"name": "minColValue", "value": 1}, - {"name": "maxColvalue", "value": 10}, + {"name": "maxColValue", "value": 10}, ], }, ], diff --git a/ingestion/tests/unit/test_suite/test_validations.py b/ingestion/tests/unit/test_suite/test_validations.py index 0be6e9e2edc..fe094cced2e 100644 --- a/ingestion/tests/unit/test_suite/test_validations.py +++ b/ingestion/tests/unit/test_suite/test_validations.py @@ -479,8 +479,8 @@ class testSuiteValidation(unittest.TestCase): testSuite=EntityReference(id=uuid4(), type="TestSuite"), testDefinition=EntityReference(id=uuid4(), type="TestDefinition"), parameterValues=[ - TestCaseParameterValue(name="minColvalue", value="2"), - TestCaseParameterValue(name="maxColvalue", value="10"), + TestCaseParameterValue(name="minColValue", value="2"), + TestCaseParameterValue(name="maxColValue", value="10"), ], )