Add partial support for BQ partitioned table (#7066)

* Added support for BQ time based partition (not ingestion)

* Fixed minor errors in test suite workflow
This commit is contained in:
Teddy 2022-08-30 20:39:15 +02:00 committed by GitHub
parent 1dfcb45e86
commit a39c4db8e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 98 additions and 42 deletions

View File

@ -248,11 +248,8 @@ class SQAInterface(InterfaceProtocol):
sample=sample,
)
try:
if column is not None:
column = column.name
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected exception computing metrics: {exc}")
return row, column

View File

@ -34,7 +34,7 @@ class TablePartitionConfig(ConfigModel):
"""table partition config"""
partitionField: Optional[str] = None
partitionQueryDuration: Optional[int] = 1
partitionQueryDuration: Optional[int] = 30
partitionValues: Optional[List] = None

View File

@ -28,6 +28,7 @@ from metadata.config.workflow import get_sink
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import (
ColumnProfilerConfig,
IntervalType,
Table,
TableProfile,
)
@ -210,6 +211,26 @@ class ProfilerWorkflow:
if entity_config:
return entity_config.partitionConfig
if entity.tablePartition:
if entity.tablePartition.intervalType in {
IntervalType.TIME_UNIT,
IntervalType.INGESTION_TIME,
}:
try:
partition_field = entity.tablePartition.columns[0]
except Exception:
raise TypeError(
"Unsupported ingestion based partition type. Skipping table"
)
return TablePartitionConfig(
partitionField=partition_field,
)
raise TypeError(
f"Unsupported partition type {entity.tablePartition.intervalType}. Skipping table"
)
return None
def create_profiler_interface(self, service_connection_config, table_entity: Table):

View File

@ -164,11 +164,10 @@ class partition_filter_handler:
def handle_and_execute(_self, *args, **kwargs):
"""Handle partitioned queries"""
if _self._partition_details:
partition_field = _self._partition_details["partition_field"]
partition_filter = build_partition_predicate(
_self._partition_details,
_self.table.__table__.c.get(
_self._partition_details["partition_field"]
),
_self.table.__table__.c.get(partition_field),
)
if self.build_sample:
return (

View File

@ -26,7 +26,7 @@ from metadata.config.common import WorkflowExecutionError
from metadata.config.workflow import get_sink
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.data.table import IntervalType, Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -40,11 +40,11 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testDefinition import TestDefinition
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type.basic import EntityLink
from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.processor import ProcessorStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.interfaces.sqa_interface import SQAInterface
from metadata.orm_profiler.api.models import TablePartitionConfig
from metadata.test_suite.api.models import TestCaseDefinition, TestSuiteProcessorConfig
from metadata.test_suite.runner.core import DataTestsRunner
from metadata.utils import entity_link
@ -178,6 +178,34 @@ class TestSuiteWorkflow:
return None
def _get_partition_details(self, entity: Table) -> Optional[TablePartitionConfig]:
"""Get partition details
Args:
entity: table entity
"""
if entity.tablePartition:
if entity.tablePartition.intervalType in {
IntervalType.TIME_UNIT,
IntervalType.INGESTION_TIME,
}:
try:
partition_field = entity.tablePartition.columns[0]
except Exception:
raise TypeError(
"Unsupported ingestion based partition type. Skipping table"
)
return TablePartitionConfig(
partitionField=partition_field,
)
raise TypeError(
f"Unsupported partition type {entity.tablePartition.intervalType}. Skipping table"
)
return None
def _create_sqa_tests_runner_interface(self, table_fqn: str):
"""create the interface to execute test against SQA sources"""
table_entity = self._get_table_entity_from_test_case(table_fqn)
@ -193,6 +221,9 @@ class TestSuiteWorkflow:
profile_query=self._get_profile_query(table_entity)
if not self._get_profile_sample(table_entity)
else None,
partition_config=self._get_partition_details(table_entity)
if not self._get_profile_query(table_entity)
else None,
)
def _create_data_tests_runner(self, sqa_interface):
@ -344,21 +375,29 @@ class TestSuiteWorkflow:
unique_table_fqns = self._get_unique_table_entities(test_cases)
for table_fqn in unique_table_fqns:
sqa_interface = self._create_sqa_tests_runner_interface(table_fqn)
for test_case in test_cases:
try:
data_test_runner = self._create_data_tests_runner(sqa_interface)
test_result = data_test_runner.run_and_handle(test_case)
if not test_result:
continue
if hasattr(self, "sink"):
self.sink.write_record(test_result)
logger.info(f"Successfuly ran test case {test_case.name.__root__}")
self.status.processed(test_case.fullyQualifiedName.__root__)
except Exception as exc:
logger.debug(traceback.format_exc(exc))
logger.warning(f"Could not run test case {test_case.name}: {exc}")
self.status.failure(test_case.fullyQualifiedName.__root__)
try:
sqa_interface = self._create_sqa_tests_runner_interface(table_fqn)
for test_case in test_cases:
try:
data_test_runner = self._create_data_tests_runner(sqa_interface)
test_result = data_test_runner.run_and_handle(test_case)
if not test_result:
continue
if hasattr(self, "sink"):
self.sink.write_record(test_result)
logger.info(
f"Successfuly ran test case {test_case.name.__root__}"
)
self.status.processed(test_case.fullyQualifiedName.__root__)
except Exception as exc:
logger.debug(traceback.format_exc(exc))
logger.warning(
f"Could not run test case {test_case.name}: {exc}"
)
except TypeError as exc:
logger.debug(traceback.format_exc(exc))
logger.warning(f"Could not run test case {test_case.name}: {exc}")
self.status.failure(test_case.fullyQualifiedName.__root__)
def print_status(self) -> int:
"""

View File

@ -67,7 +67,7 @@ def table_column_count_to_be_between(
(
int(param_value.value)
for param_value in test_case.parameterValues
if param_value.name == "minColvalue"
if param_value.name == "minColValue"
),
None,
)
@ -75,7 +75,7 @@ def table_column_count_to_be_between(
max_ = next(
int(param_value.value)
for param_value in test_case.parameterValues
if param_value.name == "maxColvalue"
if param_value.name == "maxColValue"
)
status = (

View File

@ -65,8 +65,8 @@ test_suite_config = {
"testDefinitionName": "TableColumnCountToBeBetween",
"entityLink": "<#E::table::test_suite_service_test.test_suite_database.test_suite_database_schema.users>",
"parameterValues": [
{"name": "minColvalue", "value": 1},
{"name": "maxColvalue", "value": 5},
{"name": "minColValue", "value": 1},
{"name": "maxColValue", "value": 5},
],
},
{

View File

@ -76,8 +76,8 @@ class TestSuiteWorkflowTests(unittest.TestCase):
"testDefinitionName": "TableColumnCountToBeBetween",
"entityLink": "<#E::table::my.fully.qualified.name>",
"parameterValues": [
{"name": "minColvalue", "value": 1},
{"name": "maxColvalue", "value": 5},
{"name": "minColValue", "value": 1},
{"name": "maxColValue", "value": 5},
],
}
],
@ -150,8 +150,8 @@ class TestSuiteWorkflowTests(unittest.TestCase):
"testDefinitionName": "TableColumnCountToBeBetween",
"entityLink": "<#E::table::my.fully.qualified.name>",
"parameterValues": [
{"name": "minColvalue", "value": 1},
{"name": "maxColvalue", "value": 5},
{"name": "minColValue", "value": 1},
{"name": "maxColValue", "value": 5},
],
},
{
@ -159,8 +159,8 @@ class TestSuiteWorkflowTests(unittest.TestCase):
"testDefinitionName": "TableColumnCountToBeBetween",
"entityLink": "<#E::table::my.fully.qualified.name>",
"parameterValues": [
{"name": "minColvalue", "value": 1},
{"name": "maxColvalue", "value": 5},
{"name": "minColValue", "value": 1},
{"name": "maxColValue", "value": 5},
],
},
],
@ -173,8 +173,8 @@ class TestSuiteWorkflowTests(unittest.TestCase):
"testDefinitionName": "TableColumnCountToBeBetween",
"entityLink": "<#E::table::my.fully.qualified.name>",
"parameterValues": [
{"name": "minColvalue", "value": 1},
{"name": "maxColvalue", "value": 5},
{"name": "minColValue", "value": 1},
{"name": "maxColValue", "value": 5},
],
},
],
@ -216,8 +216,8 @@ class TestSuiteWorkflowTests(unittest.TestCase):
"testDefinitionName": "TableColumnCountToBeBetween",
"entityLink": "<#E::table::sample_data.ecommerce_db.shopify.dim_address>",
"parameterValues": [
{"name": "minColvalue", "value": 1},
{"name": "maxColvalue", "value": 5},
{"name": "minColValue", "value": 1},
{"name": "maxColValue", "value": 5},
],
},
{
@ -290,7 +290,7 @@ class TestSuiteWorkflowTests(unittest.TestCase):
"entityLink": "<#E::table::sample_data.ecommerce_db.shopify.dim_address>",
"parameterValues": [
{"name": "minColValue", "value": 1},
{"name": "maxColvalue", "value": 10},
{"name": "maxColValue", "value": 10},
],
},
],

View File

@ -479,8 +479,8 @@ class testSuiteValidation(unittest.TestCase):
testSuite=EntityReference(id=uuid4(), type="TestSuite"),
testDefinition=EntityReference(id=uuid4(), type="TestDefinition"),
parameterValues=[
TestCaseParameterValue(name="minColvalue", value="2"),
TestCaseParameterValue(name="maxColvalue", value="10"),
TestCaseParameterValue(name="minColValue", value="2"),
TestCaseParameterValue(name="maxColValue", value="10"),
],
)