diff --git a/ingestion/src/metadata/ingestion/source/database/impala/metadata.py b/ingestion/src/metadata/ingestion/source/database/impala/metadata.py index ab9a7fc5306..400a12332c2 100644 --- a/ingestion/src/metadata/ingestion/source/database/impala/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/impala/metadata.py @@ -33,6 +33,13 @@ from metadata.ingestion.source.database.impala.queries import IMPALA_GET_COMMENT complex_data_types = ["struct", "map", "array", "union"] +_impala_type_to_sqlalchemy_type.update( + { + "CHAR": types.CHAR, + "REAL": types.REAL, + } +) + def get_impala_table_or_view_names(connection, schema=None, target_type="table"): """ diff --git a/ingestion/src/metadata/profiler/metrics/composed/distinct_ratio.py b/ingestion/src/metadata/profiler/metrics/composed/distinct_ratio.py index 3b462d437e9..8c4c36e43c2 100644 --- a/ingestion/src/metadata/profiler/metrics/composed/distinct_ratio.py +++ b/ingestion/src/metadata/profiler/metrics/composed/distinct_ratio.py @@ -49,9 +49,9 @@ class DistinctRatio(ComposedMetric): results of other Metrics """ res_count = res.get(Count.name()) - res_unique = res.get(DistinctCount.name()) + res_distinct = res.get(DistinctCount.name()) - if res_count and res_unique is not None: - return res_unique / res_count + if res_count and res_distinct is not None: + return res_distinct / res_count return None diff --git a/ingestion/src/metadata/profiler/metrics/static/max.py b/ingestion/src/metadata/profiler/metrics/static/max.py index 725ae97da54..67c39ea7897 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max.py +++ b/ingestion/src/metadata/profiler/metrics/static/max.py @@ -15,10 +15,35 @@ Max Metric definition # pylint: disable=duplicate-code -from sqlalchemy import column, func +from sqlalchemy import column +from sqlalchemy.ext.compiler import compiles +from sqlalchemy.sql.functions import GenericFunction -from metadata.profiler.metrics.core import StaticMetric, _label -from metadata.profiler.orm.registry import is_date_time, is_quantifiable +from metadata.profiler.metrics.core import CACHE, StaticMetric, _label +from metadata.profiler.orm.functions.length import LenFn +from metadata.profiler.orm.registry import ( + Dialects, + is_concatenable, + is_date_time, + is_quantifiable, +) + + +class MaxFn(GenericFunction): + name = __qualname__ + inherit_cache = CACHE + + +@compiles(MaxFn) +def _(element, compiler, **kw): + col = compiler.process(element.clauses, **kw) + return f"MAX({col})" + + +@compiles(MaxFn, Dialects.Impala) +def _(element, compiler, **kw): + col = compiler.process(element.clauses, **kw) + return f"MAX(if(is_nan({col}) or is_inf({col}), null, {col}))" class Max(StaticMetric): @@ -35,9 +60,11 @@ class Max(StaticMetric): @_label def fn(self): """sqlalchemy function""" + if is_concatenable(self.col.type): + return MaxFn(LenFn(column(self.col.name))) if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)): return None - return func.max(column(self.col.name)) + return MaxFn(column(self.col.name)) def df_fn(self, dfs=None): """pandas function""" diff --git a/ingestion/src/metadata/profiler/metrics/static/mean.py b/ingestion/src/metadata/profiler/metrics/static/mean.py index 881911eae6e..33443017434 100644 --- a/ingestion/src/metadata/profiler/metrics/static/mean.py +++ b/ingestion/src/metadata/profiler/metrics/static/mean.py @@ -52,6 +52,15 @@ def _(element, compiler, **kw): return f"avg(cast({proc} as decimal))" +@compiles(avg, Dialects.Impala) +def _(element, compiler, **kw): + """ + Convert NaN and Inf values to null before computing average. + """ + proc = compiler.process(element.clauses, **kw) + return f"avg(if(is_nan({proc}) or is_inf({proc}), null, {proc}))" + + class Mean(StaticMetric): """ AVG Metric diff --git a/ingestion/src/metadata/profiler/metrics/static/min.py b/ingestion/src/metadata/profiler/metrics/static/min.py index b1ee00be2ab..8971b2ff757 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min.py +++ b/ingestion/src/metadata/profiler/metrics/static/min.py @@ -15,10 +15,35 @@ Min Metric definition # pylint: disable=duplicate-code -from sqlalchemy import column, func +from sqlalchemy import column +from sqlalchemy.ext.compiler import compiles +from sqlalchemy.sql.functions import GenericFunction -from metadata.profiler.metrics.core import StaticMetric, _label -from metadata.profiler.orm.registry import is_date_time, is_quantifiable +from metadata.profiler.metrics.core import CACHE, StaticMetric, _label +from metadata.profiler.orm.functions.length import LenFn +from metadata.profiler.orm.registry import ( + Dialects, + is_concatenable, + is_date_time, + is_quantifiable, +) + + +class MinFn(GenericFunction): + name = __qualname__ + inherit_cache = CACHE + + +@compiles(MinFn) +def _(element, compiler, **kw): + col = compiler.process(element.clauses, **kw) + return f"MIN({col})" + + +@compiles(MinFn, Dialects.Impala) +def _(element, compiler, **kw): + col = compiler.process(element.clauses, **kw) + return f"MIN(if(is_nan({col}) or is_inf({col}), null, {col}))" class Min(StaticMetric): @@ -35,9 +60,12 @@ class Min(StaticMetric): @_label def fn(self): """sqlalchemy function""" + if is_concatenable(self.col.type): + return MinFn(LenFn(column(self.col.name))) + if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)): return None - return func.min(column(self.col.name)) + return MinFn(column(self.col.name)) def df_fn(self, dfs=None): """pandas function""" diff --git a/ingestion/src/metadata/profiler/metrics/static/stddev.py b/ingestion/src/metadata/profiler/metrics/static/stddev.py index e6bb602970f..0bcf16aaf07 100644 --- a/ingestion/src/metadata/profiler/metrics/static/stddev.py +++ b/ingestion/src/metadata/profiler/metrics/static/stddev.py @@ -22,7 +22,8 @@ from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql.functions import FunctionElement from metadata.profiler.metrics.core import CACHE, StaticMetric, _label -from metadata.profiler.orm.registry import Dialects, is_quantifiable +from metadata.profiler.orm.functions.length import LenFn +from metadata.profiler.orm.registry import Dialects, is_concatenable, is_quantifiable from metadata.utils.logger import profiler_logger logger = profiler_logger() @@ -43,6 +44,12 @@ def _(element, compiler, **kw): return "STDEVP(%s)" % compiler.process(element.clauses, **kw) +@compiles(StdDevFn, Dialects.Impala) +def _(element, compiler, **kw): + col = compiler.process(element.clauses, **kw) + return f"STDDEV_POP(if(is_nan({col}) or is_inf({col}), null, {col}))" + + @compiles(StdDevFn, Dialects.SQLite) # Needed for unit tests def _(element, compiler, **kw): """ @@ -84,6 +91,9 @@ class StdDev(StaticMetric): if is_quantifiable(self.col.type): return StdDevFn(column(self.col.name)) + if is_concatenable(self.col.type): + return StdDevFn(LenFn(column(self.col.name))) + logger.debug( f"{self.col} has type {self.col.type}, which is not listed as quantifiable." + " We won't compute STDDEV for it." diff --git a/ingestion/src/metadata/profiler/metrics/static/sum.py b/ingestion/src/metadata/profiler/metrics/static/sum.py index c0de620ef4e..d1949bf837c 100644 --- a/ingestion/src/metadata/profiler/metrics/static/sum.py +++ b/ingestion/src/metadata/profiler/metrics/static/sum.py @@ -17,8 +17,9 @@ SUM Metric definition from sqlalchemy import column from metadata.profiler.metrics.core import StaticMetric, _label +from metadata.profiler.orm.functions.length import LenFn from metadata.profiler.orm.functions.sum import SumFn -from metadata.profiler.orm.registry import is_quantifiable +from metadata.profiler.orm.registry import is_concatenable, is_quantifiable class Sum(StaticMetric): @@ -40,6 +41,9 @@ class Sum(StaticMetric): if is_quantifiable(self.col.type): return SumFn(column(self.col.name)) + if is_concatenable(self.col.type): + return SumFn(LenFn(column(self.col.name))) + return None def df_fn(self, dfs=None): diff --git a/ingestion/src/metadata/profiler/metrics/window/median.py b/ingestion/src/metadata/profiler/metrics/window/median.py index 35339e81ed9..afbfac48529 100644 --- a/ingestion/src/metadata/profiler/metrics/window/median.py +++ b/ingestion/src/metadata/profiler/metrics/window/median.py @@ -19,8 +19,9 @@ from typing import List, cast from sqlalchemy import column from metadata.profiler.metrics.core import StaticMetric, _label +from metadata.profiler.orm.functions.length import LenFn from metadata.profiler.orm.functions.median import MedianFn -from metadata.profiler.orm.registry import is_quantifiable +from metadata.profiler.orm.registry import is_concatenable, is_quantifiable from metadata.utils.logger import profiler_logger logger = profiler_logger() @@ -53,6 +54,9 @@ class Median(StaticMetric): if is_quantifiable(self.col.type): return MedianFn(column(self.col.name), self.col.table.fullname, 0.5) + if is_concatenable(self.col.type): + return MedianFn(LenFn(column(self.col.name)), self.col.table.fullname, 0.5) + logger.debug( f"Don't know how to process type {self.col.type} when computing Median" ) diff --git a/ingestion/src/metadata/profiler/orm/functions/median.py b/ingestion/src/metadata/profiler/orm/functions/median.py index b2a1c4845ab..45d44cb7057 100644 --- a/ingestion/src/metadata/profiler/orm/functions/median.py +++ b/ingestion/src/metadata/profiler/orm/functions/median.py @@ -111,7 +111,7 @@ def _(elements, compiler, **kwargs): col, _, percentile = [ compiler.process(element, **kwargs) for element in elements.clauses ] - return "if(%s = .5, appx_median(%s), null)" % (percentile, col) + return f"if({percentile} = .5, appx_median(if(is_nan({col}) or is_inf({col}), null, {col})), null)" @compiles(MedianFn, Dialects.MySQL) @@ -126,10 +126,10 @@ def _(elements, compiler, **kwargs): # pylint: disable=unused-argument {col} FROM ( SELECT - t.{col}, + {col}, ROW_NUMBER() OVER () AS row_num FROM - {table} t, + {table}, (SELECT @counter := COUNT(*) FROM {table}) t_count ORDER BY {col} ) temp diff --git a/ingestion/src/metadata/profiler/orm/functions/sum.py b/ingestion/src/metadata/profiler/orm/functions/sum.py index 6233fceee80..3a1f449661e 100644 --- a/ingestion/src/metadata/profiler/orm/functions/sum.py +++ b/ingestion/src/metadata/profiler/orm/functions/sum.py @@ -74,6 +74,13 @@ def _(element, compiler, **kw): return f"SUM(CAST({proc} AS NUMBER))" +@compiles(SumFn, Dialects.Impala) +def _(element, compiler, **kw): + """Impala casting. Default BIGINT isn't big enough for some sums""" + proc = compiler.process(element.clauses, **kw) + return f"SUM(CAST(if(is_nan({proc}) or is_inf({proc}), null, {proc}) AS DOUBLE))" + + @compiles(SumFn, Dialects.IbmDbSa) @compiles(SumFn, Dialects.Db2) def _(element, compiler, **kw): diff --git a/ingestion/tests/unit/profiler/test_metrics.py b/ingestion/tests/unit/profiler/test_metrics.py index 9b3ac62eefb..3f83b8b05fa 100644 --- a/ingestion/tests/unit/profiler/test_metrics.py +++ b/ingestion/tests/unit/profiler/test_metrics.py @@ -594,7 +594,7 @@ class MetricsTest(TestCase): ._column_results ) - assert res.get(User.name.name).get(Metrics.SUM.name) is None + assert res.get(User.name.name).get(Metrics.SUM.name) == 12 def test_unique_count(self): """