From 246bf15476e81f37aed6e4409dad2d6e7f306c00 Mon Sep 17 00:00:00 2001 From: Ayush Shah Date: Fri, 21 Jul 2023 10:19:56 +0530 Subject: [PATCH] Add Clickhouse profiler fix (#12531) --- .../source/database/clickhouse/metadata.py | 32 ++++++++++--------- .../source/database/column_type_parser.py | 6 ++-- .../source/database/sql_column_handler.py | 3 ++ .../profiler/metrics/hybrid/histogram.py | 2 +- .../profiler/metrics/static/count_in_set.py | 6 +++- .../profiler/metrics/static/ilike_count.py | 7 +++- .../profiler/metrics/static/like_count.py | 7 +++- .../metadata/profiler/metrics/static/max.py | 2 +- .../metadata/profiler/metrics/static/mean.py | 2 +- .../metadata/profiler/metrics/static/min.py | 2 +- .../profiler/metrics/static/not_like_count.py | 5 ++- .../metrics/static/not_regexp_match_count.py | 11 ++++++- .../profiler/metrics/static/null_count.py | 4 ++- .../metrics/static/regexp_match_count.py | 12 ++++++- .../profiler/metrics/static/stddev.py | 2 +- .../metadata/profiler/metrics/static/sum.py | 2 +- .../profiler/metrics/window/first_quartile.py | 2 +- .../profiler/metrics/window/median.py | 2 +- .../profiler/metrics/window/third_quartile.py | 2 +- .../metadata/profiler/orm/converter/common.py | 2 ++ .../metadata/profiler/orm/functions/length.py | 2 ++ .../metadata/profiler/orm/functions/median.py | 7 +++- .../src/metadata/profiler/orm/registry.py | 2 ++ .../metadata/profiler/orm/types/custom_ip.py | 30 +++++++++++++++++ .../json/schema/entity/data/table.json | 4 ++- 25 files changed, 123 insertions(+), 35 deletions(-) create mode 100644 ingestion/src/metadata/profiler/orm/types/custom_ip.py diff --git a/ingestion/src/metadata/ingestion/source/database/clickhouse/metadata.py b/ingestion/src/metadata/ingestion/source/database/clickhouse/metadata.py index 5fc99cd98d7..4a66e38e5d4 100644 --- a/ingestion/src/metadata/ingestion/source/database/clickhouse/metadata.py +++ b/ingestion/src/metadata/ingestion/source/database/clickhouse/metadata.py @@ -48,6 +48,9 @@ Map = create_sqlalchemy_type("Map") Array = create_sqlalchemy_type("Array") Enum = create_sqlalchemy_type("Enum") Tuple = create_sqlalchemy_type("Tuple") +BIGINT = create_sqlalchemy_type("BIGINT") +SMALLINT = create_sqlalchemy_type("SMALLINT") +INTEGER = create_sqlalchemy_type("INTEGER") ischema_names.update( { @@ -58,18 +61,20 @@ ischema_names.update( "Enum": Enum, "Date32": Date, "SimpleAggregateFunction": create_sqlalchemy_type("SimpleAggregateFunction"), - "Int256": create_sqlalchemy_type("BIGINT"), - "Int128": create_sqlalchemy_type("BIGINT"), - "Int64": create_sqlalchemy_type("BIGINT"), - "Int32": create_sqlalchemy_type("INTEGER"), - "Int16": create_sqlalchemy_type("SMALLINT"), - "Int8": create_sqlalchemy_type("SMALLINT"), - "UInt256": create_sqlalchemy_type("BIGINT"), - "UInt128": create_sqlalchemy_type("BIGINT"), - "UInt64": create_sqlalchemy_type("BIGINT"), - "UInt32": create_sqlalchemy_type("INTEGER"), - "UInt16": create_sqlalchemy_type("SMALLINT"), - "UInt8": create_sqlalchemy_type("SMALLINT"), + "Int256": BIGINT, + "Int128": BIGINT, + "Int64": BIGINT, + "Int32": INTEGER, + "Int16": SMALLINT, + "Int8": SMALLINT, + "UInt256": BIGINT, + "UInt128": BIGINT, + "UInt64": BIGINT, + "UInt32": INTEGER, + "UInt16": SMALLINT, + "UInt8": SMALLINT, + "IPv4": create_sqlalchemy_type("IPv4"), + "IPv6": create_sqlalchemy_type("IPv6"), } ) @@ -109,9 +114,6 @@ def _get_column_type( if spec.startswith("DateTime"): return self.ischema_names["DateTime"] - if spec.startswith("IP"): - return self.ischema_names["String"] - if spec.lower().startswith("decimal"): coltype = self.ischema_names["Decimal"] return coltype(*self._parse_decimal_params(spec)) diff --git a/ingestion/src/metadata/ingestion/source/database/column_type_parser.py b/ingestion/src/metadata/ingestion/source/database/column_type_parser.py index a0a3473f048..7ab833f36cf 100644 --- a/ingestion/src/metadata/ingestion/source/database/column_type_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/column_type_parser.py @@ -228,6 +228,8 @@ class ColumnTypeParser: "LOWCARDINALITY": "LOWCARDINALITY", "DATETIME64": "DATETIME", "SimpleAggregateFunction()": "AGGREGATEFUNCTION", + "IPV4": "IPV4", + "IPV6": "IPV6", # Databricks "VOID": "NULL", # mysql @@ -293,7 +295,7 @@ class ColumnTypeParser: for func in [ ColumnTypeParser.get_column_type_mapping, ColumnTypeParser.get_source_type_mapping, - ColumnTypeParser.get_source_type_containes_brackets, + ColumnTypeParser.get_source_type_contains_brackets, ]: column_type_result = func(column_type) if column_type_result: @@ -309,7 +311,7 @@ class ColumnTypeParser: return ColumnTypeParser._SOURCE_TYPE_TO_OM_TYPE.get(str(column_type), None) @staticmethod - def get_source_type_containes_brackets(column_type: Any) -> str: + def get_source_type_contains_brackets(column_type: Any) -> str: return ColumnTypeParser._SOURCE_TYPE_TO_OM_TYPE.get( str(column_type).split("(", maxsplit=1)[0].split("<")[0].upper(), None ) diff --git a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py index 5f4cb0745b9..a93140b3d5d 100644 --- a/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py +++ b/ingestion/src/metadata/ingestion/source/database/sql_column_handler.py @@ -254,6 +254,9 @@ class SqlColumnHandlerMixin: precision = ColumnTypeParser.check_col_precision( col_type, column["type"] ) + # Clickhouse nullable if true, data type as Null + if column.get("nullable"): + col_type = DataType.NULL.name if col_type is None: col_type = DataType.UNKNOWN.name data_type_display = col_type.lower() diff --git a/ingestion/src/metadata/profiler/metrics/hybrid/histogram.py b/ingestion/src/metadata/profiler/metrics/hybrid/histogram.py index 799c55df9b7..c8a647c47d8 100644 --- a/ingestion/src/metadata/profiler/metrics/hybrid/histogram.py +++ b/ingestion/src/metadata/profiler/metrics/hybrid/histogram.py @@ -167,7 +167,7 @@ class Histogram(HybridMetric): if is_concatenable(self.col.type): col = LenFn(column(self.col.name, self.col.type)) else: - col = column(self.col.name) # type: ignore + col = column(self.col.name, self.col.type) # type: ignore case_stmts = [] for bin_num in range(num_bins): diff --git a/ingestion/src/metadata/profiler/metrics/static/count_in_set.py b/ingestion/src/metadata/profiler/metrics/static/count_in_set.py index c2a4ce019df..648c5e9b398 100644 --- a/ingestion/src/metadata/profiler/metrics/static/count_in_set.py +++ b/ingestion/src/metadata/profiler/metrics/static/count_in_set.py @@ -56,7 +56,11 @@ class CountInSet(StaticMetric): try: set_values = set(self.values) - return SumFn(case([(column(self.col.name).in_(set_values), 1)], else_=0)) + return SumFn( + case( + [(column(self.col.name, self.col.type).in_(set_values), 1)], else_=0 + ) + ) except Exception as exc: # pylint: disable=broad-except logger.debug(traceback.format_exc()) diff --git a/ingestion/src/metadata/profiler/metrics/static/ilike_count.py b/ingestion/src/metadata/profiler/metrics/static/ilike_count.py index aa6492c3198..8d88eab1802 100644 --- a/ingestion/src/metadata/profiler/metrics/static/ilike_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/ilike_count.py @@ -47,4 +47,9 @@ class ILikeCount(StaticMetric): raise AttributeError( "ILike Count requires an expression to be set: add_props(expression=...)(Metrics.ILIKE_COUNT)" ) - return SumFn(case([(column(self.col.name).ilike(self.expression), 1)], else_=0)) + return SumFn( + case( + [(column(self.col.name, self.col.type).ilike(self.expression), 1)], + else_=0, + ) + ) diff --git a/ingestion/src/metadata/profiler/metrics/static/like_count.py b/ingestion/src/metadata/profiler/metrics/static/like_count.py index 5aa812a9ad2..4b8121cbe21 100644 --- a/ingestion/src/metadata/profiler/metrics/static/like_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/like_count.py @@ -47,4 +47,9 @@ class LikeCount(StaticMetric): raise AttributeError( "Like Count requires an expression to be set: add_props(expression=...)(Metrics.LIKE_COUNT)" ) - return SumFn(case([(column(self.col.name).like(self.expression), 1)], else_=0)) + return SumFn( + case( + [(column(self.col.name, self.col.type).like(self.expression), 1)], + else_=0, + ) + ) diff --git a/ingestion/src/metadata/profiler/metrics/static/max.py b/ingestion/src/metadata/profiler/metrics/static/max.py index b98d0c3c749..1cf63b77de1 100644 --- a/ingestion/src/metadata/profiler/metrics/static/max.py +++ b/ingestion/src/metadata/profiler/metrics/static/max.py @@ -57,7 +57,7 @@ class Max(StaticMetric): return MaxFn(LenFn(column(self.col.name, self.col.type))) if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)): return None - return MaxFn(column(self.col.name)) + return MaxFn(column(self.col.name, self.col.type)) def df_fn(self, dfs=None): """pandas function""" diff --git a/ingestion/src/metadata/profiler/metrics/static/mean.py b/ingestion/src/metadata/profiler/metrics/static/mean.py index 47e9ecb62f7..e5097d16951 100644 --- a/ingestion/src/metadata/profiler/metrics/static/mean.py +++ b/ingestion/src/metadata/profiler/metrics/static/mean.py @@ -74,7 +74,7 @@ class Mean(StaticMetric): def fn(self): """sqlalchemy function""" if is_quantifiable(self.col.type): - return func.avg(column(self.col.name)) + return func.avg(column(self.col.name, self.col.type)) if is_concatenable(self.col.type): return func.avg(LenFn(column(self.col.name, self.col.type))) diff --git a/ingestion/src/metadata/profiler/metrics/static/min.py b/ingestion/src/metadata/profiler/metrics/static/min.py index d871ba62e86..0bbfed19f26 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min.py +++ b/ingestion/src/metadata/profiler/metrics/static/min.py @@ -57,7 +57,7 @@ class Min(StaticMetric): if (not is_quantifiable(self.col.type)) and (not is_date_time(self.col.type)): return None - return MinFn(column(self.col.name)) + return MinFn(column(self.col.name, self.col.type)) def df_fn(self, dfs=None): """pandas function""" diff --git a/ingestion/src/metadata/profiler/metrics/static/not_like_count.py b/ingestion/src/metadata/profiler/metrics/static/not_like_count.py index 9967de76063..bc1d2c3089f 100644 --- a/ingestion/src/metadata/profiler/metrics/static/not_like_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/not_like_count.py @@ -48,5 +48,8 @@ class NotLikeCount(StaticMetric): "Not Like Count requires an expression to be set: add_props(expression=...)(Metrics.NOT_LIKE_COUNT)" ) return SumFn( - case([(column(self.col.name).not_like(self.expression), 0)], else_=1) + case( + [(column(self.col.name, self.col.type).not_like(self.expression), 0)], + else_=1, + ) ) diff --git a/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py b/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py index 3be07c1f42c..122f1c98654 100644 --- a/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/not_regexp_match_count.py @@ -54,7 +54,16 @@ class NotRegexCount(StaticMetric): ) return SumFn( case( - [(not_(column(self.col.name).regexp_match(self.expression)), 0)], + [ + ( + not_( + column(self.col.name, self.col.type).regexp_match( + self.expression + ) + ), + 0, + ) + ], else_=1, ) ) diff --git a/ingestion/src/metadata/profiler/metrics/static/null_count.py b/ingestion/src/metadata/profiler/metrics/static/null_count.py index 253d0473502..3c1af4da9d3 100644 --- a/ingestion/src/metadata/profiler/metrics/static/null_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/null_count.py @@ -47,7 +47,9 @@ class NullCount(StaticMetric): @_label def fn(self): """sqlalchemy function""" - return SumFn(case([(column(self.col.name).is_(None), 1)], else_=0)) + return SumFn( + case([(column(self.col.name, self.col.type).is_(None), 1)], else_=0) + ) def df_fn(self, dfs=None): """pandas function""" diff --git a/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py b/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py index c13b1d38054..2fb465d80af 100644 --- a/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/regexp_match_count.py @@ -53,7 +53,17 @@ class RegexCount(StaticMetric): "Regex Count requires an expression to be set: add_props(expression=...)(Metrics.REGEX_COUNT)" ) return SumFn( - case([(column(self.col.name).regexp_match(self.expression), 1)], else_=0) + case( + [ + ( + column(self.col.name, self.col.type).regexp_match( + self.expression + ), + 1, + ) + ], + else_=0, + ) ) def df_fn(self, dfs): diff --git a/ingestion/src/metadata/profiler/metrics/static/stddev.py b/ingestion/src/metadata/profiler/metrics/static/stddev.py index 7a1487b7eaa..9946de5905c 100644 --- a/ingestion/src/metadata/profiler/metrics/static/stddev.py +++ b/ingestion/src/metadata/profiler/metrics/static/stddev.py @@ -83,7 +83,7 @@ class StdDev(StaticMetric): def fn(self): """sqlalchemy function""" if is_quantifiable(self.col.type): - return StdDevFn(column(self.col.name)) + return StdDevFn(column(self.col.name, self.col.type)) if is_concatenable(self.col.type): return StdDevFn(LenFn(column(self.col.name, self.col.type))) diff --git a/ingestion/src/metadata/profiler/metrics/static/sum.py b/ingestion/src/metadata/profiler/metrics/static/sum.py index ab3bda5d74a..64469f5dd1b 100644 --- a/ingestion/src/metadata/profiler/metrics/static/sum.py +++ b/ingestion/src/metadata/profiler/metrics/static/sum.py @@ -39,7 +39,7 @@ class Sum(StaticMetric): def fn(self): """sqlalchemy function""" if is_quantifiable(self.col.type): - return SumFn(column(self.col.name)) + return SumFn(column(self.col.name, self.col.type)) if is_concatenable(self.col.type): return SumFn(LenFn(column(self.col.name, self.col.type))) diff --git a/ingestion/src/metadata/profiler/metrics/window/first_quartile.py b/ingestion/src/metadata/profiler/metrics/window/first_quartile.py index 5214d0be0aa..c508f276ec0 100644 --- a/ingestion/src/metadata/profiler/metrics/window/first_quartile.py +++ b/ingestion/src/metadata/profiler/metrics/window/first_quartile.py @@ -54,7 +54,7 @@ class FirstQuartile(StaticMetric): if is_quantifiable(self.col.type): # col fullname is only needed for MySQL and SQLite return MedianFn( - column(self.col.name), + column(self.col.name, self.col.type), self.col.table.fullname if self.col.table is not None else None, 0.25, ) diff --git a/ingestion/src/metadata/profiler/metrics/window/median.py b/ingestion/src/metadata/profiler/metrics/window/median.py index d7137f3a608..1a4c623e581 100644 --- a/ingestion/src/metadata/profiler/metrics/window/median.py +++ b/ingestion/src/metadata/profiler/metrics/window/median.py @@ -54,7 +54,7 @@ class Median(StaticMetric): if is_quantifiable(self.col.type): # col fullname is only needed for MySQL and SQLite return MedianFn( - column(self.col.name), + column(self.col.name, self.col.type), self.col.table.fullname if self.col.table is not None else None, 0.5, ) diff --git a/ingestion/src/metadata/profiler/metrics/window/third_quartile.py b/ingestion/src/metadata/profiler/metrics/window/third_quartile.py index 8e17275a30a..d571739419d 100644 --- a/ingestion/src/metadata/profiler/metrics/window/third_quartile.py +++ b/ingestion/src/metadata/profiler/metrics/window/third_quartile.py @@ -54,7 +54,7 @@ class ThirdQuartile(StaticMetric): if is_quantifiable(self.col.type): # col fullname is only needed for MySQL and SQLite return MedianFn( - column(self.col.name), + column(self.col.name, self.col.type), self.col.table.fullname if self.col.table is not None else None, 0.75, ) diff --git a/ingestion/src/metadata/profiler/orm/converter/common.py b/ingestion/src/metadata/profiler/orm/converter/common.py index c0d99e1580e..0fb2ba00a2f 100644 --- a/ingestion/src/metadata/profiler/orm/converter/common.py +++ b/ingestion/src/metadata/profiler/orm/converter/common.py @@ -65,6 +65,8 @@ class CommonMapTypes: DataType.BYTEA: CustomTypes.BYTEA.value, DataType.NTEXT: sqlalchemy.NVARCHAR, DataType.IMAGE: CustomTypes.IMAGE.value, + DataType.IPV4: CustomTypes.IP.value, + DataType.IPV6: CustomTypes.IP.value, } def map_types(self, col: Column, table_service_type): diff --git a/ingestion/src/metadata/profiler/orm/functions/length.py b/ingestion/src/metadata/profiler/orm/functions/length.py index 978c81d0431..cbe7181cbbe 100644 --- a/ingestion/src/metadata/profiler/orm/functions/length.py +++ b/ingestion/src/metadata/profiler/orm/functions/length.py @@ -61,6 +61,8 @@ def _(element, compiler, **kw): @compiles(LenFn, Dialects.ClickHouse) def _(element, compiler, **kw): """Handles lenght function for ClickHouse""" + if isinstance(element.clauses.clauses[0].type, sqltypes.Enum): + return "length(cast(%s, 'String'))" % compiler.process(element.clauses, **kw) return "length(%s)" % compiler.process(element.clauses, **kw) diff --git a/ingestion/src/metadata/profiler/orm/functions/median.py b/ingestion/src/metadata/profiler/orm/functions/median.py index dbd36fbe2e4..c210f8848f8 100644 --- a/ingestion/src/metadata/profiler/orm/functions/median.py +++ b/ingestion/src/metadata/profiler/orm/functions/median.py @@ -16,6 +16,7 @@ Define Median function # pylint: disable=consider-using-f-string,duplicate-code from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql.functions import FunctionElement +from sqlalchemy.sql.sqltypes import DECIMAL from metadata.profiler.metrics.core import CACHE from metadata.profiler.orm.registry import Dialects @@ -48,7 +49,11 @@ def _(elements, compiler, **kwargs): col, _, percentile = [ compiler.process(element, **kwargs) for element in elements.clauses ] - return "if(isNaN(quantile(%s)(%s)),null,quantile(%s)(%s))" % ((percentile, col) * 2) + quantile_str = f"quantile({percentile})({col})" + null_check = ( + "isNull" if isinstance(elements.clauses.clauses[0].type, DECIMAL) else "isNaN" + ) + return f"if({null_check}({quantile_str}), null, {quantile_str})" # pylint: disable=unused-argument diff --git a/ingestion/src/metadata/profiler/orm/registry.py b/ingestion/src/metadata/profiler/orm/registry.py index fcdb8375d52..3678a54c393 100644 --- a/ingestion/src/metadata/profiler/orm/registry.py +++ b/ingestion/src/metadata/profiler/orm/registry.py @@ -22,6 +22,7 @@ from metadata.ingestion.source import sqa_types from metadata.profiler.orm.types.bytea_to_string import ByteaToHex from metadata.profiler.orm.types.custom_array import CustomArray from metadata.profiler.orm.types.custom_image import CustomImage +from metadata.profiler.orm.types.custom_ip import CustomIP from metadata.profiler.orm.types.custom_timestamp import CustomTimestamp from metadata.profiler.orm.types.hex_byte_string import HexByteString from metadata.profiler.orm.types.uuid import UUIDString @@ -36,6 +37,7 @@ class CustomTypes(TypeRegistry): ARRAY = CustomArray TIMESTAMP = CustomTimestamp IMAGE = CustomImage + IP = CustomIP class Dialects(Enum): diff --git a/ingestion/src/metadata/profiler/orm/types/custom_ip.py b/ingestion/src/metadata/profiler/orm/types/custom_ip.py new file mode 100644 index 00000000000..a9e719df325 --- /dev/null +++ b/ingestion/src/metadata/profiler/orm/types/custom_ip.py @@ -0,0 +1,30 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=abstract-method + +""" +Expand sqlalchemy types to map them to OpenMetadata DataType +""" +from sqlalchemy.sql.sqltypes import String, TypeDecorator + +from metadata.utils.logger import profiler_logger + +logger = profiler_logger() + + +class CustomIP(TypeDecorator): + """ + Convert RowVersion + """ + + impl = String + cache_ok = True diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json index ec3a65ad598..e31178bb58a 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/data/table.json @@ -148,7 +148,9 @@ "SPATIAL", "TABLE", "NTEXT", - "IMAGE" + "IMAGE", + "IPV4", + "IPV6" ] }, "constraint": {