mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-26 09:35:23 +00:00
feat(ingest/profiling): allow unique count queries to be combined (#10322)
This commit is contained in:
parent
d3fb698d8d
commit
f99f73841a
@ -142,11 +142,17 @@ class GEProfilerRequest:
|
|||||||
batch_kwargs: dict
|
batch_kwargs: dict
|
||||||
|
|
||||||
|
|
||||||
def get_column_unique_count_patch(self: SqlAlchemyDataset, column: str) -> int:
|
def get_column_unique_count_dh_patch(self: SqlAlchemyDataset, column: str) -> int:
|
||||||
if self.engine.dialect.name.lower() == REDSHIFT:
|
if self.engine.dialect.name.lower() == REDSHIFT:
|
||||||
element_values = self.engine.execute(
|
element_values = self.engine.execute(
|
||||||
sa.select(
|
sa.select(
|
||||||
[sa.text(f'APPROXIMATE count(distinct "{column}")')] # type:ignore
|
[
|
||||||
|
# We use coalesce here to force SQL Alchemy to see this
|
||||||
|
# as a column expression.
|
||||||
|
sa.func.coalesce(
|
||||||
|
sa.text(f'APPROXIMATE count(distinct "{column}")')
|
||||||
|
),
|
||||||
|
]
|
||||||
).select_from(self._table)
|
).select_from(self._table)
|
||||||
)
|
)
|
||||||
return convert_to_json_serializable(element_values.fetchone()[0])
|
return convert_to_json_serializable(element_values.fetchone()[0])
|
||||||
@ -154,9 +160,7 @@ def get_column_unique_count_patch(self: SqlAlchemyDataset, column: str) -> int:
|
|||||||
element_values = self.engine.execute(
|
element_values = self.engine.execute(
|
||||||
sa.select(
|
sa.select(
|
||||||
[
|
[
|
||||||
sa.text( # type:ignore
|
sa.func.coalesce(sa.text(f"APPROX_COUNT_DISTINCT(`{column}`)")),
|
||||||
f"APPROX_COUNT_DISTINCT(`{column}`)"
|
|
||||||
)
|
|
||||||
]
|
]
|
||||||
).select_from(self._table)
|
).select_from(self._table)
|
||||||
)
|
)
|
||||||
@ -233,9 +237,16 @@ def _is_single_row_query_method(query: Any) -> bool:
|
|||||||
"unexpected_count",
|
"unexpected_count",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
FIRST_PARTY_SINGLE_ROW_QUERY_METHODS = {
|
||||||
|
"get_column_unique_count_dh_patch",
|
||||||
|
}
|
||||||
|
|
||||||
# We'll do this the inefficient way since the arrays are pretty small.
|
# We'll do this the inefficient way since the arrays are pretty small.
|
||||||
stack = traceback.extract_stack()
|
stack = traceback.extract_stack()
|
||||||
for frame in reversed(stack):
|
for frame in reversed(stack):
|
||||||
|
if frame.name in FIRST_PARTY_SINGLE_ROW_QUERY_METHODS:
|
||||||
|
return True
|
||||||
|
|
||||||
if not any(frame.filename.endswith(file) for file in SINGLE_ROW_QUERY_FILES):
|
if not any(frame.filename.endswith(file) for file in SINGLE_ROW_QUERY_FILES):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@ -1023,7 +1034,7 @@ class DatahubGEProfiler:
|
|||||||
|
|
||||||
with PerfTimer() as timer, unittest.mock.patch(
|
with PerfTimer() as timer, unittest.mock.patch(
|
||||||
"great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset.get_column_unique_count",
|
"great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset.get_column_unique_count",
|
||||||
get_column_unique_count_patch,
|
get_column_unique_count_dh_patch,
|
||||||
), unittest.mock.patch(
|
), unittest.mock.patch(
|
||||||
"great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset._get_column_quantiles_bigquery",
|
"great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset._get_column_quantiles_bigquery",
|
||||||
_get_column_quantiles_bigquery_patch,
|
_get_column_quantiles_bigquery_patch,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user