From abaf8a84e9b2135aa1107323ebe1107ad15ba458 Mon Sep 17 00:00:00 2001 From: Teddy Date: Wed, 17 Aug 2022 12:53:16 +0200 Subject: [PATCH] Fixes #5661 by removing association between profiler and data quality (#6715) * Added database filter in workflow * Removed association between profiler and data quality * fixed tests with removed association * Fixed sonar code smells and bugs --- .../src/metadata/orm_profiler/api/workflow.py | 111 ++++-- .../interfaces/interface_protocol.py | 80 ++-- .../interfaces/sqa_profiler_interface.py | 297 +++++---------- .../orm_profiler/processor/orm_profiler.py | 357 +++++++----------- .../metadata/orm_profiler/profiler/core.py | 90 ++--- .../metadata/orm_profiler/profiler/default.py | 23 +- .../orm_profiler/validations/models.py | 36 +- .../metadata/utils/processor_config_helper.py | 36 ++ ingestion/tests/unit/profiler/conftest.py | 22 +- ingestion/tests/unit/profiler/test_metrics.py | 132 +------ .../tests/unit/profiler/test_profiler.py | 26 +- ingestion/tests/unit/profiler/test_sample.py | 86 ++--- .../unit/profiler/test_session_validations.py | 6 +- .../profiler/test_sqa_profiler_interface.py | 44 +-- .../tests/unit/profiler/test_workflow.py | 111 +++--- 15 files changed, 572 insertions(+), 885 deletions(-) create mode 100644 ingestion/src/metadata/utils/processor_config_helper.py diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index 69506adec35..20a7b3a2526 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -45,13 +45,16 @@ from metadata.ingestion.source.database.common_db_source import SQLSourceStatus from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerResponse from metadata.orm_profiler.interfaces.interface_protocol import InterfaceProtocol from metadata.orm_profiler.interfaces.sqa_profiler_interface import SQAProfilerInterface +from metadata.orm_profiler.metrics.registry import Metrics +from metadata.orm_profiler.profiler.core import Profiler +from metadata.orm_profiler.profiler.default import DefaultProfiler, get_default_metrics from metadata.utils import fqn from metadata.utils.class_helper import ( get_service_class_from_service_type, get_service_type_from_source_type, ) from metadata.utils.connections import get_connection, test_connection -from metadata.utils.filters import filter_by_fqn, filter_by_schema, filter_by_table +from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table from metadata.utils.logger import profiler_logger logger = profiler_logger() @@ -87,7 +90,7 @@ class ProfilerWorkflow: ) self.source_status = SQLSourceStatus() - self.processor = None + self.processor_obj = None if self.config.sink: self.sink = get_sink( @@ -116,6 +119,51 @@ class ProfilerWorkflow: logger.error("Error trying to parse the Profiler Workflow configuration") raise err + def create_profiler_interface(self, service_connection_config, table_entity: Table): + """Creates a profiler interface object""" + self.processor_interface: InterfaceProtocol = SQAProfilerInterface( + service_connection_config, + metadata_config=self.metadata_config, + profiler_config=ProfilerProcessorConfig.parse_obj( + self.config.processor.dict().get("config") + ) + or ProfilerProcessorConfig(), + workflow_profile_sample=self.source_config.profileSample, + thread_count=self.source_config.threadCount, + table_entity=table_entity, + ) + + def create_profiler_obj(self): + """Profile a single entity""" + if not self.processor_interface.profiler_config.profiler: + self.profiler_obj = DefaultProfiler( + profiler_interface=self.processor_interface, + ) + else: + metrics = ( + [Metrics.get(name) for name in self.config.profiler.metrics] + if self.processor_interface.profiler_config.profiler.metrics + else get_default_metrics(self.processor_interface.table) + ) + + self.profiler_obj = Profiler( + *metrics, + profiler_interface=self.processor_interface, + ) + + def filter_databases(self, database: Database) -> Database: + """Returns filtered database entities""" + if filter_by_database( + self.source_config.databaseFilterPattern, + database.name.__root__, + ): + self.source_status.filter( + database.name.__root__, "Database pattern not allowed" + ) + return None + else: + return database + def filter_entities(self, tables: List[Table]) -> Iterable[Table]: """ From a list of tables, apply the SQLSourceConfig @@ -150,12 +198,8 @@ class ProfilerWorkflow: logger.error(err) logger.debug(traceback.format_exc()) - def create_processor(self, service_connection_config): - self.processor_interface: InterfaceProtocol = SQAProfilerInterface( - service_connection_config, - thread_count=self.source_config.threadCount, - ) - self.processor = get_processor( + def create_processor_obj(self): + self.processor_obj = get_processor( processor_type=self.config.processor.type, # orm-profiler processor_config=self.config.processor or ProfilerProcessorConfig(), metadata_config=self.metadata_config, @@ -174,11 +218,14 @@ class ProfilerWorkflow: def get_database_entities(self): """List all databases in service""" - for database in self.metadata.list_all_entities( - entity=Database, - params={"service": self.config.source.serviceName}, - ): - yield database + return [ + self.filter_databases(database) + for database in self.metadata.list_all_entities( + entity=Database, + params={"service": self.config.source.serviceName}, + ) + if self.filter_databases(database) + ] def get_table_entities(self, database): """ @@ -230,22 +277,34 @@ class ProfilerWorkflow: if hasattr(self.config.source.serviceConnection.__root__.config, "catalog"): copy_service_connection_config.catalog = database.name.__root__ - self.create_processor(copy_service_connection_config) + return copy_service_connection_config def execute(self): """ Run the profiling and tests """ - for database in self.get_database_entities(): - try: - self.copy_service_config(database) + databases = self.get_database_entities() + if not databases: + raise ValueError( + "databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern." + f"\n\t- includes: {self.source_config.databaseFilterPattern.includes}\n\t- excludes: {self.source_config.databaseFilterPattern.excludes}" + ) + + for database in databases: + copied_service_config = self.copy_service_config(database) + try: for entity in self.get_table_entities(database=database): try: - profile_and_tests: ProfilerResponse = self.processor.process( + self.create_profiler_interface(copied_service_config, entity) + self.create_profiler_obj() + profile = self.profiler_obj.process() + self.create_processor_obj() + profile_and_tests: ProfilerResponse = self.processor_obj.process( record=entity, generate_sample_data=self.source_config.generateSampleData, + entity_profile=profile, ) if hasattr(self, "sink"): @@ -267,7 +326,7 @@ class ProfilerWorkflow: click.secho("Source Status:", bold=True) click.echo(self.source_status.as_string()) click.secho("Processor Status:", bold=True) - click.echo(self.processor.get_status().as_string()) + click.echo(self.processor_obj.get_status().as_string()) if hasattr(self, "sink"): click.secho("Sink Status:", bold=True) click.echo(self.sink.get_status().as_string()) @@ -275,14 +334,14 @@ class ProfilerWorkflow: if ( self.source_status.failures - or self.processor.get_status().failures + or self.processor_obj.get_status().failures or (hasattr(self, "sink") and self.sink.get_status().failures) ): click.secho("Workflow finished with failures", fg="bright_red", bold=True) return 1 if ( self.source_status.warnings - or self.processor.get_status().failures + or self.processor_obj.get_status().failures or (hasattr(self, "sink") and self.sink.get_status().warnings) ): click.secho("Workflow finished with warnings", fg="yellow", bold=True) @@ -299,9 +358,9 @@ class ProfilerWorkflow: as we are just picking up data from OM. """ - if self.processor.get_status().failures: + if self.processor_obj.get_status().failures: raise WorkflowExecutionError( - "Processor reported errors", self.processor.get_status() + "Processor reported errors", self.processor_obj.get_status() ) if hasattr(self, "sink") and self.sink.get_status().failures: raise WorkflowExecutionError("Sink reported errors", self.sink.get_status()) @@ -311,9 +370,9 @@ class ProfilerWorkflow: raise WorkflowExecutionError( "Source reported warnings", self.source_status ) - if self.processor.get_status().warnings: + if self.processor_obj.get_status().warnings: raise WorkflowExecutionError( - "Processor reported warnings", self.processor.get_status() + "Processor reported warnings", self.processor_obj.get_status() ) if hasattr(self, "sink") and self.sink.get_status().warnings: raise WorkflowExecutionError( @@ -331,7 +390,7 @@ class ProfilerWorkflow: Close all connections """ self.metadata.close() - self.processor.close() + self.processor_obj.close() def _retrieve_service_connection_if_needed(self) -> None: """ diff --git a/ingestion/src/metadata/orm_profiler/interfaces/interface_protocol.py b/ingestion/src/metadata/orm_profiler/interfaces/interface_protocol.py index d36b1a09de6..efe2ea201a6 100644 --- a/ingestion/src/metadata/orm_profiler/interfaces/interface_protocol.py +++ b/ingestion/src/metadata/orm_profiler/interfaces/interface_protocol.py @@ -15,14 +15,32 @@ supporting sqlalchemy abstraction layer """ from abc import ABC, abstractmethod -from typing import Dict, Optional +from typing import Optional +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) from metadata.generated.schema.tests.basic import TestCaseResult +from metadata.orm_profiler.api.models import ProfilerProcessorConfig +from metadata.utils.processor_config_helper import get_record_test_def class InterfaceProtocol(ABC): """Protocol interface for the processor""" + @abstractmethod + def __init__( + self, + metadata_config: OpenMetadataConnection = None, + profiler_config: ProfilerProcessorConfig = None, + workflow_profile_sample: float = None, + thread_count: int = 5, + table: Table = None, + ): + """Required attribute for the interface""" + raise NotImplementedError + @abstractmethod def create_sampler(*args, **kwargs) -> None: """Method to instantiate a Sampler object""" @@ -34,36 +52,34 @@ class InterfaceProtocol(ABC): raise NotImplementedError @abstractmethod - def get_table_metrics(*args, **kwargs) -> Dict: - """Method to retrieve table metrics""" - raise NotImplementedError - - @abstractmethod - def get_window_metrics(*args, **kwargs) -> Dict: - """Method to retrieve window metrics""" - raise NotImplementedError - - @abstractmethod - def get_static_metrics(*args, **kwargs) -> Dict: - """Method to retrieve static metrics""" - raise NotImplementedError - - @abstractmethod - def get_query_metrics(*args, **kwargs) -> Dict: - """Method to retrieve query metrics""" - raise NotImplementedError - - @abstractmethod - def get_composed_metrics(*args, **kwargs) -> Dict: - """Method to retrieve composed metrics""" - raise NotImplementedError - - @abstractmethod - def run_table_test(*args, **kwargs) -> Optional[TestCaseResult]: - """run table data quality tests""" - raise NotImplementedError - - @abstractmethod - def run_column_test(*args, **kwargs) -> Optional[TestCaseResult]: + def run_test_case(*args, **kwargs) -> Optional[TestCaseResult]: """run column data quality tests""" raise NotImplementedError + + def get_table_profile_sample(self) -> Optional[float]: + """ + Pick the Table profileSample value, either from the test + definition or the value from the instance. + + :param table: Table instance + :return: profileSample value to use + """ + if self.profiler_config.testSuites: + # If the processed table has information about the profile_sample, + # use that instead of the Entity stored profileSample + my_record_tests = get_record_test_def(self.table) + if my_record_tests and my_record_tests.profile_sample: + return my_record_tests.profile_sample + + if self.workflow_profile_sample: + if ( + self.table.tableProfilerConfig.profileSample is not None + and self.workflow_profile_sample != self.table.profileSample + ): + return self.table.tableProfilerConfig.profileSample + return self.workflow_profile_sample + + if not self.table.tableProfilerConfig: + return None + + return self.table.tableProfilerConfig.profileSample diff --git a/ingestion/src/metadata/orm_profiler/interfaces/sqa_profiler_interface.py b/ingestion/src/metadata/orm_profiler/interfaces/sqa_profiler_interface.py index c520adff21f..e0c0f8d056c 100644 --- a/ingestion/src/metadata/orm_profiler/interfaces/sqa_profiler_interface.py +++ b/ingestion/src/metadata/orm_profiler/interfaces/sqa_profiler_interface.py @@ -20,18 +20,22 @@ from collections import defaultdict from datetime import datetime from typing import Dict, List, Optional, Union -from sqlalchemy import Column +from sqlalchemy import Column, inspect from sqlalchemy.engine.row import Row from sqlalchemy.orm import DeclarativeMeta, Session -from metadata.generated.schema.entity.data.table import TableProfile +from metadata.generated.schema.entity.data.table import Table, TableProfile +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) from metadata.generated.schema.tests.basic import TestCaseResult -from metadata.generated.schema.tests.columnTest import ColumnTestCase -from metadata.generated.schema.tests.tableTest import TableTestCase from metadata.generated.schema.tests.testCase import TestCase from metadata.generated.schema.tests.testDefinition import TestDefinition +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.orm_profiler.api.models import ProfilerProcessorConfig from metadata.orm_profiler.interfaces.interface_protocol import InterfaceProtocol from metadata.orm_profiler.metrics.registry import Metrics +from metadata.orm_profiler.orm.converter import ometa_to_orm from metadata.orm_profiler.profiler.runner import QueryRunner from metadata.orm_profiler.profiler.sampler import Sampler from metadata.orm_profiler.validations.core import validation_enum_registry @@ -56,15 +60,36 @@ class SQAProfilerInterface(InterfaceProtocol): sqlalchemy. """ - def __init__(self, service_connection_config, thread_count: int = 5): + def __init__( + self, + service_connection_config, + metadata_config: Optional[OpenMetadataConnection] = None, + profiler_config: Optional[ProfilerProcessorConfig] = None, + workflow_profile_sample: Optional[float] = None, + thread_count: Optional[int] = 5, + table_entity: Optional[Table] = None, + table: Optional[DeclarativeMeta] = None, + ): """Instantiate SQA Interface object""" - self._sampler = None - self._runner = None self._thread_count = thread_count - self.service_connection_config = service_connection_config - self.session_factory = self._session_factory() + self.profiler_config = profiler_config + self.table_entity = table_entity + self._create_ometa_obj(metadata_config) + self.table = ( + table or self._convert_table_to_orm_object() + ) # Allows SQA Interface to be used without OM server config + self.workflow_profile_sample = workflow_profile_sample + self.session_factory = self._session_factory(service_connection_config) self.session: Session = self.session_factory() + self._sampler = self.create_sampler() + self._runner = self.create_runner() + + # Need to re-implement the logic with https://github.com/open-metadata/OpenMetadata/issues/5831 + self._profile_sample = None + self._profile_sample_query = None + self.partition_details = None + @property def sample(self): """Getter method for sample attribute""" @@ -85,18 +110,34 @@ class SQAProfilerInterface(InterfaceProtocol): """Getter methid for sampler attribute""" return self._sampler - def _session_factory(self): + @staticmethod + def _session_factory(service_connection_config): """Create thread safe session that will be automatically garbage collected once the application thread ends """ - engine = get_connection(self.service_connection_config) + engine = get_connection(service_connection_config) return create_and_bind_thread_safe_session(engine) + def _create_ometa_obj(self, metadata_config): + try: + self._metadata = OpenMetadata(metadata_config) + self._metadata.health_check() + except Exception: + logger.warning( + "No OpenMetadata server configuration found. Running profiler interface without OM server connection" + ) + self._metadata = None + def _create_thread_safe_sampler( - self, session, table, profile_sample, partition_details, profile_sample_query + self, + session, + table, + profile_sample=None, + partition_details=None, + profile_sample_query=None, ): """Create thread safe runner""" - if not hasattr(thread_local, "runner"): + if not hasattr(thread_local, "sampler"): thread_local.sampler = Sampler( session=session, table=table, @@ -107,7 +148,12 @@ class SQAProfilerInterface(InterfaceProtocol): return thread_local.sampler def _create_thread_safe_runner( - self, session, table, sample, partition_details, profile_sample_query + self, + session, + table, + sample=None, + partition_details=None, + profile_sample_query=None, ): """Create thread safe runner""" if not hasattr(thread_local, "runner"): @@ -120,6 +166,14 @@ class SQAProfilerInterface(InterfaceProtocol): ) return thread_local.runner + def _convert_table_to_orm_object(self) -> DeclarativeMeta: + """Given a table entity return a SQA ORM object""" + return ometa_to_orm(self.table_entity, self._metadata) + + def get_columns(self) -> Column: + """get columns from an orm object""" + return inspect(self.table).c + def compute_metrics_in_thread( self, metric_funcs, @@ -130,9 +184,6 @@ class SQAProfilerInterface(InterfaceProtocol): metric_type, column, table, - profile_sample, - partition_details, - profile_sample_query, ) = metric_funcs logger.debug( f"Running profiler for {table.__tablename__} on thread {threading.current_thread()}" @@ -140,11 +191,14 @@ class SQAProfilerInterface(InterfaceProtocol): Session = self.session_factory session = Session() sampler = self._create_thread_safe_sampler( - session, table, profile_sample, partition_details, profile_sample_query + session, + table, ) sample = sampler.random_sample() runner = self._create_thread_safe_runner( - session, table, sample, partition_details, profile_sample_query + session, + table, + sample, ) row = compute_metrics_registry.registry[metric_type.value]( @@ -210,136 +264,29 @@ class SQAProfilerInterface(InterfaceProtocol): ) return self.sampler.fetch_sample_data() - def create_sampler( - self, - table: DeclarativeMeta, - profile_sample: Optional[float] = None, - partition_details: Optional[dict] = None, - profile_sample_query: Optional[str] = None, - ) -> None: - """Create sampler instance - - Args: - table: sqlalchemy declarative table of the database table, - profile_sample: percentage to use for the table sample (between 0-100) - partition_details: details about the table partition - profile_sample_query: custom query used for table sampling - """ - self._sampler = Sampler( + def create_sampler(self) -> Sampler: + """Create sampler instance""" + return Sampler( session=self.session, - table=table, - profile_sample=profile_sample, - partition_details=partition_details, - profile_sample_query=profile_sample_query, + table=self.table, + profile_sample=self.workflow_profile_sample, + partition_details=None, + profile_sample_query=None, ) - def create_runner( - self, - table: DeclarativeMeta, - partition_details: Optional[dict] = None, - profile_sample_query: Optional[str] = None, - ) -> None: - """Create a QueryRunner Instance + def create_runner(self) -> None: + """Create a QueryRunner Instance""" - Args: - table: sqlalchemy declarative table of the database table, - profile_sample: percentage to use for the table sample (between 0-100) - partition_details: details about the table partition - profile_sample_query: custom query used for table sampling - """ - - self._runner = cls_timeout(TEN_MIN)( + return cls_timeout(TEN_MIN)( QueryRunner( session=self.session, - table=table, + table=self.table(), sample=self.sample, - partition_details=partition_details, - profile_sample_query=profile_sample_query, + partition_details=None, + profile_sample_query=None, ) ) - def get_table_metrics( - self, - metrics: List[Metrics], - ) -> Dict[str, Union[str, int]]: - """Given a list of metrics, compute the given results - and returns the values - - Args: - metrics: list of metrics to compute - Returns: - dictionnary of results - """ - try: - row = self.runner.select_first_from_table( - *[metric().fn() for metric in metrics] - ) - - if row: - return dict(row) - - except Exception as err: - logger.error(err) - self.session.rollback() - - def get_static_metrics( - self, - column: Column, - metrics: List[Metrics], - ) -> Dict[str, Union[str, int]]: - """Given a list of metrics, compute the given results - and returns the values - - Args: - column: the column to compute the metrics against - metrics: list of metrics to compute - Returns: - dictionnary of results - """ - try: - row = self.runner.select_first_from_sample( - *[ - metric(column).fn() - for metric in metrics - if not metric.is_window_metric() - ] - ) - return dict(row) - except Exception as err: - logger.error(err) - self.session.rollback() - - def get_query_metrics( - self, - column: Column, - metric: Metrics, - ) -> Optional[Dict[str, Union[str, int]]]: - """Given a list of metrics, compute the given results - and returns the values - - Args: - column: the column to compute the metrics against - metrics: list of metrics to compute - Returns: - dictionnary of results - """ - try: - col_metric = metric(column) - metric_query = col_metric.query(sample=self.sample, session=self.session) - if not metric_query: - return None - if col_metric.metric_type == dict: - results = self.runner.select_all_from_query(metric_query) - data = {k: [result[k] for result in results] for k in dict(results[0])} - return {metric.name(): data} - - else: - row = self.runner.select_first_from_query(metric_query) - return dict(row) - except Exception as err: - logger.error(err) - self.session.rollback() - def get_composed_metrics( self, column: Column, metric: Metrics, column_results: Dict ): @@ -358,35 +305,11 @@ class SQAProfilerInterface(InterfaceProtocol): logger.error(err) self.session.rollback() - def get_window_metrics( - self, - column: Column, - metric: Metrics, - ) -> Dict[str, Union[str, int]]: - """Given a list of metrics, compute the given results - and returns the values - - Args: - column: the column to compute the metrics against - metrics: list of metrics to compute - Returns: - dictionnary of results - """ - try: - row = self.runner.select_first_from_sample(metric(column).fn()) - if not isinstance(row, Row): - return {metric.name(): row} - return dict(row) - except Exception as err: - logger.error(err) - self.session.rollback() - def run_test_case( self, test_case: TestCase, test_definition: TestDefinition, table_profile: TableProfile, - orm_table: DeclarativeMeta, profile_sample: float, ) -> Optional[TestCaseResult]: """Run table tests where platformsTest=OpenMetadata @@ -404,56 +327,10 @@ class SQAProfilerInterface(InterfaceProtocol): table_profile=table_profile, execution_date=datetime.now(), session=self.session, - table=orm_table, + table=self.table, profile_sample=profile_sample, ) - def run_table_test( - self, - test_case: TableTestCase, - table_profile: TableProfile, - orm_table: DeclarativeMeta, - profile_sample: float, - ) -> Optional[TestCaseResult]: - """Run table tests - - Args: - table_test_type: test type to be ran - table_profile: table profile - table: SQA table, - profile_sample: sample for the profile - """ - return validation_enum_registry.registry[test_case.tableTestType.value]( - test_case.config, - table_profile=table_profile, - execution_date=datetime.now(), - session=self.session, - table=orm_table, - profile_sample=profile_sample, - ) - - def run_column_test( - self, - test_case: ColumnTestCase, - col_profile: TableProfile, - orm_table: DeclarativeMeta, - ) -> Optional[TestCaseResult]: - """Run table tests - - Args: - table_test_type: test type to be ran - table_profile: table profile - table: SQA table, - profile_sample: sample for the profile - """ - return validation_enum_registry.registry[test_case.columnTestType.value]( - test_case.config, - col_profile=col_profile, - execution_date=datetime.now(), - table=orm_table, - runner=self.runner, - ) - def get_table_metrics( metrics: List[Metrics], diff --git a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py index e9d069698d9..58bde85eee9 100644 --- a/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py +++ b/ingestion/src/metadata/orm_profiler/processor/orm_profiler.py @@ -21,10 +21,8 @@ from datetime import datetime from typing import Dict, List, Optional from croniter import croniter -from sqlalchemy.orm import DeclarativeMeta, Session +from sqlalchemy.orm import DeclarativeMeta -from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest -from metadata.generated.schema.api.tests.createTableTest import CreateTableTestRequest from metadata.generated.schema.api.tests.createTestCase import CreateTestCaseRequest from metadata.generated.schema.api.tests.createTestSuite import CreateTestSuiteRequest from metadata.generated.schema.entity.data.table import ( @@ -50,16 +48,12 @@ from metadata.ingestion.api.processor import Processor, ProcessorStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.orm_profiler.api.models import ProfilerProcessorConfig, ProfilerResponse from metadata.orm_profiler.interfaces.interface_protocol import InterfaceProtocol -from metadata.orm_profiler.metrics.registry import Metrics -from metadata.orm_profiler.orm.converter import ometa_to_orm -from metadata.orm_profiler.profiler.core import Profiler -from metadata.orm_profiler.profiler.default import DefaultProfiler, get_default_metrics from metadata.orm_profiler.profiler.handle_partition import ( get_partition_cols, is_partitioned, ) from metadata.orm_profiler.validations.models import TestCase as CLIConfigTestCase -from metadata.orm_profiler.validations.models import TestDef +from metadata.orm_profiler.validations.models import TestCase as TestCaseConfig from metadata.utils.helpers import get_start_and_end logger = logging.getLogger("ORM Profiler Workflow") @@ -183,24 +177,25 @@ class OrmProfilerProcessor(Processor[Table]): my_record_profile = ( self.get_record_test_def(table) if self.config.testSuites and self.get_record_test_def(table) - else TestDef(table=table.fullyQualifiedName.__root__) + else None ) - if my_record_profile.profile_sample_query: - return None + if my_record_profile: + if my_record_profile.profile_sample_query: + return None - start, end = get_start_and_end( - my_record_profile.partition_query_duration - ) - partition_details = { - "partition_field": my_record_profile.partition_field - or get_partition_cols(self.processor_interface.session, orm), - "partition_start": start, - "partition_end": end, - "partition_values": my_record_profile.partition_values, - } + start, end = get_start_and_end( + my_record_profile.partition_query_duration + ) + partition_details = { + "partition_field": my_record_profile.partition_field + or get_partition_cols(self.processor_interface.session, orm), + "partition_start": start, + "partition_end": end, + "partition_values": my_record_profile.partition_values, + } - return partition_details + return partition_details return None @@ -234,74 +229,6 @@ class OrmProfilerProcessor(Processor[Table]): return None return table.tableProfilerConfig.profileQuery - def build_profiler( - self, - orm: DeclarativeMeta, - table: Table, - ) -> Profiler: - """ - Given a column from the entity, build the profiler. - - :param orm: Declarative Meta - :param table: Table record being processed - :return: Initialised Profiler - """ - - profile_sample = None - - if not self.config.profiler: - return DefaultProfiler( - profiler_interface=self.processor_interface, - table=orm, - profile_sample=profile_sample, - partition_details=self.get_partition_details(orm, table), - profile_sample_query=None, - table_entity=table, - ) - - # Here we will need to add the logic to pass kwargs to the metrics - metrics = ( - [Metrics.get(name) for name in self.config.profiler.metrics] - if self.config.profiler.metrics - else get_default_metrics(orm) - ) - - return Profiler( - *metrics, - profiler_interface=self.processor_interface, - table=orm, - profile_date=self.execution_date, - profile_sample=profile_sample, - timeout_seconds=self.config.profiler.timeout_seconds, - partition_details=self.get_partition_details(orm, table), - profile_sample_query=self.get_profile_sample_query(table), - table_entity=table, - ) - - def profile_entity( - self, - orm: DeclarativeMeta, - table: Table, - ) -> TableProfile: - """ - Given a table, we will prepare the profiler for - all its columns and return all the run profilers - in a Dict in the shape {col_name: Profiler} - - Type of entity is DeclarativeMeta - """ - if not isinstance(orm, DeclarativeMeta): - raise ValueError(f"Entity {orm} should be a DeclarativeMeta.") - - # Prepare the profilers for all table columns - profiler = self.build_profiler(orm, table=table) - - logger.info(f"Executing profilers for {table.fullyQualifiedName.__root__}...") - profiler.execute() - - self.status.processed(table.fullyQualifiedName.__root__) - return profiler.get_profile() - def log_test_result(self, name: str, result: TestCaseResult) -> None: """ Log test case results @@ -475,7 +402,6 @@ class OrmProfilerProcessor(Processor[Table]): def run_test_case( self, table: Table, - orm_table: DeclarativeMeta, test_case: CLIConfigTestCase, test_suite: TestSuite, profiler_results: TableProfile, @@ -513,7 +439,6 @@ class OrmProfilerProcessor(Processor[Table]): test_case=case, test_definition=definition, table_profile=profiler_results, - orm_table=orm_table, profile_sample=None, ) @@ -578,16 +503,15 @@ class OrmProfilerProcessor(Processor[Table]): return test_case_result def validate_config_tests( - self, orm: DeclarativeMeta, table: Table, profiler_results: TableProfile - ) -> Optional[TestDef]: + self, table: Table, profiler_results: TableProfile + ) -> Optional[List[TestCaseResult]]: """ Here we take care of new incoming tests in the workflow - definition. Run them and prepare the new TestDef + definition. Run them and prepare the new TestCaseResult of the record, that will be sent to the sink to update the Table Entity. :param table: OpenMetadata Table Entity being processed - :param orm: Declarative Meta :param profiler_results: TableProfile with computed metrics """ @@ -606,7 +530,6 @@ class OrmProfilerProcessor(Processor[Table]): for test_case in test_cases: result = self.run_test_case( table=table, - orm_table=orm, test_case=test_case, test_suite=test_suite, profiler_results=profiler_results, @@ -616,7 +539,7 @@ class OrmProfilerProcessor(Processor[Table]): return test_results - def get_record_test_def(self, table: Table) -> Optional[TestDef]: + def get_record_test_def(self, table: Table) -> List[Optional[TestCaseConfig]]: """ Fetch a record from the Workflow JSON config if the processed table is informed there. @@ -624,122 +547,118 @@ class OrmProfilerProcessor(Processor[Table]): :param table: Processed table :return: Test definition """ - my_record_tests = next( - ( - test - for test in self.config.testSuites.tests - if test.table == table.fullyQualifiedName.__root__ - ), - None, - ) + my_record_tests = [ + test_case + for test_suite in self.config.testSuites + for test_case in test_suite.testCases + if test_case.fullyQualifiedName == table.fullyQualifiedName.__root__ + ] return my_record_tests - def validate_entity_tests( - self, - table: Table, - orm_table: DeclarativeMeta, - profiler_results: TableProfile, - config_tests: Optional[TestDef], - ) -> Optional[TestDef]: - """ - This method checks the tests that are - configured at entity level, i.e., have been - stored via the API at some other point in time. + # Need to re-implement the logic with https://github.com/open-metadata/OpenMetadata/issues/5831 + # as we won't be processing entity tests but testSuite testCases + # def validate_entity_tests( + # self, + # table: Table, + # orm_table: DeclarativeMeta, + # profiler_results: TableProfile, + # config_tests: Optional[SQATestDef], + # ) -> Optional[TestCaseResult]: + # """ + # This method checks the tests that are + # configured at entity level, i.e., have been + # stored via the API at some other point in time. - If we find a test that has already been run - from the workflow config, we will skip it - and trust the workflow input. + # If we find a test that has already been run + # from the workflow config, we will skip it + # and trust the workflow input. - :param table: OpenMetadata Table Entity being processed - :param orm_table: Declarative Meta - :param profiler_results: TableProfile with computed metrics - :param config_tests: Results of running the configuration tests - """ + # :param table: OpenMetadata Table Entity being processed + # :param orm_table: Declarative Meta + # :param profiler_results: TableProfile with computed metrics + # :param config_tests: Results of running the configuration tests + # """ - # We need to keep track of all ran tests, so let's initialize - # a TestDef class with either what we have from the incoming - # config, or leaving it empty. - # During the Entity processing, we will add here - # any tests we discover from the Entity side. - record_tests = ( - TestDef( - table=config_tests.table, - profile_sample=config_tests.profile_sample, - table_tests=config_tests.table_tests - if config_tests.table_tests - else [], - column_tests=config_tests.column_tests - if config_tests.column_tests - else [], - ) - if config_tests - else TestDef( - table=table.fullyQualifiedName.__root__, table_tests=[], column_tests=[] - ) - ) + # # We need to keep track of all ran tests, so let's initialize + # # a SQATestDef class with either what we have from the incoming + # # config, or leaving it empty. + # # During the Entity processing, we will add here + # # any tests we discover from the Entity side. + # record_tests = ( + # SQATestDef( + # table=config_tests.table, + # profile_sample=config_tests.profile_sample, + # table_tests=config_tests.table_tests + # if config_tests.table_tests + # else [], + # column_tests=config_tests.column_tests + # if config_tests.column_tests + # else [], + # ) + # if config_tests + # else SQATestDef( + # table=table.fullyQualifiedName.__root__, table_tests=[], column_tests=[] + # ) + # ) - # Note that the tests configured in the Entity as `TableTest` and - # `ColumnTest`. However, to PUT the results we need the API form: - # `CreateTableTestRequest` and `CreateColumnTestRequest`. - # We will convert the found tests before running them. + # # Note that the tests configured in the Entity as `TableTest` and + # # `ColumnTest`. However, to PUT the results we need the API form: + # # `CreateTableTestRequest` and `CreateColumnTestRequest`. + # # We will convert the found tests before running them. - # Fetch all table tests, if any - table_tests = ( - table_test for table_test in (table.tableTests or []) - ) # tableTests are optional, so it might be a list or None - for table_test in table_tests: - test_case_result = self.run_table_test( - table=table, - orm_table=orm_table, - test_case=table_test.testCase, - profiler_results=profiler_results, - ) - if test_case_result: - create_table_test = CreateTableTestRequest( - description=table_test.description, - testCase=table_test.testCase, - executionFrequency=table_test.executionFrequency, - owner=table_test.owner, - result=test_case_result, - ) - record_tests.table_tests.append(create_table_test) + # # Fetch all table tests, if any + # table_tests = ( + # table_test for table_test in (table.tableTests or []) + # ) # tableTests are optional, so it might be a list or None + # for table_test in table_tests: + # test_case_result = self.run_table_test( + # table=table, + # orm_table=orm_table, + # test_case=table_test.testCase, + # profiler_results=profiler_results, + # ) + # if test_case_result: + # create_table_test = CreateTableTestRequest( + # description=table_test.description, + # testCase=table_test.testCase, + # executionFrequency=table_test.executionFrequency, + # owner=table_test.owner, + # result=test_case_result, + # ) + # record_tests.table_tests.append(create_table_test) - # For all columns, check if any of them has tests and fetch them - col_tests = ( - col_test - for col in table.columns - for col_test in ( - col.columnTests or [] - ) # columnTests is optional, so it might be a list or None - if col.columnTests - ) - for column_test in col_tests: - if column_test: - test_case_result = self.run_column_test( - table=table, - orm_table=orm_table, - column=column_test.columnName, - test_case=column_test.testCase, - profiler_results=profiler_results, - ) - if test_case_result: - create_column_test = CreateColumnTestRequest( - columnName=column_test.columnName, - description=column_test.description, - testCase=column_test.testCase, - executionFrequency=column_test.executionFrequency, - owner=column_test.owner, - result=test_case_result, - ) - record_tests.column_tests.append(create_column_test) + # # For all columns, check if any of them has tests and fetch them + # col_tests = ( + # col_test + # for col in table.columns + # for col_test in ( + # col.columnTests or [] + # ) # columnTests is optional, so it might be a list or None + # if col.columnTests + # ) + # for column_test in col_tests: + # if column_test: + # test_case_result = self.run_column_test( + # table=table, + # orm_table=orm_table, + # column=column_test.columnName, + # test_case=column_test.testCase, + # profiler_results=profiler_results, + # ) + # if test_case_result: + # create_column_test = CreateColumnTestRequest( + # columnName=column_test.columnName, + # description=column_test.description, + # testCase=column_test.testCase, + # executionFrequency=column_test.executionFrequency, + # owner=column_test.owner, + # result=test_case_result, + # ) + # record_tests.column_tests.append(create_column_test) - return record_tests + # return record_tests - def fetch_sample_data( - self, - orm: DeclarativeMeta, - table: Table, - ) -> TableData: + def fetch_sample_data(self) -> TableData: """ Fetch the table data from a real sample :param orm: SQA ORM table @@ -749,50 +668,32 @@ class OrmProfilerProcessor(Processor[Table]): return self.processor_interface.fetch_sample_data() except Exception as err: logger.error( - f"Could not obtain sample data from {orm.__tablename__} - {err}" + f"Could not obtain sample data from {self.processor_interface.table.__tablename__} - {err}" ) def process( self, record: Table, generate_sample_data: bool = True, + entity_profile: Optional[dict] = None, ) -> ProfilerResponse: """ Run the profiling and tests """ - # Convert entity to ORM. Fetch the db by ID to make sure we use the proper db name - - orm_table = ometa_to_orm(table=record, metadata=self.metadata) - self.processor_interface.create_sampler( - table=orm_table, - profile_sample=None, - partition_details=self.get_partition_details(orm_table, record), - profile_sample_query=None, - ) - self.processor_interface.create_runner( - table=orm_table, - partition_details=self.get_partition_details(orm_table, record), - profile_sample_query=None, - ) - - entity_profile = self.profile_entity(orm=orm_table, table=record) - # First, check if we have any tests directly configured in the workflow test_results = None if self.config.testSuites: test_results = self.validate_config_tests( - orm=orm_table, table=record, profiler_results=entity_profile + table=record, profiler_results=entity_profile ) + + # Need to re-implement the logic with https://github.com/open-metadata/OpenMetadata/issues/5831 # # Then, Check if the entity has any tests # record_tests = self.validate_entity_tests( # record, orm_table, entity_profile, config_tests # ) - sample_data = ( - self.fetch_sample_data(orm=orm_table, table=record) - if generate_sample_data - else None - ) + sample_data = self.fetch_sample_data() if generate_sample_data else None res = ProfilerResponse( table=record, diff --git a/ingestion/src/metadata/orm_profiler/profiler/core.py b/ingestion/src/metadata/orm_profiler/profiler/core.py index 4cb569e9971..d146ddcb84c 100644 --- a/ingestion/src/metadata/orm_profiler/profiler/core.py +++ b/ingestion/src/metadata/orm_profiler/profiler/core.py @@ -22,11 +22,8 @@ from sqlalchemy.orm import DeclarativeMeta from sqlalchemy.orm.session import Session from typing_extensions import Self -from metadata.generated.schema.entity.data.table import ( - ColumnProfile, - Table, - TableProfile, -) +from metadata.generated.schema.entity.data.table import ColumnProfile, TableProfile +from metadata.ingestion.api.processor import ProcessorStatus from metadata.orm_profiler.interfaces.interface_protocol import InterfaceProtocol from metadata.orm_profiler.interfaces.sqa_profiler_interface import SQAProfilerInterface from metadata.orm_profiler.metrics.core import ( @@ -38,10 +35,8 @@ from metadata.orm_profiler.metrics.core import ( StaticMetric, TMetric, ) -from metadata.orm_profiler.metrics.static.column_names import ColumnNames from metadata.orm_profiler.metrics.static.row_count import RowCount from metadata.orm_profiler.orm.registry import NOT_COMPUTE -from metadata.utils.constants import TEN_MIN from metadata.utils.logger import profiler_logger logger = profiler_logger() @@ -70,15 +65,7 @@ class Profiler(Generic[TMetric]): self, *metrics: Type[TMetric], profiler_interface: InterfaceProtocol, - table: DeclarativeMeta, - table_entity: Table, profile_date: datetime = datetime.now(), - ignore_cols: Optional[List[str]] = None, - use_cols: Optional[List[Column]] = None, - profile_sample: Optional[float] = None, - timeout_seconds: Optional[int] = TEN_MIN, - partition_details: Optional[Dict] = None, - profile_sample_query: Optional[str] = None, ): """ :param metrics: Metrics to run. We are receiving the uninitialized classes @@ -88,19 +75,10 @@ class Profiler(Generic[TMetric]): :param profile_sample: % of rows to use for sampling column metrics """ - if not isinstance(table, DeclarativeMeta): - raise ValueError(f"Table {table} should be a DeclarativeMeta.") - self.profiler_interface = profiler_interface - self._table = table - self.table_entity = table_entity self._metrics = metrics - self._ignore_cols = ignore_cols - self._use_cols = use_cols - self._profile_sample = profile_sample self._profile_date = profile_date - self._partition_details = partition_details - self._profile_sample_query = profile_sample_query + self.status = ProcessorStatus() self.validate_composed_metric() @@ -128,7 +106,7 @@ class Profiler(Generic[TMetric]): @property def table(self) -> DeclarativeMeta: - return self._table + return self.profiler_interface.table @property def metrics(self) -> Tuple[Type[TMetric], ...]: @@ -136,7 +114,7 @@ class Profiler(Generic[TMetric]): @property def ignore_cols(self) -> List[str]: - return self._ignore_cols + return self._get_excluded_columns() @property def use_cols(self) -> List[Column]: @@ -147,7 +125,7 @@ class Profiler(Generic[TMetric]): instead of picking up all table's columns and ignoring the specified ones. """ - return self._use_cols + return self._get_included_columns() @property def profile_date(self) -> datetime: @@ -166,14 +144,14 @@ class Profiler(Generic[TMetric]): if self._get_included_columns(): self._columns = [ column - for column in inspect(self.table).c + for column in inspect(self.profiler_interface.table).c if column.name in self._get_included_columns() ] if not self._get_included_columns(): self._columns = [ column - for column in inspect(self.table).c + for column in self.profiler_interface.get_columns() if column.name not in self._get_excluded_columns() ] @@ -182,24 +160,24 @@ class Profiler(Generic[TMetric]): def _get_excluded_columns(self) -> Set[str]: """Get excluded columns from table""" if ( - self.table_entity.tableProfilerConfig - and self.table_entity.tableProfilerConfig.excludeColumns + self.profiler_interface.table_entity.tableProfilerConfig + and self.profiler_interface.table_entity.tableProfilerConfig.excludeColumns ): return { excl_cln - for excl_cln in self.table_entity.tableProfilerConfig.excludeColumns + for excl_cln in self.profiler_interface.table_entity.tableProfilerConfig.excludeColumns } return {} def _get_included_columns(self) -> Optional[Set[str]]: if ( - self.table_entity.tableProfilerConfig - and self.table_entity.tableProfilerConfig.includeColumns + self.profiler_interface.table_entity.tableProfilerConfig + and self.profiler_interface.table_entity.tableProfilerConfig.includeColumns ): return { incl_cln.columnName - for incl_cln in self.table_entity.tableProfilerConfig.includeColumns + for incl_cln in self.profiler_interface.table_entity.tableProfilerConfig.includeColumns if incl_cln.columnName not in self._get_excluded_columns() } @@ -238,12 +216,12 @@ class Profiler(Generic[TMetric]): return [metric for metric in metrics if metric.is_col_metric()] if ( - self.table_entity.tableProfilerConfig - and self.table_entity.tableProfilerConfig.includeColumns + self.profiler_interface.table_entity.tableProfilerConfig + and self.profiler_interface.table_entity.tableProfilerConfig.includeColumns ): metric_names = ( metric_array - for col_name, metric_array in self.table_entity.tableProfilerConfig.includeColumns + for col_name, metric_array in self.profiler_interface.table_entity.tableProfilerConfig.includeColumns if col_name == column ) if metric_names: @@ -308,10 +286,7 @@ class Profiler(Generic[TMetric]): metrics, # metric functions MetricTypes.Table, # metric type for function mapping None, # column name - self.table, # ORM table object - self._profile_sample, # profile sample - self._partition_details, # partition details if any - self._profile_sample_query, # profile sample query + self.table, # table name ) ] return [] @@ -336,9 +311,6 @@ class Profiler(Generic[TMetric]): MetricTypes.Static, column, self.table, - self._profile_sample, - self._partition_details, - self._profile_sample_query, ) for column in columns ], @@ -348,9 +320,6 @@ class Profiler(Generic[TMetric]): MetricTypes.Query, column, self.table, - self._profile_sample, - self._partition_details, - self._profile_sample_query, ) for column in self.columns for metric in self.get_col_metrics(self.query_metrics, column) @@ -361,9 +330,6 @@ class Profiler(Generic[TMetric]): MetricTypes.Window, column, self.table, - self._profile_sample, - self._partition_details, - self._profile_sample_query, ) for column in self.columns for metric in [ @@ -400,6 +366,22 @@ class Profiler(Generic[TMetric]): return self + def process(self) -> TableProfile: + """ + Given a table, we will prepare the profiler for + all its columns and return all the run profilers + in a Dict in the shape {col_name: Profiler} + """ + logger.info( + f"Executing profilers for {self.profiler_interface.table_entity.fullyQualifiedName.__root__}..." + ) + self.execute() + + self.status.processed( + self.profiler_interface.table_entity.fullyQualifiedName.__root__ + ) + return self.get_profile() + def get_profile(self) -> TableProfile: """ After executing the profiler, get all results @@ -439,8 +421,8 @@ class Profiler(Generic[TMetric]): columnCount=self._table_results.get("columnCount"), rowCount=self._table_results.get(RowCount.name()), columnProfile=computed_profiles, - profileQuery=self._profile_sample_query, - profileSample=self._profile_sample, + profileQuery=self.profiler_interface._profile_sample_query, + profileSample=self.profiler_interface._profile_sample, ) return profile diff --git a/ingestion/src/metadata/orm_profiler/profiler/default.py b/ingestion/src/metadata/orm_profiler/profiler/default.py index a19bfa721a5..9d162dc03e3 100644 --- a/ingestion/src/metadata/orm_profiler/profiler/default.py +++ b/ingestion/src/metadata/orm_profiler/profiler/default.py @@ -12,17 +12,14 @@ """ Default simple profiler to use """ -from datetime import datetime -from typing import Dict, List, Optional +from typing import List from sqlalchemy.orm import DeclarativeMeta -from metadata.generated.schema.entity.data.table import Table from metadata.orm_profiler.interfaces.sqa_profiler_interface import SQAProfilerInterface from metadata.orm_profiler.metrics.core import Metric, add_props from metadata.orm_profiler.metrics.registry import Metrics from metadata.orm_profiler.profiler.core import Profiler -from metadata.utils.constants import TEN_MIN def get_default_metrics(table: DeclarativeMeta) -> List[Metric]: @@ -61,27 +58,11 @@ class DefaultProfiler(Profiler): def __init__( self, profiler_interface: SQAProfilerInterface, - table: DeclarativeMeta, - table_entity: Table, - ignore_cols: Optional[List[str]] = None, - profile_date: datetime = datetime.now(), - profile_sample: Optional[float] = None, - timeout_seconds: Optional[int] = TEN_MIN, - partition_details: Optional[Dict] = None, - profile_sample_query: Optional[str] = None, ): - _metrics = get_default_metrics(table) + _metrics = get_default_metrics(profiler_interface.table) super().__init__( *_metrics, profiler_interface=profiler_interface, - table=table, - table_entity=table_entity, - ignore_cols=ignore_cols, - profile_date=profile_date, - profile_sample=profile_sample, - timeout_seconds=timeout_seconds, - partition_details=partition_details, - profile_sample_query=profile_sample_query, ) diff --git a/ingestion/src/metadata/orm_profiler/validations/models.py b/ingestion/src/metadata/orm_profiler/validations/models.py index be181eee8b7..697b5dfb445 100644 --- a/ingestion/src/metadata/orm_profiler/validations/models.py +++ b/ingestion/src/metadata/orm_profiler/validations/models.py @@ -20,6 +20,17 @@ from metadata.generated.schema.tests.testCase import TestCaseParameterValue from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Markdown +class TableConfig(ConfigModel): + """table profile config""" + + sample: Optional[float] = None + partition_field: Optional[str] = None + partition_query_duration: Optional[int] = 1 + partition_values: Optional[List] = None + sample_query: Optional[str] = None + clear_sample_query_from_entity: bool = False + + class TestCase(ConfigModel): """ cli testcases @@ -30,30 +41,7 @@ class TestCase(ConfigModel): testDefinitionName: str fullyQualifiedName: FullyQualifiedEntityName parameterValues: List[TestCaseParameterValue] - - -class TestDef(ConfigModel): - """ - Table test definition - - We expect: - - table name - - Profile sample - - List of table tests - - List of column tests - - We will run a PUT using the info given in the JSON - workflow to update the Table definition based - on the incoming properties. - """ - - testCase: Optional[List[TestCase]] = None - profile_sample: Optional[float] = None - partition_field: Optional[str] = None - partition_query_duration: Optional[int] = 1 - partition_values: Optional[List] = None - profile_sample_query: Optional[str] = None - clear_sample_query_from_entity: bool = False + tableProfileConfig: Optional[TableConfig] = None class TestSuite(ConfigModel): diff --git a/ingestion/src/metadata/utils/processor_config_helper.py b/ingestion/src/metadata/utils/processor_config_helper.py new file mode 100644 index 00000000000..bce2efb82b5 --- /dev/null +++ b/ingestion/src/metadata/utils/processor_config_helper.py @@ -0,0 +1,36 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Helper for the procoessor.config yaml/json config file +""" + +from typing import List, Optional + +from metadata.generated.schema.entity.data.table import Table +from metadata.orm_profiler.validations.models import TestCase + + +def get_record_test_def(table: Table) -> List[Optional[TestCase]]: + """ + Fetch a record from the Workflow JSON config + if the processed table is informed there. + + :param table: Processed table + :return: Test definition + """ + my_record_tests = [ + test_case + for test_suite in self.config.testSuites + for test_case in test_suite.testCases + if test_case.fullyQualifiedName == table.fullyQualifiedName.__root__ + ] + return my_record_tests diff --git a/ingestion/tests/unit/profiler/conftest.py b/ingestion/tests/unit/profiler/conftest.py index 11d56e18040..6bd2dbd423c 100644 --- a/ingestion/tests/unit/profiler/conftest.py +++ b/ingestion/tests/unit/profiler/conftest.py @@ -23,9 +23,10 @@ from metadata.generated.schema.entity.data.table import Column, DataType, Table from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.tests.testCase import TestCaseParameterValue from metadata.orm_profiler.api.models import ProfilerProcessorConfig from metadata.orm_profiler.processor.orm_profiler import OrmProfilerProcessor -from metadata.orm_profiler.validations.models import TestDef, TestSuite +from metadata.orm_profiler.validations.models import TestCase, TestSuite from metadata.utils.connections import create_and_bind_session @@ -46,8 +47,23 @@ def base_profiler_processor_config(): return ProfilerProcessorConfig( test_suite=TestSuite( name="test suite", - tests=[ - TestDef(table="my.awesome.table"), + testCases=[ + TestCase( + name="my_test_case", + testDefinitionName="TableColumnCountToBeBetween", + fullyQualifiedName="fqn.table", + paramterValues=[ + TestCaseParameterValue( + name="minColValue", + value=5, + ), + TestCaseParameterValue( + name="maxColValue", + value=10, + ), + ], + table="my.awesome.table", + ), ], ) ) diff --git a/ingestion/tests/unit/profiler/test_metrics.py b/ingestion/tests/unit/profiler/test_metrics.py index d3eb22ec45a..e81016c646d 100644 --- a/ingestion/tests/unit/profiler/test_metrics.py +++ b/ingestion/tests/unit/profiler/test_metrics.py @@ -60,8 +60,7 @@ class MetricsTest(TestCase): scheme=SQLiteScheme.sqlite_pysqlite, databaseMode=db_path + "?check_same_thread=False", ) - sqa_profiler_interface = SQAProfilerInterface(sqlite_conn) - engine = sqa_profiler_interface.session.get_bind() + table_entity = Table( id=uuid4(), name="user", @@ -72,6 +71,10 @@ class MetricsTest(TestCase): ) ], ) + sqa_profiler_interface = SQAProfilerInterface( + sqlite_conn, table=User, table_entity=table_entity + ) + engine = sqa_profiler_interface.session.get_bind() @classmethod def setUpClass(cls) -> None: @@ -115,10 +118,6 @@ class MetricsTest(TestCase): cls.sqa_profiler_interface.session.add_all(data) cls.sqa_profiler_interface.session.commit() - def setUp(self) -> None: - self.sqa_profiler_interface.create_sampler(User) - self.sqa_profiler_interface.create_runner(User) - def test_count(self): """ Check the Count metric @@ -127,9 +126,6 @@ class MetricsTest(TestCase): profiler = Profiler( count, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) res = profiler.execute()._column_results @@ -144,9 +140,6 @@ class MetricsTest(TestCase): profiler = Profiler( min_age, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.age], - table_entity=self.table_entity, ) res = profiler.execute()._column_results @@ -161,9 +154,6 @@ class MetricsTest(TestCase): profiler = Profiler( std_age, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.age], - table_entity=self.table_entity, ) res = profiler.execute()._column_results # SQLITE STD custom implementation returns the squared STD. @@ -178,9 +168,6 @@ class MetricsTest(TestCase): profiler = Profiler( earliest_time, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.dob, User.tob, User.doe], - table_entity=self.table_entity, ) res = profiler.execute()._column_results assert ( @@ -197,9 +184,6 @@ class MetricsTest(TestCase): profiler = Profiler( latest_time, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.dob, User.tob, User.doe], - table_entity=self.table_entity, ) res = profiler.execute()._column_results assert ( @@ -216,9 +200,6 @@ class MetricsTest(TestCase): profiler = Profiler( null_count, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.nickname], - table_entity=self.table_entity, ) res = profiler.execute()._column_results @@ -239,9 +220,6 @@ class MetricsTest(TestCase): null_count, null_ratio, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.nickname], - table_entity=self.table_entity, ) res = profiler.execute()._column_results assert ( @@ -257,8 +235,6 @@ class MetricsTest(TestCase): profiler = Profiler( table_count, profiler_interface=self.sqa_profiler_interface, - table=User, - table_entity=self.table_entity, ) res = profiler.execute()._table_results assert res.get(Metrics.ROW_COUNT.name) == 3 @@ -271,8 +247,6 @@ class MetricsTest(TestCase): profiler = Profiler( col_count, profiler_interface=self.sqa_profiler_interface, - table=User, - table_entity=self.table_entity, ) res = profiler.execute()._table_results assert res.get(Metrics.COLUMN_COUNT.name) == 9 @@ -288,9 +262,6 @@ class MetricsTest(TestCase): Profiler( avg, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.age], - table_entity=self.table_entity, ) .execute() ._column_results @@ -304,9 +275,6 @@ class MetricsTest(TestCase): Profiler( avg, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -320,9 +288,6 @@ class MetricsTest(TestCase): Profiler( avg, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.comments], - table_entity=self.table_entity, ) .execute() ._column_results @@ -343,9 +308,6 @@ class MetricsTest(TestCase): unique, dup_count, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.age], - table_entity=self.table_entity, ) .execute() ._column_results @@ -363,9 +325,6 @@ class MetricsTest(TestCase): Profiler( hist, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.age], - table_entity=self.table_entity, ) .execute() ._column_results @@ -388,9 +347,6 @@ class MetricsTest(TestCase): Profiler( like, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -403,9 +359,6 @@ class MetricsTest(TestCase): Profiler( like, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -431,9 +384,6 @@ class MetricsTest(TestCase): Profiler( ilike, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -446,9 +396,6 @@ class MetricsTest(TestCase): Profiler( ilike, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -469,9 +416,6 @@ class MetricsTest(TestCase): count, like_ratio, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -494,9 +438,6 @@ class MetricsTest(TestCase): count, ilike_ratio, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -514,9 +455,6 @@ class MetricsTest(TestCase): Profiler( _max, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.age], - table_entity=self.table_entity, ) .execute() ._column_results @@ -536,9 +474,6 @@ class MetricsTest(TestCase): Profiler( min_length, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.age], - table_entity=self.table_entity, ) .execute() ._column_results @@ -551,9 +486,6 @@ class MetricsTest(TestCase): Profiler( min_length, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -566,9 +498,6 @@ class MetricsTest(TestCase): Profiler( min_length, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.comments], - table_entity=self.table_entity, ) .execute() ._column_results @@ -587,9 +516,6 @@ class MetricsTest(TestCase): Profiler( max_length, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.age], - table_entity=self.table_entity, ) .execute() ._column_results @@ -602,9 +528,6 @@ class MetricsTest(TestCase): Profiler( max_length, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -617,9 +540,6 @@ class MetricsTest(TestCase): Profiler( max_length, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.comments], - table_entity=self.table_entity, ) .execute() ._column_results @@ -637,9 +557,6 @@ class MetricsTest(TestCase): Profiler( _sum, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.age], - table_entity=self.table_entity, ) .execute() ._column_results @@ -651,9 +568,6 @@ class MetricsTest(TestCase): Profiler( _sum, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -670,9 +584,6 @@ class MetricsTest(TestCase): Profiler( unique_count, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -693,9 +604,6 @@ class MetricsTest(TestCase): unique_count, unique_ratio, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -714,9 +622,6 @@ class MetricsTest(TestCase): Profiler( count, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -737,9 +642,6 @@ class MetricsTest(TestCase): distinct_count, distinct_ratio, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -760,9 +662,6 @@ class MetricsTest(TestCase): Profiler( set_count, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -775,9 +674,6 @@ class MetricsTest(TestCase): Profiler( set_count, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -799,19 +695,17 @@ class MetricsTest(TestCase): comments = Column(TEXT) age = Column(Integer) - self.sqa_profiler_interface.create_sampler(EmptyUser) - self.sqa_profiler_interface.create_runner(EmptyUser) - EmptyUser.__table__.create(bind=self.engine) + sqa_profiler_interface = SQAProfilerInterface( + self.sqlite_conn, table=EmptyUser, table_entity=self.table_entity + ) + hist = add_props(bins=5)(Metrics.HISTOGRAM.value) res = ( Profiler( hist, - profiler_interface=self.sqa_profiler_interface, - table=EmptyUser, - use_cols=[EmptyUser.age], - table_entity=self.table_entity, + profiler_interface=sqa_profiler_interface, ) .execute() ._column_results @@ -842,9 +736,6 @@ class MetricsTest(TestCase): Profiler( not_like, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) .execute() ._column_results @@ -862,9 +753,6 @@ class MetricsTest(TestCase): Profiler( median, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.age], - table_entity=self.table_entity, ) .execute() ._column_results diff --git a/ingestion/tests/unit/profiler/test_profiler.py b/ingestion/tests/unit/profiler/test_profiler.py index 73d1250210b..a158138cd37 100644 --- a/ingestion/tests/unit/profiler/test_profiler.py +++ b/ingestion/tests/unit/profiler/test_profiler.py @@ -63,7 +63,7 @@ class ProfilerTest(TestCase): scheme=SQLiteScheme.sqlite_pysqlite, databaseMode=db_path + "?check_same_thread=False", ) - sqa_profiler_interface = SQAProfilerInterface(sqlite_conn) + table_entity = Table( id=uuid4(), name="user", @@ -74,6 +74,9 @@ class ProfilerTest(TestCase): ) ], ) + sqa_profiler_interface = SQAProfilerInterface( + sqlite_conn, table=User, table_entity=table_entity + ) @classmethod def setUpClass(cls) -> None: @@ -89,18 +92,12 @@ class ProfilerTest(TestCase): cls.sqa_profiler_interface.session.add_all(data) cls.sqa_profiler_interface.session.commit() - def setUp(self) -> None: - self.sqa_profiler_interface.create_sampler(User) - self.sqa_profiler_interface.create_runner(User) - def test_default_profiler(self): """ Check our pre-cooked profiler """ simple = DefaultProfiler( profiler_interface=self.sqa_profiler_interface, - table=User, - table_entity=self.table_entity, ) simple.execute() @@ -160,9 +157,6 @@ class ProfilerTest(TestCase): count, like_ratio, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.age], - table_entity=self.table_entity, ) with pytest.raises(MissingMetricException): @@ -171,9 +165,6 @@ class ProfilerTest(TestCase): like, like_ratio, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.age], - table_entity=self.table_entity, ) def test_skipped_types(self): @@ -194,15 +185,6 @@ class ProfilerTest(TestCase): profiler = Profiler( Metrics.COUNT.value, profiler_interface=self.sqa_profiler_interface, - table=NotCompute, - use_cols=[ - NotCompute.null_col, - NotCompute.array_col, - NotCompute.json_col, - NotCompute.map_col, - NotCompute.struct_col, - ], - table_entity=self.table_entity, ) assert not profiler.column_results diff --git a/ingestion/tests/unit/profiler/test_sample.py b/ingestion/tests/unit/profiler/test_sample.py index 4de94b07a23..e3ebd36b513 100644 --- a/ingestion/tests/unit/profiler/test_sample.py +++ b/ingestion/tests/unit/profiler/test_sample.py @@ -56,9 +56,6 @@ class SampleTest(TestCase): scheme=SQLiteScheme.sqlite_pysqlite, databaseMode=db_path + "?check_same_thread=False", ) - sqa_profiler_interface = SQAProfilerInterface(sqlite_conn) - engine = sqa_profiler_interface.session.get_bind() - session = sqa_profiler_interface.session table_entity = Table( id=uuid4(), @@ -71,6 +68,12 @@ class SampleTest(TestCase): ], ) + sqa_profiler_interface = SQAProfilerInterface( + sqlite_conn, table=User, table_entity=table_entity + ) + engine = sqa_profiler_interface.session.get_bind() + session = sqa_profiler_interface.session + @classmethod def setUpClass(cls) -> None: """ @@ -121,15 +124,17 @@ class SampleTest(TestCase): Sample property should be properly generated """ - self.sqa_profiler_interface.create_sampler(User, profile_sample=50.0) - # Randomly pick table_count to init the Profiler, we don't care for this test table_count = Metrics.ROW_COUNT.value - profiler = Profiler( - table_count, - profiler_interface=self.sqa_profiler_interface, + sqa_profiler_interface = SQAProfilerInterface( + self.sqlite_conn, table=User, table_entity=self.table_entity, + workflow_profile_sample=50, + ) + profiler = Profiler( + table_count, + profiler_interface=sqa_profiler_interface, ) res = self.session.query(func.count()).select_from(profiler.sample).first() @@ -144,9 +149,6 @@ class SampleTest(TestCase): profiler = Profiler( table_count, profiler_interface=self.sqa_profiler_interface, - table=User, - profile_sample=50.0, - table_entity=self.table_entity, ) res = profiler.execute()._table_results assert res.get(Metrics.ROW_COUNT.name) == 30 @@ -160,24 +162,17 @@ class SampleTest(TestCase): """ count = Metrics.COUNT.value - profiler = Profiler( - count, - profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - profile_sample=50, - table_entity=self.table_entity, - ) - res = profiler.execute()._column_results - assert res.get(User.name.name)[Metrics.COUNT.name] < 30 + # Need to re-implement the logic with https://github.com/open-metadata/OpenMetadata/issues/5831 + # profiler = Profiler( + # count, + # profiler_interface=self.sqa_profiler_interface, + # ) + # res = profiler.execute()._column_results + # assert res.get(User.name.name)[Metrics.COUNT.name] < 30 profiler = Profiler( count, profiler_interface=self.sqa_profiler_interface, - table=User, - profile_sample=100.0, - use_cols=[User.name], - table_entity=self.table_entity, ) res = profiler.execute()._column_results assert res.get(User.name.name)[Metrics.COUNT.name] == 30 @@ -187,26 +182,19 @@ class SampleTest(TestCase): Histogram should run correctly """ hist = Metrics.HISTOGRAM.value - profiler = Profiler( - hist, - profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.id], - profile_sample=50.0, - table_entity=self.table_entity, - ) - res = profiler.execute()._column_results + # Need to re-implement the logic with https://github.com/open-metadata/OpenMetadata/issues/5831 + # profiler = Profiler( + # hist, + # profiler_interface=self.sqa_profiler_interface, + # ) + # res = profiler.execute()._column_results - # The sum of all frequencies should be sampled - assert sum(res.get(User.id.name)[Metrics.HISTOGRAM.name]["frequencies"]) < 30 + # # The sum of all frequencies should be sampled + # assert sum(res.get(User.id.name)[Metrics.HISTOGRAM.name]["frequencies"]) < 30 profiler = Profiler( hist, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.id], - profile_sample=100.0, - table_entity=self.table_entity, ) res = profiler.execute()._column_results @@ -218,19 +206,10 @@ class SampleTest(TestCase): Unique count should run correctly """ - self.sqa_profiler_interface.create_sampler( - User, - profile_sample=50.0, - ) - self.sqa_profiler_interface.create_runner(User) - hist = Metrics.UNIQUE_COUNT.value profiler = Profiler( hist, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) res = profiler.execute()._column_results @@ -238,18 +217,9 @@ class SampleTest(TestCase): # This tests might very rarely, fail, depending on the sampled random data. assert res.get(User.name.name)[Metrics.UNIQUE_COUNT.name] <= 1 - self.sqa_profiler_interface.create_sampler( - User, - profile_sample=100.0, - ) - self.sqa_profiler_interface.create_runner(User) - profiler = Profiler( hist, profiler_interface=self.sqa_profiler_interface, - table=User, - use_cols=[User.name], - table_entity=self.table_entity, ) res = profiler.execute()._column_results diff --git a/ingestion/tests/unit/profiler/test_session_validations.py b/ingestion/tests/unit/profiler/test_session_validations.py index be89b414f0e..85c22be783b 100644 --- a/ingestion/tests/unit/profiler/test_session_validations.py +++ b/ingestion/tests/unit/profiler/test_session_validations.py @@ -63,7 +63,7 @@ class SessionValidation(UnitestTestCase): """ sqlite_conn = SQLiteConnection(scheme=SQLiteScheme.sqlite_pysqlite) - sqa_profiler_interface = SQAProfilerInterface(sqlite_conn) + sqa_profiler_interface = SQAProfilerInterface(sqlite_conn, table=User) engine = sqa_profiler_interface.session.get_bind() session = sqa_profiler_interface.session @@ -93,10 +93,6 @@ class SessionValidation(UnitestTestCase): cls.session.add_all(data) cls.session.commit() - def setUp(self) -> None: - self.sqa_profiler_interface.create_sampler(User) - self.sqa_profiler_interface.create_runner(User) - def test_column_values_not_in_set(self): """ Check that the metric runs and the results are correctly validated diff --git a/ingestion/tests/unit/profiler/test_sqa_profiler_interface.py b/ingestion/tests/unit/profiler/test_sqa_profiler_interface.py index 3cbec285062..b4450a182cf 100644 --- a/ingestion/tests/unit/profiler/test_sqa_profiler_interface.py +++ b/ingestion/tests/unit/profiler/test_sqa_profiler_interface.py @@ -35,8 +35,6 @@ from metadata.orm_profiler.metrics.core import ( ) from metadata.orm_profiler.metrics.static.row_count import RowCount from metadata.orm_profiler.profiler.default import get_default_metrics -from metadata.orm_profiler.profiler.runner import QueryRunner -from metadata.orm_profiler.profiler.sampler import Sampler class User(declarative_base()): @@ -54,34 +52,16 @@ class SQAProfilerInterfaceTest(TestCase): sqlite_conn = SQLiteConnection( scheme=SQLiteScheme.sqlite_pysqlite, ) - self.sqa_profiler_interface = SQAProfilerInterface(sqlite_conn) + self.sqa_profiler_interface = SQAProfilerInterface(sqlite_conn, table=User) self.table = User def test_init_interface(self): """Test we can instantiate our interface object correctly""" - assert self.sqa_profiler_interface._sampler == None - assert self.sqa_profiler_interface._runner == None + assert self.sqa_profiler_interface._sampler != None + assert self.sqa_profiler_interface._runner != None assert isinstance(self.sqa_profiler_interface.session, Session) - def test_create_sampler(self): - """Test we can create our sampler correctly""" - self.sqa_profiler_interface.create_sampler( - self.table, - ) - - assert isinstance(self.sqa_profiler_interface.sampler, Sampler) - - def test_create_runner(self): - """Test we can create our sampler correctly""" - - with raises(RuntimeError): - self.sqa_profiler_interface.create_runner(self.table) - - self.sqa_profiler_interface.create_sampler(self.table) - self.sqa_profiler_interface.create_runner(self.table) - assert isinstance(self.sqa_profiler_interface.runner, QueryRunner) - def test_private_attributes(self): with raises(AttributeError): self.sqa_profiler_interface.runner = None @@ -99,7 +79,7 @@ class SQAProfilerInterfaceTestMultiThread(TestCase): scheme=SQLiteScheme.sqlite_pysqlite, databaseMode=db_path + "?check_same_thread=False", ) - sqa_profiler_interface = SQAProfilerInterface(sqlite_conn) + sqa_profiler_interface = SQAProfilerInterface(sqlite_conn, table=User) @classmethod def setUpClass(cls) -> None: @@ -136,8 +116,8 @@ class SQAProfilerInterfaceTestMultiThread(TestCase): def test_init_interface(self): """Test we can instantiate our interface object correctly""" - assert self.sqa_profiler_interface._sampler == None - assert self.sqa_profiler_interface._runner == None + assert self.sqa_profiler_interface._sampler != None + assert self.sqa_profiler_interface._runner != None assert isinstance(self.sqa_profiler_interface.session, Session) def test_get_all_metrics(self): @@ -147,9 +127,6 @@ class SQAProfilerInterfaceTestMultiThread(TestCase): MetricTypes.Table, None, self.table, - None, - None, - None, ) ] column_metrics = [] @@ -166,9 +143,6 @@ class SQAProfilerInterfaceTestMultiThread(TestCase): MetricTypes.Static, col, self.table, - None, - None, - None, ) ) for query_metric in self.query_metrics: @@ -178,9 +152,6 @@ class SQAProfilerInterfaceTestMultiThread(TestCase): MetricTypes.Query, col, self.table, - None, - None, - None, ) ) for window_metric in self.window_metrics: @@ -190,9 +161,6 @@ class SQAProfilerInterfaceTestMultiThread(TestCase): MetricTypes.Window, col, self.table, - None, - None, - None, ) ) diff --git a/ingestion/tests/unit/profiler/test_workflow.py b/ingestion/tests/unit/profiler/test_workflow.py index 4961a4279e8..60f6abfc506 100644 --- a/ingestion/tests/unit/profiler/test_workflow.py +++ b/ingestion/tests/unit/profiler/test_workflow.py @@ -34,10 +34,28 @@ from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline ) from metadata.generated.schema.type.entityReference import EntityReference from metadata.orm_profiler.api.workflow import ProfilerWorkflow +from metadata.orm_profiler.interfaces.sqa_profiler_interface import SQAProfilerInterface from metadata.orm_profiler.processor.orm_profiler import OrmProfilerProcessor from metadata.orm_profiler.profiler.default import DefaultProfiler from metadata.orm_profiler.profiler.models import ProfilerDef +TABLE = Table( + id=uuid.uuid4(), + name="users", + fullyQualifiedName="service.db.users", + columns=[ + Column(name="id", dataType=DataType.INT), + Column(name="name", dataType=DataType.STRING), + Column(name="fullname", dataType=DataType.STRING), + Column(name="nickname", dataType=DataType.STRING), + Column(name="age", dataType=DataType.INT), + ], + database=EntityReference(id=uuid.uuid4(), name="db", type="database"), + tableProfilerConfig=TableProfilerConfig( + profilerCo=80.0, + ), +) + config = { "source": { "type": "sqlite", @@ -55,13 +73,29 @@ config = { }, } +Base = declarative_base() + +class User(Base): + __tablename__ = "users" + id = sqa.Column(sqa.Integer, primary_key=True) + name = sqa.Column(sqa.String(256)) + fullname = sqa.Column(sqa.String(256)) + nickname = sqa.Column(sqa.String(256)) + age = sqa.Column(sqa.Integer) + + +@patch.object( + SQAProfilerInterface, + "_convert_table_to_orm_object", + return_value=User, +) @patch.object( ProfilerWorkflow, "_validate_service_name", return_value=True, ) -def test_init_workflow(mocked_method): +def test_init_workflow(mocked_method, mocked_orm): """ We can initialise the workflow from a config """ @@ -71,11 +105,15 @@ def test_init_workflow(mocked_method): assert isinstance(workflow.source_config, DatabaseServiceProfilerPipeline) assert isinstance(workflow.metadata_config, OpenMetadataConnection) - workflow.create_processor(workflow.config.source.serviceConnection.__root__.config) + workflow.create_profiler_interface( + workflow.config.source.serviceConnection.__root__.config, + TABLE, + ) + workflow.create_processor_obj() - assert isinstance(workflow.processor, OrmProfilerProcessor) - assert workflow.processor.config.profiler is None - assert workflow.processor.config.testSuites is None + assert isinstance(workflow.processor_obj, OrmProfilerProcessor) + assert workflow.processor_obj.config.profiler is None + assert workflow.processor_obj.config.testSuites is None @patch.object( @@ -155,12 +193,17 @@ def test_filter_entities(mocked_method): assert len(list(include_workflow.filter_entities(all_tables))) == 3 +@patch.object( + SQAProfilerInterface, + "_convert_table_to_orm_object", + return_value=User, +) @patch.object( ProfilerWorkflow, "_validate_service_name", return_value=True, ) -def test_profile_def(mocked_method): +def test_profile_def(mocked_method, mocked_orm): """ Validate the definitions of the profile in the JSON """ @@ -172,9 +215,11 @@ def test_profile_def(mocked_method): profile_workflow = ProfilerWorkflow.create(profile_config) mocked_method.assert_called() - profile_workflow.create_processor( - profile_workflow.config.source.serviceConnection.__root__.config + profile_workflow.create_profiler_interface( + profile_workflow.config.source.serviceConnection.__root__.config, + TABLE, ) + profile_workflow.create_processor_obj() profile_definition = ProfilerDef( name="my_profiler", @@ -183,16 +228,21 @@ def test_profile_def(mocked_method): custom_metrics=None, ) - assert isinstance(profile_workflow.processor, OrmProfilerProcessor) - assert profile_workflow.processor.config.profiler == profile_definition + assert isinstance(profile_workflow.processor_obj, OrmProfilerProcessor) + assert profile_workflow.processor_obj.config.profiler == profile_definition +@patch.object( + SQAProfilerInterface, + "_convert_table_to_orm_object", + return_value=User, +) @patch.object( ProfilerWorkflow, "_validate_service_name", return_value=True, ) -def test_default_profile_def(mocked_method): +def test_default_profile_def(mocked_method, mocked_orm): """ If no information is specified for the profiler, let's use the SimpleTableProfiler and SimpleProfiler @@ -200,42 +250,19 @@ def test_default_profile_def(mocked_method): profile_workflow = ProfilerWorkflow.create(config) mocked_method.assert_called() - profile_workflow.create_processor( - profile_workflow.config.source.serviceConnection.__root__.config + + profile_workflow.create_profiler_interface( + profile_workflow.config.source.serviceConnection.__root__.config, TABLE ) + profile_workflow.create_processor_obj() - assert isinstance(profile_workflow.processor, OrmProfilerProcessor) - assert profile_workflow.processor.config.profiler is None + assert isinstance(profile_workflow.processor_obj, OrmProfilerProcessor) + assert profile_workflow.processor_obj.config.profiler is None - Base = declarative_base() - - class User(Base): - __tablename__ = "users" - id = sqa.Column(sqa.Integer, primary_key=True) - name = sqa.Column(sqa.String(256)) - fullname = sqa.Column(sqa.String(256)) - nickname = sqa.Column(sqa.String(256)) - age = sqa.Column(sqa.Integer) - - table = Table( - id=uuid.uuid4(), - name="users", - fullyQualifiedName="service.db.users", - columns=[ - Column(name="id", dataType=DataType.INT), - Column(name="name", dataType=DataType.STRING), - Column(name="fullname", dataType=DataType.STRING), - Column(name="nickname", dataType=DataType.STRING), - Column(name="age", dataType=DataType.INT), - ], - database=EntityReference(id=uuid.uuid4(), name="db", type="database"), - tableProfilerConfig=TableProfilerConfig( - profilerCo=80.0, - ), - ) + profile_workflow.create_profiler_obj() assert isinstance( - profile_workflow.processor.build_profiler(User, table=table), + profile_workflow.profiler_obj, DefaultProfiler, )