Fixes #4368 - Add Histogram Metric (#10422)

This commit is contained in:
Teddy 2023-03-03 21:56:32 +01:00 committed by GitHub
parent b47fa2e818
commit 5208b6f684
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 844 additions and 187 deletions

View File

@ -21,7 +21,15 @@
"nullCount": 0.0,
"nullProportion": 0.0,
"distinctCount": 14509.0,
"distinctProportion": 1.0
"distinctProportion": 1.0,
"firstQuartile": 7.4,
"thirdQuartile": 476.5,
"interQuartileRange": 469.1,
"nonParametricSkew": -0.567,
"histogram": {
"boundaries": ["5.00 to 100.00", "100.00 to 200.00", "200.00 to 300.00", "300.00 and up"],
"frequencies": [101, 235, 123, 98]
}
},
{
"name": "address_id",

View File

@ -0,0 +1,56 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Inter Quartile Range Composed Metric definition
"""
# pylint: disable=duplicate-code
from typing import Any, Dict, Optional, Tuple
from metadata.profiler.metrics.core import ComposedMetric
from metadata.profiler.metrics.window.first_quartile import FirstQuartile
from metadata.profiler.metrics.window.third_quartile import ThirdQuartile
class InterQuartileRange(ComposedMetric):
"""
Given the first and third quartile compute the IQR,
"""
@classmethod
def name(cls):
return "interQuartileRange"
@classmethod
def required_metrics(cls) -> Tuple[str, ...]:
return FirstQuartile.name(), ThirdQuartile.name()
@property
def metric_type(self):
"""
Override default metric_type definition as
we now don't care about the column
"""
return float
def fn(self, res: Dict[str, Any]) -> Optional[float]:
"""
Safely compute null ratio based on the profiler
results of other Metrics
"""
res_first_quartile = res.get(FirstQuartile.name())
res_third_quartile = res.get(ThirdQuartile.name())
if res_first_quartile is not None and res_third_quartile is not None:
return res_third_quartile - res_first_quartile
return None

View File

@ -0,0 +1,58 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Non Parametric Skew definition
"""
# pylint: disable=duplicate-code
from typing import Any, Dict, Optional, Tuple
from metadata.profiler.metrics.core import ComposedMetric
from metadata.profiler.metrics.static.mean import Mean
from metadata.profiler.metrics.static.stddev import StdDev
from metadata.profiler.metrics.window.median import Median
class NonParametricSkew(ComposedMetric):
"""
Return the non parametric skew of a column
"""
@classmethod
def name(cls):
return "nonParametricSkew"
@classmethod
def required_metrics(cls) -> Tuple[str, ...]:
return Mean.name(), StdDev.name(), Median.name()
@property
def metric_type(self):
"""
Override default metric_type definition as
we now don't care about the column
"""
return float
def fn(self, res: Dict[str, Any]) -> Optional[float]:
"""
Safely compute null ratio based on the profiler
results of other Metrics
"""
res_mean = res.get(Mean.name())
res_stddev = res.get(StdDev.name())
res_median = res.get(Median.name())
if res_mean is not None and res_stddev is not None and res_median is not None:
return (res_mean - float(res_median)) / res_stddev
return None

View File

@ -190,6 +190,28 @@ class QueryMetric(Metric, ABC):
"""
class HybridMetric(Metric, ABC):
"""
Metric that needs to execute a fully fledged
query and might need existing metrics to compute
some part of the metric.
"""
@abstractmethod
def fn(
self,
sample: Optional[DeclarativeMeta],
res: Dict[str, Any],
session: Optional[Session] = None,
):
"""
Function implementing the metric computation.
These metrics will require result from other metrics
and a session to execute the query.
"""
class CustomMetric(Metric, ABC):
"""
Custom metric definition

View File

@ -0,0 +1,196 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Histogram Metric definition
"""
import math
from typing import Any, Dict, List, Optional, Union, cast
from sqlalchemy import and_, case, column, func
from sqlalchemy.orm import DeclarativeMeta, Session
from metadata.profiler.metrics.composed.iqr import InterQuartileRange
from metadata.profiler.metrics.core import HybridMetric
from metadata.profiler.metrics.static.count import Count
from metadata.profiler.metrics.static.max import Max
from metadata.profiler.metrics.static.min import Min
from metadata.profiler.orm.registry import is_quantifiable
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class Histogram(HybridMetric):
"""
AVG Metric
Given a column, return the Histogram value.
- For a quantifiable value, return the usual AVG
- For a concatenable (str, text...) return the AVG length
"""
@classmethod
def name(cls):
return "histogram"
@property
def metric_type(self):
return dict
@staticmethod
def _get_bin_width(iqr: float, row_count: float) -> Union[float, int]:
"""
Compute the bin width for the histogram using Freedman-Diaconis rule
"""
if iqr == 0:
return 1
return 2 * iqr * row_count ** (-1 / 3)
@staticmethod
def _get_res(res: Dict[str, Any]):
# get the metric need for the freedman-diaconis rule
res_iqr = res.get(InterQuartileRange.name())
res_row_count = res.get(Count.name())
res_min = res.get(Min.name())
res_max = res.get(Max.name())
if any(var is None for var in [res_iqr, res_row_count, res_min, res_max]):
return None
return (res_iqr, res_row_count, res_min, res_max)
def fn(
self,
sample: Optional[DeclarativeMeta],
res: Dict[str, Any],
session: Optional[Session] = None,
):
"""
Build the histogram query
"""
if not session:
raise AttributeError(
"We are missing the session attribute to compute the Histogram."
)
if not is_quantifiable(self.col.type):
return None
# get the metric need for the freedman-diaconis rule
results = self._get_res(res)
if not results:
return None
res_iqr, res_row_count, res_min, res_max = results
# compute the bin width and the number of bins
bind_width = self._get_bin_width(float(res_iqr), res_row_count) # type: ignore
num_bins = math.ceil((res_max - res_min) / bind_width) # type: ignore
if num_bins == 0:
return None
# set starting and ending bin bounds for the first bin
starting_bin_bound = res_min
res_min = cast(Union[float, int], res_min) # satisfy mypy
ending_bin_bound = res_min + bind_width
col = column(self.col.name) # type: ignore
case_stmts = []
for bin_num in range(num_bins):
if bin_num < num_bins - 1:
condition = and_(col >= starting_bin_bound, col < ending_bin_bound)
else:
# for the last bin we won't add the upper bound
condition = and_(col >= starting_bin_bound)
case_stmts.append(
func.count(case([(condition, col)])).label(
f"{starting_bin_bound:.2f} and up"
)
)
continue
case_stmts.append(
func.count(case([(condition, col)])).label(
f"{starting_bin_bound:.2f} to {ending_bin_bound:.2f}"
)
)
starting_bin_bound = ending_bin_bound
ending_bin_bound += bind_width
rows = session.query(*case_stmts).select_from(sample).first()
if rows:
return {"boundaries": list(rows.keys()), "frequencies": list(rows)}
return None
def df_fn(
self,
res: Dict[str, Any],
dfs=None,
):
"""_summary_
Args:
res (Dict[str, Any]): dictionnary of columns values
dfs (List[DataFrame]): list of dataframes
Returns:
Dict
"""
# pylint: disable=import-outside-toplevel
import numpy as np
import pandas as pd
dfs = cast(List[pd.DataFrame], dfs) # satisfy mypy
if not is_quantifiable(self.col.type):
return None
# get the metric need for the freedman-diaconis rule
results = self._get_res(res)
if not results:
return None
res_iqr, res_row_count, res_min, res_max = results
# compute the bin width and the number of bins
bind_width = self._get_bin_width(float(res_iqr), res_row_count) # type: ignore
num_bins = math.ceil((res_max - res_min) / bind_width) # type: ignore
if num_bins == 0:
return None
bins = list(np.arange(num_bins) * bind_width + res_min)
bins_label = [
f"{bins[i]:.2f} to {bins[i+1]:.2f}"
if i < len(bins) - 1
else f"{bins[i]:.2f} and up"
for i in range(len(bins))
]
bins.append(np.inf) # add the last bin
frequencies = None
for df in dfs:
if not frequencies:
frequencies = (
pd.cut(df[self.col.name], bins, right=False).value_counts().values
) # right boundary is exclusive
continue
frequencies += (
pd.cut(df[self.col.name], bins, right=False).value_counts().values
) # right boundary is exclusive
if frequencies.size > 0:
return {"boundaries": bins_label, "frequencies": frequencies.tolist()}
return None

View File

@ -21,15 +21,17 @@ having the verbosely pass .value all the time...
from metadata.profiler.metrics.composed.distinct_ratio import DistinctRatio
from metadata.profiler.metrics.composed.duplicate_count import DuplicateCount
from metadata.profiler.metrics.composed.ilike_ratio import ILikeRatio
from metadata.profiler.metrics.composed.iqr import InterQuartileRange
from metadata.profiler.metrics.composed.like_ratio import LikeRatio
from metadata.profiler.metrics.composed.non_parametric_skew import NonParametricSkew
from metadata.profiler.metrics.composed.null_ratio import NullRatio
from metadata.profiler.metrics.composed.unique_ratio import UniqueRatio
from metadata.profiler.metrics.hybrid.histogram import Histogram
from metadata.profiler.metrics.static.column_count import ColumnCount
from metadata.profiler.metrics.static.column_names import ColumnNames
from metadata.profiler.metrics.static.count import Count
from metadata.profiler.metrics.static.count_in_set import CountInSet
from metadata.profiler.metrics.static.distinct_count import DistinctCount
from metadata.profiler.metrics.static.histogram import Histogram
from metadata.profiler.metrics.static.ilike_count import ILikeCount
from metadata.profiler.metrics.static.like_count import LikeCount
from metadata.profiler.metrics.static.max import Max
@ -46,7 +48,9 @@ from metadata.profiler.metrics.static.stddev import StdDev
from metadata.profiler.metrics.static.sum import Sum
from metadata.profiler.metrics.static.unique_count import UniqueCount
from metadata.profiler.metrics.system.system import System
from metadata.profiler.metrics.window.first_quartile import FirstQuartile
from metadata.profiler.metrics.window.median import Median
from metadata.profiler.metrics.window.third_quartile import ThirdQuartile
from metadata.profiler.registry import MetricRegistry
@ -59,13 +63,11 @@ class Metrics(MetricRegistry):
# Static Metrics
MEAN = Mean
MEDIAN = Median
COUNT = Count
COUNT_IN_SET = CountInSet
COLUMN_COUNT = ColumnCount
DISTINCT_COUNT = DistinctCount
DISTINCT_RATIO = DistinctRatio
HISTOGRAM = Histogram
ILIKE_COUNT = ILikeCount
LIKE_COUNT = LikeCount
NOT_LIKE_COUNT = NotLikeCount
@ -88,6 +90,16 @@ class Metrics(MetricRegistry):
ILIKE_RATIO = ILikeRatio
LIKE_RATIO = LikeRatio
NULL_RATIO = NullRatio
IQR = InterQuartileRange
NON_PARAMETRIC_SKEW = NonParametricSkew
# Window Metrics
MEDIAN = Median
FIRST_QUARTILE = FirstQuartile
THIRD_QUARTILE = ThirdQuartile
# System Metrics
SYSTEM = System
# Hybrid Metrics
HISTOGRAM = Histogram

View File

@ -1,105 +0,0 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Histogram Metric definition
"""
from typing import Optional
from sqlalchemy import column, func
from sqlalchemy.orm import DeclarativeMeta, Session
from metadata.profiler.metrics.core import QueryMetric
from metadata.profiler.orm.functions.concat import ConcatFn
from metadata.profiler.orm.registry import is_quantifiable
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class Histogram(QueryMetric):
"""
AVG Metric
Given a column, return the AVG value.
- For a quantifiable value, return the usual AVG
- For a concatenable (str, text...) return the AVG length
"""
@classmethod
def name(cls):
return "histogram"
@property
def metric_type(self):
return dict
def query(
self, sample: Optional[DeclarativeMeta], session: Optional[Session] = None
):
"""
Build the histogram query
"""
if not session:
raise AttributeError(
"We are missing the session attribute to compute the Histogram."
)
if not is_quantifiable(self.col.type):
return None
# Run all queries on top of the sampled data
col = column(self.col.name)
num_bins = self.bins if hasattr(self, "bins") else 5
bins = session.query(
((func.max(col) - func.min(col)) / float(num_bins - 1)).label("step"),
).select_from(sample)
raw_step = dict(bins.first())["step"]
if not raw_step: # step == 0 or None for empty tables
logger.debug(
f"MIN({col.name}) == MAX({col.name}) or EMPTY table. Aborting histogram computation."
)
return None
step = float(raw_step)
ranges = session.query(
sample,
(func.round(col / step - 0.5, 0) * step).label("bin_floor"),
(func.round(col / step - 0.5, 0) * step + step).label("bin_ceil"),
)
ranges_cte = ranges.cte("ranges")
hist = (
session.query(
ConcatFn(
str(ranges_cte.c.bin_floor), " to ", str(ranges_cte.c.bin_ceil)
).label("boundaries"),
func.count().label("frequencies"),
)
.select_from(ranges_cte)
.group_by(
ranges_cte.c.bin_floor,
ranges_cte.c.bin_ceil,
ConcatFn(
str(ranges_cte.c.bin_floor), " to ", str(ranges_cte.c.bin_ceil)
).label("boundaries"),
)
.order_by("boundaries")
)
return hist

View File

@ -92,14 +92,17 @@ class StdDev(StaticMetric):
def df_fn(self, df=None):
"""pandas function"""
from pandas import DataFrame # pylint: disable=import-outside-toplevel
import pandas as pd # pylint: disable=import-outside-toplevel
df = cast(DataFrame, df)
df = cast(pd.DataFrame, df)
if is_quantifiable(self.col.type):
return df[self.col.name].std()
stddev = df[self.col.name].std()
if pd.isnull(stddev):
return None
return stddev
logger.debug(
f"{self.col.name} has type {self.col.type}, which is not listed as quantifiable."
+ " We won't compute STDDEV for it."
)
return 0
return None

View File

@ -256,7 +256,7 @@ def _(
query_text = row_insert.query_text
operation = next(
(
token.value
token.value.upper()
for token in query_text.tokens
if token.ttype is sqlparse.tokens.DML
and token.value.upper()
@ -277,7 +277,7 @@ def _(
query_text = row_deleted.query_text
operation = next(
(
token.value
token.value.upper()
for token in query_text.tokens
if token.ttype is sqlparse.tokens.DML and token.value != "UPDATE"
),

View File

@ -0,0 +1,85 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
First Quartile definition
"""
# pylint: disable=duplicate-code
from typing import List, cast
from sqlalchemy import column
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.orm.functions.median import MedianFn
from metadata.profiler.orm.registry import is_quantifiable
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class FirstQuartile(StaticMetric):
"""
First Quartile Metric
Given a column, return the first quartile value.
- For a quantifiable value, return first quartile value
"""
@classmethod
def name(cls):
return "firstQuartile"
@classmethod
def is_window_metric(cls):
return True
@property
def metric_type(self):
return float
@_label
def fn(self):
"""sqlalchemy function"""
if is_quantifiable(self.col.type):
return MedianFn(column(self.col.name), self.col.table.name, 0.25)
logger.debug(
f"Don't know how to process type {self.col.type} when computing First Quartile"
)
return None
def df_fn(self, dfs=None):
"""Dataframe function"""
# pylint: disable=import-outside-toplevel
import numpy as np
import pandas as pd
df = cast(List[pd.DataFrame], dfs)
if is_quantifiable(self.col.type):
# we can't compute the first quartile unless we have
# the entire set. Median of Medians could be used
# though it would required set to be sorted before hand
try:
df = pd.concat(dfs)
except MemoryError:
logger.error(
f"Unable to compute Median for {self.col.name} due to memory constraints."
f"We recommend using a smaller sample size or partitionning."
)
return None
return np.percentile(df[self.col.name], 25)
logger.debug(
f"Don't know how to process type {self.col.type} when computing First Quartile"
)
return None

View File

@ -14,7 +14,7 @@ Median Metric definition
"""
# pylint: disable=duplicate-code
from typing import cast
from typing import List, cast
from sqlalchemy import column
@ -51,21 +51,34 @@ class Median(StaticMetric):
def fn(self):
"""sqlalchemy function"""
if is_quantifiable(self.col.type):
return MedianFn(column(self.col.name), self.col.table.name)
return MedianFn(column(self.col.name), self.col.table.name, 0.5)
logger.debug(
f"Don't know how to process type {self.col.type} when computing Median"
)
return None
def df_fn(self, df=None):
def df_fn(self, dfs=None):
"""Dataframe function"""
from pandas import DataFrame # pylint: disable=import-outside-toplevel
import pandas as pd # pylint: disable=import-outside-toplevel
df = cast(DataFrame, df)
dfs = cast(List[pd.DataFrame], dfs)
if is_quantifiable(self.col.type):
return df[self.col.name].median().tolist()
# we can't compute the median unless we have
# the entire set. Median of Medians could be used
# though it would required set to be sorted before hand
try:
df = (
pd.concat(dfs) if isinstance(dfs, list) else dfs
) # workaround should be removed once #10351 is fixed
except MemoryError:
logger.error(
f"Unable to compute Median for {self.col.name} due to memory constraints."
f"We recommend using a smaller sample size or partitionning."
)
return None
return df[self.col.name].median()
logger.debug(
f"Don't know how to process type {self.col.type} when computing Median"
)

View File

@ -0,0 +1,85 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Median Metric definition
"""
# pylint: disable=duplicate-code
from typing import List, cast
from sqlalchemy import column
from metadata.profiler.metrics.core import StaticMetric, _label
from metadata.profiler.orm.functions.median import MedianFn
from metadata.profiler.orm.registry import is_quantifiable
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class ThirdQuartile(StaticMetric):
"""
Third Quartile Metric
Given a column, return the third quartile value.
- For a quantifiable value, return third quartile value
"""
@classmethod
def name(cls):
return "thirdQuartile"
@classmethod
def is_window_metric(cls):
return True
@property
def metric_type(self):
return float
@_label
def fn(self):
"""sqlalchemy function"""
if is_quantifiable(self.col.type):
return MedianFn(column(self.col.name), self.col.table.name, 0.75)
logger.debug(
f"Don't know how to process type {self.col.type} when computing Third Quartile"
)
return None
def df_fn(self, dfs=None):
"""Dataframe function"""
# pylint: disable=import-outside-toplevel
import numpy as np
import pandas as pd
df = cast(List[pd.DataFrame], dfs)
if is_quantifiable(self.col.type):
# we can't compute the median unless we have
# the entire set. Median of Medians could be used
# though it would required set to be sorted before hand
try:
df = pd.concat(dfs)
except MemoryError:
logger.error(
f"Unable to compute Median for {self.col.name} due to memory constraints."
f"We recommend using a smaller sample size or partitionning."
)
return None
return np.percentile(df[self.col.name], 75)
logger.debug(
f"Don't know how to process type {self.col.type} when computing Third Quartile"
)
return None

View File

@ -31,19 +31,24 @@ class MedianFn(FunctionElement):
@compiles(MedianFn)
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
col = compiler.process(elements.clauses.clauses[0])
return "percentile_cont(0.5) WITHIN GROUP (ORDER BY %s ASC)" % col
percentile = elements.clauses.clauses[2].value
return "percentile_cont(%.1f) WITHIN GROUP (ORDER BY %s ASC)" % (percentile, col)
@compiles(MedianFn, Dialects.BigQuery)
def _(elements, compiler, **kwargs):
col, _ = [compiler.process(element, **kwargs) for element in elements.clauses]
return "percentile_cont(%s , 0.5) OVER()" % col
col, _, percentile = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return "percentile_cont(%s , %.1f) OVER()" % (col, percentile)
@compiles(MedianFn, Dialects.ClickHouse)
def _(elements, compiler, **kwargs):
col, _ = [compiler.process(element, **kwargs) for element in elements.clauses]
return "median(%s)" % col
col, _, percentile = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return "quantile(%.1f)(%s)" % (percentile, col)
# pylint: disable=unused-argument
@ -52,21 +57,28 @@ def _(elements, compiler, **kwargs):
@compiles(MedianFn, Dialects.Presto)
def _(elements, compiler, **kwargs):
col = elements.clauses.clauses[0].name
return 'approx_percentile("%s", 0.5)' % col
percentile = elements.clauses.clauses[2].value
return 'approx_percentile("%s", %.1f)' % (col, percentile)
@compiles(MedianFn, Dialects.MSSQL)
def _(elements, compiler, **kwargs):
"""Median computation for MSSQL"""
col = elements.clauses.clauses[0].name
return "percentile_cont(0.5) WITHIN GROUP (ORDER BY %s ASC) OVER()" % col
percentile = elements.clauses.clauses[2].value
return "percentile_cont(%.1f) WITHIN GROUP (ORDER BY %s ASC) OVER()" % (
percentile,
col,
)
@compiles(MedianFn, Dialects.Hive)
def _(elements, compiler, **kwargs):
"""Median computation for Hive"""
col, _ = [compiler.process(element, **kwargs) for element in elements.clauses]
return "percentile(cast(%s as BIGINT), 0.5)" % col
col, _, percentile = [
compiler.process(element, **kwargs) for element in elements.clauses
]
return "percentile(cast(%s as BIGINT), %.1f)" % (col, percentile)
@compiles(MedianFn, Dialects.MySQL)
@ -79,25 +91,23 @@ def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
@compiles(MedianFn, Dialects.SQLite)
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
col, table = list(elements.clauses)
col = compiler.process(elements.clauses.clauses[0])
table = elements.clauses.clauses[1].value
percentile = elements.clauses.clauses[2].value
return """
(SELECT
AVG({col})
FROM (
SELECT {col}
{col}
FROM {table}
WHERE {col} IS NOT NULL
ORDER BY {col}
LIMIT 1
OFFSET (
SELECT ROUND(COUNT(*) * {percentile} -1)
FROM {table}
ORDER BY {col}
LIMIT 2 - (SELECT COUNT(*) FROM {table}) % 2
OFFSET (SELECT (COUNT(*) - 1) / 2
FROM {table})))
WHERE {col} IS NOT NULL
)
)
""".format(
col=col, table=table.value
)
@compiles(MedianFn, Dialects.Vertica)
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
col, table = list(elements.clauses)
return "(SELECT MEDIAN({col}) OVER() FROM {table} LIMIT 1)".format(
col=col, table=table.value
col=col, table=table, percentile=percentile
)

View File

@ -119,7 +119,7 @@ def is_date_time(_type) -> bool:
Check if sqlalchemy _type is derived from Date, Time or DateTime Type
"""
if isinstance(_type, Type):
return _type.__class__.__name__ in DATATIME_SET
return _type.name in DATATIME_SET
return (
issubclass(_type.__class__, Date)
or issubclass(_type.__class__, Time)
@ -132,7 +132,7 @@ def is_quantifiable(_type) -> bool:
Check if sqlalchemy _type is either integer or numeric
"""
if isinstance(_type, Type):
return _type.__class__.__name__ in QUANTIFIABLE_SET
return _type.name in QUANTIFIABLE_SET
return is_numeric(_type) or is_integer(_type)
@ -142,5 +142,5 @@ def is_concatenable(_type) -> bool:
e.g., strings or text
"""
if isinstance(_type, Type):
return _type.__class__.__name__ in CONCATENABLE_SET
return _type.name in CONCATENABLE_SET
return issubclass(_type.__class__, Concatenable)

View File

@ -37,6 +37,7 @@ from metadata.profiler.api.models import ProfilerResponse
from metadata.profiler.metrics.core import (
ComposedMetric,
CustomMetric,
HybridMetric,
MetricTypes,
QueryMetric,
StaticMetric,
@ -59,6 +60,8 @@ class MissingMetricException(Exception):
"""
# pylint: disable=too-many-public-methods
# Pylint error above indicates that this class needs to be refactored
class Profiler(Generic[TMetric]):
"""
Core Profiler.
@ -221,6 +224,10 @@ class Profiler(Generic[TMetric]):
def system_metrics(self) -> List[Type[SystemMetric]]:
return self._filter_metrics(SystemMetric)
@property
def hybrid_metric(self) -> List[Type[HybridMetric]]:
return self._filter_metrics(HybridMetric)
def get_col_metrics(
self, metrics: List[Type[TMetric]], column: Optional[Column] = None
) -> List[Type[TMetric]]:
@ -299,6 +306,30 @@ class Profiler(Generic[TMetric]):
current_col_results,
)
def run_hybrid_metrics(self, col: Column):
"""Run hybrid metrics
Args:
col (Column): column to run distribution metrics on
"""
logger.debug("Running distribution metrics...")
current_col_results: Dict[str, Any] = self._column_results.get(col.name)
if not current_col_results:
logger.error(
"We do not have any results to base our Composed Metrics. Stopping!"
)
return
for metric in self.get_col_metrics(self.hybrid_metric):
logger.debug(f"Running hybrid metric {metric.name()} for {col.name}")
self._column_results[col.name][
metric.name()
] = self.profiler_interface.get_hybrid_metrics(
col,
metric,
current_col_results,
table=self.table,
)
def _prepare_table_metrics(self) -> List:
"""prepare table metrics"""
table_metrics = [
@ -346,7 +377,11 @@ class Profiler(Generic[TMetric]):
column_metrics_for_thread_pool = [
*[
(
self.get_col_metrics(self.static_metrics, column),
[
metric
for metric in self.get_col_metrics(self.static_metrics, column)
if not metric.is_window_metric()
],
MetricTypes.Static,
column,
self.table,
@ -365,17 +400,16 @@ class Profiler(Generic[TMetric]):
],
*[
(
metric,
[
metric
for metric in self.get_col_metrics(self.static_metrics, column)
if metric.is_window_metric()
],
MetricTypes.Window,
column,
self.table,
)
for column in self.columns
for metric in [
metric
for metric in self.get_col_metrics(self.static_metrics, column)
if metric.is_window_metric()
]
],
]
@ -404,6 +438,7 @@ class Profiler(Generic[TMetric]):
self.profile_entity()
for column in self.columns:
self.run_composed_metrics(column)
self.run_hybrid_metrics(column)
return self

View File

@ -32,6 +32,8 @@ def get_default_metrics(table: DeclarativeMeta) -> List[Metric]:
add_props(table=table)(Metrics.SYSTEM.value),
# Column Metrics
Metrics.MEDIAN.value,
Metrics.FIRST_QUARTILE.value,
Metrics.THIRD_QUARTILE.value,
Metrics.MEAN.value,
Metrics.COUNT.value,
Metrics.DISTINCT_COUNT.value,
@ -46,7 +48,9 @@ def get_default_metrics(table: DeclarativeMeta) -> List[Metric]:
Metrics.SUM.value,
Metrics.UNIQUE_COUNT.value,
Metrics.UNIQUE_RATIO.value,
# Metrics.HISTOGRAM.value, # TODO: enable it back after #4368
Metrics.IQR.value,
Metrics.HISTOGRAM.value,
Metrics.NON_PARAMETRIC_SKEW.value,
]

View File

@ -209,6 +209,9 @@ class PandasProfilerInterface(ProfilerProtocol, PandasInterfaceMixin):
@_get_metrics.register(MetricTypes.Window.value)
def _(
self,
metric_type: str,
metrics: Metrics,
column,
*args,
**kwargs,
):
@ -216,7 +219,15 @@ class PandasProfilerInterface(ProfilerProtocol, PandasInterfaceMixin):
Given a list of metrics, compute the given results
and returns the values
"""
return None # to be implemented
try:
metric_values = {}
for metric in metrics:
metric_values[metric.name()] = metric(column).df_fn(self.dfs)
return metric_values if metric_values else None
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected exception computing metrics: {exc}")
return None
@_get_metrics.register(MetricTypes.System.value)
def _(
@ -296,6 +307,26 @@ class PandasProfilerInterface(ProfilerProtocol, PandasInterfaceMixin):
logger.warning(f"Unexpected exception computing metrics: {exc}")
return None
def get_hybrid_metrics(
self, column: Column, metric: Metrics, column_results: Dict, **kwargs
):
"""Given a list of metrics, compute the given results
and returns the values
Args:
column: the column to compute the metrics against
metric: list of metrics to compute
column_results: computed values for the column
Returns:
dictionary of results
"""
try:
return metric(column).df_fn(column_results, self.dfs)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected exception computing metrics: {exc}")
return None
def get_all_metrics(
self,
metric_funcs: list,

View File

@ -217,6 +217,13 @@ class ProfilerProtocol(ABC):
"""run profiler metrics"""
raise NotImplementedError
@abstractmethod
def get_hybrid_metrics(
self, column: Column, metric: Metrics, column_results: Dict
) -> dict:
"""run profiler metrics"""
raise NotImplementedError
@abstractmethod
def fetch_sample_data(self, table) -> dict:
"""run profiler metrics"""

View File

@ -22,7 +22,6 @@ from datetime import datetime, timezone
from typing import Dict, List
from sqlalchemy import Column
from sqlalchemy.engine.row import Row
from sqlalchemy.orm import scoped_session
from metadata.generated.schema.entity.data.table import TableData
@ -31,7 +30,7 @@ from metadata.ingestion.api.processor import ProfilerProcessorStatus
from metadata.ingestion.connections.session import create_and_bind_thread_safe_session
from metadata.ingestion.source.connections import get_connection
from metadata.interfaces.sqalchemy.mixins.sqa_mixin import SQAInterfaceMixin
from metadata.profiler.metrics.core import Metric, MetricTypes
from metadata.profiler.metrics.core import MetricTypes
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.profiler.interface.profiler_protocol import ProfilerProtocol
from metadata.profiler.profiler.runner import QueryRunner
@ -232,7 +231,7 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
def _(
self,
metric_type: str,
metric: Metrics,
metrics: List[Metrics],
runner: QueryRunner,
session,
column: Column,
@ -248,10 +247,12 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
Returns:
dictionnary of results
"""
if not metrics:
return None
try:
row = runner.select_first_from_sample(metric(column).fn())
if not isinstance(row, Row):
return {metric.name(): row}
row = runner.select_first_from_sample(
*[metric(column).fn() for metric in metrics]
)
return dict(row)
except Exception as exc:
logger.debug(traceback.format_exc())
@ -459,3 +460,31 @@ class SQAProfilerInterface(ProfilerProtocol, SQAInterfaceMixin):
logger.warning(f"Unexpected exception computing metrics: {exc}")
self.session.rollback()
return None
def get_hybrid_metrics(
self, column: Column, metric: Metrics, column_results: Dict, table, **kwargs
):
"""Given a list of metrics, compute the given results
and returns the values
Args:
column: the column to compute the metrics against
metrics: list of metrics to compute
Returns:
dictionnary of results
"""
sampler = Sampler(
session=self.session,
table=table,
profile_sample_config=self.profile_sample_config,
partition_details=self.partition_details,
profile_sample_query=self.profile_query,
)
sample = sampler.random_sample()
try:
return metric(column).fn(sample, column_results, self.session)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Unexpected exception computing metrics: {exc}")
self.session.rollback()
return None

View File

@ -335,21 +335,36 @@ class MetricsTest(TestCase):
Check histogram computation
"""
hist = add_props(bins=5)(Metrics.HISTOGRAM.value)
hist = Metrics.HISTOGRAM.value
count = Metrics.COUNT.value
min = Metrics.MIN.value
max = Metrics.MAX.value
first_quartile = Metrics.FIRST_QUARTILE.value
third_quartile = Metrics.THIRD_QUARTILE.value
iqr = Metrics.IQR.value
res = (
Profiler(
hist,
count,
min,
max,
first_quartile,
third_quartile,
iqr,
profiler_interface=self.sqa_profiler_interface,
)
.compute_metrics()
._column_results
)
assert res.get(User.age.name)[Metrics.HISTOGRAM.name]
assert (
len(res.get(User.age.name)[Metrics.HISTOGRAM.name]["frequencies"])
== 3 # Too little values. Counts nulls
)
age_histogram = res.get(User.age.name)[Metrics.HISTOGRAM.name]
id_histogram = res.get(User.id.name)[Metrics.HISTOGRAM.name]
assert age_histogram
assert len(age_histogram["frequencies"]) == 1
assert id_histogram
assert len(id_histogram["frequencies"]) == 2
def test_like_count(self):
"""
@ -716,7 +731,7 @@ class MetricsTest(TestCase):
None,
)
hist = add_props(bins=5)(Metrics.HISTOGRAM.value)
hist = Metrics.HISTOGRAM.value
res = (
Profiler(
hist,
@ -775,6 +790,58 @@ class MetricsTest(TestCase):
assert res.get(User.age.name)[Metrics.MEDIAN.name] == 30
def test_first_quartile(self):
"""
Check first quartile
"""
first_quartile = Metrics.FIRST_QUARTILE.value
res = (
Profiler(
first_quartile,
profiler_interface=self.sqa_profiler_interface,
)
.compute_metrics()
._column_results
)
assert res.get(User.age.name)[Metrics.FIRST_QUARTILE.name] == 30
def test_third_quartile(self):
"""
Check third quartile
"""
third_quartile = Metrics.THIRD_QUARTILE.value
res = (
Profiler(
third_quartile,
profiler_interface=self.sqa_profiler_interface,
)
.compute_metrics()
._column_results
)
assert res.get(User.age.name)[Metrics.THIRD_QUARTILE.name] == 31
def test_iqr(self):
"""Check IQR metric"""
iqr = Metrics.IQR.value
first_quartile = Metrics.FIRST_QUARTILE.value
third_quartile = Metrics.THIRD_QUARTILE.value
res = (
Profiler(
first_quartile,
third_quartile,
iqr,
profiler_interface=self.sqa_profiler_interface,
)
.compute_metrics()
._column_results
)
assert res.get(User.age.name)[Metrics.IQR.name] == 1
def test_sum_function(self):
"""Check overwritten sum function"""
session = self.sqa_profiler_interface.session

View File

@ -24,6 +24,7 @@ import sqlalchemy.types
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import declarative_base
from ingestion.src.metadata.generated.schema.entity.data.table import Histogram
from metadata.generated.schema.api.data.createTableProfile import (
CreateTableProfileRequest,
)
@ -153,11 +154,13 @@ class ProfilerTest(TestCase):
variance=None,
distinctCount=2.0,
distinctProportion=1.0,
median=30.5,
timestamp=datetime.now(tz=timezone.utc).timestamp()
# histogram=Histogram(
# boundaries=["30.0 to 30.25", "31.0 to 31.25"], frequencies=[1, 1]
# ),
median=30.0,
timestamp=datetime.now(tz=timezone.utc).timestamp(),
firstQuartile=30.0,
thirdQuartile=31.0,
interQuartileRange=1.0,
nonParametricSkew=2.0,
histogram=Histogram(boundaries=["30.00 and up"], frequencies=[2]),
)
def test_required_metrics(self):

View File

@ -207,11 +207,24 @@ class SampleTest(TestCase):
Histogram should run correctly
"""
hist = Metrics.HISTOGRAM.value
count = Metrics.COUNT.value
min = Metrics.MIN.value
max = Metrics.MAX.value
first_quartile = Metrics.FIRST_QUARTILE.value
third_quartile = Metrics.THIRD_QUARTILE.value
iqr = Metrics.IQR.value
with patch.object(
SQAProfilerInterface, "_convert_table_to_orm_object", return_value=User
):
profiler = Profiler(
hist,
count,
min,
max,
first_quartile,
third_quartile,
iqr,
profiler_interface=SQAProfilerInterface(
self.sqlite_conn,
None,
@ -229,6 +242,12 @@ class SampleTest(TestCase):
profiler = Profiler(
hist,
count,
min,
max,
first_quartile,
third_quartile,
iqr,
profiler_interface=self.sqa_profiler_interface,
)
res = profiler.compute_metrics()._column_results

View File

@ -204,15 +204,18 @@ class SQAInterfaceTestMultiThread(TestCase):
self.table,
)
)
for window_metric in self.window_metrics:
window_metrics.append(
(
window_metric,
MetricTypes.Window,
col,
self.table,
)
window_metrics.append(
(
[
metric
for metric in self.window_metrics
if metric.is_window_metric()
],
MetricTypes.Window,
col,
self.table,
)
)
all_metrics = [*table_metrics, *column_metrics, *query_metrics, *window_metrics]
@ -247,7 +250,7 @@ class SQAInterfaceTestMultiThread(TestCase):
profile for profile in profile_request.columnProfile if profile.name == "id"
][0]
assert name_column_profile.nullCount == 0
assert id_column_profile.median == 1.5
assert id_column_profile.median == 1.0
@classmethod
def tearDownClass(cls) -> None:

View File

@ -74,7 +74,7 @@ EXECUTION_DATE = datetime.strptime("2021-07-03", "%Y-%m-%d")
"test_case_column_value_median_to_be_between",
"columnValueMedianToBeBetween",
"COLUMN",
(TestCaseResult, "30.0", None, TestCaseStatus.Failed),
(TestCaseResult, "30", None, TestCaseStatus.Failed),
),
(
"test_case_column_value_min_to_be_between",

View File

@ -523,6 +523,22 @@
"description": "Median of a column.",
"type": "number"
},
"firstQuartile": {
"description": "First quartile of a column.",
"type": "number"
},
"thirdQuartile": {
"description": "First quartile of a column.",
"type": "number"
},
"interQuartileRange": {
"description": "Inter quartile range of a column.",
"type": "number"
},
"nonParametricSkew": {
"description": "Non parametric skew of a column.",
"type": "number"
},
"histogram": {
"description": "Histogram of a column.",
"properties": {