From cb6e42941aa3357fe44b456f1d5bb51323210aa9 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Thu, 22 Jun 2023 12:51:56 +0530 Subject: [PATCH] Fix 12025: Clickhouse NaN issue (#12079) --- .../sqlalchemy/sqa_test_suite_interface.py | 2 +- .../metadata/profiler/interface/__init__.py | 0 .../profiler/interface/pandas/__init__.py | 0 ...ler_interface.py => profiler_interface.py} | 7 ++-- .../profiler/interface/sqlalchemy/__init__.py | 0 ...ler_interface.py => profiler_interface.py} | 38 +++++++++++-------- .../metadata/profiler/orm/functions/median.py | 2 +- .../profiler/processor/pandas/__init__.py | 0 .../sampler.py} | 0 .../profiler/processor/sqlalchemy/__init__.py | 0 .../processor/{ => sqlalchemy}/sampler.py | 0 .../profiler/source/base_profiler_source.py | 4 +- .../source/profiler_source_factory.py | 3 +- .../unit/profiler/test_datalake_metrics.py | 2 +- ingestion/tests/unit/profiler/test_metrics.py | 2 +- .../tests/unit/profiler/test_profiler.py | 2 +- ingestion/tests/unit/profiler/test_runner.py | 2 +- ingestion/tests/unit/profiler/test_sample.py | 4 +- .../profiler/test_sqa_profiler_interface.py | 2 +- .../tests/unit/profiler/test_workflow.py | 2 +- 20 files changed, 40 insertions(+), 32 deletions(-) create mode 100644 ingestion/src/metadata/profiler/interface/__init__.py create mode 100644 ingestion/src/metadata/profiler/interface/pandas/__init__.py rename ingestion/src/metadata/profiler/interface/pandas/{pandas_profiler_interface.py => profiler_interface.py} (98%) create mode 100644 ingestion/src/metadata/profiler/interface/sqlalchemy/__init__.py rename ingestion/src/metadata/profiler/interface/sqlalchemy/{sqa_profiler_interface.py => profiler_interface.py} (96%) create mode 100644 ingestion/src/metadata/profiler/processor/pandas/__init__.py rename ingestion/src/metadata/profiler/processor/{datalake_sampler.py => pandas/sampler.py} (100%) create mode 100644 ingestion/src/metadata/profiler/processor/sqlalchemy/__init__.py rename ingestion/src/metadata/profiler/processor/{ => sqlalchemy}/sampler.py (100%) diff --git a/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py b/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py index cf3abafa4f4..37cca5272f1 100644 --- a/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py +++ b/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py @@ -32,7 +32,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection from metadata.mixins.sqalchemy.sqa_mixin import SQAInterfaceMixin from metadata.profiler.processor.runner import QueryRunner -from metadata.profiler.processor.sampler import Sampler +from metadata.profiler.processor.sqlalchemy.sampler import Sampler from metadata.utils.constants import TEN_MIN from metadata.utils.importer import import_test_case_class from metadata.utils.logger import test_suite_logger diff --git a/ingestion/src/metadata/profiler/interface/__init__.py b/ingestion/src/metadata/profiler/interface/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/interface/pandas/__init__.py b/ingestion/src/metadata/profiler/interface/pandas/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/interface/pandas/pandas_profiler_interface.py b/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py similarity index 98% rename from ingestion/src/metadata/profiler/interface/pandas/pandas_profiler_interface.py rename to ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py index a235579471d..1a559eb2b93 100644 --- a/ingestion/src/metadata/profiler/interface/pandas/pandas_profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py @@ -34,7 +34,7 @@ from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin from metadata.profiler.interface.profiler_protocol import ProfilerProtocol from metadata.profiler.metrics.core import MetricTypes from metadata.profiler.metrics.registry import Metrics -from metadata.profiler.processor.datalake_sampler import DatalakeSampler +from metadata.profiler.processor.pandas.sampler import DatalakeSampler from metadata.utils.dispatch import valuedispatch from metadata.utils.logger import profiler_interface_registry_logger from metadata.utils.sqa_like_column import SQALikeColumn, Type @@ -48,6 +48,8 @@ class PandasProfilerInterface(ProfilerProtocol, PandasInterfaceMixin): sqlalchemy. """ + # pylint: disable=too-many-arguments + _profiler_type: str = DatalakeConnection.__name__ def __init__( @@ -60,7 +62,7 @@ class PandasProfilerInterface(ProfilerProtocol, PandasInterfaceMixin): source_config, sample_query, table_partition_config=None, - **kwargs, + **_, ): """Instantiate SQA Interface object""" self._thread_count = thread_count @@ -359,4 +361,3 @@ class PandasProfilerInterface(ProfilerProtocol, PandasInterfaceMixin): def close(self): """Nothing to close with pandas""" - pass diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/__init__.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/sqa_profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py similarity index 96% rename from ingestion/src/metadata/profiler/interface/sqlalchemy/sqa_profiler_interface.py rename to ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py index 7155272fb1c..d2931bb6aeb 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/sqa_profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py @@ -41,7 +41,7 @@ from metadata.profiler.orm.functions.table_metric_construct import ( table_metric_construct_factory, ) from metadata.profiler.processor.runner import QueryRunner -from metadata.profiler.processor.sampler import Sampler +from metadata.profiler.processor.sqlalchemy.sampler import Sampler from metadata.utils.custom_thread_pool import CustomThreadPoolExecutor from metadata.utils.dispatch import valuedispatch from metadata.utils.logger import profiler_interface_registry_logger @@ -68,6 +68,8 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin): sqlalchemy. """ + # pylint: disable=too-many-instance-attributes,too-many-arguments + _profiler_type: str = DatabaseConnection.__name__ def __init__( @@ -82,7 +84,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin): sqa_metadata=None, timeout_seconds=43200, thread_count=5, - **kwargs, + **_, ): """Instantiate SQA Interface object""" self._thread_count = thread_count @@ -172,6 +174,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin): except Exception as exc: msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}" handle_query_exception(msg, exc, session) + return None @valuedispatch def _get_metrics(self, *args, **kwargs): @@ -199,6 +202,8 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin): Returns: dictionnary of results """ + # pylint: disable=protected-access + try: dialect = runner._session.get_bind().dialect.name row = table_metric_construct_factory.construct( @@ -250,12 +255,9 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin): **self._is_array_column(column), ) return dict(row) - except Exception as exc: - if ( - isinstance(exc, ProgrammingError) - and exc.orig - and exc.orig.errno - in OVERFLOW_ERROR_CODES.get(session.bind.dialect.name) + except ProgrammingError as exc: + if exc.orig and exc.orig.errno in OVERFLOW_ERROR_CODES.get( + session.bind.dialect.name ): logger.info( f"Computing metrics without sum for {runner.table.__tablename__}.{column.name}" @@ -264,8 +266,10 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin): metrics, runner, session, column ) + except Exception as exc: msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}" handle_query_exception(msg, exc, session) + return None # pylint: disable=unused-argument @_get_metrics.register(MetricTypes.Query.value) @@ -302,6 +306,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin): except Exception as exc: msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}" handle_query_exception(msg, exc, session) + return None # pylint: disable=unused-argument @_get_metrics.register(MetricTypes.Window.value) @@ -331,17 +336,17 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin): *[metric(column).fn() for metric in metrics], **self._is_array_column(column), ) - except Exception as exc: - if ( - isinstance(exc, ProgrammingError) - and exc.orig - and exc.orig.errno - in OVERFLOW_ERROR_CODES.get(session.bind.dialect.name) + except ProgrammingError as exc: + if exc.orig and exc.orig.errno in OVERFLOW_ERROR_CODES.get( + session.bind.dialect.name ): logger.info( f"Skipping window metrics for {runner.table.__tablename__}.{column.name} due to overflow" ) return None + + except Exception as exc: + msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}" handle_query_exception(msg, exc, session) if row: @@ -374,6 +379,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin): except Exception as exc: msg = f"Error trying to compute profile for {runner.table.__tablename__}: {exc}" handle_query_exception(msg, exc, session) + return None def _create_thread_safe_sampler( self, @@ -548,7 +554,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin): return None def get_hybrid_metrics( - self, column: Column, metric: Metrics, column_results: Dict, table, **kwargs + self, column: Column, metric: Metrics, column_results: Dict, **kwargs ): """Given a list of metrics, compute the given results and returns the values @@ -561,7 +567,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin): """ sampler = Sampler( session=self.session, - table=table, + table=kwargs.get("table"), sample_columns=self._get_sample_columns(), profile_sample_config=self.profile_sample_config, partition_details=self.partition_details, diff --git a/ingestion/src/metadata/profiler/orm/functions/median.py b/ingestion/src/metadata/profiler/orm/functions/median.py index 1826e52a8a9..e0b3106d927 100644 --- a/ingestion/src/metadata/profiler/orm/functions/median.py +++ b/ingestion/src/metadata/profiler/orm/functions/median.py @@ -48,7 +48,7 @@ def _(elements, compiler, **kwargs): col, _, percentile = [ compiler.process(element, **kwargs) for element in elements.clauses ] - return "quantile(%s)(%s)" % (percentile, col) + return "if(isNaN(quantile(%s)(%s)),null,quantile(%s)(%s))" % ((percentile, col) * 2) # pylint: disable=unused-argument diff --git a/ingestion/src/metadata/profiler/processor/pandas/__init__.py b/ingestion/src/metadata/profiler/processor/pandas/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/processor/datalake_sampler.py b/ingestion/src/metadata/profiler/processor/pandas/sampler.py similarity index 100% rename from ingestion/src/metadata/profiler/processor/datalake_sampler.py rename to ingestion/src/metadata/profiler/processor/pandas/sampler.py diff --git a/ingestion/src/metadata/profiler/processor/sqlalchemy/__init__.py b/ingestion/src/metadata/profiler/processor/sqlalchemy/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/processor/sampler.py b/ingestion/src/metadata/profiler/processor/sqlalchemy/sampler.py similarity index 100% rename from ingestion/src/metadata/profiler/processor/sampler.py rename to ingestion/src/metadata/profiler/processor/sqlalchemy/sampler.py diff --git a/ingestion/src/metadata/profiler/source/base_profiler_source.py b/ingestion/src/metadata/profiler/source/base_profiler_source.py index 379eab31e3a..7897b3c9c80 100644 --- a/ingestion/src/metadata/profiler/source/base_profiler_source.py +++ b/ingestion/src/metadata/profiler/source/base_profiler_source.py @@ -34,11 +34,11 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.profiler.api.models import ProfilerProcessorConfig, TableConfig -from metadata.profiler.interface.pandas.pandas_profiler_interface import ( +from metadata.profiler.interface.pandas.profiler_interface import ( PandasProfilerInterface, ) from metadata.profiler.interface.profiler_protocol import ProfilerProtocol -from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( SQAProfilerInterface, ) from metadata.profiler.metrics.registry import Metrics diff --git a/ingestion/src/metadata/profiler/source/profiler_source_factory.py b/ingestion/src/metadata/profiler/source/profiler_source_factory.py index 60a35f12234..4831599fb3b 100644 --- a/ingestion/src/metadata/profiler/source/profiler_source_factory.py +++ b/ingestion/src/metadata/profiler/source/profiler_source_factory.py @@ -41,5 +41,6 @@ class ProfilerSourceFactory: profiler_source_factory = ProfilerSourceFactory() profiler_source_factory.register_source( - BigqueryType.BigQuery.value.lower(), BigQueryProfilerSource + BigqueryType.BigQuery.value.lower(), + BigQueryProfilerSource, ) diff --git a/ingestion/tests/unit/profiler/test_datalake_metrics.py b/ingestion/tests/unit/profiler/test_datalake_metrics.py index 6849a9f72e4..8a3231c56d7 100644 --- a/ingestion/tests/unit/profiler/test_datalake_metrics.py +++ b/ingestion/tests/unit/profiler/test_datalake_metrics.py @@ -23,7 +23,7 @@ from sqlalchemy.orm import declarative_base from metadata.generated.schema.entity.data.table import Column as EntityColumn from metadata.generated.schema.entity.data.table import ColumnName, DataType, Table from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin -from metadata.profiler.interface.pandas.pandas_profiler_interface import ( +from metadata.profiler.interface.pandas.profiler_interface import ( PandasProfilerInterface, ) from metadata.profiler.metrics.core import add_props diff --git a/ingestion/tests/unit/profiler/test_metrics.py b/ingestion/tests/unit/profiler/test_metrics.py index c692de8310c..cd8943fe3c7 100644 --- a/ingestion/tests/unit/profiler/test_metrics.py +++ b/ingestion/tests/unit/profiler/test_metrics.py @@ -27,7 +27,7 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec SQLiteConnection, SQLiteScheme, ) -from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( SQAProfilerInterface, ) from metadata.profiler.metrics.core import add_props diff --git a/ingestion/tests/unit/profiler/test_profiler.py b/ingestion/tests/unit/profiler/test_profiler.py index 2ff548a7e26..acd8f390c38 100644 --- a/ingestion/tests/unit/profiler/test_profiler.py +++ b/ingestion/tests/unit/profiler/test_profiler.py @@ -43,7 +43,7 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec SQLiteScheme, ) from metadata.ingestion.source import sqa_types -from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( SQAProfilerInterface, ) from metadata.profiler.metrics.core import MetricTypes, add_props diff --git a/ingestion/tests/unit/profiler/test_runner.py b/ingestion/tests/unit/profiler/test_runner.py index 0d7ff06e851..bb3032c187f 100644 --- a/ingestion/tests/unit/profiler/test_runner.py +++ b/ingestion/tests/unit/profiler/test_runner.py @@ -23,7 +23,7 @@ from sqlalchemy.orm import declarative_base from metadata.ingestion.connections.session import create_and_bind_session from metadata.profiler.api.models import ProfileSampleConfig from metadata.profiler.processor.runner import QueryRunner -from metadata.profiler.processor.sampler import Sampler +from metadata.profiler.processor.sqlalchemy.sampler import Sampler from metadata.utils.timeout import cls_timeout Base = declarative_base() diff --git a/ingestion/tests/unit/profiler/test_sample.py b/ingestion/tests/unit/profiler/test_sample.py index f8d7d97e637..90d37f5d14d 100644 --- a/ingestion/tests/unit/profiler/test_sample.py +++ b/ingestion/tests/unit/profiler/test_sample.py @@ -27,13 +27,13 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec SQLiteScheme, ) from metadata.profiler.api.models import ProfileSampleConfig -from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( SQAProfilerInterface, ) from metadata.profiler.metrics.registry import Metrics from metadata.profiler.orm.registry import CustomTypes from metadata.profiler.processor.core import Profiler -from metadata.profiler.processor.sampler import Sampler +from metadata.profiler.processor.sqlalchemy.sampler import Sampler Base = declarative_base() diff --git a/ingestion/tests/unit/profiler/test_sqa_profiler_interface.py b/ingestion/tests/unit/profiler/test_sqa_profiler_interface.py index d629e3a964c..3c61833da04 100644 --- a/ingestion/tests/unit/profiler/test_sqa_profiler_interface.py +++ b/ingestion/tests/unit/profiler/test_sqa_profiler_interface.py @@ -38,7 +38,7 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec SQLiteConnection, SQLiteScheme, ) -from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( SQAProfilerInterface, ) from metadata.profiler.metrics.core import ( diff --git a/ingestion/tests/unit/profiler/test_workflow.py b/ingestion/tests/unit/profiler/test_workflow.py index 1ee83537c74..4681edb9e8f 100644 --- a/ingestion/tests/unit/profiler/test_workflow.py +++ b/ingestion/tests/unit/profiler/test_workflow.py @@ -42,7 +42,7 @@ from metadata.generated.schema.type.entityReference import EntityReference from metadata.profiler.api.models import ProfilerProcessorConfig from metadata.profiler.api.workflow import ProfilerWorkflow from metadata.profiler.interface.profiler_protocol import ProfilerProtocol -from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( SQAProfilerInterface, ) from metadata.profiler.processor.default import DefaultProfiler