Fix 12025: Clickhouse NaN issue (#12079)

This commit is contained in:
Ayush Shah 2023-06-22 12:51:56 +05:30 committed by GitHub
parent 159b5f131f
commit cb6e42941a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 40 additions and 32 deletions

View File

@ -32,7 +32,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.connections import get_connection
from metadata.mixins.sqalchemy.sqa_mixin import SQAInterfaceMixin from metadata.mixins.sqalchemy.sqa_mixin import SQAInterfaceMixin
from metadata.profiler.processor.runner import QueryRunner from metadata.profiler.processor.runner import QueryRunner
from metadata.profiler.processor.sampler import Sampler from metadata.profiler.processor.sqlalchemy.sampler import Sampler
from metadata.utils.constants import TEN_MIN from metadata.utils.constants import TEN_MIN
from metadata.utils.importer import import_test_case_class from metadata.utils.importer import import_test_case_class
from metadata.utils.logger import test_suite_logger from metadata.utils.logger import test_suite_logger

View File

@ -34,7 +34,7 @@ from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin
from metadata.profiler.interface.profiler_protocol import ProfilerProtocol from metadata.profiler.interface.profiler_protocol import ProfilerProtocol
from metadata.profiler.metrics.core import MetricTypes from metadata.profiler.metrics.core import MetricTypes
from metadata.profiler.metrics.registry import Metrics from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.datalake_sampler import DatalakeSampler from metadata.profiler.processor.pandas.sampler import DatalakeSampler
from metadata.utils.dispatch import valuedispatch from metadata.utils.dispatch import valuedispatch
from metadata.utils.logger import profiler_interface_registry_logger from metadata.utils.logger import profiler_interface_registry_logger
from metadata.utils.sqa_like_column import SQALikeColumn, Type from metadata.utils.sqa_like_column import SQALikeColumn, Type
@ -48,6 +48,8 @@ class PandasProfilerInterface(ProfilerProtocol, PandasInterfaceMixin):
sqlalchemy. sqlalchemy.
""" """
# pylint: disable=too-many-arguments
_profiler_type: str = DatalakeConnection.__name__ _profiler_type: str = DatalakeConnection.__name__
def __init__( def __init__(
@ -60,7 +62,7 @@ class PandasProfilerInterface(ProfilerProtocol, PandasInterfaceMixin):
source_config, source_config,
sample_query, sample_query,
table_partition_config=None, table_partition_config=None,
**kwargs, **_,
): ):
"""Instantiate SQA Interface object""" """Instantiate SQA Interface object"""
self._thread_count = thread_count self._thread_count = thread_count
@ -359,4 +361,3 @@ class PandasProfilerInterface(ProfilerProtocol, PandasInterfaceMixin):
def close(self): def close(self):
"""Nothing to close with pandas""" """Nothing to close with pandas"""
pass

View File

@ -41,7 +41,7 @@ from metadata.profiler.orm.functions.table_metric_construct import (
table_metric_construct_factory, table_metric_construct_factory,
) )
from metadata.profiler.processor.runner import QueryRunner from metadata.profiler.processor.runner import QueryRunner
from metadata.profiler.processor.sampler import Sampler from metadata.profiler.processor.sqlalchemy.sampler import Sampler
from metadata.utils.custom_thread_pool import CustomThreadPoolExecutor from metadata.utils.custom_thread_pool import CustomThreadPoolExecutor
from metadata.utils.dispatch import valuedispatch from metadata.utils.dispatch import valuedispatch
from metadata.utils.logger import profiler_interface_registry_logger from metadata.utils.logger import profiler_interface_registry_logger
@ -68,6 +68,8 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
sqlalchemy. sqlalchemy.
""" """
# pylint: disable=too-many-instance-attributes,too-many-arguments
_profiler_type: str = DatabaseConnection.__name__ _profiler_type: str = DatabaseConnection.__name__
def __init__( def __init__(
@ -82,7 +84,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
sqa_metadata=None, sqa_metadata=None,
timeout_seconds=43200, timeout_seconds=43200,
thread_count=5, thread_count=5,
**kwargs, **_,
): ):
"""Instantiate SQA Interface object""" """Instantiate SQA Interface object"""
self._thread_count = thread_count self._thread_count = thread_count
@ -172,6 +174,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
except Exception as exc: except Exception as exc:
msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}" msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}"
handle_query_exception(msg, exc, session) handle_query_exception(msg, exc, session)
return None
@valuedispatch @valuedispatch
def _get_metrics(self, *args, **kwargs): def _get_metrics(self, *args, **kwargs):
@ -199,6 +202,8 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
Returns: Returns:
dictionnary of results dictionnary of results
""" """
# pylint: disable=protected-access
try: try:
dialect = runner._session.get_bind().dialect.name dialect = runner._session.get_bind().dialect.name
row = table_metric_construct_factory.construct( row = table_metric_construct_factory.construct(
@ -250,12 +255,9 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
**self._is_array_column(column), **self._is_array_column(column),
) )
return dict(row) return dict(row)
except Exception as exc: except ProgrammingError as exc:
if ( if exc.orig and exc.orig.errno in OVERFLOW_ERROR_CODES.get(
isinstance(exc, ProgrammingError) session.bind.dialect.name
and exc.orig
and exc.orig.errno
in OVERFLOW_ERROR_CODES.get(session.bind.dialect.name)
): ):
logger.info( logger.info(
f"Computing metrics without sum for {runner.table.__tablename__}.{column.name}" f"Computing metrics without sum for {runner.table.__tablename__}.{column.name}"
@ -264,8 +266,10 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
metrics, runner, session, column metrics, runner, session, column
) )
except Exception as exc:
msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}" msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}"
handle_query_exception(msg, exc, session) handle_query_exception(msg, exc, session)
return None
# pylint: disable=unused-argument # pylint: disable=unused-argument
@_get_metrics.register(MetricTypes.Query.value) @_get_metrics.register(MetricTypes.Query.value)
@ -302,6 +306,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
except Exception as exc: except Exception as exc:
msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}" msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}"
handle_query_exception(msg, exc, session) handle_query_exception(msg, exc, session)
return None
# pylint: disable=unused-argument # pylint: disable=unused-argument
@_get_metrics.register(MetricTypes.Window.value) @_get_metrics.register(MetricTypes.Window.value)
@ -331,17 +336,17 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
*[metric(column).fn() for metric in metrics], *[metric(column).fn() for metric in metrics],
**self._is_array_column(column), **self._is_array_column(column),
) )
except Exception as exc: except ProgrammingError as exc:
if ( if exc.orig and exc.orig.errno in OVERFLOW_ERROR_CODES.get(
isinstance(exc, ProgrammingError) session.bind.dialect.name
and exc.orig
and exc.orig.errno
in OVERFLOW_ERROR_CODES.get(session.bind.dialect.name)
): ):
logger.info( logger.info(
f"Skipping window metrics for {runner.table.__tablename__}.{column.name} due to overflow" f"Skipping window metrics for {runner.table.__tablename__}.{column.name} due to overflow"
) )
return None return None
except Exception as exc:
msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}" msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}"
handle_query_exception(msg, exc, session) handle_query_exception(msg, exc, session)
if row: if row:
@ -374,6 +379,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
except Exception as exc: except Exception as exc:
msg = f"Error trying to compute profile for {runner.table.__tablename__}: {exc}" msg = f"Error trying to compute profile for {runner.table.__tablename__}: {exc}"
handle_query_exception(msg, exc, session) handle_query_exception(msg, exc, session)
return None
def _create_thread_safe_sampler( def _create_thread_safe_sampler(
self, self,
@ -548,7 +554,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
return None return None
def get_hybrid_metrics( def get_hybrid_metrics(
self, column: Column, metric: Metrics, column_results: Dict, table, **kwargs self, column: Column, metric: Metrics, column_results: Dict, **kwargs
): ):
"""Given a list of metrics, compute the given results """Given a list of metrics, compute the given results
and returns the values and returns the values
@ -561,7 +567,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
""" """
sampler = Sampler( sampler = Sampler(
session=self.session, session=self.session,
table=table, table=kwargs.get("table"),
sample_columns=self._get_sample_columns(), sample_columns=self._get_sample_columns(),
profile_sample_config=self.profile_sample_config, profile_sample_config=self.profile_sample_config,
partition_details=self.partition_details, partition_details=self.partition_details,

View File

@ -48,7 +48,7 @@ def _(elements, compiler, **kwargs):
col, _, percentile = [ col, _, percentile = [
compiler.process(element, **kwargs) for element in elements.clauses compiler.process(element, **kwargs) for element in elements.clauses
] ]
return "quantile(%s)(%s)" % (percentile, col) return "if(isNaN(quantile(%s)(%s)),null,quantile(%s)(%s))" % ((percentile, col) * 2)
# pylint: disable=unused-argument # pylint: disable=unused-argument

View File

@ -34,11 +34,11 @@ from metadata.generated.schema.metadataIngestion.workflow import (
) )
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.models import ProfilerProcessorConfig, TableConfig from metadata.profiler.api.models import ProfilerProcessorConfig, TableConfig
from metadata.profiler.interface.pandas.pandas_profiler_interface import ( from metadata.profiler.interface.pandas.profiler_interface import (
PandasProfilerInterface, PandasProfilerInterface,
) )
from metadata.profiler.interface.profiler_protocol import ProfilerProtocol from metadata.profiler.interface.profiler_protocol import ProfilerProtocol
from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface, SQAProfilerInterface,
) )
from metadata.profiler.metrics.registry import Metrics from metadata.profiler.metrics.registry import Metrics

View File

@ -41,5 +41,6 @@ class ProfilerSourceFactory:
profiler_source_factory = ProfilerSourceFactory() profiler_source_factory = ProfilerSourceFactory()
profiler_source_factory.register_source( profiler_source_factory.register_source(
BigqueryType.BigQuery.value.lower(), BigQueryProfilerSource BigqueryType.BigQuery.value.lower(),
BigQueryProfilerSource,
) )

View File

@ -23,7 +23,7 @@ from sqlalchemy.orm import declarative_base
from metadata.generated.schema.entity.data.table import Column as EntityColumn from metadata.generated.schema.entity.data.table import Column as EntityColumn
from metadata.generated.schema.entity.data.table import ColumnName, DataType, Table from metadata.generated.schema.entity.data.table import ColumnName, DataType, Table
from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin
from metadata.profiler.interface.pandas.pandas_profiler_interface import ( from metadata.profiler.interface.pandas.profiler_interface import (
PandasProfilerInterface, PandasProfilerInterface,
) )
from metadata.profiler.metrics.core import add_props from metadata.profiler.metrics.core import add_props

View File

@ -27,7 +27,7 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec
SQLiteConnection, SQLiteConnection,
SQLiteScheme, SQLiteScheme,
) )
from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface, SQAProfilerInterface,
) )
from metadata.profiler.metrics.core import add_props from metadata.profiler.metrics.core import add_props

View File

@ -43,7 +43,7 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec
SQLiteScheme, SQLiteScheme,
) )
from metadata.ingestion.source import sqa_types from metadata.ingestion.source import sqa_types
from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface, SQAProfilerInterface,
) )
from metadata.profiler.metrics.core import MetricTypes, add_props from metadata.profiler.metrics.core import MetricTypes, add_props

View File

@ -23,7 +23,7 @@ from sqlalchemy.orm import declarative_base
from metadata.ingestion.connections.session import create_and_bind_session from metadata.ingestion.connections.session import create_and_bind_session
from metadata.profiler.api.models import ProfileSampleConfig from metadata.profiler.api.models import ProfileSampleConfig
from metadata.profiler.processor.runner import QueryRunner from metadata.profiler.processor.runner import QueryRunner
from metadata.profiler.processor.sampler import Sampler from metadata.profiler.processor.sqlalchemy.sampler import Sampler
from metadata.utils.timeout import cls_timeout from metadata.utils.timeout import cls_timeout
Base = declarative_base() Base = declarative_base()

View File

@ -27,13 +27,13 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec
SQLiteScheme, SQLiteScheme,
) )
from metadata.profiler.api.models import ProfileSampleConfig from metadata.profiler.api.models import ProfileSampleConfig
from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface, SQAProfilerInterface,
) )
from metadata.profiler.metrics.registry import Metrics from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.orm.registry import CustomTypes from metadata.profiler.orm.registry import CustomTypes
from metadata.profiler.processor.core import Profiler from metadata.profiler.processor.core import Profiler
from metadata.profiler.processor.sampler import Sampler from metadata.profiler.processor.sqlalchemy.sampler import Sampler
Base = declarative_base() Base = declarative_base()

View File

@ -38,7 +38,7 @@ from metadata.generated.schema.entity.services.connections.database.sqliteConnec
SQLiteConnection, SQLiteConnection,
SQLiteScheme, SQLiteScheme,
) )
from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface, SQAProfilerInterface,
) )
from metadata.profiler.metrics.core import ( from metadata.profiler.metrics.core import (

View File

@ -42,7 +42,7 @@ from metadata.generated.schema.type.entityReference import EntityReference
from metadata.profiler.api.models import ProfilerProcessorConfig from metadata.profiler.api.models import ProfilerProcessorConfig
from metadata.profiler.api.workflow import ProfilerWorkflow from metadata.profiler.api.workflow import ProfilerWorkflow
from metadata.profiler.interface.profiler_protocol import ProfilerProtocol from metadata.profiler.interface.profiler_protocol import ProfilerProtocol
from metadata.profiler.interface.sqlalchemy.sqa_profiler_interface import ( from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface, SQAProfilerInterface,
) )
from metadata.profiler.processor.default import DefaultProfiler from metadata.profiler.processor.default import DefaultProfiler