diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeUnique.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeUnique.py index 583590928d9..4f5080d59eb 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeUnique.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeUnique.py @@ -16,6 +16,7 @@ Validator for column values to be unique test case from typing import Optional from sqlalchemy import Column, inspect +from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm.util import AliasedClass from metadata.data_quality.validations.column.base.columnValuesToBeUnique import ( @@ -50,24 +51,34 @@ class ColumnValuesToBeUniqueValidator( metric: metric column: column """ - return self.run_query_results(self.runner, metric, column) + count = Metrics.COUNT.value(column).fn() + unique_count = Metrics.UNIQUE_COUNT.value(column).query( + sample=self.runner._sample # pylint: disable=protected-access + if isinstance( + self.runner._sample, # pylint: disable=protected-access + AliasedClass, + ) + else self.runner.table, + session=self.runner._session, # pylint: disable=protected-access + ) # type: ignore + + try: + self.value = dict(self.runner.dispatch_query_select_first(count, unique_count.subquery("uniqueCount"))) # type: ignore + res = self.value.get(Metrics.COUNT.name) + except Exception as exc: + raise SQLAlchemyError(exc) + + if res is None: + raise ValueError( + f"\nQuery on table/column {column.name if column is not None else ''} returned None. Your table might be empty. " + "If you confirmed your table is not empty and are still seeing this message you can:\n" + "\t1. check the documentation: https://docs.open-metadata.org/v1.3.x/connectors/ingestion/workflows/data-quality/tests\n" + "\t2. reach out to the Collate team for support" + ) + + return res def _get_unique_count(self, metric: Metrics, column: Column) -> Optional[int]: """Get unique count of values""" - unique_count = dict( - self.runner.select_all_from_query( - metric.value(column).query( - sample=self.runner._sample # pylint: disable=protected-access - if isinstance( - self.runner._sample, # pylint: disable=protected-access - AliasedClass, - ) - else self.runner.table, - session=self.runner._session, # pylint: disable=protected-access - ) # type: ignore - )[ - 0 - ] # query result is a list of tuples - ) - return unique_count.get(metric.name) + return self.value.get(metric.name) diff --git a/ingestion/src/metadata/profiler/metrics/static/unique_count.py b/ingestion/src/metadata/profiler/metrics/static/unique_count.py index 00853b1b25f..ade1e743f27 100644 --- a/ingestion/src/metadata/profiler/metrics/static/unique_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/unique_count.py @@ -61,8 +61,8 @@ class UniqueCount(QueryMetric): unique_count_query = _unique_count_query_mapper[session.bind.dialect.name]( col, session, sample ) - only_once_cte = unique_count_query.cte("only_once") - return session.query(func.count().label(self.name())).select_from(only_once_cte) + only_once_sub = unique_count_query.subquery("only_once") + return session.query(func.count().label(self.name())).select_from(only_once_sub) def df_fn(self, dfs=None): """