fix: implement percentile computation logic for SingleStore (#13170)

This commit is contained in:
Teddy 2023-09-13 16:32:55 +02:00 committed by GitHub
parent 0059645b64
commit d4593e9caa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 262 additions and 89 deletions

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=arguments-differ
"""
Interfaces with database for all database engine
@ -31,7 +32,6 @@ from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_
from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper
from metadata.utils.datalake.datalake_utils import fetch_col_types, fetch_dataframe
from metadata.utils.dispatch import valuedispatch
from metadata.utils.logger import profiler_interface_registry_logger
from metadata.utils.sqa_like_column import SQALikeColumn
@ -109,20 +109,10 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
profile_sample_query=self.profile_query,
)
@valuedispatch
def _get_metrics(self, *_, **__):
"""Generic getter method for metrics. To be used with
specific dispatch methods
"""
logger.warning("Could not get metric. No function registered.")
# pylint: disable=unused-argument
@_get_metrics.register(MetricTypes.Table.value)
def _(
def _compute_table_metrics(
self,
metric_type: str,
metrics: List[Metrics],
dfs,
runner: List,
*args,
**kwargs,
):
@ -138,7 +128,7 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
try:
row_dict = {}
df_list = [df.where(pd.notnull(df), None) for df in dfs]
df_list = [df.where(pd.notnull(df), None) for df in runner]
for metric in metrics:
row_dict[metric.name()] = metric().df_fn(df_list)
return row_dict
@ -147,13 +137,10 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
logger.warning(f"Error trying to compute profile for {exc}")
raise RuntimeError(exc)
# pylint: disable=unused-argument
@_get_metrics.register(MetricTypes.Static.value)
def _(
def _compute_static_metrics(
self,
metric_type: str,
metrics: List[Metrics],
dfs,
runner: List,
column,
*args,
**kwargs,
@ -172,7 +159,7 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
try:
row_dict = {}
for metric in metrics:
metric_resp = metric(column).df_fn(dfs)
metric_resp = metric(column).df_fn(runner)
row_dict[metric.name()] = (
None if pd.isnull(metric_resp) else metric_resp
)
@ -183,13 +170,10 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
)
raise RuntimeError(exc)
# pylint: disable=unused-argument
@_get_metrics.register(MetricTypes.Query.value)
def _(
def _compute_query_metrics(
self,
metric_type: str,
metrics: Metrics,
dfs,
metric: Metrics,
runner: List,
column,
*args,
**kwargs,
@ -204,18 +188,15 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
dictionnary of results
"""
col_metric = None
col_metric = metrics(column).df_fn(dfs)
col_metric = metric(column).df_fn(runner)
if not col_metric:
return None
return {metrics.name(): col_metric}
return {metric.name(): col_metric}
# pylint: disable=unused-argument
@_get_metrics.register(MetricTypes.Window.value)
def _(
def _compute_window_metrics(
self,
metric_type: str,
metrics: Metrics,
dfs,
metrics: List[Metrics],
runner: List,
column,
*args,
**kwargs,
@ -224,19 +205,21 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
Given a list of metrics, compute the given results
and returns the values
"""
try:
metric_values = {}
for metric in metrics:
metric_values[metric.name()] = metric(column).df_fn(dfs)
metric_values[metric.name()] = metric(column).df_fn(runner)
return metric_values if metric_values else None
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected exception computing metrics: {exc}")
return None
@_get_metrics.register(MetricTypes.System.value)
def _(
def _compute_system_metrics(
self,
metrics: Metrics,
runner: List,
*args,
**kwargs,
):
@ -260,11 +243,9 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
try:
row = None
if self.dfs:
row = self._get_metrics(
metric_type.value,
row = self._get_metric_fn[metric_type.value](
metrics,
dfs,
session=self.client,
column=column,
)
except Exception as exc:

View File

@ -15,7 +15,7 @@ supporting sqlalchemy abstraction layer
"""
from abc import ABC, abstractmethod
from typing import Dict, Optional, Union
from typing import Dict, List, Optional, Union
from sqlalchemy import Column
from typing_extensions import Self
@ -36,7 +36,9 @@ from metadata.ingestion.api.processor import ProfilerProcessorStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.profiler.api.models import ProfileSampleConfig, TableConfig
from metadata.profiler.metrics.core import MetricTypes
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.partition import get_partition_details
@ -78,6 +80,14 @@ class ProfilerInterface(ABC):
)
self.timeout_seconds = timeout_seconds
self._get_metric_fn = {
MetricTypes.Table.value: self._compute_table_metrics,
MetricTypes.Static.value: self._compute_static_metrics,
MetricTypes.Query.value: self._compute_query_metrics,
MetricTypes.Window.value: self._compute_window_metrics,
MetricTypes.System.value: self._compute_system_metrics,
}
@abstractmethod
def _get_sampler(self):
"""Get the sampler"""
@ -222,7 +232,57 @@ class ProfilerInterface(ABC):
raise NotImplementedError
@abstractmethod
def _get_metrics(self, *args, **kwargs):
def _compute_table_metrics(
self,
metrics: List[Metrics],
runner,
*args,
**kwargs,
):
"""Get metrics"""
raise NotImplementedError
@abstractmethod
def _compute_static_metrics(
self,
metrics: List[Metrics],
runner,
*args,
**kwargs,
):
"""Get metrics"""
raise NotImplementedError
@abstractmethod
def _compute_query_metrics(
self,
metric: Metrics,
runner,
*args,
**kwargs,
):
"""Get metrics"""
raise NotImplementedError
@abstractmethod
def _compute_window_metrics(
self,
metrics: List[Metrics],
runner: QueryRunner,
*args,
**kwargs,
):
"""Get metrics"""
raise NotImplementedError
@abstractmethod
def _compute_system_metrics(
self,
metrics: Metrics,
runner,
*args,
**kwargs,
):
"""Get metrics"""
raise NotImplementedError

View File

@ -21,6 +21,9 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConn
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import (
SingleStoreConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.profiler.interface.pandas.profiler_interface import (
PandasProfilerInterface,
@ -32,6 +35,9 @@ from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import (
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.interface.sqlalchemy.single_store.profiler_interface import (
SingleStoreProfilerInterface,
)
class ProfilerInterfaceFactory:
@ -58,6 +64,9 @@ profiler_interface_factory.register(DatabaseConnection.__name__, SQAProfilerInte
profiler_interface_factory.register(
BigQueryConnection.__name__, BigQueryProfilerInterface
)
profiler_interface_factory.register(
SingleStoreConnection.__name__, SingleStoreProfilerInterface
)
profiler_interface_factory.register(
DatalakeConnection.__name__, PandasProfilerInterface
)

View File

@ -8,6 +8,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=arguments-differ
"""
Interfaces with database for all database engine
@ -40,7 +41,6 @@ from metadata.profiler.orm.functions.table_metric_construct import (
from metadata.profiler.processor.runner import QueryRunner
from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_
from metadata.utils.custom_thread_pool import CustomThreadPoolExecutor
from metadata.utils.dispatch import valuedispatch
from metadata.utils.logger import profiler_interface_registry_logger
logger = profiler_interface_registry_logger()
@ -153,18 +153,8 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
handle_query_exception(msg, exc, session)
return None
@valuedispatch
def _get_metrics(self, *args, **kwargs):
"""Generic getter method for metrics. To be used with
specific dispatch methods
"""
logger.warning("Could not get metric. No function registered.")
# pylint: disable=unused-argument
@_get_metrics.register(MetricTypes.Table.value)
def _(
def _compute_table_metrics(
self,
metric_type: str,
metrics: List[Metrics],
runner: QueryRunner,
session,
@ -180,7 +170,6 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
dictionnary of results
"""
# pylint: disable=protected-access
try:
dialect = runner._session.get_bind().dialect.name
row = table_metric_construct_factory.construct(
@ -201,15 +190,12 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
session.rollback()
raise RuntimeError(exc)
# pylint: disable=unused-argument
@_get_metrics.register(MetricTypes.Static.value)
def _(
def _compute_static_metrics(
self,
metric_type: str,
metrics: List[Metrics],
runner: QueryRunner,
column,
session,
column: Column,
*args,
**kwargs,
):
@ -247,16 +233,15 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
handle_query_exception(msg, exc, session)
return None
# pylint: disable=unused-argument
@_get_metrics.register(MetricTypes.Query.value)
def _(
def _compute_query_metrics(
self,
metric_type: str,
metric: Metrics,
runner: QueryRunner,
column,
session,
column: Column,
sample,
*args,
**kwargs,
):
"""Given a list of metrics, compute the given results
and returns the values
@ -267,6 +252,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
Returns:
dictionnary of results
"""
try:
col_metric = metric(column)
metric_query = col_metric.query(sample=sample, session=session)
@ -284,15 +270,12 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
handle_query_exception(msg, exc, session)
return None
# pylint: disable=unused-argument
@_get_metrics.register(MetricTypes.Window.value)
def _(
def _compute_window_metrics(
self,
metric_type: str,
metrics: List[Metrics],
runner: QueryRunner,
column,
session,
column: Column,
*args,
**kwargs,
):
@ -305,6 +288,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
Returns:
dictionnary of results
"""
if not metrics:
return None
try:
@ -327,11 +311,9 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
return dict(row)
return None
@_get_metrics.register(MetricTypes.System.value)
def _(
def _compute_system_metrics(
self,
metric_type: str,
metric: Metrics,
metrics: Metrics,
runner: QueryRunner,
session,
*args,
@ -348,7 +330,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
dictionnary of results
"""
try:
rows = metric().sql(session, conn_config=self.service_connection_config)
rows = metrics().sql(session, conn_config=self.service_connection_config)
return rows
except Exception as exc:
msg = f"Error trying to compute profile for {runner.table.__tablename__}: {exc}"
@ -412,8 +394,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
)
try:
row = self._get_metrics(
metric_type.value,
row = self._get_metric_fn[metric_type.value](
metrics,
runner=runner,
session=session,

View File

@ -0,0 +1,86 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from typing import List
from sqlalchemy.exc import ProgrammingError
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
handle_query_exception,
)
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.runner import QueryRunner
from metadata.profiler.source.single_store.metrics.window.first_quartile import (
SingleStoreFirstQuartile,
)
from metadata.profiler.source.single_store.metrics.window.median import (
SingleStoreMedian,
)
from metadata.profiler.source.single_store.metrics.window.third_quartile import (
SingleStoreThirdQuartile,
)
from metadata.utils.logger import profiler_interface_registry_logger
logger = profiler_interface_registry_logger()
class SingleStoreProfilerInterface(SQAProfilerInterface):
"""
Interface to interact with registry supporting
sqlalchemy.
"""
def _compute_window_metrics(
self,
metrics: List[Metrics],
runner: QueryRunner,
*args,
**kwargs,
):
"""Given a list of metrics, compute the given results
and returns the values
Args:
column: the column to compute the metrics against
metrics: list of metrics to compute
Returns:
dictionnary of results
"""
session = kwargs.get("session")
column = kwargs.get("column")
if not metrics:
return None
try:
# we patch the metrics at runtime to use the SingleStore specific functions
# as we can't compile the query based on the dialect as it return `mysql`
metrics = [SingleStoreFirstQuartile, SingleStoreMedian, SingleStoreThirdQuartile] # type: ignore
row = runner.select_first_from_sample(
*[metric(column).fn() for metric in metrics],
)
except ProgrammingError:
logger.info(
f"Skipping window metrics for {runner.table.__tablename__}.{column.name} due to overflow"
)
return None
except Exception as exc:
msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}"
handle_query_exception(msg, exc, session)
if row:
return dict(row)
return None

View File

@ -19,15 +19,15 @@ from typing import List, cast
from sqlalchemy import column
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.metrics.window.percentille_mixin import PercentilMixin
from metadata.profiler.orm.functions.length import LenFn
from metadata.profiler.orm.functions.median import MedianFn
from metadata.profiler.orm.registry import is_concatenable, is_quantifiable
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class FirstQuartile(StaticMetric):
class FirstQuartile(StaticMetric, PercentilMixin):
"""
First Quartile Metric
@ -53,14 +53,14 @@ class FirstQuartile(StaticMetric):
"""sqlalchemy function"""
if is_quantifiable(self.col.type):
# col fullname is only needed for MySQL and SQLite
return MedianFn(
return self._compute_sqa_fn(
column(self.col.name, self.col.type),
self.col.table.fullname if self.col.table is not None else None,
0.25,
)
if is_concatenable(self.col.type):
return MedianFn(
return self._compute_sqa_fn(
LenFn(column(self.col.name, self.col.type)),
self.col.table.fullname if self.col.table is not None else None,
0.25,

View File

@ -19,15 +19,15 @@ from typing import List, cast
from sqlalchemy import column
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.metrics.window.percentille_mixin import PercentilMixin
from metadata.profiler.orm.functions.length import LenFn
from metadata.profiler.orm.functions.median import MedianFn
from metadata.profiler.orm.registry import is_concatenable, is_quantifiable
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class Median(StaticMetric):
class Median(StaticMetric, PercentilMixin):
"""
Median Metric
@ -53,14 +53,14 @@ class Median(StaticMetric):
"""sqlalchemy function"""
if is_quantifiable(self.col.type):
# col fullname is only needed for MySQL and SQLite
return MedianFn(
return self._compute_sqa_fn(
column(self.col.name, self.col.type),
self.col.table.fullname if self.col.table is not None else None,
0.5,
)
if is_concatenable(self.col.type):
return MedianFn(
return self._compute_sqa_fn(
LenFn(column(self.col.name, self.col.type)),
self.col.table.fullname if self.col.table is not None else None,
0.5,

View File

@ -0,0 +1,9 @@
"""function calls shared accross all percentile metrics"""
from metadata.profiler.orm.functions.median import MedianFn
class PercentilMixin:
def _compute_sqa_fn(self, column, table, percentile):
"""Generic method to compute the quartile using sqlalchemy"""
return MedianFn(column, table, percentile)

View File

@ -19,15 +19,15 @@ from typing import List, cast
from sqlalchemy import column
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.metrics.window.percentille_mixin import PercentilMixin
from metadata.profiler.orm.functions.length import LenFn
from metadata.profiler.orm.functions.median import MedianFn
from metadata.profiler.orm.registry import is_concatenable, is_quantifiable
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class ThirdQuartile(StaticMetric):
class ThirdQuartile(StaticMetric, PercentilMixin):
"""
Third Quartile Metric
@ -53,14 +53,14 @@ class ThirdQuartile(StaticMetric):
"""sqlalchemy function"""
if is_quantifiable(self.col.type):
# col fullname is only needed for MySQL and SQLite
return MedianFn(
return self._compute_sqa_fn(
column(self.col.name, self.col.type),
self.col.table.fullname if self.col.table is not None else None,
0.75,
)
if is_concatenable(self.col.type):
return MedianFn(
return self._compute_sqa_fn(
LenFn(column(self.col.name, self.col.type)),
self.col.table.fullname if self.col.table is not None else None,
0.75,

View File

@ -0,0 +1,17 @@
"""Median function for single store"""
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.functions import FunctionElement
from metadata.profiler.metrics.core import CACHE
class SingleStoreMedianFn(FunctionElement):
inherit_cache = CACHE
@compiles(SingleStoreMedianFn)
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
col = compiler.process(elements.clauses.clauses[0])
percentile = elements.clauses.clauses[2].value
return f"approx_percentile({col}, {percentile:.2f})"

View File

@ -0,0 +1,10 @@
"""Override first quartile metric definition for SingleStore"""
from metadata.profiler.metrics.window.first_quartile import FirstQuartile
from metadata.profiler.source.single_store.functions.median import SingleStoreMedianFn
class SingleStoreFirstQuartile(FirstQuartile):
def _compute_sqa_fn(self, column, table, percentile):
"""Generic method to compute the quartile using sqlalchemy"""
return SingleStoreMedianFn(column, table, percentile)

View File

@ -0,0 +1,10 @@
"""Override first quartile metric definition for SingleStore"""
from metadata.profiler.metrics.window.median import Median
from metadata.profiler.source.single_store.functions.median import SingleStoreMedianFn
class SingleStoreMedian(Median):
def _compute_sqa_fn(self, column, table, percentile):
"""Generic method to compute the quartile using sqlalchemy"""
return SingleStoreMedianFn(column, table, percentile)

View File

@ -0,0 +1,10 @@
"""Override first quartile metric definition for SingleStore"""
from metadata.profiler.metrics.window.third_quartile import ThirdQuartile
from metadata.profiler.source.single_store.functions.median import SingleStoreMedianFn
class SingleStoreThirdQuartile(ThirdQuartile):
def _compute_sqa_fn(self, column, table, percentile):
"""Generic method to compute the quartile using sqlalchemy"""
return SingleStoreMedianFn(column, table, percentile)