MINOR: ref(data-quality): modularized test case validator import (#18716)

* ref(data-quality): modularized test case validator import

- removed test_suite_factory
- implemented TestCaseImporter
- removed SQAValidatorBuilder and PandasValidatorBuilder in favor of a SourceType enum
- removed the orm table creation from test suite source

* format

* IValidatorBuilder -> ValidatorBuilder

* use the table from the sampler in the test suite interface

* linting

* fixed the profiler with similar solution

* removed unused inheritance

* removed unneeded super().__init__()

* removed all instances of orm_table

* fixed tests

* add reportExplicitAny=false

* fixed tests
This commit is contained in:
Imri Paran 2024-11-27 16:25:12 +01:00 committed by GitHub
parent fdd8c5878c
commit cd74d8f55a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 125 additions and 159 deletions

View File

@ -1,29 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Builder defining the structure of builders for validators for Pandas sources
"""
from typing import TYPE_CHECKING
from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder
if TYPE_CHECKING:
pass
class PandasValidatorBuilder(IValidatorBuilder):
"""Builder for Pandas validators"""
def _get_source_type(self) -> str:
"""Return the test case"""
return "pandas"

View File

@ -1,25 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Builder defining the structure of builders for validators for SQA sources
"""
from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder
class SQAValidatorBuilder(IValidatorBuilder):
"""Builder for SQA validators"""
def _get_source_type(self) -> str:
"""Return the test case"""
return "sqlalchemy"

View File

@ -14,8 +14,8 @@ Builder interface defining the structure of builders for validators.
Validators are test classes (e.g. columnValuesToBeBetween, etc.) Validators are test classes (e.g. columnValuesToBeBetween, etc.)
""" """
from abc import ABC, abstractmethod
from datetime import datetime, timezone from datetime import datetime, timezone
from enum import Enum
from typing import TYPE_CHECKING, Set, Type, Union from typing import TYPE_CHECKING, Set, Type, Union
from metadata.data_quality.validations.base_test_handler import BaseTestValidator from metadata.data_quality.validations.base_test_handler import BaseTestValidator
@ -25,15 +25,57 @@ from metadata.data_quality.validations.runtime_param_setter.param_setter import
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.type.basic import Timestamp from metadata.generated.schema.type.basic import Timestamp
from metadata.profiler.processor.runner import QueryRunner from metadata.profiler.processor.runner import QueryRunner
from metadata.utils import importer from metadata.utils.importer import import_test_case_class
if TYPE_CHECKING: if TYPE_CHECKING:
from pandas import DataFrame from pandas import DataFrame
class IValidatorBuilder(ABC): class TestCaseImporter:
def import_test_case_validator(
self,
test_type: str,
runner_type: str,
test_definition: str,
) -> Type[BaseTestValidator]:
return import_test_case_class(test_type, runner_type, test_definition)
class SourceType(Enum):
PANDAS = "pandas"
SQL = "sqlalchemy"
class ValidatorBuilder(TestCaseImporter):
"""Interface for validator builders""" """Interface for validator builders"""
def __init__(
self,
runner: Union[QueryRunner, "DataFrame"],
test_case: TestCase,
source_type: SourceType,
entity_type: str,
) -> None:
"""Builder object for SQA validators. This builder is used to create a validator object
Args:
runner (QueryRunner): The runner object
test_case (TestCase): The test case object
source_type (SourceType): The source type
entity_type (str): one of COLUMN or TABLE -- fetched from the test definition
"""
super().__init__()
self._test_case = test_case
self.runner = runner
self.validator_cls: Type[
BaseTestValidator
] = super().import_test_case_validator(
entity_type,
source_type.value,
self.test_case.testDefinition.fullyQualifiedName,
)
self.reset()
@property @property
def test_case(self): def test_case(self):
"""Return the test case object""" """Return the test case object"""
@ -44,29 +86,6 @@ class IValidatorBuilder(ABC):
"""Return the validator object""" """Return the validator object"""
return self._validator return self._validator
def __init__(
self,
runner: Union[QueryRunner, "DataFrame"],
test_case: TestCase,
entity_type: str,
) -> None:
"""Builder object for SQA validators. This builder is used to create a validator object
Args:
runner (QueryRunner): The runner object
test_case (TestCase): The test case object
entity_type (str): one of COLUMN or TABLE -- fetched from the test definition
"""
self._test_case = test_case
self.runner = runner
# TODO this will be removed on https://github.com/open-metadata/OpenMetadata/pull/18716
self.validator_cls: Type[BaseTestValidator] = importer.import_test_case_class(
entity_type,
self._get_source_type(),
self.test_case.testDefinition.fullyQualifiedName, # type: ignore
)
self.reset()
def set_runtime_params(self, runtime_params_setters: Set[RuntimeParameterSetter]): def set_runtime_params(self, runtime_params_setters: Set[RuntimeParameterSetter]):
"""Set the runtime parameters for the validator object """Set the runtime parameters for the validator object
@ -93,8 +112,3 @@ class IValidatorBuilder(ABC):
int(datetime.now(tz=timezone.utc).timestamp() * 1000) int(datetime.now(tz=timezone.utc).timestamp() * 1000)
), ),
) )
@abstractmethod
def _get_source_type(self):
"""Get the source type"""
raise NotImplementedError

View File

@ -13,9 +13,10 @@
Interfaces with database for all database engine Interfaces with database for all database engine
supporting sqlalchemy abstraction layer supporting sqlalchemy abstraction layer
""" """
from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder
from metadata.data_quality.builders.pandas_validator_builder import ( from metadata.data_quality.builders.validator_builder import (
PandasValidatorBuilder, SourceType,
ValidatorBuilder,
) )
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
@ -44,13 +45,14 @@ class PandasTestSuiteInterface(TestSuiteInterface, PandasInterfaceMixin):
ometa_client: OpenMetadata, ometa_client: OpenMetadata,
sampler: SamplerInterface, sampler: SamplerInterface,
table_entity: Table, table_entity: Table,
**__, **kwargs,
): ):
super().__init__( super().__init__(
service_connection_config, service_connection_config,
ometa_client, ometa_client,
sampler, sampler,
table_entity, table_entity,
**kwargs,
) )
( (
@ -63,5 +65,10 @@ class PandasTestSuiteInterface(TestSuiteInterface, PandasInterfaceMixin):
def _get_validator_builder( def _get_validator_builder(
self, test_case: TestCase, entity_type: str self, test_case: TestCase, entity_type: str
) -> IValidatorBuilder: ) -> ValidatorBuilder:
return PandasValidatorBuilder(self.dataset, test_case, entity_type) return self.validator_builder_class(
runner=self.dataset,
test_case=test_case,
entity_type=entity_type,
source_type=SourceType.PANDAS,
)

View File

@ -19,8 +19,10 @@ from typing import Union
from sqlalchemy.orm import DeclarativeMeta from sqlalchemy.orm import DeclarativeMeta
from sqlalchemy.orm.util import AliasedClass from sqlalchemy.orm.util import AliasedClass
from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder from metadata.data_quality.builders.validator_builder import (
from metadata.data_quality.builders.sqa_validator_builder import SQAValidatorBuilder SourceType,
ValidatorBuilder,
)
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
@ -51,14 +53,14 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteInterface):
ometa_client: OpenMetadata, ometa_client: OpenMetadata,
sampler: SamplerInterface, sampler: SamplerInterface,
table_entity: Table = None, table_entity: Table = None,
**kwargs,
): ):
super().__init__( super().__init__(
service_connection_config, service_connection_config, ometa_client, sampler, table_entity, **kwargs
ometa_client,
sampler,
table_entity,
) )
self.source_type = SourceType.SQL
self.create_session() self.create_session()
( (
self.table_sample_query, self.table_sample_query,
self.table_sample_config, self.table_sample_config,
@ -109,5 +111,10 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteInterface):
def _get_validator_builder( def _get_validator_builder(
self, test_case: TestCase, entity_type: str self, test_case: TestCase, entity_type: str
) -> IValidatorBuilder: ) -> ValidatorBuilder:
return SQAValidatorBuilder(self.runner, test_case, entity_type) return self.validator_builder_class(
runner=self.runner,
test_case=test_case,
entity_type=entity_type,
source_type=self.source_type,
)

View File

@ -17,7 +17,7 @@ supporting sqlalchemy abstraction layer
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Optional, Set, Type from typing import Optional, Set, Type
from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder from metadata.data_quality.builders.validator_builder import ValidatorBuilder
from metadata.data_quality.validations.base_test_handler import BaseTestValidator from metadata.data_quality.validations.base_test_handler import BaseTestValidator
from metadata.data_quality.validations.runtime_param_setter.param_setter import ( from metadata.data_quality.validations.runtime_param_setter.param_setter import (
RuntimeParameterSetter, RuntimeParameterSetter,
@ -48,12 +48,14 @@ class TestSuiteInterface(ABC):
ometa_client: OpenMetadata, ometa_client: OpenMetadata,
sampler: SamplerInterface, sampler: SamplerInterface,
table_entity: Table, table_entity: Table,
validator_builder: Type[ValidatorBuilder],
): ):
"""Required attribute for the interface""" """Required attribute for the interface"""
self.ometa_client = ometa_client self.ometa_client = ometa_client
self.service_connection_config = service_connection_config self.service_connection_config = service_connection_config
self.table_entity = table_entity self.table_entity = table_entity
self.sampler = sampler self.sampler = sampler
self.validator_builder_class = validator_builder
@classmethod @classmethod
def create( def create(
@ -77,7 +79,7 @@ class TestSuiteInterface(ABC):
@abstractmethod @abstractmethod
def _get_validator_builder( def _get_validator_builder(
self, test_case: TestCase, entity_type: str self, test_case: TestCase, entity_type: str
) -> IValidatorBuilder: ) -> ValidatorBuilder:
"""get the builder class for the validator. Define this in the implementation class """get the builder class for the validator. Define this in the implementation class
Args: Args:
@ -85,7 +87,7 @@ class TestSuiteInterface(ABC):
entity_type (str): type of the entity entity_type (str): type of the entity
Returns: Returns:
IValidatorBuilder: a validator builder ValidatorBuilder: a validator builder
""" """
raise NotImplementedError raise NotImplementedError

View File

@ -23,10 +23,8 @@ from metadata.data_quality.api.models import (
TestCaseResults, TestCaseResults,
TestSuiteProcessorConfig, TestSuiteProcessorConfig,
) )
from metadata.data_quality.runner.base_test_suite_source import BaseTestSuiteRunner
from metadata.data_quality.runner.core import DataTestsRunner from metadata.data_quality.runner.core import DataTestsRunner
from metadata.data_quality.runner.test_suite_source_factory import (
test_suite_source_factory,
)
from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.ingestionPipelines.status import ( from metadata.generated.schema.entity.services.ingestionPipelines.status import (
@ -95,12 +93,7 @@ class TestCaseRunner(Processor):
record.table, openmetadata_test_cases record.table, openmetadata_test_cases
) )
test_suite_runner = test_suite_source_factory.create( test_suite_runner = self.get_test_suite_runner(record.table)
record.service_type.lower(),
self.config,
self.metadata,
record.table,
).get_data_quality_runner()
logger.debug( logger.debug(
f"Found {len(openmetadata_test_cases)} test cases for table {record.table.fullyQualifiedName.root}" f"Found {len(openmetadata_test_cases)} test cases for table {record.table.fullyQualifiedName.root}"
@ -357,3 +350,10 @@ class TestCaseRunner(Processor):
else: else:
result.append(tc) result.append(tc)
return result return result
def get_test_suite_runner(self, table: Table):
return BaseTestSuiteRunner(
self.config,
self.metadata,
table,
).get_data_quality_runner()

View File

@ -15,6 +15,7 @@ Base source for the data quality used to instantiate a data quality runner with
from copy import deepcopy from copy import deepcopy
from typing import Optional, cast from typing import Optional, cast
from metadata.data_quality.builders.validator_builder import ValidatorBuilder
from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface
from metadata.data_quality.runner.core import DataTestsRunner from metadata.data_quality.runner.core import DataTestsRunner
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
@ -46,6 +47,7 @@ class BaseTestSuiteRunner:
ometa_client: OpenMetadata, ometa_client: OpenMetadata,
entity: Table, entity: Table,
): ):
self.validator_builder_class = ValidatorBuilder
self._interface = None self._interface = None
self._interface_type: str = config.source.type.lower() self._interface_type: str = config.source.type.lower()
self.entity = entity self.entity = entity
@ -125,10 +127,11 @@ class BaseTestSuiteRunner:
) )
self.interface: TestSuiteInterface = test_suite_class.create( self.interface: TestSuiteInterface = test_suite_class.create(
self.service_conn_config, service_connection_config=self.service_conn_config,
self.ometa_client, ometa_client=self.ometa_client,
sampler_interface, sampler=sampler_interface,
self.entity, table_entity=self.entity,
validator_builder=self.validator_builder_class,
) )
return self.interface return self.interface

View File

@ -1,38 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Factory class for creating test suite source objects
"""
from metadata.data_quality.runner.base_test_suite_source import BaseTestSuiteRunner
class TestSuiteRunnerFactory:
"""Creational factory for test suite source objects"""
def __init__(self):
self._source_type = {"base": BaseTestSuiteRunner}
def register_source(self, source_type: str, source_class):
"""Register a new source type"""
self._source_type[source_type] = source_class
def create(self, source_type: str, *args, **kwargs) -> BaseTestSuiteRunner:
"""Create source object based on source type"""
source_class = self._source_type.get(source_type)
if not source_class:
source_class = self._source_type["base"]
return source_class(*args, **kwargs)
return source_class(*args, **kwargs)
test_suite_source_factory = TestSuiteRunnerFactory()

View File

@ -30,7 +30,7 @@ from metadata.generated.schema.entity.services.connections.database.snowflakeCon
from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import ( from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import (
UnityCatalogConnection, UnityCatalogConnection,
) )
from metadata.generated.schema.tests.basic import BaseModel from metadata.ingestion.models.custom_pydantic import BaseModel
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.snowflake.queries import ( from metadata.ingestion.source.database.snowflake.queries import (

View File

@ -109,6 +109,12 @@ def ometa_to_sqa_orm(
We are building the class dynamically using We are building the class dynamically using
`type` and passing SQLAlchemy `Base` class `type` and passing SQLAlchemy `Base` class
as the bases tuple for inheritance. as the bases tuple for inheritance.
Args:
table (Table): OpenMetadata Table instance
metadata (OpenMetadata): OpenMetadata connection
sqa_metadata_obj (MetaData): For advanced use cases, you can pass a custom MetaData object. For most cases, this
can be left as None so that the global_metadata object is used.
""" """
_metadata = sqa_metadata_obj or Base.metadata _metadata = sqa_metadata_obj or Base.metadata
table.serviceType = cast( table.serviceType = cast(
@ -156,7 +162,6 @@ def ometa_to_sqa_orm(
if not isinstance(orm, DeclarativeMeta): if not isinstance(orm, DeclarativeMeta):
raise ValueError("OMeta to ORM did not create a DeclarativeMeta") raise ValueError("OMeta to ORM did not create a DeclarativeMeta")
return orm return orm

View File

@ -51,7 +51,10 @@ logger = sampler_logger()
class SamplerInterface(ABC): class SamplerInterface(ABC):
"""Sampler interface""" """Sampler interface
This should be the entrypoint for computing any metrics that are required downstream for
data quality, profiling, etc.
"""
# pylint: disable=too-many-instance-attributes, too-many-arguments # pylint: disable=too-many-instance-attributes, too-many-arguments
def __init__( def __init__(

View File

@ -60,6 +60,9 @@ class SQASampler(SamplerInterface, SQAInterfaceMixin):
""" """
Generates a sample of the data to not Generates a sample of the data to not
run the query in the whole table. run the query in the whole table.
Args:
orm_table (Optional[DeclarativeMeta]): ORM Table
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):

View File

@ -301,13 +301,23 @@ def run_data_quality_workflow(
ids=lambda *x: x[0], ids=lambda *x: x[0],
) )
def test_data_quality( def test_data_quality(
run_data_quality_workflow, metadata: OpenMetadata, test_case_name, expected_status run_data_quality_workflow,
metadata: OpenMetadata,
test_case_name,
expected_status,
db_service,
): ):
test_cases: List[TestCase] = metadata.list_entities( test_cases: List[TestCase] = metadata.list_entities(
TestCase, fields=["*"], skip_on_failure=True TestCase, fields=["*"], skip_on_failure=True
).entities ).entities
test_case: TestCase = next( test_case: TestCase = next(
(t for t in test_cases if t.name.root == test_case_name), None (
t
for t in test_cases
if t.name.root == test_case_name
and "dvdrental.public.customer" in t.entityFQN
),
None,
) )
assert test_case is not None assert test_case is not None
assert_equal_pydantic_objects( assert_equal_pydantic_objects(

View File

@ -84,7 +84,9 @@ class RunnerTest(TestCase):
return_value=Mock(), return_value=Mock(),
), ),
): ):
sampler = SQASampler( sampler = SQASampler.__new__(SQASampler)
sampler.build_table_orm = lambda *args, **kwargs: User
sampler.__init__(
service_connection_config=Mock(), service_connection_config=Mock(),
ometa_client=None, ometa_client=None,
entity=None, entity=None,

View File

@ -22,6 +22,7 @@ import pytest
import sqlalchemy as sqa import sqlalchemy as sqa
from sqlalchemy.orm import declarative_base from sqlalchemy.orm import declarative_base
from metadata.data_quality.builders.validator_builder import ValidatorBuilder
from metadata.data_quality.interface.sqlalchemy.sqa_test_suite_interface import ( from metadata.data_quality.interface.sqlalchemy.sqa_test_suite_interface import (
SQATestSuiteInterface, SQATestSuiteInterface,
) )
@ -101,6 +102,7 @@ def create_sqlite_table():
None, None,
sampler, sampler,
TABLE, TABLE,
validator_builder=ValidatorBuilder,
) )
runner = sqa_profiler_interface.runner runner = sqa_profiler_interface.runner