fix(ingest/profiling): only apply monkeypatches once when profiling (#8160)

This commit is contained in:
Harshal Sheth 2023-06-08 00:29:17 -07:00 committed by GitHub
parent 45e592b7c6
commit 2e1d31b6ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 58 deletions

View File

@ -780,7 +780,18 @@ class DatahubGEProfiler:
platform: Optional[str] = None,
profiler_args: Optional[Dict] = None,
) -> Iterable[Tuple[GEProfilerRequest, Optional[DatasetProfileClass]]]:
with PerfTimer() as timer, concurrent.futures.ThreadPoolExecutor(
max_workers = min(max_workers, len(requests))
logger.info(
f"Will profile {len(requests)} table(s) with {max_workers} worker(s) - this may take a while"
)
with PerfTimer() as timer, unittest.mock.patch(
"great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset.get_column_unique_count",
get_column_unique_count_patch,
), unittest.mock.patch(
"great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset._get_column_quantiles_bigquery",
_get_column_quantiles_bigquery_patch,
), concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers
) as async_executor, SQLAlchemyQueryCombiner(
enabled=self.config.query_combiner_enabled,
@ -788,70 +799,58 @@ class DatahubGEProfiler:
is_single_row_query_method=_is_single_row_query_method,
serial_execution_fallback_enabled=True,
).activate() as query_combiner:
max_workers = min(max_workers, len(requests))
logger.info(
f"Will profile {len(requests)} table(s) with {max_workers} worker(s) - this may take a while"
# Submit the profiling requests to the thread pool executor.
async_profiles = collections.deque(
async_executor.submit(
self._generate_profile_from_request,
query_combiner,
request,
platform=platform,
profiler_args=profiler_args,
)
for request in requests
)
with unittest.mock.patch(
"great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset.get_column_unique_count",
get_column_unique_count_patch,
):
with unittest.mock.patch(
"great_expectations.dataset.sqlalchemy_dataset.SqlAlchemyDataset._get_column_quantiles_bigquery",
_get_column_quantiles_bigquery_patch,
):
async_profiles = collections.deque(
async_executor.submit(
self._generate_profile_from_request,
query_combiner,
request,
platform=platform,
profiler_args=profiler_args,
)
for request in requests
)
# Avoid using as_completed so that the results are yielded in the
# same order as the requests.
# for async_profile in concurrent.futures.as_completed(async_profiles):
while len(async_profiles) > 0:
async_profile = async_profiles.popleft()
yield async_profile.result()
# Avoid using as_completed so that the results are yielded in the
# same order as the requests.
# for async_profile in concurrent.futures.as_completed(async_profiles):
while len(async_profiles) > 0:
async_profile = async_profiles.popleft()
yield async_profile.result()
total_time_taken = timer.elapsed_seconds()
total_time_taken = timer.elapsed_seconds()
logger.info(
f"Profiling {len(requests)} table(s) finished in {total_time_taken:.3f} seconds"
)
logger.info(
f"Profiling {len(requests)} table(s) finished in {total_time_taken:.3f} seconds"
)
time_percentiles: Dict[str, float] = {}
time_percentiles: Dict[str, float] = {}
if len(self.times_taken) > 0:
percentiles = [50, 75, 95, 99]
percentile_values = stats.calculate_percentiles(
self.times_taken, percentiles
)
if len(self.times_taken) > 0:
percentiles = [50, 75, 95, 99]
percentile_values = stats.calculate_percentiles(
self.times_taken, percentiles
)
time_percentiles = {
f"table_time_taken_p{percentile}": stats.discretize(
percentile_values[percentile]
)
for percentile in percentiles
}
time_percentiles = {
f"table_time_taken_p{percentile}": stats.discretize(
percentile_values[percentile]
)
for percentile in percentiles
}
telemetry.telemetry_instance.ping(
"sql_profiling_summary",
# bucket by taking floor of log of time taken
{
"total_time_taken": stats.discretize(total_time_taken),
"count": stats.discretize(len(self.times_taken)),
"total_row_count": stats.discretize(self.total_row_count),
"platform": self.platform,
**time_percentiles,
},
)
telemetry.telemetry_instance.ping(
"sql_profiling_summary",
# bucket by taking floor of log of time taken
{
"total_time_taken": stats.discretize(total_time_taken),
"count": stats.discretize(len(self.times_taken)),
"total_row_count": stats.discretize(self.total_row_count),
"platform": self.platform,
**time_percentiles,
},
)
self.report.report_from_query_combiner(query_combiner.report)
self.report.report_from_query_combiner(query_combiner.report)
def _generate_profile_from_request(
self,

View File

@ -191,7 +191,7 @@ class PlatformDetail(ConfigModel):
platform_instance: Optional[str] = pydantic.Field(
default=None,
description="DataHub platform instance name. To generate correct urn for upstream dataset, this should match "
"with platform instance name used in ingestion"
"with platform instance name used in ingestion "
"recipe of other datahub sources.",
)
env: str = pydantic.Field(