diff --git a/ingestion/src/metadata/profiler/metrics/static/unique_count.py b/ingestion/src/metadata/profiler/metrics/static/unique_count.py index a664439053b..9adf9196fb0 100644 --- a/ingestion/src/metadata/profiler/metrics/static/unique_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/unique_count.py @@ -14,14 +14,12 @@ Unique Count Metric definition """ from typing import Optional -from sqlalchemy import NVARCHAR, TEXT, column, func, literal_column +from sqlalchemy import column, func from sqlalchemy.orm import DeclarativeMeta, Session from metadata.profiler.metrics.core import QueryMetric -from metadata.profiler.orm.converter.mssql.converter import cast_dict -from metadata.profiler.orm.functions.count import CountFn +from metadata.profiler.orm.functions.unique_count import _unique_count_query_mapper from metadata.profiler.orm.registry import NOT_COMPUTE -from metadata.profiler.orm.types.custom_image import CustomImage from metadata.utils.logger import profiler_logger logger = profiler_logger() @@ -58,28 +56,10 @@ class UniqueCount(QueryMetric): # Run all queries on top of the sampled data col = column(self.col.name, self.col.type) - - is_mssql = ( - hasattr(session.bind, "dialect") and session.bind.dialect.name == "mssql" + unique_count_query = _unique_count_query_mapper[session.bind.dialect.name]( + col, session, sample ) - is_mssql_deprecated_datatype = isinstance( - self.col.type, (CustomImage, TEXT, NVARCHAR) - ) - - count_fn = CountFn(col) if is_mssql and is_mssql_deprecated_datatype else col - group_by_col = ( - func.convert(literal_column(cast_dict.get(type(self.col.type))), col) - if is_mssql and is_mssql_deprecated_datatype - else col - ) - - only_once = ( - session.query(func.count(count_fn)) - .select_from(sample) - .group_by(group_by_col) - .having(func.count(count_fn) == 1) - ) - only_once_cte = only_once.cte("only_once") + only_once_cte = unique_count_query.cte("only_once") return session.query(func.count().label(self.name())).select_from(only_once_cte) def df_fn(self, dfs=None): diff --git a/ingestion/src/metadata/profiler/orm/converter/base.py b/ingestion/src/metadata/profiler/orm/converter/base.py index 60baf8bfcea..6fce328af85 100644 --- a/ingestion/src/metadata/profiler/orm/converter/base.py +++ b/ingestion/src/metadata/profiler/orm/converter/base.py @@ -15,13 +15,15 @@ to an SQLAlchemy ORM class. """ from typing import Optional, cast +import sqlalchemy from sqlalchemy import MetaData from sqlalchemy.orm import DeclarativeMeta, declarative_base from metadata.generated.schema.entity.data.database import Database, databaseService from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.table import Column, Table from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.profiler.orm.converter.converter_registry import converter_registry Base = declarative_base() @@ -62,6 +64,32 @@ def check_if_should_quote_column_name(table_service_type) -> Optional[bool]: return None +def build_orm_col(idx: int, col: Column, table_service_type) -> sqlalchemy.Column: + """ + Cook the ORM column from our metadata instance + information. + + The first parsed column will be used arbitrarily + as the PK, as SQLAlchemy forces us to specify + at least one PK. + + As this is only used for INSERT/UPDATE/DELETE, + there is no impact for our read-only purposes. + """ + return sqlalchemy.Column( + name=str(col.name.__root__), + type_=converter_registry[table_service_type]().map_types( + col, table_service_type + ), + primary_key=not bool(idx), # The first col seen is used as PK + quote=check_if_should_quote_column_name(table_service_type) + or check_snowflake_case_sensitive(table_service_type, col.name.__root__), + key=str( + col.name.__root__ + ).lower(), # Add lowercase column name as key for snowflake case sensitive columns + ) + + def ometa_to_sqa_orm( table: Table, metadata: OpenMetadata, sqa_metadata_obj: Optional[MetaData] = None ) -> DeclarativeMeta: @@ -74,9 +102,6 @@ def ometa_to_sqa_orm( `type` and passing SQLAlchemy `Base` class as the bases tuple for inheritance. """ - # pylint: disable=import-outside-toplevel,cyclic-import - from metadata.profiler.orm.converter.dispatch_converter import build_orm_col - table.serviceType = cast( databaseService.DatabaseServiceType, table.serviceType ) # satisfy mypy diff --git a/ingestion/src/metadata/profiler/orm/converter/converter_registry.py b/ingestion/src/metadata/profiler/orm/converter/converter_registry.py new file mode 100644 index 00000000000..9f918121ab1 --- /dev/null +++ b/ingestion/src/metadata/profiler/orm/converter/converter_registry.py @@ -0,0 +1,26 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Dispatch logic to map an Converter base based on dialect +""" +from collections import defaultdict + +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseServiceType, +) +from metadata.profiler.orm.converter.bigquery.converter import BigqueryMapTypes +from metadata.profiler.orm.converter.common import CommonMapTypes +from metadata.profiler.orm.converter.snowflake.converter import SnowflakeMapTypes + +converter_registry = defaultdict(lambda: CommonMapTypes) +converter_registry[DatabaseServiceType.BigQuery] = BigqueryMapTypes +converter_registry[DatabaseServiceType.Snowflake] = SnowflakeMapTypes diff --git a/ingestion/src/metadata/profiler/orm/converter/dispatch_converter.py b/ingestion/src/metadata/profiler/orm/converter/dispatch_converter.py deleted file mode 100644 index dc1f5b770ce..00000000000 --- a/ingestion/src/metadata/profiler/orm/converter/dispatch_converter.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Dispatch logic to map an Converter base based on dialect -""" -import sqlalchemy - -from metadata.generated.schema.entity.data.table import Column -from metadata.generated.schema.entity.services.databaseService import ( - DatabaseServiceType, -) -from metadata.profiler.orm.converter.bigquery.converter import BigqueryMapTypes -from metadata.profiler.orm.converter.snowflake.converter import SnowflakeMapTypes - -converter_registry = { - DatabaseServiceType.BigQuery: BigqueryMapTypes, - DatabaseServiceType.Snowflake: SnowflakeMapTypes, -} - - -def build_orm_col(idx: int, col: Column, table_service_type) -> sqlalchemy.Column: - """ - Cook the ORM column from our metadata instance - information. - - The first parsed column will be used arbitrarily - as the PK, as SQLAlchemy forces us to specify - at least one PK. - - As this is only used for INSERT/UPDATE/DELETE, - there is no impact for our read-only purposes. - """ - # pylint: disable=import-outside-toplevel - from metadata.profiler.orm.converter.base import ( - check_if_should_quote_column_name, - check_snowflake_case_sensitive, - ) - from metadata.profiler.orm.converter.common import CommonMapTypes - - return sqlalchemy.Column( - name=str(col.name.__root__), - type_=converter_registry.get(table_service_type, CommonMapTypes)().map_types( - col, table_service_type - ), - primary_key=not bool(idx), # The first col seen is used as PK - quote=check_if_should_quote_column_name(table_service_type) - or check_snowflake_case_sensitive(table_service_type, col.name.__root__), - key=str( - col.name.__root__ - ).lower(), # Add lowercase column name as key for snowflake case sensitive columns - ) diff --git a/ingestion/src/metadata/profiler/orm/converter/registry.py b/ingestion/src/metadata/profiler/orm/converter/registry.py deleted file mode 100644 index 35e774d09a6..00000000000 --- a/ingestion/src/metadata/profiler/orm/converter/registry.py +++ /dev/null @@ -1,14 +0,0 @@ -# Copyright 2021 Collate -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Registry logic to map an Converter base based on dialect -""" diff --git a/ingestion/src/metadata/profiler/orm/functions/sum.py b/ingestion/src/metadata/profiler/orm/functions/sum.py index fb1af8f7d60..1a7cfad49ff 100644 --- a/ingestion/src/metadata/profiler/orm/functions/sum.py +++ b/ingestion/src/metadata/profiler/orm/functions/sum.py @@ -10,11 +10,7 @@ # limitations under the License. """ -Define Random Number function - -Returns a column with random values -between 0 and 100 to help us draw sample -data. +Define Sum function """ from sqlalchemy.ext.compiler import compiles diff --git a/ingestion/src/metadata/profiler/orm/functions/unique_count.py b/ingestion/src/metadata/profiler/orm/functions/unique_count.py new file mode 100644 index 00000000000..9acb809df81 --- /dev/null +++ b/ingestion/src/metadata/profiler/orm/functions/unique_count.py @@ -0,0 +1,57 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Unique Count Metric functions +""" + +from collections import defaultdict + +from sqlalchemy import NVARCHAR, TEXT, func, literal_column + +from metadata.profiler.orm.converter.mssql.converter import cast_dict +from metadata.profiler.orm.functions.count import CountFn +from metadata.profiler.orm.registry import Dialects +from metadata.profiler.orm.types.custom_image import CustomImage + + +def _unique_count_query(col, session, sample): + return ( + session.query(func.count(col)) + .select_from(sample) + .group_by(col) + .having(func.count(col) == 1) + ) + + +def _unique_count_query_mssql(col, session, sample): + # The ntext, text, and image data types will be removed in a future version of SQL Server. + # Avoid using these data types in new development work, and plan to modify applications that currently use them. + # Use nvarchar(max), varchar(max), and varbinary(max) instead. + # ref:https://learn.microsoft.com/en-us/sql/t-sql/data-types/ntext-text-and-image-transact-sql?view=sql-server-ver16 + is_mssql_deprecated_datatype = isinstance(col.type, (CustomImage, TEXT, NVARCHAR)) + + if is_mssql_deprecated_datatype: + count_fn = CountFn(col) + group_by_col = func.convert(literal_column(cast_dict.get(type(col.type))), col) + else: + count_fn = col + group_by_col = col + return ( + session.query(func.count(count_fn)) + .select_from(sample) + .group_by(group_by_col) + .having(func.count(count_fn) == 1) + ) + + +_unique_count_query_mapper = defaultdict(lambda: _unique_count_query) +_unique_count_query_mapper[Dialects.MSSQL] = _unique_count_query_mssql