mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-04 06:33:10 +00:00
Impala Connection Profiler is_nan rollback; Histogram fix. (#11388)
This commit is contained in:
parent
7db9f3eb24
commit
65c5b44eaa
@ -23,7 +23,8 @@ from metadata.profiler.metrics.core import HybridMetric
|
|||||||
from metadata.profiler.metrics.static.count import Count
|
from metadata.profiler.metrics.static.count import Count
|
||||||
from metadata.profiler.metrics.static.max import Max
|
from metadata.profiler.metrics.static.max import Max
|
||||||
from metadata.profiler.metrics.static.min import Min
|
from metadata.profiler.metrics.static.min import Min
|
||||||
from metadata.profiler.orm.registry import is_quantifiable
|
from metadata.profiler.orm.functions.length import LenFn
|
||||||
|
from metadata.profiler.orm.registry import is_concatenable, is_quantifiable
|
||||||
from metadata.utils.helpers import format_large_string_numbers
|
from metadata.utils.helpers import format_large_string_numbers
|
||||||
from metadata.utils.logger import profiler_logger
|
from metadata.utils.logger import profiler_logger
|
||||||
from metadata.utils.sqa_utils import handle_array
|
from metadata.utils.sqa_utils import handle_array
|
||||||
@ -66,11 +67,11 @@ class Histogram(HybridMetric):
|
|||||||
res_min = res.get(Min.name())
|
res_min = res.get(Min.name())
|
||||||
res_max = res.get(Max.name())
|
res_max = res.get(Max.name())
|
||||||
|
|
||||||
if any(var is None for var in [res_iqr, res_row_count, res_min, res_max]):
|
if any(var is None for var in [res_row_count, res_min, res_max]):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return (
|
return (
|
||||||
float(res_iqr),
|
float(res_iqr) if res_iqr is not None else res_iqr,
|
||||||
float(res_row_count),
|
float(res_row_count),
|
||||||
float(res_min),
|
float(res_min),
|
||||||
float(res_max),
|
float(res_max),
|
||||||
@ -110,18 +111,22 @@ class Histogram(HybridMetric):
|
|||||||
res_min (float): minimum value
|
res_min (float): minimum value
|
||||||
res_max (float): maximum value
|
res_max (float): maximum value
|
||||||
"""
|
"""
|
||||||
# freedman-diaconis rule
|
# preinint num_bins over 100. On the normal path freedman-diaconis will readjust according to the algorithm
|
||||||
bin_width = self._get_bin_width(float(res_iqr), res_row_count) # type: ignore
|
# when we must fallback to sturges rule due to res_iqr being None, then num_bins will be readjusted.
|
||||||
num_bins = math.ceil((res_max - res_min) / bin_width) # type: ignore
|
max_bin_count = 100
|
||||||
|
if res_iqr is not None:
|
||||||
|
# freedman-diaconis rule
|
||||||
|
bin_width = self._get_bin_width(float(res_iqr), res_row_count) # type: ignore
|
||||||
|
num_bins = math.ceil((res_max - res_min) / bin_width) # type: ignore
|
||||||
|
|
||||||
# sturge's rule
|
# sturge's rule
|
||||||
if num_bins > 100:
|
if res_iqr is None or num_bins > max_bin_count:
|
||||||
num_bins = int(math.ceil(math.log2(res_row_count) + 1))
|
num_bins = int(math.ceil(math.log2(res_row_count) + 1))
|
||||||
bin_width = (res_max - res_min) / num_bins
|
bin_width = (res_max - res_min) / num_bins
|
||||||
|
|
||||||
# fallback to 100 bins
|
# fallback to max_bin_count bins
|
||||||
if num_bins > 100:
|
if num_bins > max_bin_count:
|
||||||
num_bins = 100
|
num_bins = max_bin_count
|
||||||
bin_width = (res_max - res_min) / num_bins
|
bin_width = (res_max - res_min) / num_bins
|
||||||
|
|
||||||
return num_bins, bin_width
|
return num_bins, bin_width
|
||||||
@ -141,7 +146,7 @@ class Histogram(HybridMetric):
|
|||||||
"We are missing the session attribute to compute the Histogram."
|
"We are missing the session attribute to compute the Histogram."
|
||||||
)
|
)
|
||||||
|
|
||||||
if not is_quantifiable(self.col.type):
|
if not (is_quantifiable(self.col.type) or is_concatenable(self.col.type)):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# get the metric need for the freedman-diaconis rule
|
# get the metric need for the freedman-diaconis rule
|
||||||
@ -159,7 +164,11 @@ class Histogram(HybridMetric):
|
|||||||
starting_bin_bound = res_min
|
starting_bin_bound = res_min
|
||||||
res_min = cast(Union[float, int], res_min) # satisfy mypy
|
res_min = cast(Union[float, int], res_min) # satisfy mypy
|
||||||
ending_bin_bound = res_min + bin_width
|
ending_bin_bound = res_min + bin_width
|
||||||
col = column(self.col.name) # type: ignore
|
|
||||||
|
if is_concatenable(self.col.type):
|
||||||
|
col = LenFn(column(self.col.name))
|
||||||
|
else:
|
||||||
|
col = column(self.col.name) # type: ignore
|
||||||
|
|
||||||
case_stmts = []
|
case_stmts = []
|
||||||
for bin_num in range(num_bins):
|
for bin_num in range(num_bins):
|
||||||
|
@ -22,7 +22,6 @@ from sqlalchemy.sql.functions import GenericFunction
|
|||||||
from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
|
from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
|
||||||
from metadata.profiler.orm.functions.length import LenFn
|
from metadata.profiler.orm.functions.length import LenFn
|
||||||
from metadata.profiler.orm.registry import (
|
from metadata.profiler.orm.registry import (
|
||||||
Dialects,
|
|
||||||
is_concatenable,
|
is_concatenable,
|
||||||
is_date_time,
|
is_date_time,
|
||||||
is_quantifiable,
|
is_quantifiable,
|
||||||
@ -40,12 +39,6 @@ def _(element, compiler, **kw):
|
|||||||
return f"MAX({col})"
|
return f"MAX({col})"
|
||||||
|
|
||||||
|
|
||||||
@compiles(MaxFn, Dialects.Impala)
|
|
||||||
def _(element, compiler, **kw):
|
|
||||||
col = compiler.process(element.clauses, **kw)
|
|
||||||
return f"MAX(if(is_nan({col}) or is_inf({col}), null, {col}))"
|
|
||||||
|
|
||||||
|
|
||||||
class Max(StaticMetric):
|
class Max(StaticMetric):
|
||||||
"""
|
"""
|
||||||
MAX Metric
|
MAX Metric
|
||||||
|
@ -52,15 +52,6 @@ def _(element, compiler, **kw):
|
|||||||
return f"avg(cast({proc} as decimal))"
|
return f"avg(cast({proc} as decimal))"
|
||||||
|
|
||||||
|
|
||||||
@compiles(avg, Dialects.Impala)
|
|
||||||
def _(element, compiler, **kw):
|
|
||||||
"""
|
|
||||||
Convert NaN and Inf values to null before computing average.
|
|
||||||
"""
|
|
||||||
proc = compiler.process(element.clauses, **kw)
|
|
||||||
return f"avg(if(is_nan({proc}) or is_inf({proc}), null, {proc}))"
|
|
||||||
|
|
||||||
|
|
||||||
class Mean(StaticMetric):
|
class Mean(StaticMetric):
|
||||||
"""
|
"""
|
||||||
AVG Metric
|
AVG Metric
|
||||||
|
@ -22,7 +22,6 @@ from sqlalchemy.sql.functions import GenericFunction
|
|||||||
from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
|
from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
|
||||||
from metadata.profiler.orm.functions.length import LenFn
|
from metadata.profiler.orm.functions.length import LenFn
|
||||||
from metadata.profiler.orm.registry import (
|
from metadata.profiler.orm.registry import (
|
||||||
Dialects,
|
|
||||||
is_concatenable,
|
is_concatenable,
|
||||||
is_date_time,
|
is_date_time,
|
||||||
is_quantifiable,
|
is_quantifiable,
|
||||||
@ -40,12 +39,6 @@ def _(element, compiler, **kw):
|
|||||||
return f"MIN({col})"
|
return f"MIN({col})"
|
||||||
|
|
||||||
|
|
||||||
@compiles(MinFn, Dialects.Impala)
|
|
||||||
def _(element, compiler, **kw):
|
|
||||||
col = compiler.process(element.clauses, **kw)
|
|
||||||
return f"MIN(if(is_nan({col}) or is_inf({col}), null, {col}))"
|
|
||||||
|
|
||||||
|
|
||||||
class Min(StaticMetric):
|
class Min(StaticMetric):
|
||||||
"""
|
"""
|
||||||
MIN Metric
|
MIN Metric
|
||||||
|
@ -44,12 +44,6 @@ def _(element, compiler, **kw):
|
|||||||
return "STDEVP(%s)" % compiler.process(element.clauses, **kw)
|
return "STDEVP(%s)" % compiler.process(element.clauses, **kw)
|
||||||
|
|
||||||
|
|
||||||
@compiles(StdDevFn, Dialects.Impala)
|
|
||||||
def _(element, compiler, **kw):
|
|
||||||
col = compiler.process(element.clauses, **kw)
|
|
||||||
return f"STDDEV_POP(if(is_nan({col}) or is_inf({col}), null, {col}))"
|
|
||||||
|
|
||||||
|
|
||||||
@compiles(StdDevFn, Dialects.SQLite) # Needed for unit tests
|
@compiles(StdDevFn, Dialects.SQLite) # Needed for unit tests
|
||||||
def _(element, compiler, **kw):
|
def _(element, compiler, **kw):
|
||||||
"""
|
"""
|
||||||
|
@ -111,7 +111,7 @@ def _(elements, compiler, **kwargs):
|
|||||||
col, _, percentile = [
|
col, _, percentile = [
|
||||||
compiler.process(element, **kwargs) for element in elements.clauses
|
compiler.process(element, **kwargs) for element in elements.clauses
|
||||||
]
|
]
|
||||||
return f"if({percentile} = .5, appx_median(if(is_nan({col}) or is_inf({col}), null, {col})), null)"
|
return f"if({percentile} = .5, appx_median({col}), null)"
|
||||||
|
|
||||||
|
|
||||||
@compiles(MedianFn, Dialects.MySQL)
|
@compiles(MedianFn, Dialects.MySQL)
|
||||||
|
@ -78,7 +78,7 @@ def _(element, compiler, **kw):
|
|||||||
def _(element, compiler, **kw):
|
def _(element, compiler, **kw):
|
||||||
"""Impala casting. Default BIGINT isn't big enough for some sums"""
|
"""Impala casting. Default BIGINT isn't big enough for some sums"""
|
||||||
proc = compiler.process(element.clauses, **kw)
|
proc = compiler.process(element.clauses, **kw)
|
||||||
return f"SUM(CAST(if(is_nan({proc}) or is_inf({proc}), null, {proc}) AS DOUBLE))"
|
return f"SUM(CAST({proc} AS DOUBLE))"
|
||||||
|
|
||||||
|
|
||||||
@compiles(SumFn, Dialects.IbmDbSa)
|
@compiles(SumFn, Dialects.IbmDbSa)
|
||||||
|
@ -361,11 +361,14 @@ class MetricsTest(TestCase):
|
|||||||
|
|
||||||
age_histogram = res.get(User.age.name)[Metrics.HISTOGRAM.name]
|
age_histogram = res.get(User.age.name)[Metrics.HISTOGRAM.name]
|
||||||
id_histogram = res.get(User.id.name)[Metrics.HISTOGRAM.name]
|
id_histogram = res.get(User.id.name)[Metrics.HISTOGRAM.name]
|
||||||
|
comments_histogram = res.get(User.comments.name)[Metrics.HISTOGRAM.name]
|
||||||
|
|
||||||
assert age_histogram
|
assert age_histogram
|
||||||
assert len(age_histogram["frequencies"]) == 1
|
assert len(age_histogram["frequencies"]) == 1
|
||||||
assert id_histogram
|
assert id_histogram
|
||||||
assert len(id_histogram["frequencies"]) == 2
|
assert len(id_histogram["frequencies"]) == 2
|
||||||
|
assert comments_histogram
|
||||||
|
assert len(comments_histogram["frequencies"]) == 1
|
||||||
|
|
||||||
def test_like_count(self):
|
def test_like_count(self):
|
||||||
"""
|
"""
|
||||||
|
Loading…
x
Reference in New Issue
Block a user