From d4593e9caa9e126f67a5f51dbce411ee31d95fbf Mon Sep 17 00:00:00 2001 From: Teddy Date: Wed, 13 Sep 2023 16:32:55 +0200 Subject: [PATCH] fix: implement percentile computation logic for SingleStore (#13170) --- .../interface/pandas/profiler_interface.py | 61 +++++-------- .../profiler/interface/profiler_interface.py | 64 +++++++++++++- .../interface/profiler_interface_factory.py | 9 ++ .../sqlalchemy/profiler_interface.py | 51 ++++------- .../sqlalchemy/single_store/__init__.py | 0 .../single_store/profiler_interface.py | 86 +++++++++++++++++++ .../profiler/metrics/window/first_quartile.py | 8 +- .../profiler/metrics/window/median.py | 8 +- .../metrics/window/percentille_mixin.py | 9 ++ .../profiler/metrics/window/third_quartile.py | 8 +- .../source/single_store/functions/median.py | 17 ++++ .../metrics/window/first_quartile.py | 10 +++ .../single_store/metrics/window/median.py | 10 +++ .../metrics/window/third_quartile.py | 10 +++ 14 files changed, 262 insertions(+), 89 deletions(-) create mode 100644 ingestion/src/metadata/profiler/interface/sqlalchemy/single_store/__init__.py create mode 100644 ingestion/src/metadata/profiler/interface/sqlalchemy/single_store/profiler_interface.py create mode 100644 ingestion/src/metadata/profiler/metrics/window/percentille_mixin.py create mode 100644 ingestion/src/metadata/profiler/source/single_store/functions/median.py create mode 100644 ingestion/src/metadata/profiler/source/single_store/metrics/window/first_quartile.py create mode 100644 ingestion/src/metadata/profiler/source/single_store/metrics/window/median.py create mode 100644 ingestion/src/metadata/profiler/source/single_store/metrics/window/third_quartile.py diff --git a/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py b/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py index ca0525490fc..754776d6eb4 100644 --- a/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=arguments-differ """ Interfaces with database for all database engine @@ -31,7 +32,6 @@ from metadata.profiler.metrics.registry import Metrics from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_ from metadata.readers.dataframe.models import DatalakeTableSchemaWrapper from metadata.utils.datalake.datalake_utils import fetch_col_types, fetch_dataframe -from metadata.utils.dispatch import valuedispatch from metadata.utils.logger import profiler_interface_registry_logger from metadata.utils.sqa_like_column import SQALikeColumn @@ -109,20 +109,10 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin): profile_sample_query=self.profile_query, ) - @valuedispatch - def _get_metrics(self, *_, **__): - """Generic getter method for metrics. To be used with - specific dispatch methods - """ - logger.warning("Could not get metric. No function registered.") - - # pylint: disable=unused-argument - @_get_metrics.register(MetricTypes.Table.value) - def _( + def _compute_table_metrics( self, - metric_type: str, metrics: List[Metrics], - dfs, + runner: List, *args, **kwargs, ): @@ -138,7 +128,7 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin): try: row_dict = {} - df_list = [df.where(pd.notnull(df), None) for df in dfs] + df_list = [df.where(pd.notnull(df), None) for df in runner] for metric in metrics: row_dict[metric.name()] = metric().df_fn(df_list) return row_dict @@ -147,13 +137,10 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin): logger.warning(f"Error trying to compute profile for {exc}") raise RuntimeError(exc) - # pylint: disable=unused-argument - @_get_metrics.register(MetricTypes.Static.value) - def _( + def _compute_static_metrics( self, - metric_type: str, metrics: List[Metrics], - dfs, + runner: List, column, *args, **kwargs, @@ -172,7 +159,7 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin): try: row_dict = {} for metric in metrics: - metric_resp = metric(column).df_fn(dfs) + metric_resp = metric(column).df_fn(runner) row_dict[metric.name()] = ( None if pd.isnull(metric_resp) else metric_resp ) @@ -183,13 +170,10 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin): ) raise RuntimeError(exc) - # pylint: disable=unused-argument - @_get_metrics.register(MetricTypes.Query.value) - def _( + def _compute_query_metrics( self, - metric_type: str, - metrics: Metrics, - dfs, + metric: Metrics, + runner: List, column, *args, **kwargs, @@ -204,18 +188,15 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin): dictionnary of results """ col_metric = None - col_metric = metrics(column).df_fn(dfs) + col_metric = metric(column).df_fn(runner) if not col_metric: return None - return {metrics.name(): col_metric} + return {metric.name(): col_metric} - # pylint: disable=unused-argument - @_get_metrics.register(MetricTypes.Window.value) - def _( + def _compute_window_metrics( self, - metric_type: str, - metrics: Metrics, - dfs, + metrics: List[Metrics], + runner: List, column, *args, **kwargs, @@ -224,19 +205,21 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin): Given a list of metrics, compute the given results and returns the values """ + try: metric_values = {} for metric in metrics: - metric_values[metric.name()] = metric(column).df_fn(dfs) + metric_values[metric.name()] = metric(column).df_fn(runner) return metric_values if metric_values else None except Exception as exc: logger.debug(traceback.format_exc()) logger.warning(f"Unexpected exception computing metrics: {exc}") return None - @_get_metrics.register(MetricTypes.System.value) - def _( + def _compute_system_metrics( self, + metrics: Metrics, + runner: List, *args, **kwargs, ): @@ -260,11 +243,9 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin): try: row = None if self.dfs: - row = self._get_metrics( - metric_type.value, + row = self._get_metric_fn[metric_type.value]( metrics, dfs, - session=self.client, column=column, ) except Exception as exc: diff --git a/ingestion/src/metadata/profiler/interface/profiler_interface.py b/ingestion/src/metadata/profiler/interface/profiler_interface.py index fc050a04d0e..b55e8c45b8c 100644 --- a/ingestion/src/metadata/profiler/interface/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/profiler_interface.py @@ -15,7 +15,7 @@ supporting sqlalchemy abstraction layer """ from abc import ABC, abstractmethod -from typing import Dict, Optional, Union +from typing import Dict, List, Optional, Union from sqlalchemy import Column from typing_extensions import Self @@ -36,7 +36,9 @@ from metadata.ingestion.api.processor import ProfilerProcessorStatus from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection from metadata.profiler.api.models import ProfileSampleConfig, TableConfig +from metadata.profiler.metrics.core import MetricTypes from metadata.profiler.metrics.registry import Metrics +from metadata.profiler.processor.runner import QueryRunner from metadata.utils.partition import get_partition_details @@ -78,6 +80,14 @@ class ProfilerInterface(ABC): ) self.timeout_seconds = timeout_seconds + self._get_metric_fn = { + MetricTypes.Table.value: self._compute_table_metrics, + MetricTypes.Static.value: self._compute_static_metrics, + MetricTypes.Query.value: self._compute_query_metrics, + MetricTypes.Window.value: self._compute_window_metrics, + MetricTypes.System.value: self._compute_system_metrics, + } + @abstractmethod def _get_sampler(self): """Get the sampler""" @@ -222,7 +232,57 @@ class ProfilerInterface(ABC): raise NotImplementedError @abstractmethod - def _get_metrics(self, *args, **kwargs): + def _compute_table_metrics( + self, + metrics: List[Metrics], + runner, + *args, + **kwargs, + ): + """Get metrics""" + raise NotImplementedError + + @abstractmethod + def _compute_static_metrics( + self, + metrics: List[Metrics], + runner, + *args, + **kwargs, + ): + """Get metrics""" + raise NotImplementedError + + @abstractmethod + def _compute_query_metrics( + self, + metric: Metrics, + runner, + *args, + **kwargs, + ): + """Get metrics""" + raise NotImplementedError + + @abstractmethod + def _compute_window_metrics( + self, + metrics: List[Metrics], + runner: QueryRunner, + *args, + **kwargs, + ): + """Get metrics""" + raise NotImplementedError + + @abstractmethod + def _compute_system_metrics( + self, + metrics: Metrics, + runner, + *args, + **kwargs, + ): """Get metrics""" raise NotImplementedError diff --git a/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py b/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py index 684e6be3643..15d7aff293c 100644 --- a/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py +++ b/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py @@ -21,6 +21,9 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConn from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( DatalakeConnection, ) +from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import ( + SingleStoreConnection, +) from metadata.generated.schema.entity.services.databaseService import DatabaseConnection from metadata.profiler.interface.pandas.profiler_interface import ( PandasProfilerInterface, @@ -32,6 +35,9 @@ from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import ( from metadata.profiler.interface.sqlalchemy.profiler_interface import ( SQAProfilerInterface, ) +from metadata.profiler.interface.sqlalchemy.single_store.profiler_interface import ( + SingleStoreProfilerInterface, +) class ProfilerInterfaceFactory: @@ -58,6 +64,9 @@ profiler_interface_factory.register(DatabaseConnection.__name__, SQAProfilerInte profiler_interface_factory.register( BigQueryConnection.__name__, BigQueryProfilerInterface ) +profiler_interface_factory.register( + SingleStoreConnection.__name__, SingleStoreProfilerInterface +) profiler_interface_factory.register( DatalakeConnection.__name__, PandasProfilerInterface ) diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py index 761266abe67..fd380946573 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py @@ -8,6 +8,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +# pylint: disable=arguments-differ """ Interfaces with database for all database engine @@ -40,7 +41,6 @@ from metadata.profiler.orm.functions.table_metric_construct import ( from metadata.profiler.processor.runner import QueryRunner from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_ from metadata.utils.custom_thread_pool import CustomThreadPoolExecutor -from metadata.utils.dispatch import valuedispatch from metadata.utils.logger import profiler_interface_registry_logger logger = profiler_interface_registry_logger() @@ -153,18 +153,8 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): handle_query_exception(msg, exc, session) return None - @valuedispatch - def _get_metrics(self, *args, **kwargs): - """Generic getter method for metrics. To be used with - specific dispatch methods - """ - logger.warning("Could not get metric. No function registered.") - - # pylint: disable=unused-argument - @_get_metrics.register(MetricTypes.Table.value) - def _( + def _compute_table_metrics( self, - metric_type: str, metrics: List[Metrics], runner: QueryRunner, session, @@ -180,7 +170,6 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): dictionnary of results """ # pylint: disable=protected-access - try: dialect = runner._session.get_bind().dialect.name row = table_metric_construct_factory.construct( @@ -201,15 +190,12 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): session.rollback() raise RuntimeError(exc) - # pylint: disable=unused-argument - @_get_metrics.register(MetricTypes.Static.value) - def _( + def _compute_static_metrics( self, - metric_type: str, metrics: List[Metrics], runner: QueryRunner, + column, session, - column: Column, *args, **kwargs, ): @@ -247,16 +233,15 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): handle_query_exception(msg, exc, session) return None - # pylint: disable=unused-argument - @_get_metrics.register(MetricTypes.Query.value) - def _( + def _compute_query_metrics( self, - metric_type: str, metric: Metrics, runner: QueryRunner, + column, session, - column: Column, sample, + *args, + **kwargs, ): """Given a list of metrics, compute the given results and returns the values @@ -267,6 +252,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): Returns: dictionnary of results """ + try: col_metric = metric(column) metric_query = col_metric.query(sample=sample, session=session) @@ -284,15 +270,12 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): handle_query_exception(msg, exc, session) return None - # pylint: disable=unused-argument - @_get_metrics.register(MetricTypes.Window.value) - def _( + def _compute_window_metrics( self, - metric_type: str, metrics: List[Metrics], runner: QueryRunner, + column, session, - column: Column, *args, **kwargs, ): @@ -305,6 +288,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): Returns: dictionnary of results """ + if not metrics: return None try: @@ -327,11 +311,9 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): return dict(row) return None - @_get_metrics.register(MetricTypes.System.value) - def _( + def _compute_system_metrics( self, - metric_type: str, - metric: Metrics, + metrics: Metrics, runner: QueryRunner, session, *args, @@ -348,7 +330,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): dictionnary of results """ try: - rows = metric().sql(session, conn_config=self.service_connection_config) + rows = metrics().sql(session, conn_config=self.service_connection_config) return rows except Exception as exc: msg = f"Error trying to compute profile for {runner.table.__tablename__}: {exc}" @@ -412,8 +394,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): ) try: - row = self._get_metrics( - metric_type.value, + row = self._get_metric_fn[metric_type.value]( metrics, runner=runner, session=session, diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/single_store/__init__.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/single_store/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/single_store/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/single_store/profiler_interface.py new file mode 100644 index 00000000000..6d87883a2e3 --- /dev/null +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/single_store/profiler_interface.py @@ -0,0 +1,86 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Interfaces with database for all database engine +supporting sqlalchemy abstraction layer +""" + +from typing import List + +from sqlalchemy.exc import ProgrammingError + +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( + SQAProfilerInterface, + handle_query_exception, +) +from metadata.profiler.metrics.registry import Metrics +from metadata.profiler.processor.runner import QueryRunner +from metadata.profiler.source.single_store.metrics.window.first_quartile import ( + SingleStoreFirstQuartile, +) +from metadata.profiler.source.single_store.metrics.window.median import ( + SingleStoreMedian, +) +from metadata.profiler.source.single_store.metrics.window.third_quartile import ( + SingleStoreThirdQuartile, +) +from metadata.utils.logger import profiler_interface_registry_logger + +logger = profiler_interface_registry_logger() + + +class SingleStoreProfilerInterface(SQAProfilerInterface): + """ + Interface to interact with registry supporting + sqlalchemy. + """ + + def _compute_window_metrics( + self, + metrics: List[Metrics], + runner: QueryRunner, + *args, + **kwargs, + ): + """Given a list of metrics, compute the given results + and returns the values + + Args: + column: the column to compute the metrics against + metrics: list of metrics to compute + Returns: + dictionnary of results + """ + session = kwargs.get("session") + column = kwargs.get("column") + + if not metrics: + return None + try: + # we patch the metrics at runtime to use the SingleStore specific functions + # as we can't compile the query based on the dialect as it return `mysql` + metrics = [SingleStoreFirstQuartile, SingleStoreMedian, SingleStoreThirdQuartile] # type: ignore + row = runner.select_first_from_sample( + *[metric(column).fn() for metric in metrics], + ) + except ProgrammingError: + logger.info( + f"Skipping window metrics for {runner.table.__tablename__}.{column.name} due to overflow" + ) + return None + + except Exception as exc: + msg = f"Error trying to compute profile for {runner.table.__tablename__}.{column.name}: {exc}" + handle_query_exception(msg, exc, session) + if row: + return dict(row) + return None diff --git a/ingestion/src/metadata/profiler/metrics/window/first_quartile.py b/ingestion/src/metadata/profiler/metrics/window/first_quartile.py index c508f276ec0..ecfce32fff1 100644 --- a/ingestion/src/metadata/profiler/metrics/window/first_quartile.py +++ b/ingestion/src/metadata/profiler/metrics/window/first_quartile.py @@ -19,15 +19,15 @@ from typing import List, cast from sqlalchemy import column from metadata.profiler.metrics.core import StaticMetric, _label +from metadata.profiler.metrics.window.percentille_mixin import PercentilMixin from metadata.profiler.orm.functions.length import LenFn -from metadata.profiler.orm.functions.median import MedianFn from metadata.profiler.orm.registry import is_concatenable, is_quantifiable from metadata.utils.logger import profiler_logger logger = profiler_logger() -class FirstQuartile(StaticMetric): +class FirstQuartile(StaticMetric, PercentilMixin): """ First Quartile Metric @@ -53,14 +53,14 @@ class FirstQuartile(StaticMetric): """sqlalchemy function""" if is_quantifiable(self.col.type): # col fullname is only needed for MySQL and SQLite - return MedianFn( + return self._compute_sqa_fn( column(self.col.name, self.col.type), self.col.table.fullname if self.col.table is not None else None, 0.25, ) if is_concatenable(self.col.type): - return MedianFn( + return self._compute_sqa_fn( LenFn(column(self.col.name, self.col.type)), self.col.table.fullname if self.col.table is not None else None, 0.25, diff --git a/ingestion/src/metadata/profiler/metrics/window/median.py b/ingestion/src/metadata/profiler/metrics/window/median.py index 1a4c623e581..e8d4f42e946 100644 --- a/ingestion/src/metadata/profiler/metrics/window/median.py +++ b/ingestion/src/metadata/profiler/metrics/window/median.py @@ -19,15 +19,15 @@ from typing import List, cast from sqlalchemy import column from metadata.profiler.metrics.core import StaticMetric, _label +from metadata.profiler.metrics.window.percentille_mixin import PercentilMixin from metadata.profiler.orm.functions.length import LenFn -from metadata.profiler.orm.functions.median import MedianFn from metadata.profiler.orm.registry import is_concatenable, is_quantifiable from metadata.utils.logger import profiler_logger logger = profiler_logger() -class Median(StaticMetric): +class Median(StaticMetric, PercentilMixin): """ Median Metric @@ -53,14 +53,14 @@ class Median(StaticMetric): """sqlalchemy function""" if is_quantifiable(self.col.type): # col fullname is only needed for MySQL and SQLite - return MedianFn( + return self._compute_sqa_fn( column(self.col.name, self.col.type), self.col.table.fullname if self.col.table is not None else None, 0.5, ) if is_concatenable(self.col.type): - return MedianFn( + return self._compute_sqa_fn( LenFn(column(self.col.name, self.col.type)), self.col.table.fullname if self.col.table is not None else None, 0.5, diff --git a/ingestion/src/metadata/profiler/metrics/window/percentille_mixin.py b/ingestion/src/metadata/profiler/metrics/window/percentille_mixin.py new file mode 100644 index 00000000000..38a98ccc8ce --- /dev/null +++ b/ingestion/src/metadata/profiler/metrics/window/percentille_mixin.py @@ -0,0 +1,9 @@ +"""function calls shared accross all percentile metrics""" + +from metadata.profiler.orm.functions.median import MedianFn + + +class PercentilMixin: + def _compute_sqa_fn(self, column, table, percentile): + """Generic method to compute the quartile using sqlalchemy""" + return MedianFn(column, table, percentile) diff --git a/ingestion/src/metadata/profiler/metrics/window/third_quartile.py b/ingestion/src/metadata/profiler/metrics/window/third_quartile.py index d571739419d..1a7b06094e9 100644 --- a/ingestion/src/metadata/profiler/metrics/window/third_quartile.py +++ b/ingestion/src/metadata/profiler/metrics/window/third_quartile.py @@ -19,15 +19,15 @@ from typing import List, cast from sqlalchemy import column from metadata.profiler.metrics.core import StaticMetric, _label +from metadata.profiler.metrics.window.percentille_mixin import PercentilMixin from metadata.profiler.orm.functions.length import LenFn -from metadata.profiler.orm.functions.median import MedianFn from metadata.profiler.orm.registry import is_concatenable, is_quantifiable from metadata.utils.logger import profiler_logger logger = profiler_logger() -class ThirdQuartile(StaticMetric): +class ThirdQuartile(StaticMetric, PercentilMixin): """ Third Quartile Metric @@ -53,14 +53,14 @@ class ThirdQuartile(StaticMetric): """sqlalchemy function""" if is_quantifiable(self.col.type): # col fullname is only needed for MySQL and SQLite - return MedianFn( + return self._compute_sqa_fn( column(self.col.name, self.col.type), self.col.table.fullname if self.col.table is not None else None, 0.75, ) if is_concatenable(self.col.type): - return MedianFn( + return self._compute_sqa_fn( LenFn(column(self.col.name, self.col.type)), self.col.table.fullname if self.col.table is not None else None, 0.75, diff --git a/ingestion/src/metadata/profiler/source/single_store/functions/median.py b/ingestion/src/metadata/profiler/source/single_store/functions/median.py new file mode 100644 index 00000000000..cd509d782e5 --- /dev/null +++ b/ingestion/src/metadata/profiler/source/single_store/functions/median.py @@ -0,0 +1,17 @@ +"""Median function for single store""" + +from sqlalchemy.ext.compiler import compiles +from sqlalchemy.sql.functions import FunctionElement + +from metadata.profiler.metrics.core import CACHE + + +class SingleStoreMedianFn(FunctionElement): + inherit_cache = CACHE + + +@compiles(SingleStoreMedianFn) +def _(elements, compiler, **kwargs): # pylint: disable=unused-argument + col = compiler.process(elements.clauses.clauses[0]) + percentile = elements.clauses.clauses[2].value + return f"approx_percentile({col}, {percentile:.2f})" diff --git a/ingestion/src/metadata/profiler/source/single_store/metrics/window/first_quartile.py b/ingestion/src/metadata/profiler/source/single_store/metrics/window/first_quartile.py new file mode 100644 index 00000000000..3bfd15f1bcb --- /dev/null +++ b/ingestion/src/metadata/profiler/source/single_store/metrics/window/first_quartile.py @@ -0,0 +1,10 @@ +"""Override first quartile metric definition for SingleStore""" + +from metadata.profiler.metrics.window.first_quartile import FirstQuartile +from metadata.profiler.source.single_store.functions.median import SingleStoreMedianFn + + +class SingleStoreFirstQuartile(FirstQuartile): + def _compute_sqa_fn(self, column, table, percentile): + """Generic method to compute the quartile using sqlalchemy""" + return SingleStoreMedianFn(column, table, percentile) diff --git a/ingestion/src/metadata/profiler/source/single_store/metrics/window/median.py b/ingestion/src/metadata/profiler/source/single_store/metrics/window/median.py new file mode 100644 index 00000000000..843fb971b5f --- /dev/null +++ b/ingestion/src/metadata/profiler/source/single_store/metrics/window/median.py @@ -0,0 +1,10 @@ +"""Override first quartile metric definition for SingleStore""" + +from metadata.profiler.metrics.window.median import Median +from metadata.profiler.source.single_store.functions.median import SingleStoreMedianFn + + +class SingleStoreMedian(Median): + def _compute_sqa_fn(self, column, table, percentile): + """Generic method to compute the quartile using sqlalchemy""" + return SingleStoreMedianFn(column, table, percentile) diff --git a/ingestion/src/metadata/profiler/source/single_store/metrics/window/third_quartile.py b/ingestion/src/metadata/profiler/source/single_store/metrics/window/third_quartile.py new file mode 100644 index 00000000000..c8c8ef53274 --- /dev/null +++ b/ingestion/src/metadata/profiler/source/single_store/metrics/window/third_quartile.py @@ -0,0 +1,10 @@ +"""Override first quartile metric definition for SingleStore""" + +from metadata.profiler.metrics.window.third_quartile import ThirdQuartile +from metadata.profiler.source.single_store.functions.median import SingleStoreMedianFn + + +class SingleStoreThirdQuartile(ThirdQuartile): + def _compute_sqa_fn(self, column, table, percentile): + """Generic method to compute the quartile using sqlalchemy""" + return SingleStoreMedianFn(column, table, percentile)