Add Clickhouse profiler fix (#12531)

This commit is contained in:
Ayush Shah 2023-07-21 10:19:56 +05:30 committed by GitHub
parent cd347299d7
commit 246bf15476
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 123 additions and 35 deletions

View File

@ -48,6 +48,9 @@ Map = create_sqlalchemy_type("Map")
Array = create_sqlalchemy_type("Array") Array = create_sqlalchemy_type("Array")
Enum = create_sqlalchemy_type("Enum") Enum = create_sqlalchemy_type("Enum")
Tuple = create_sqlalchemy_type("Tuple") Tuple = create_sqlalchemy_type("Tuple")
BIGINT = create_sqlalchemy_type("BIGINT")
SMALLINT = create_sqlalchemy_type("SMALLINT")
INTEGER = create_sqlalchemy_type("INTEGER")
ischema_names.update( ischema_names.update(
{ {
@ -58,18 +61,20 @@ ischema_names.update(
"Enum": Enum, "Enum": Enum,
"Date32": Date, "Date32": Date,
"SimpleAggregateFunction": create_sqlalchemy_type("SimpleAggregateFunction"), "SimpleAggregateFunction": create_sqlalchemy_type("SimpleAggregateFunction"),
"Int256": create_sqlalchemy_type("BIGINT"), "Int256": BIGINT,
"Int128": create_sqlalchemy_type("BIGINT"), "Int128": BIGINT,
"Int64": create_sqlalchemy_type("BIGINT"), "Int64": BIGINT,
"Int32": create_sqlalchemy_type("INTEGER"), "Int32": INTEGER,
"Int16": create_sqlalchemy_type("SMALLINT"), "Int16": SMALLINT,
"Int8": create_sqlalchemy_type("SMALLINT"), "Int8": SMALLINT,
"UInt256": create_sqlalchemy_type("BIGINT"), "UInt256": BIGINT,
"UInt128": create_sqlalchemy_type("BIGINT"), "UInt128": BIGINT,
"UInt64": create_sqlalchemy_type("BIGINT"), "UInt64": BIGINT,
"UInt32": create_sqlalchemy_type("INTEGER"), "UInt32": INTEGER,
"UInt16": create_sqlalchemy_type("SMALLINT"), "UInt16": SMALLINT,
"UInt8": create_sqlalchemy_type("SMALLINT"), "UInt8": SMALLINT,
"IPv4": create_sqlalchemy_type("IPv4"),
"IPv6": create_sqlalchemy_type("IPv6"),
} }
) )
@ -109,9 +114,6 @@ def _get_column_type(
if spec.startswith("DateTime"): if spec.startswith("DateTime"):
return self.ischema_names["DateTime"] return self.ischema_names["DateTime"]
if spec.startswith("IP"):
return self.ischema_names["String"]
if spec.lower().startswith("decimal"): if spec.lower().startswith("decimal"):
coltype = self.ischema_names["Decimal"] coltype = self.ischema_names["Decimal"]
return coltype(*self._parse_decimal_params(spec)) return coltype(*self._parse_decimal_params(spec))

View File

@ -228,6 +228,8 @@ class ColumnTypeParser:
"LOWCARDINALITY": "LOWCARDINALITY", "LOWCARDINALITY": "LOWCARDINALITY",
"DATETIME64": "DATETIME", "DATETIME64": "DATETIME",
"SimpleAggregateFunction()": "AGGREGATEFUNCTION", "SimpleAggregateFunction()": "AGGREGATEFUNCTION",
"IPV4": "IPV4",
"IPV6": "IPV6",
# Databricks # Databricks
"VOID": "NULL", "VOID": "NULL",
# mysql # mysql
@ -293,7 +295,7 @@ class ColumnTypeParser:
for func in [ for func in [
ColumnTypeParser.get_column_type_mapping, ColumnTypeParser.get_column_type_mapping,
ColumnTypeParser.get_source_type_mapping, ColumnTypeParser.get_source_type_mapping,
ColumnTypeParser.get_source_type_containes_brackets, ColumnTypeParser.get_source_type_contains_brackets,
]: ]:
column_type_result = func(column_type) column_type_result = func(column_type)
if column_type_result: if column_type_result:
@ -309,7 +311,7 @@ class ColumnTypeParser:
return ColumnTypeParser._SOURCE_TYPE_TO_OM_TYPE.get(str(column_type), None) return ColumnTypeParser._SOURCE_TYPE_TO_OM_TYPE.get(str(column_type), None)
@staticmethod @staticmethod
def get_source_type_containes_brackets(column_type: Any) -> str: def get_source_type_contains_brackets(column_type: Any) -> str:
return ColumnTypeParser._SOURCE_TYPE_TO_OM_TYPE.get( return ColumnTypeParser._SOURCE_TYPE_TO_OM_TYPE.get(
str(column_type).split("(", maxsplit=1)[0].split("<")[0].upper(), None str(column_type).split("(", maxsplit=1)[0].split("<")[0].upper(), None
) )

View File

@ -254,6 +254,9 @@ class SqlColumnHandlerMixin:
precision = ColumnTypeParser.check_col_precision( precision = ColumnTypeParser.check_col_precision(
col_type, column["type"] col_type, column["type"]
) )
# Clickhouse nullable if true, data type as Null
if column.get("nullable"):
col_type = DataType.NULL.name
if col_type is None: if col_type is None:
col_type = DataType.UNKNOWN.name col_type = DataType.UNKNOWN.name
data_type_display = col_type.lower() data_type_display = col_type.lower()

View File

@ -167,7 +167,7 @@ class Histogram(HybridMetric):
if is_concatenable(self.col.type): if is_concatenable(self.col.type):
col = LenFn(column(self.col.name, self.col.type)) col = LenFn(column(self.col.name, self.col.type))
else: else:
col = column(self.col.name) # type: ignore col = column(self.col.name, self.col.type) # type: ignore
case_stmts = [] case_stmts = []
for bin_num in range(num_bins): for bin_num in range(num_bins):

View File

@ -56,7 +56,11 @@ class CountInSet(StaticMetric):
try: try:
set_values = set(self.values) set_values = set(self.values)
return SumFn(case([(column(self.col.name).in_(set_values), 1)], else_=0)) return SumFn(
case(
[(column(self.col.name, self.col.type).in_(set_values), 1)], else_=0
)
)
except Exception as exc: # pylint: disable=broad-except except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())

View File

@ -47,4 +47,9 @@ class ILikeCount(StaticMetric):
raise AttributeError( raise AttributeError(
"ILike Count requires an expression to be set: add_props(expression=...)(Metrics.ILIKE_COUNT)" "ILike Count requires an expression to be set: add_props(expression=...)(Metrics.ILIKE_COUNT)"
) )
return SumFn(case([(column(self.col.name).ilike(self.expression), 1)], else_=0)) return SumFn(
case(
[(column(self.col.name, self.col.type).ilike(self.expression), 1)],
else_=0,
)
)

View File

@ -47,4 +47,9 @@ class LikeCount(StaticMetric):
raise AttributeError( raise AttributeError(
"Like Count requires an expression to be set: add_props(expression=...)(Metrics.LIKE_COUNT)" "Like Count requires an expression to be set: add_props(expression=...)(Metrics.LIKE_COUNT)"
) )
return SumFn(case([(column(self.col.name).like(self.expression), 1)], else_=0)) return SumFn(
case(
[(column(self.col.name, self.col.type).like(self.expression), 1)],
else_=0,
)
)

View File

@ -57,7 +57,7 @@ class Max(StaticMetric):
return MaxFn(LenFn(column(self.col.name, self.col.type))) return MaxFn(LenFn(column(self.col.name, self.col.type)))
if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)): if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)):
return None return None
return MaxFn(column(self.col.name)) return MaxFn(column(self.col.name, self.col.type))
def df_fn(self, dfs=None): def df_fn(self, dfs=None):
"""pandas function""" """pandas function"""

View File

@ -74,7 +74,7 @@ class Mean(StaticMetric):
def fn(self): def fn(self):
"""sqlalchemy function""" """sqlalchemy function"""
if is_quantifiable(self.col.type): if is_quantifiable(self.col.type):
return func.avg(column(self.col.name)) return func.avg(column(self.col.name, self.col.type))
if is_concatenable(self.col.type): if is_concatenable(self.col.type):
return func.avg(LenFn(column(self.col.name, self.col.type))) return func.avg(LenFn(column(self.col.name, self.col.type)))

View File

@ -57,7 +57,7 @@ class Min(StaticMetric):
if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)): if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)):
return None return None
return MinFn(column(self.col.name)) return MinFn(column(self.col.name, self.col.type))
def df_fn(self, dfs=None): def df_fn(self, dfs=None):
"""pandas function""" """pandas function"""

View File

@ -48,5 +48,8 @@ class NotLikeCount(StaticMetric):
"Not Like Count requires an expression to be set: add_props(expression=...)(Metrics.NOT_LIKE_COUNT)" "Not Like Count requires an expression to be set: add_props(expression=...)(Metrics.NOT_LIKE_COUNT)"
) )
return SumFn( return SumFn(
case([(column(self.col.name).not_like(self.expression), 0)], else_=1) case(
[(column(self.col.name, self.col.type).not_like(self.expression), 0)],
else_=1,
)
) )

View File

@ -54,7 +54,16 @@ class NotRegexCount(StaticMetric):
) )
return SumFn( return SumFn(
case( case(
[(not_(column(self.col.name).regexp_match(self.expression)), 0)], [
(
not_(
column(self.col.name, self.col.type).regexp_match(
self.expression
)
),
0,
)
],
else_=1, else_=1,
) )
) )

View File

@ -47,7 +47,9 @@ class NullCount(StaticMetric):
@_label @_label
def fn(self): def fn(self):
"""sqlalchemy function""" """sqlalchemy function"""
return SumFn(case([(column(self.col.name).is_(None), 1)], else_=0)) return SumFn(
case([(column(self.col.name, self.col.type).is_(None), 1)], else_=0)
)
def df_fn(self, dfs=None): def df_fn(self, dfs=None):
"""pandas function""" """pandas function"""

View File

@ -53,7 +53,17 @@ class RegexCount(StaticMetric):
"Regex Count requires an expression to be set: add_props(expression=...)(Metrics.REGEX_COUNT)" "Regex Count requires an expression to be set: add_props(expression=...)(Metrics.REGEX_COUNT)"
) )
return SumFn( return SumFn(
case([(column(self.col.name).regexp_match(self.expression), 1)], else_=0) case(
[
(
column(self.col.name, self.col.type).regexp_match(
self.expression
),
1,
)
],
else_=0,
)
) )
def df_fn(self, dfs): def df_fn(self, dfs):

View File

@ -83,7 +83,7 @@ class StdDev(StaticMetric):
def fn(self): def fn(self):
"""sqlalchemy function""" """sqlalchemy function"""
if is_quantifiable(self.col.type): if is_quantifiable(self.col.type):
return StdDevFn(column(self.col.name)) return StdDevFn(column(self.col.name, self.col.type))
if is_concatenable(self.col.type): if is_concatenable(self.col.type):
return StdDevFn(LenFn(column(self.col.name, self.col.type))) return StdDevFn(LenFn(column(self.col.name, self.col.type)))

View File

@ -39,7 +39,7 @@ class Sum(StaticMetric):
def fn(self): def fn(self):
"""sqlalchemy function""" """sqlalchemy function"""
if is_quantifiable(self.col.type): if is_quantifiable(self.col.type):
return SumFn(column(self.col.name)) return SumFn(column(self.col.name, self.col.type))
if is_concatenable(self.col.type): if is_concatenable(self.col.type):
return SumFn(LenFn(column(self.col.name, self.col.type))) return SumFn(LenFn(column(self.col.name, self.col.type)))

View File

@ -54,7 +54,7 @@ class FirstQuartile(StaticMetric):
if is_quantifiable(self.col.type): if is_quantifiable(self.col.type):
# col fullname is only needed for MySQL and SQLite # col fullname is only needed for MySQL and SQLite
return MedianFn( return MedianFn(
column(self.col.name), column(self.col.name, self.col.type),
self.col.table.fullname if self.col.table is not None else None, self.col.table.fullname if self.col.table is not None else None,
0.25, 0.25,
) )

View File

@ -54,7 +54,7 @@ class Median(StaticMetric):
if is_quantifiable(self.col.type): if is_quantifiable(self.col.type):
# col fullname is only needed for MySQL and SQLite # col fullname is only needed for MySQL and SQLite
return MedianFn( return MedianFn(
column(self.col.name), column(self.col.name, self.col.type),
self.col.table.fullname if self.col.table is not None else None, self.col.table.fullname if self.col.table is not None else None,
0.5, 0.5,
) )

View File

@ -54,7 +54,7 @@ class ThirdQuartile(StaticMetric):
if is_quantifiable(self.col.type): if is_quantifiable(self.col.type):
# col fullname is only needed for MySQL and SQLite # col fullname is only needed for MySQL and SQLite
return MedianFn( return MedianFn(
column(self.col.name), column(self.col.name, self.col.type),
self.col.table.fullname if self.col.table is not None else None, self.col.table.fullname if self.col.table is not None else None,
0.75, 0.75,
) )

View File

@ -65,6 +65,8 @@ class CommonMapTypes:
DataType.BYTEA: CustomTypes.BYTEA.value, DataType.BYTEA: CustomTypes.BYTEA.value,
DataType.NTEXT: sqlalchemy.NVARCHAR, DataType.NTEXT: sqlalchemy.NVARCHAR,
DataType.IMAGE: CustomTypes.IMAGE.value, DataType.IMAGE: CustomTypes.IMAGE.value,
DataType.IPV4: CustomTypes.IP.value,
DataType.IPV6: CustomTypes.IP.value,
} }
def map_types(self, col: Column, table_service_type): def map_types(self, col: Column, table_service_type):

View File

@ -61,6 +61,8 @@ def _(element, compiler, **kw):
@compiles(LenFn, Dialects.ClickHouse) @compiles(LenFn, Dialects.ClickHouse)
def _(element, compiler, **kw): def _(element, compiler, **kw):
"""Handles lenght function for ClickHouse""" """Handles lenght function for ClickHouse"""
if isinstance(element.clauses.clauses[0].type, sqltypes.Enum):
return "length(cast(%s, 'String'))" % compiler.process(element.clauses, **kw)
return "length(%s)" % compiler.process(element.clauses, **kw) return "length(%s)" % compiler.process(element.clauses, **kw)

View File

@ -16,6 +16,7 @@ Define Median function
# pylint: disable=consider-using-f-string,duplicate-code # pylint: disable=consider-using-f-string,duplicate-code
from sqlalchemy.ext.compiler import compiles from sqlalchemy.ext.compiler import compiles
from sqlalchemy.sql.functions import FunctionElement from sqlalchemy.sql.functions import FunctionElement
from sqlalchemy.sql.sqltypes import DECIMAL
from metadata.profiler.metrics.core import CACHE from metadata.profiler.metrics.core import CACHE
from metadata.profiler.orm.registry import Dialects from metadata.profiler.orm.registry import Dialects
@ -48,7 +49,11 @@ def _(elements, compiler, **kwargs):
col, _, percentile = [ col, _, percentile = [
compiler.process(element, **kwargs) for element in elements.clauses compiler.process(element, **kwargs) for element in elements.clauses
] ]
return "if(isNaN(quantile(%s)(%s)),null,quantile(%s)(%s))" % ((percentile, col) * 2) quantile_str = f"quantile({percentile})({col})"
null_check = (
"isNull" if isinstance(elements.clauses.clauses[0].type, DECIMAL) else "isNaN"
)
return f"if({null_check}({quantile_str}), null, {quantile_str})"
# pylint: disable=unused-argument # pylint: disable=unused-argument

View File

@ -22,6 +22,7 @@ from metadata.ingestion.source import sqa_types
from metadata.profiler.orm.types.bytea_to_string import ByteaToHex from metadata.profiler.orm.types.bytea_to_string import ByteaToHex
from metadata.profiler.orm.types.custom_array import CustomArray from metadata.profiler.orm.types.custom_array import CustomArray
from metadata.profiler.orm.types.custom_image import CustomImage from metadata.profiler.orm.types.custom_image import CustomImage
from metadata.profiler.orm.types.custom_ip import CustomIP
from metadata.profiler.orm.types.custom_timestamp import CustomTimestamp from metadata.profiler.orm.types.custom_timestamp import CustomTimestamp
from metadata.profiler.orm.types.hex_byte_string import HexByteString from metadata.profiler.orm.types.hex_byte_string import HexByteString
from metadata.profiler.orm.types.uuid import UUIDString from metadata.profiler.orm.types.uuid import UUIDString
@ -36,6 +37,7 @@ class CustomTypes(TypeRegistry):
ARRAY = CustomArray ARRAY = CustomArray
TIMESTAMP = CustomTimestamp TIMESTAMP = CustomTimestamp
IMAGE = CustomImage IMAGE = CustomImage
IP = CustomIP
class Dialects(Enum): class Dialects(Enum):

View File

@ -0,0 +1,30 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=abstract-method
"""
Expand sqlalchemy types to map them to OpenMetadata DataType
"""
from sqlalchemy.sql.sqltypes import String, TypeDecorator
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
class CustomIP(TypeDecorator):
"""
Convert RowVersion
"""
impl = String
cache_ok = True

View File

@ -148,7 +148,9 @@
"SPATIAL", "SPATIAL",
"TABLE", "TABLE",
"NTEXT", "NTEXT",
"IMAGE" "IMAGE",
"IPV4",
"IPV6"
] ]
}, },
"constraint": { "constraint": {