fix: compute unique and count at the same time (#15996)

This commit is contained in:
Teddy 2024-04-23 12:38:49 +02:00 committed by GitHub
parent e996f15a7d
commit a4b856956d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 30 additions and 19 deletions

View File

@ -16,6 +16,7 @@ Validator for column values to be unique test case
from typing import Optional from typing import Optional
from sqlalchemy import Column, inspect from sqlalchemy import Column, inspect
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm.util import AliasedClass from sqlalchemy.orm.util import AliasedClass
from metadata.data_quality.validations.column.base.columnValuesToBeUnique import ( from metadata.data_quality.validations.column.base.columnValuesToBeUnique import (
@ -50,13 +51,8 @@ class ColumnValuesToBeUniqueValidator(
metric: metric metric: metric
column: column column: column
""" """
return self.run_query_results(self.runner, metric, column) count = Metrics.COUNT.value(column).fn()
unique_count = Metrics.UNIQUE_COUNT.value(column).query(
def _get_unique_count(self, metric: Metrics, column: Column) -> Optional[int]:
"""Get unique count of values"""
unique_count = dict(
self.runner.select_all_from_query(
metric.value(column).query(
sample=self.runner._sample # pylint: disable=protected-access sample=self.runner._sample # pylint: disable=protected-access
if isinstance( if isinstance(
self.runner._sample, # pylint: disable=protected-access self.runner._sample, # pylint: disable=protected-access
@ -65,9 +61,24 @@ class ColumnValuesToBeUniqueValidator(
else self.runner.table, else self.runner.table,
session=self.runner._session, # pylint: disable=protected-access session=self.runner._session, # pylint: disable=protected-access
) # type: ignore ) # type: ignore
)[
0 try:
] # query result is a list of tuples self.value = dict(self.runner.dispatch_query_select_first(count, unique_count.subquery("uniqueCount"))) # type: ignore
res = self.value.get(Metrics.COUNT.name)
except Exception as exc:
raise SQLAlchemyError(exc)
if res is None:
raise ValueError(
f"\nQuery on table/column {column.name if column is not None else ''} returned None. Your table might be empty. "
"If you confirmed your table is not empty and are still seeing this message you can:\n"
"\t1. check the documentation: https://docs.open-metadata.org/v1.3.x/connectors/ingestion/workflows/data-quality/tests\n"
"\t2. reach out to the Collate team for support"
) )
return unique_count.get(metric.name) return res
def _get_unique_count(self, metric: Metrics, column: Column) -> Optional[int]:
"""Get unique count of values"""
return self.value.get(metric.name)

View File

@ -61,8 +61,8 @@ class UniqueCount(QueryMetric):
unique_count_query = _unique_count_query_mapper[session.bind.dialect.name]( unique_count_query = _unique_count_query_mapper[session.bind.dialect.name](
col, session, sample col, session, sample
) )
only_once_cte = unique_count_query.cte("only_once") only_once_sub = unique_count_query.subquery("only_once")
return session.query(func.count().label(self.name())).select_from(only_once_cte) return session.query(func.count().label(self.name())).select_from(only_once_sub)
def df_fn(self, dfs=None): def df_fn(self, dfs=None):
""" """