From cd74d8f55a2e57065fa5aab5cd91365ec100829d Mon Sep 17 00:00:00 2001 From: Imri Paran Date: Wed, 27 Nov 2024 16:25:12 +0100 Subject: [PATCH] MINOR: ref(data-quality): modularized test case validator import (#18716) * ref(data-quality): modularized test case validator import - removed test_suite_factory - implemented TestCaseImporter - removed SQAValidatorBuilder and PandasValidatorBuilder in favor of a SourceType enum - removed the orm table creation from test suite source * format * IValidatorBuilder -> ValidatorBuilder * use the table from the sampler in the test suite interface * linting * fixed the profiler with similar solution * removed unused inheritance * removed unneeded super().__init__() * removed all instances of orm_table * fixed tests * add reportExplicitAny=false * fixed tests --- .../builders/pandas_validator_builder.py | 29 ------- .../builders/sqa_validator_builder.py | 25 ------ ...idator_builder.py => validator_builder.py} | 76 +++++++++++-------- .../pandas/pandas_test_suite_interface.py | 19 +++-- .../sqlalchemy/sqa_test_suite_interface.py | 23 ++++-- .../interface/test_suite_interface.py | 8 +- .../processor/test_case_runner.py | 18 ++--- .../runner/base_test_suite_source.py | 11 ++- .../runner/test_suite_source_factory.py | 38 ---------- .../metadata/mixins/sqalchemy/sqa_mixin.py | 2 +- .../metadata/profiler/orm/converter/base.py | 7 +- .../src/metadata/sampler/sampler_interface.py | 5 +- .../metadata/sampler/sqlalchemy/sampler.py | 3 + .../integration/postgres/test_data_quality.py | 14 +++- .../snowflake/test_sampling_method.py | 0 .../unit/profiler/sqlalchemy/test_runner.py | 4 +- ingestion/tests/unit/test_suite/conftest.py | 2 + 17 files changed, 125 insertions(+), 159 deletions(-) delete mode 100644 ingestion/src/metadata/data_quality/builders/pandas_validator_builder.py delete mode 100644 ingestion/src/metadata/data_quality/builders/sqa_validator_builder.py rename ingestion/src/metadata/data_quality/builders/{i_validator_builder.py => validator_builder.py} (80%) delete mode 100644 ingestion/src/metadata/data_quality/runner/test_suite_source_factory.py create mode 100644 ingestion/tests/unit/profiler/sqlalchemy/snowflake/test_sampling_method.py diff --git a/ingestion/src/metadata/data_quality/builders/pandas_validator_builder.py b/ingestion/src/metadata/data_quality/builders/pandas_validator_builder.py deleted file mode 100644 index 299ec894b36..00000000000 --- a/ingestion/src/metadata/data_quality/builders/pandas_validator_builder.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Builder defining the structure of builders for validators for Pandas sources -""" - -from typing import TYPE_CHECKING - -from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder - -if TYPE_CHECKING: - pass - - -class PandasValidatorBuilder(IValidatorBuilder): - """Builder for Pandas validators""" - - def _get_source_type(self) -> str: - """Return the test case""" - return "pandas" diff --git a/ingestion/src/metadata/data_quality/builders/sqa_validator_builder.py b/ingestion/src/metadata/data_quality/builders/sqa_validator_builder.py deleted file mode 100644 index bf629517ee5..00000000000 --- a/ingestion/src/metadata/data_quality/builders/sqa_validator_builder.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Builder defining the structure of builders for validators for SQA sources -""" - - -from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder - - -class SQAValidatorBuilder(IValidatorBuilder): - """Builder for SQA validators""" - - def _get_source_type(self) -> str: - """Return the test case""" - return "sqlalchemy" diff --git a/ingestion/src/metadata/data_quality/builders/i_validator_builder.py b/ingestion/src/metadata/data_quality/builders/validator_builder.py similarity index 80% rename from ingestion/src/metadata/data_quality/builders/i_validator_builder.py rename to ingestion/src/metadata/data_quality/builders/validator_builder.py index 66cececad6f..0ac25740c83 100644 --- a/ingestion/src/metadata/data_quality/builders/i_validator_builder.py +++ b/ingestion/src/metadata/data_quality/builders/validator_builder.py @@ -14,8 +14,8 @@ Builder interface defining the structure of builders for validators. Validators are test classes (e.g. columnValuesToBeBetween, etc.) """ -from abc import ABC, abstractmethod from datetime import datetime, timezone +from enum import Enum from typing import TYPE_CHECKING, Set, Type, Union from metadata.data_quality.validations.base_test_handler import BaseTestValidator @@ -25,15 +25,57 @@ from metadata.data_quality.validations.runtime_param_setter.param_setter import from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue from metadata.generated.schema.type.basic import Timestamp from metadata.profiler.processor.runner import QueryRunner -from metadata.utils import importer +from metadata.utils.importer import import_test_case_class if TYPE_CHECKING: from pandas import DataFrame -class IValidatorBuilder(ABC): +class TestCaseImporter: + def import_test_case_validator( + self, + test_type: str, + runner_type: str, + test_definition: str, + ) -> Type[BaseTestValidator]: + return import_test_case_class(test_type, runner_type, test_definition) + + +class SourceType(Enum): + PANDAS = "pandas" + SQL = "sqlalchemy" + + +class ValidatorBuilder(TestCaseImporter): """Interface for validator builders""" + def __init__( + self, + runner: Union[QueryRunner, "DataFrame"], + test_case: TestCase, + source_type: SourceType, + entity_type: str, + ) -> None: + """Builder object for SQA validators. This builder is used to create a validator object + + Args: + runner (QueryRunner): The runner object + test_case (TestCase): The test case object + source_type (SourceType): The source type + entity_type (str): one of COLUMN or TABLE -- fetched from the test definition + """ + super().__init__() + self._test_case = test_case + self.runner = runner + self.validator_cls: Type[ + BaseTestValidator + ] = super().import_test_case_validator( + entity_type, + source_type.value, + self.test_case.testDefinition.fullyQualifiedName, + ) + self.reset() + @property def test_case(self): """Return the test case object""" @@ -44,29 +86,6 @@ class IValidatorBuilder(ABC): """Return the validator object""" return self._validator - def __init__( - self, - runner: Union[QueryRunner, "DataFrame"], - test_case: TestCase, - entity_type: str, - ) -> None: - """Builder object for SQA validators. This builder is used to create a validator object - - Args: - runner (QueryRunner): The runner object - test_case (TestCase): The test case object - entity_type (str): one of COLUMN or TABLE -- fetched from the test definition - """ - self._test_case = test_case - self.runner = runner - # TODO this will be removed on https://github.com/open-metadata/OpenMetadata/pull/18716 - self.validator_cls: Type[BaseTestValidator] = importer.import_test_case_class( - entity_type, - self._get_source_type(), - self.test_case.testDefinition.fullyQualifiedName, # type: ignore - ) - self.reset() - def set_runtime_params(self, runtime_params_setters: Set[RuntimeParameterSetter]): """Set the runtime parameters for the validator object @@ -93,8 +112,3 @@ class IValidatorBuilder(ABC): int(datetime.now(tz=timezone.utc).timestamp() * 1000) ), ) - - @abstractmethod - def _get_source_type(self): - """Get the source type""" - raise NotImplementedError diff --git a/ingestion/src/metadata/data_quality/interface/pandas/pandas_test_suite_interface.py b/ingestion/src/metadata/data_quality/interface/pandas/pandas_test_suite_interface.py index 8873d6a16ee..07f844f4274 100644 --- a/ingestion/src/metadata/data_quality/interface/pandas/pandas_test_suite_interface.py +++ b/ingestion/src/metadata/data_quality/interface/pandas/pandas_test_suite_interface.py @@ -13,9 +13,10 @@ Interfaces with database for all database engine supporting sqlalchemy abstraction layer """ -from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder -from metadata.data_quality.builders.pandas_validator_builder import ( - PandasValidatorBuilder, + +from metadata.data_quality.builders.validator_builder import ( + SourceType, + ValidatorBuilder, ) from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface from metadata.generated.schema.entity.data.table import Table @@ -44,13 +45,14 @@ class PandasTestSuiteInterface(TestSuiteInterface, PandasInterfaceMixin): ometa_client: OpenMetadata, sampler: SamplerInterface, table_entity: Table, - **__, + **kwargs, ): super().__init__( service_connection_config, ometa_client, sampler, table_entity, + **kwargs, ) ( @@ -63,5 +65,10 @@ class PandasTestSuiteInterface(TestSuiteInterface, PandasInterfaceMixin): def _get_validator_builder( self, test_case: TestCase, entity_type: str - ) -> IValidatorBuilder: - return PandasValidatorBuilder(self.dataset, test_case, entity_type) + ) -> ValidatorBuilder: + return self.validator_builder_class( + runner=self.dataset, + test_case=test_case, + entity_type=entity_type, + source_type=SourceType.PANDAS, + ) diff --git a/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py b/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py index 3cff85a6a75..67baa22335d 100644 --- a/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py +++ b/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py @@ -19,8 +19,10 @@ from typing import Union from sqlalchemy.orm import DeclarativeMeta from sqlalchemy.orm.util import AliasedClass -from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder -from metadata.data_quality.builders.sqa_validator_builder import SQAValidatorBuilder +from metadata.data_quality.builders.validator_builder import ( + SourceType, + ValidatorBuilder, +) from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.databaseService import DatabaseConnection @@ -51,14 +53,14 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteInterface): ometa_client: OpenMetadata, sampler: SamplerInterface, table_entity: Table = None, + **kwargs, ): super().__init__( - service_connection_config, - ometa_client, - sampler, - table_entity, + service_connection_config, ometa_client, sampler, table_entity, **kwargs ) + self.source_type = SourceType.SQL self.create_session() + ( self.table_sample_query, self.table_sample_config, @@ -109,5 +111,10 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteInterface): def _get_validator_builder( self, test_case: TestCase, entity_type: str - ) -> IValidatorBuilder: - return SQAValidatorBuilder(self.runner, test_case, entity_type) + ) -> ValidatorBuilder: + return self.validator_builder_class( + runner=self.runner, + test_case=test_case, + entity_type=entity_type, + source_type=self.source_type, + ) diff --git a/ingestion/src/metadata/data_quality/interface/test_suite_interface.py b/ingestion/src/metadata/data_quality/interface/test_suite_interface.py index 0d5fa1bfef7..ac0dc4c6876 100644 --- a/ingestion/src/metadata/data_quality/interface/test_suite_interface.py +++ b/ingestion/src/metadata/data_quality/interface/test_suite_interface.py @@ -17,7 +17,7 @@ supporting sqlalchemy abstraction layer from abc import ABC, abstractmethod from typing import Optional, Set, Type -from metadata.data_quality.builders.i_validator_builder import IValidatorBuilder +from metadata.data_quality.builders.validator_builder import ValidatorBuilder from metadata.data_quality.validations.base_test_handler import BaseTestValidator from metadata.data_quality.validations.runtime_param_setter.param_setter import ( RuntimeParameterSetter, @@ -48,12 +48,14 @@ class TestSuiteInterface(ABC): ometa_client: OpenMetadata, sampler: SamplerInterface, table_entity: Table, + validator_builder: Type[ValidatorBuilder], ): """Required attribute for the interface""" self.ometa_client = ometa_client self.service_connection_config = service_connection_config self.table_entity = table_entity self.sampler = sampler + self.validator_builder_class = validator_builder @classmethod def create( @@ -77,7 +79,7 @@ class TestSuiteInterface(ABC): @abstractmethod def _get_validator_builder( self, test_case: TestCase, entity_type: str - ) -> IValidatorBuilder: + ) -> ValidatorBuilder: """get the builder class for the validator. Define this in the implementation class Args: @@ -85,7 +87,7 @@ class TestSuiteInterface(ABC): entity_type (str): type of the entity Returns: - IValidatorBuilder: a validator builder + ValidatorBuilder: a validator builder """ raise NotImplementedError diff --git a/ingestion/src/metadata/data_quality/processor/test_case_runner.py b/ingestion/src/metadata/data_quality/processor/test_case_runner.py index 98c7fc7086f..696caaae0ce 100644 --- a/ingestion/src/metadata/data_quality/processor/test_case_runner.py +++ b/ingestion/src/metadata/data_quality/processor/test_case_runner.py @@ -23,10 +23,8 @@ from metadata.data_quality.api.models import ( TestCaseResults, TestSuiteProcessorConfig, ) +from metadata.data_quality.runner.base_test_suite_source import BaseTestSuiteRunner from metadata.data_quality.runner.core import DataTestsRunner -from metadata.data_quality.runner.test_suite_source_factory import ( - test_suite_source_factory, -) from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.ingestionPipelines.status import ( @@ -95,12 +93,7 @@ class TestCaseRunner(Processor): record.table, openmetadata_test_cases ) - test_suite_runner = test_suite_source_factory.create( - record.service_type.lower(), - self.config, - self.metadata, - record.table, - ).get_data_quality_runner() + test_suite_runner = self.get_test_suite_runner(record.table) logger.debug( f"Found {len(openmetadata_test_cases)} test cases for table {record.table.fullyQualifiedName.root}" @@ -357,3 +350,10 @@ class TestCaseRunner(Processor): else: result.append(tc) return result + + def get_test_suite_runner(self, table: Table): + return BaseTestSuiteRunner( + self.config, + self.metadata, + table, + ).get_data_quality_runner() diff --git a/ingestion/src/metadata/data_quality/runner/base_test_suite_source.py b/ingestion/src/metadata/data_quality/runner/base_test_suite_source.py index 23ccfd53b74..9651e0d045f 100644 --- a/ingestion/src/metadata/data_quality/runner/base_test_suite_source.py +++ b/ingestion/src/metadata/data_quality/runner/base_test_suite_source.py @@ -15,6 +15,7 @@ Base source for the data quality used to instantiate a data quality runner with from copy import deepcopy from typing import Optional, cast +from metadata.data_quality.builders.validator_builder import ValidatorBuilder from metadata.data_quality.interface.test_suite_interface import TestSuiteInterface from metadata.data_quality.runner.core import DataTestsRunner from metadata.generated.schema.entity.data.table import Table @@ -46,6 +47,7 @@ class BaseTestSuiteRunner: ometa_client: OpenMetadata, entity: Table, ): + self.validator_builder_class = ValidatorBuilder self._interface = None self._interface_type: str = config.source.type.lower() self.entity = entity @@ -125,10 +127,11 @@ class BaseTestSuiteRunner: ) self.interface: TestSuiteInterface = test_suite_class.create( - self.service_conn_config, - self.ometa_client, - sampler_interface, - self.entity, + service_connection_config=self.service_conn_config, + ometa_client=self.ometa_client, + sampler=sampler_interface, + table_entity=self.entity, + validator_builder=self.validator_builder_class, ) return self.interface diff --git a/ingestion/src/metadata/data_quality/runner/test_suite_source_factory.py b/ingestion/src/metadata/data_quality/runner/test_suite_source_factory.py deleted file mode 100644 index 2b1a245d8cc..00000000000 --- a/ingestion/src/metadata/data_quality/runner/test_suite_source_factory.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Factory class for creating test suite source objects -""" - -from metadata.data_quality.runner.base_test_suite_source import BaseTestSuiteRunner - - -class TestSuiteRunnerFactory: - """Creational factory for test suite source objects""" - - def __init__(self): - self._source_type = {"base": BaseTestSuiteRunner} - - def register_source(self, source_type: str, source_class): - """Register a new source type""" - self._source_type[source_type] = source_class - - def create(self, source_type: str, *args, **kwargs) -> BaseTestSuiteRunner: - """Create source object based on source type""" - source_class = self._source_type.get(source_type) - if not source_class: - source_class = self._source_type["base"] - return source_class(*args, **kwargs) - return source_class(*args, **kwargs) - - -test_suite_source_factory = TestSuiteRunnerFactory() diff --git a/ingestion/src/metadata/mixins/sqalchemy/sqa_mixin.py b/ingestion/src/metadata/mixins/sqalchemy/sqa_mixin.py index 3bc7dc7c047..32cda7c1008 100644 --- a/ingestion/src/metadata/mixins/sqalchemy/sqa_mixin.py +++ b/ingestion/src/metadata/mixins/sqalchemy/sqa_mixin.py @@ -30,7 +30,7 @@ from metadata.generated.schema.entity.services.connections.database.snowflakeCon from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import ( UnityCatalogConnection, ) -from metadata.generated.schema.tests.basic import BaseModel +from metadata.ingestion.models.custom_pydantic import BaseModel from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.database.snowflake.queries import ( diff --git a/ingestion/src/metadata/profiler/orm/converter/base.py b/ingestion/src/metadata/profiler/orm/converter/base.py index dea640aefc2..b7225c358aa 100644 --- a/ingestion/src/metadata/profiler/orm/converter/base.py +++ b/ingestion/src/metadata/profiler/orm/converter/base.py @@ -109,6 +109,12 @@ def ometa_to_sqa_orm( We are building the class dynamically using `type` and passing SQLAlchemy `Base` class as the bases tuple for inheritance. + + Args: + table (Table): OpenMetadata Table instance + metadata (OpenMetadata): OpenMetadata connection + sqa_metadata_obj (MetaData): For advanced use cases, you can pass a custom MetaData object. For most cases, this + can be left as None so that the global_metadata object is used. """ _metadata = sqa_metadata_obj or Base.metadata table.serviceType = cast( @@ -156,7 +162,6 @@ def ometa_to_sqa_orm( if not isinstance(orm, DeclarativeMeta): raise ValueError("OMeta to ORM did not create a DeclarativeMeta") - return orm diff --git a/ingestion/src/metadata/sampler/sampler_interface.py b/ingestion/src/metadata/sampler/sampler_interface.py index 20df252f4cb..03a3d1d6dc9 100644 --- a/ingestion/src/metadata/sampler/sampler_interface.py +++ b/ingestion/src/metadata/sampler/sampler_interface.py @@ -51,7 +51,10 @@ logger = sampler_logger() class SamplerInterface(ABC): - """Sampler interface""" + """Sampler interface + This should be the entrypoint for computing any metrics that are required downstream for + data quality, profiling, etc. + """ # pylint: disable=too-many-instance-attributes, too-many-arguments def __init__( diff --git a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py index e8edd310a24..3cb7fd62944 100644 --- a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py +++ b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py @@ -60,6 +60,9 @@ class SQASampler(SamplerInterface, SQAInterfaceMixin): """ Generates a sample of the data to not run the query in the whole table. + + Args: + orm_table (Optional[DeclarativeMeta]): ORM Table """ def __init__(self, *args, **kwargs): diff --git a/ingestion/tests/integration/postgres/test_data_quality.py b/ingestion/tests/integration/postgres/test_data_quality.py index 9b3b325af91..f37c9560ae7 100644 --- a/ingestion/tests/integration/postgres/test_data_quality.py +++ b/ingestion/tests/integration/postgres/test_data_quality.py @@ -301,13 +301,23 @@ def run_data_quality_workflow( ids=lambda *x: x[0], ) def test_data_quality( - run_data_quality_workflow, metadata: OpenMetadata, test_case_name, expected_status + run_data_quality_workflow, + metadata: OpenMetadata, + test_case_name, + expected_status, + db_service, ): test_cases: List[TestCase] = metadata.list_entities( TestCase, fields=["*"], skip_on_failure=True ).entities test_case: TestCase = next( - (t for t in test_cases if t.name.root == test_case_name), None + ( + t + for t in test_cases + if t.name.root == test_case_name + and "dvdrental.public.customer" in t.entityFQN + ), + None, ) assert test_case is not None assert_equal_pydantic_objects( diff --git a/ingestion/tests/unit/profiler/sqlalchemy/snowflake/test_sampling_method.py b/ingestion/tests/unit/profiler/sqlalchemy/snowflake/test_sampling_method.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/tests/unit/profiler/sqlalchemy/test_runner.py b/ingestion/tests/unit/profiler/sqlalchemy/test_runner.py index 354eee70347..738d4985fe4 100644 --- a/ingestion/tests/unit/profiler/sqlalchemy/test_runner.py +++ b/ingestion/tests/unit/profiler/sqlalchemy/test_runner.py @@ -84,7 +84,9 @@ class RunnerTest(TestCase): return_value=Mock(), ), ): - sampler = SQASampler( + sampler = SQASampler.__new__(SQASampler) + sampler.build_table_orm = lambda *args, **kwargs: User + sampler.__init__( service_connection_config=Mock(), ometa_client=None, entity=None, diff --git a/ingestion/tests/unit/test_suite/conftest.py b/ingestion/tests/unit/test_suite/conftest.py index 03381ceca02..06e21c791ed 100644 --- a/ingestion/tests/unit/test_suite/conftest.py +++ b/ingestion/tests/unit/test_suite/conftest.py @@ -22,6 +22,7 @@ import pytest import sqlalchemy as sqa from sqlalchemy.orm import declarative_base +from metadata.data_quality.builders.validator_builder import ValidatorBuilder from metadata.data_quality.interface.sqlalchemy.sqa_test_suite_interface import ( SQATestSuiteInterface, ) @@ -101,6 +102,7 @@ def create_sqlite_table(): None, sampler, TABLE, + validator_builder=ValidatorBuilder, ) runner = sqa_profiler_interface.runner