mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-06 04:26:57 +00:00
Impalaconnection 0.2.1 + string datatypes enabled in profile (#11364)
* updated metadata to work with the impala query engine. Uses the describe function to grab column names, data types, and comments. * added the ordinalPosition data point into the Column constructor. * renamed variable to better describe its usage. * updated profile errors. Hive connections now comment columns by default. * removed print statements * Cleaned up code by pulling check into its own function * Updated median function to return null when it is being used for first and third quartiles. * updated metadata to work with the impala query engine. Uses the describe function to grab column names, data types, and comments. * added the ordinalPosition data point into the Column constructor. * renamed variable to better describe its usage. * updated profile errors. Hive connections now comment columns by default. * removed print statements * Cleaned up code by pulling check into its own function * Updated median function to return null when it is being used for first and third quartiles. * removed print statements and ran make py_format * updated to fix some pylint errors. imported Dialects to remove string compare to "impala" engine * moved huge comment into function docstring. This comment shows us the sql to get quartiles in Impala * added cast to decimal for column when running average in mean.py * fixed lint error * fixed ui ordering of precision and scale. Precision should be ordred in front of scale since the precision is set first in decimal data types * Fixed overflow error when converting large numbers to bigint Fixed error for CHAR datatype missing. * Fixed NaN issues with Impala Profile * py formatting * Fixed warnings from SqlAlchemy The GenericFunction 'max' is already registered and is going to be overridden. The GenericFunction 'min' is already registered and is going to be overridden. Updated Min/Max to handle strings by getting they length. * Updated profiler to handle strings by using the string length as the parameter to compute the profile * py_format updates * fix: ran linting * fix: Mysql hardcoded table alias --------- Co-authored-by: Chirag Madlani <12962843+chirag-madlani@users.noreply.github.com> Co-authored-by: Teddy Crepineau <teddy.crepineau@gmail.com>
This commit is contained in:
parent
01c928a11e
commit
ad9b5a0cb5
@ -33,6 +33,13 @@ from metadata.ingestion.source.database.impala.queries import IMPALA_GET_COMMENT
|
||||
|
||||
complex_data_types = ["struct", "map", "array", "union"]
|
||||
|
||||
_impala_type_to_sqlalchemy_type.update(
|
||||
{
|
||||
"CHAR": types.CHAR,
|
||||
"REAL": types.REAL,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def get_impala_table_or_view_names(connection, schema=None, target_type="table"):
|
||||
"""
|
||||
|
||||
@ -49,9 +49,9 @@ class DistinctRatio(ComposedMetric):
|
||||
results of other Metrics
|
||||
"""
|
||||
res_count = res.get(Count.name())
|
||||
res_unique = res.get(DistinctCount.name())
|
||||
res_distinct = res.get(DistinctCount.name())
|
||||
|
||||
if res_count and res_unique is not None:
|
||||
return res_unique / res_count
|
||||
if res_count and res_distinct is not None:
|
||||
return res_distinct / res_count
|
||||
|
||||
return None
|
||||
|
||||
@ -15,10 +15,35 @@ Max Metric definition
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
|
||||
from sqlalchemy import column, func
|
||||
from sqlalchemy import column
|
||||
from sqlalchemy.ext.compiler import compiles
|
||||
from sqlalchemy.sql.functions import GenericFunction
|
||||
|
||||
from metadata.profiler.metrics.core import StaticMetric, _label
|
||||
from metadata.profiler.orm.registry import is_date_time, is_quantifiable
|
||||
from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
|
||||
from metadata.profiler.orm.functions.length import LenFn
|
||||
from metadata.profiler.orm.registry import (
|
||||
Dialects,
|
||||
is_concatenable,
|
||||
is_date_time,
|
||||
is_quantifiable,
|
||||
)
|
||||
|
||||
|
||||
class MaxFn(GenericFunction):
|
||||
name = __qualname__
|
||||
inherit_cache = CACHE
|
||||
|
||||
|
||||
@compiles(MaxFn)
|
||||
def _(element, compiler, **kw):
|
||||
col = compiler.process(element.clauses, **kw)
|
||||
return f"MAX({col})"
|
||||
|
||||
|
||||
@compiles(MaxFn, Dialects.Impala)
|
||||
def _(element, compiler, **kw):
|
||||
col = compiler.process(element.clauses, **kw)
|
||||
return f"MAX(if(is_nan({col}) or is_inf({col}), null, {col}))"
|
||||
|
||||
|
||||
class Max(StaticMetric):
|
||||
@ -35,9 +60,11 @@ class Max(StaticMetric):
|
||||
@_label
|
||||
def fn(self):
|
||||
"""sqlalchemy function"""
|
||||
if is_concatenable(self.col.type):
|
||||
return MaxFn(LenFn(column(self.col.name)))
|
||||
if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)):
|
||||
return None
|
||||
return func.max(column(self.col.name))
|
||||
return MaxFn(column(self.col.name))
|
||||
|
||||
def df_fn(self, dfs=None):
|
||||
"""pandas function"""
|
||||
|
||||
@ -52,6 +52,15 @@ def _(element, compiler, **kw):
|
||||
return f"avg(cast({proc} as decimal))"
|
||||
|
||||
|
||||
@compiles(avg, Dialects.Impala)
|
||||
def _(element, compiler, **kw):
|
||||
"""
|
||||
Convert NaN and Inf values to null before computing average.
|
||||
"""
|
||||
proc = compiler.process(element.clauses, **kw)
|
||||
return f"avg(if(is_nan({proc}) or is_inf({proc}), null, {proc}))"
|
||||
|
||||
|
||||
class Mean(StaticMetric):
|
||||
"""
|
||||
AVG Metric
|
||||
|
||||
@ -15,10 +15,35 @@ Min Metric definition
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
|
||||
from sqlalchemy import column, func
|
||||
from sqlalchemy import column
|
||||
from sqlalchemy.ext.compiler import compiles
|
||||
from sqlalchemy.sql.functions import GenericFunction
|
||||
|
||||
from metadata.profiler.metrics.core import StaticMetric, _label
|
||||
from metadata.profiler.orm.registry import is_date_time, is_quantifiable
|
||||
from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
|
||||
from metadata.profiler.orm.functions.length import LenFn
|
||||
from metadata.profiler.orm.registry import (
|
||||
Dialects,
|
||||
is_concatenable,
|
||||
is_date_time,
|
||||
is_quantifiable,
|
||||
)
|
||||
|
||||
|
||||
class MinFn(GenericFunction):
|
||||
name = __qualname__
|
||||
inherit_cache = CACHE
|
||||
|
||||
|
||||
@compiles(MinFn)
|
||||
def _(element, compiler, **kw):
|
||||
col = compiler.process(element.clauses, **kw)
|
||||
return f"MIN({col})"
|
||||
|
||||
|
||||
@compiles(MinFn, Dialects.Impala)
|
||||
def _(element, compiler, **kw):
|
||||
col = compiler.process(element.clauses, **kw)
|
||||
return f"MIN(if(is_nan({col}) or is_inf({col}), null, {col}))"
|
||||
|
||||
|
||||
class Min(StaticMetric):
|
||||
@ -35,9 +60,12 @@ class Min(StaticMetric):
|
||||
@_label
|
||||
def fn(self):
|
||||
"""sqlalchemy function"""
|
||||
if is_concatenable(self.col.type):
|
||||
return MinFn(LenFn(column(self.col.name)))
|
||||
|
||||
if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)):
|
||||
return None
|
||||
return func.min(column(self.col.name))
|
||||
return MinFn(column(self.col.name))
|
||||
|
||||
def df_fn(self, dfs=None):
|
||||
"""pandas function"""
|
||||
|
||||
@ -22,7 +22,8 @@ from sqlalchemy.ext.compiler import compiles
|
||||
from sqlalchemy.sql.functions import FunctionElement
|
||||
|
||||
from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
|
||||
from metadata.profiler.orm.registry import Dialects, is_quantifiable
|
||||
from metadata.profiler.orm.functions.length import LenFn
|
||||
from metadata.profiler.orm.registry import Dialects, is_concatenable, is_quantifiable
|
||||
from metadata.utils.logger import profiler_logger
|
||||
|
||||
logger = profiler_logger()
|
||||
@ -43,6 +44,12 @@ def _(element, compiler, **kw):
|
||||
return "STDEVP(%s)" % compiler.process(element.clauses, **kw)
|
||||
|
||||
|
||||
@compiles(StdDevFn, Dialects.Impala)
|
||||
def _(element, compiler, **kw):
|
||||
col = compiler.process(element.clauses, **kw)
|
||||
return f"STDDEV_POP(if(is_nan({col}) or is_inf({col}), null, {col}))"
|
||||
|
||||
|
||||
@compiles(StdDevFn, Dialects.SQLite) # Needed for unit tests
|
||||
def _(element, compiler, **kw):
|
||||
"""
|
||||
@ -84,6 +91,9 @@ class StdDev(StaticMetric):
|
||||
if is_quantifiable(self.col.type):
|
||||
return StdDevFn(column(self.col.name))
|
||||
|
||||
if is_concatenable(self.col.type):
|
||||
return StdDevFn(LenFn(column(self.col.name)))
|
||||
|
||||
logger.debug(
|
||||
f"{self.col} has type {self.col.type}, which is not listed as quantifiable."
|
||||
+ " We won't compute STDDEV for it."
|
||||
|
||||
@ -17,8 +17,9 @@ SUM Metric definition
|
||||
from sqlalchemy import column
|
||||
|
||||
from metadata.profiler.metrics.core import StaticMetric, _label
|
||||
from metadata.profiler.orm.functions.length import LenFn
|
||||
from metadata.profiler.orm.functions.sum import SumFn
|
||||
from metadata.profiler.orm.registry import is_quantifiable
|
||||
from metadata.profiler.orm.registry import is_concatenable, is_quantifiable
|
||||
|
||||
|
||||
class Sum(StaticMetric):
|
||||
@ -40,6 +41,9 @@ class Sum(StaticMetric):
|
||||
if is_quantifiable(self.col.type):
|
||||
return SumFn(column(self.col.name))
|
||||
|
||||
if is_concatenable(self.col.type):
|
||||
return SumFn(LenFn(column(self.col.name)))
|
||||
|
||||
return None
|
||||
|
||||
def df_fn(self, dfs=None):
|
||||
|
||||
@ -19,8 +19,9 @@ from typing import List, cast
|
||||
from sqlalchemy import column
|
||||
|
||||
from metadata.profiler.metrics.core import StaticMetric, _label
|
||||
from metadata.profiler.orm.functions.length import LenFn
|
||||
from metadata.profiler.orm.functions.median import MedianFn
|
||||
from metadata.profiler.orm.registry import is_quantifiable
|
||||
from metadata.profiler.orm.registry import is_concatenable, is_quantifiable
|
||||
from metadata.utils.logger import profiler_logger
|
||||
|
||||
logger = profiler_logger()
|
||||
@ -53,6 +54,9 @@ class Median(StaticMetric):
|
||||
if is_quantifiable(self.col.type):
|
||||
return MedianFn(column(self.col.name), self.col.table.fullname, 0.5)
|
||||
|
||||
if is_concatenable(self.col.type):
|
||||
return MedianFn(LenFn(column(self.col.name)), self.col.table.fullname, 0.5)
|
||||
|
||||
logger.debug(
|
||||
f"Don't know how to process type {self.col.type} when computing Median"
|
||||
)
|
||||
|
||||
@ -111,7 +111,7 @@ def _(elements, compiler, **kwargs):
|
||||
col, _, percentile = [
|
||||
compiler.process(element, **kwargs) for element in elements.clauses
|
||||
]
|
||||
return "if(%s = .5, appx_median(%s), null)" % (percentile, col)
|
||||
return f"if({percentile} = .5, appx_median(if(is_nan({col}) or is_inf({col}), null, {col})), null)"
|
||||
|
||||
|
||||
@compiles(MedianFn, Dialects.MySQL)
|
||||
@ -126,10 +126,10 @@ def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
|
||||
{col}
|
||||
FROM (
|
||||
SELECT
|
||||
t.{col},
|
||||
{col},
|
||||
ROW_NUMBER() OVER () AS row_num
|
||||
FROM
|
||||
{table} t,
|
||||
{table},
|
||||
(SELECT @counter := COUNT(*) FROM {table}) t_count
|
||||
ORDER BY {col}
|
||||
) temp
|
||||
|
||||
@ -74,6 +74,13 @@ def _(element, compiler, **kw):
|
||||
return f"SUM(CAST({proc} AS NUMBER))"
|
||||
|
||||
|
||||
@compiles(SumFn, Dialects.Impala)
|
||||
def _(element, compiler, **kw):
|
||||
"""Impala casting. Default BIGINT isn't big enough for some sums"""
|
||||
proc = compiler.process(element.clauses, **kw)
|
||||
return f"SUM(CAST(if(is_nan({proc}) or is_inf({proc}), null, {proc}) AS DOUBLE))"
|
||||
|
||||
|
||||
@compiles(SumFn, Dialects.IbmDbSa)
|
||||
@compiles(SumFn, Dialects.Db2)
|
||||
def _(element, compiler, **kw):
|
||||
|
||||
@ -594,7 +594,7 @@ class MetricsTest(TestCase):
|
||||
._column_results
|
||||
)
|
||||
|
||||
assert res.get(User.name.name).get(Metrics.SUM.name) is None
|
||||
assert res.get(User.name.name).get(Metrics.SUM.name) == 12
|
||||
|
||||
def test_unique_count(self):
|
||||
"""
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user