diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index c04f2f8e5a..b16287dcfc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -142,11 +142,17 @@ class GEProfilerRequest: batch_kwargs: dict -def get_column_unique_count_patch(self: SqlAlchemyDataset, column: str) -> int: +def get_column_unique_count_dh_patch(self: SqlAlchemyDataset, column: str) -> int: if self.engine.dialect.name.lower() == REDSHIFT: element_values = self.engine.execute( sa.select( - [sa.text(f'APPROXIMATE count(distinct "{column}")')] # type:ignore + [ + # We use coalesce here to force SQL Alchemy to see this + # as a column expression. + sa.func.coalesce( + sa.text(f'APPROXIMATE count(distinct "{column}")') + ), + ] ).select_from(self._table) ) return convert_to_json_serializable(element_values.fetchone()[0]) @@ -154,9 +160,7 @@ def get_column_unique_count_patch(self: SqlAlchemyDataset, column: str) -> int: element_values = self.engine.execute( sa.select( [ - sa.text( # type:ignore - f"APPROX_COUNT_DISTINCT(`{column}`)" - ) + sa.func.coalesce(sa.text(f"APPROX_COUNT_DISTINCT(`{column}`)")), ] ).select_from(self._table) ) @@ -233,9 +237,16 @@ def _is_single_row_query_method(query: Any) -> bool: "unexpected_count", ] + FIRST_PARTY_SINGLE_ROW_QUERY_METHODS = { + "get_column_unique_count_dh_patch", + } + # We'll do this the inefficient way since the arrays are pretty small. stack = traceback.extract_stack() for frame in reversed(stack): + if frame.name in FIRST_PARTY_SINGLE_ROW_QUERY_METHODS: + return True + if not any(frame.filename.endswith(file) for file in SINGLE_ROW_QUERY_FILES): continue @@ -1023,7 +1034,7 @@ class DatahubGEProfiler: with PerfTimer() as timer, unittest.mock.patch( "great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset.get_column_unique_count", - get_column_unique_count_patch, + get_column_unique_count_dh_patch, ), unittest.mock.patch( "great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset._get_column_quantiles_bigquery", _get_column_quantiles_bigquery_patch,