MINOR - enable dynamic assertion dl (#17008)

* fix: refactor runtime param setter + add dynamic assertion support for datalake

* chore: add missing test dependencies

* fix: centralize objecxt constructor in interface

* fix: remove abstract decorator in interface
This commit is contained in:
Teddy 2024-07-16 11:01:43 +02:00 committed by GitHub
parent 87fea77009
commit 3bcfdfe014
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 322 additions and 108 deletions

View File

@ -364,6 +364,7 @@ test = {
VERSIONS["giturlparse"],
VERSIONS["avro"], # Sample Data
VERSIONS["grpc-tools"],
VERSIONS["neo4j"],
"testcontainers==3.7.1;python_version<'3.9'",
"testcontainers==4.4.0;python_version>='3.9'",
"minio==7.2.5",
@ -375,6 +376,12 @@ test = {
"requests==2.31.0",
f"{DATA_DIFF['mysql']}==0.11.2",
*plugins["deltalake"],
*plugins["datalake-gcs"],
*plugins["pgspider"],
*plugins["clickhouse"],
*plugins["mssql"],
*plugins["dagster"],
*plugins["oracle"],
}
e2e_test = {

View File

@ -0,0 +1,100 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Builder interface defining the structure of builders for validators.
Validators are test classes (e.g. columnValuesToBeBetween, etc.)
"""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import TYPE_CHECKING, Optional, Type, Union
from metadata.data_quality.validations.base_test_handler import BaseTestValidator
from metadata.data_quality.validations.runtime_param_setter.param_setter import (
RuntimeParameterSetter,
)
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.importer import import_test_case_class
if TYPE_CHECKING:
from pandas import DataFrame
class IValidatorBuilder(ABC):
"""Interface for validator builders"""
@property
def test_case(self):
"""Return the test case object"""
return self._test_case
@property
def validator(self):
"""Return the validator object"""
return self._validator
def __init__(
self,
runner: Union[QueryRunner, "DataFrame"],
test_case: TestCase,
entity_type: str,
) -> None:
"""Builder object for SQA validators. This builder is used to create a validator object
Args:
runner (QueryRunner): The runner object
test_case (TestCase): The test case object
entity_type (str): one of COLUMN or TABLE -- fetched from the test definition
"""
self._test_case = test_case
self.runner = runner
self.validator_cls: Type[BaseTestValidator] = import_test_case_class(
entity_type,
self._get_source_type(),
self.test_case.testDefinition.fullyQualifiedName, # type: ignore
)
self.reset()
def set_runtime_params(
self, runtime_params_setter: Optional[RuntimeParameterSetter]
):
"""Set the runtime parameters for the validator object
# TODO: We should support setting n runtime parameters
Args:
runtime_params_setter (Optional[RuntimeParameterSetter]): The runtime parameter setter
"""
if runtime_params_setter:
params = runtime_params_setter.get_parameters(self.test_case)
if not self.test_case.parameterValues:
# If there are no parameters, create a new list
self.test_case.parameterValues = []
self.test_case.parameterValues.append(
TestCaseParameterValue(
name="runtimeParams", value=params.model_dump_json()
)
)
def reset(self):
"""Reset the builder"""
self._validator = self.validator_cls(
self.runner,
test_case=self.test_case,
execution_date=int(datetime.now().timestamp() * 1000),
)
@abstractmethod
def _get_source_type(self):
"""Get the source type"""
raise NotImplementedError

View File

@ -0,0 +1,29 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Builder defining the structure of builders for validators for Pandas sources
"""
from typing import TYPE_CHECKING
from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder
if TYPE_CHECKING:
pass
class PandasValidatorBuilder(IValidatorBuilder):
"""Builder for Pandas validators"""
def _get_source_type(self) -> str:
"""Return the test case"""
return "pandas"

View File

@ -0,0 +1,25 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Builder defining the structure of builders for validators for SQA sources
"""
from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder
class SQAValidatorBuilder(IValidatorBuilder):
"""Builder for SQA validators"""
def _get_source_type(self) -> str:
"""Return the test case"""
return "sqlalchemy"

View File

@ -13,22 +13,20 @@
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from datetime import datetime
from typing import Optional, Type
from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder
from metadata.data_quality.builders.pandas_validator_builder import (
PandasValidatorBuilder,
)
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.data_quality.validations.base_test_handler import BaseTestValidator
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testDefinition import TestDefinition
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin
from metadata.utils.importer import import_test_case_class
from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
@ -69,40 +67,7 @@ class PandasTestSuiteInterface(TestSuiteInterface, PandasInterfaceMixin):
if self.dfs and self.table_partition_config:
self.dfs = self.get_partitioned_df(self.dfs)
def run_test_case(
self,
test_case: TestCase,
) -> Optional[TestCaseResult]:
"""Run table tests where platformsTest=OpenMetadata
Args:
test_case: test case object to execute
Returns:
TestCaseResult object
"""
try:
TestHandler: Type[ # pylint: disable=invalid-name
BaseTestValidator
] = import_test_case_class(
self.ometa_client.get_by_id(
TestDefinition, test_case.testDefinition.id
).entityType.value,
"pandas",
test_case.testDefinition.fullyQualifiedName,
)
test_handler = TestHandler(
self.dfs,
test_case=test_case,
execution_date=int(datetime.now().timestamp() * 1000),
)
return test_handler.run_validation()
except Exception as err:
logger.error(
f"Error executing {test_case.testDefinition.fullyQualifiedName} - {err}"
)
raise RuntimeError(err)
def _get_validator_builder(
self, test_case: TestCase, entity_type: str
) -> IValidatorBuilder:
return PandasValidatorBuilder(self.dfs, test_case, entity_type)

View File

@ -14,25 +14,23 @@ Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from datetime import datetime
from typing import Optional, Union
from typing import Union
from sqlalchemy.orm import DeclarativeMeta
from sqlalchemy.orm.util import AliasedClass
from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder
from metadata.data_quality.builders.sqa_validator_builder import SQAValidatorBuilder
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.tests.testDefinition import TestDefinition
from metadata.generated.schema.tests.testCase import TestCase
from metadata.ingestion.connections.session import create_and_bind_session
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.mixins.sqalchemy.sqa_mixin import SQAInterfaceMixin
from metadata.profiler.processor.runner import QueryRunner
from metadata.profiler.processor.sampler.sqlalchemy.sampler import SQASampler
from metadata.utils.constants import TEN_MIN
from metadata.utils.importer import import_test_case_class
from metadata.utils.logger import test_suite_logger
from metadata.utils.ssl_manager import get_ssl_connection
from metadata.utils.timeout import cls_timeout
@ -143,50 +141,7 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteInterface):
)
)
def run_test_case(
self,
test_case: TestCase,
) -> Optional[TestCaseResult]:
"""Run table tests where platformsTest=OpenMetadata
Args:
test_case: test case object to execute
Returns:
TestCaseResult object
"""
try:
TestHandler = import_test_case_class( # pylint: disable=invalid-name
self.ometa_client.get_by_id(
TestDefinition, test_case.testDefinition.id
).entityType.value,
"sqlalchemy",
test_case.testDefinition.fullyQualifiedName,
)
if TestHandler.runtime_parameter_setter:
setter = TestHandler.runtime_parameter_setter(
self.ometa_client,
self.service_connection_config,
self.table_entity,
self.sampler,
)
runtime_params = setter.get_parameters(test_case)
test_case.parameterValues.append(
TestCaseParameterValue(
name="runtimeParams", value=runtime_params.model_dump_json()
)
)
test_handler = TestHandler(
self.runner,
test_case=test_case,
execution_date=int(datetime.now().timestamp() * 1000),
)
return test_handler.run_validation()
except Exception as err:
logger.error(
f"Error executing {test_case.testDefinition.fullyQualifiedName} - {err}"
)
raise RuntimeError(err)
def _get_validator_builder(
self, test_case: TestCase, entity_type: str
) -> IValidatorBuilder:
return SQAValidatorBuilder(self.runner, test_case, entity_type)

View File

@ -15,20 +15,34 @@ supporting sqlalchemy abstraction layer
"""
from abc import ABC, abstractmethod
from typing import Optional
from typing import Optional, Type
from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder
from metadata.data_quality.validations.base_test_handler import BaseTestValidator
from metadata.data_quality.validations.runtime_param_setter.param_setter import (
RuntimeParameterSetter,
)
from metadata.data_quality.validations.runtime_param_setter.param_setter_factory import (
RuntimeParameterSetterFactory,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testDefinition import TestDefinition
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.models import ProfileSampleConfig
from metadata.utils.logger import test_suite_logger
from metadata.utils.partition import get_partition_details
logger = test_suite_logger()
class TestSuiteInterface(ABC):
"""Abstract interface for the processor"""
runtime_params_setter_fact = RuntimeParameterSetterFactory
@abstractmethod
def __init__(
self,
@ -41,10 +55,78 @@ class TestSuiteInterface(ABC):
self.service_connection_config = service_connection_config
self.table_entity = table_entity
@property
def sampler(self):
"""Get the sampler object
Note: Overriden in the implementation class. This should be removed from the interface. It has been
implemented as the RuntimeParameterSetter takes the sampler as an argument, though we may want to
remove that dependency.
"""
return None
@abstractmethod
def _get_validator_builder(
self, test_case: TestCase, entity_type: str
) -> IValidatorBuilder:
"""get the builder class for the validator. Define this in the implementation class
Args:
test_case (TestCase): test case object
entity_type (str): type of the entity
Returns:
IValidatorBuilder: a validator builder
"""
raise NotImplementedError
@classmethod
def _get_runtime_params_setter_fact(cls) -> RuntimeParameterSetterFactory:
"""Get the runtime parameter setter factory."""
return cls.runtime_params_setter_fact()
@classmethod
def _set_runtime_params_setter_fact(
cls, class_fact: Type[RuntimeParameterSetterFactory]
):
"""Set the runtime parameter setter factory.
Use this method to set the runtime parameter setter factory and override the default.
Args:
class_fact (Type[RuntimeParameterSetterFactory]): the runtime parameter setter factory class
"""
cls.runtime_params_setter_fact = class_fact
def run_test_case(self, test_case: TestCase) -> Optional[TestCaseResult]:
"""run column data quality tests"""
raise NotImplementedError
runtime_params_setter_fact: RuntimeParameterSetterFactory = (
self._get_runtime_params_setter_fact()
) # type: ignore
runtime_params_setter: Optional[
RuntimeParameterSetter
] = runtime_params_setter_fact.get_runtime_param_setter(
test_case.testDefinition.fullyQualifiedName, # type: ignore
self.ometa_client,
self.service_connection_config,
self.table_entity,
self.sampler,
)
# get `column` or `table` type for validator import
entity_type: str = self.ometa_client.get_by_id(
TestDefinition, test_case.testDefinition.id
).entityType.value
validator_builder = self._get_validator_builder(test_case, entity_type)
validator_builder.set_runtime_params(runtime_params_setter)
validator: BaseTestValidator = validator_builder.validator
try:
return validator.run_validation()
except Exception as err:
logger.error(
f"Error executing {test_case.testDefinition.fullyQualifiedName} - {err}"
)
raise RuntimeError(err)
def _get_sample_query(self) -> Optional[str]:
"""Get the sampling query for the data quality tests

View File

@ -18,7 +18,7 @@ from __future__ import annotations
import reprlib
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Callable, List, Optional, Type, TypeVar, Union
from typing import TYPE_CHECKING, Callable, List, Optional, Type, TypeVar, Union
from metadata.data_quality.validations.runtime_param_setter.param_setter import (
RuntimeParameterSetter,
@ -31,6 +31,9 @@ from metadata.generated.schema.tests.basic import (
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.profiler.processor.runner import QueryRunner
if TYPE_CHECKING:
from pandas import DataFrame
T = TypeVar("T", bound=Callable)
R = TypeVar("R")
@ -45,7 +48,7 @@ class BaseTestValidator(ABC):
def __init__(
self,
runner: QueryRunner,
runner: Union[QueryRunner, List["DataFrame"]],
test_case: TestCase,
execution_date: Union[datetime, float],
) -> None:

View File

@ -0,0 +1,54 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module that defines the RuntimeParameterFactory class.
This class is responsible for creating instances of the RuntimeParameterSetter
based on the test case.
"""
from typing import Optional
from metadata.data_quality.validations.runtime_param_setter.param_setter import (
RuntimeParameterSetter,
)
from metadata.data_quality.validations.runtime_param_setter.table_diff_params_setter import (
TableDiffParamsSetter,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
class RuntimeParameterSetterFactory:
"""runtime parameter setter factory class"""
def __init__(self) -> None:
"""Set"""
self._setter_map = {
TableDiffParamsSetter: {"tableDiff"},
}
def get_runtime_param_setter(
self,
name: str,
ometa: OpenMetadata,
service_connection_config,
table_entity,
sampler,
) -> Optional[RuntimeParameterSetter]:
"""Get the runtime parameter setter"""
for setter_cls, validator_names in self._setter_map.items():
if name in validator_names:
return setter_cls(
ometa,
service_connection_config,
table_entity,
sampler,
)
return None

View File

@ -26,9 +26,6 @@ from metadata.data_quality.validations.mixins.sqa_validator_mixin import (
SQAValidatorMixin,
)
from metadata.data_quality.validations.models import TableDiffRuntimeParameters
from metadata.data_quality.validations.runtime_param_setter.table_diff_params_setter import (
TableDiffParamsSetter,
)
from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.entity.services.connections.database.sapHanaConnection import (
SapHanaScheme,
@ -67,13 +64,10 @@ class TableDiffValidator(BaseTestValidator, SQAValidatorMixin):
Compare two tables and fail if the number of differences exceeds a threshold
"""
runtime_parameter_setter = TableDiffParamsSetter
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.runtime_params: TableDiffRuntimeParameters = self.get_runtime_params()
runtime_params: TableDiffRuntimeParameters
def run_validation(self) -> TestCaseResult:
self.runtime_params = self.get_runtime_params()
try:
self._validate_dialects()
return self._run()