mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-10 08:21:21 +00:00
fix(ingest/profiling): compute sample row count correctly (#10319)
This commit is contained in:
parent
4e2cec86b3
commit
77f1a0c60e
@ -689,9 +689,28 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
|
|||||||
logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries")
|
logger.debug(f"profiling {self.dataset_name}: flushing stage 1 queries")
|
||||||
self.query_combiner.flush()
|
self.query_combiner.flush()
|
||||||
|
|
||||||
|
assert profile.rowCount is not None
|
||||||
|
full_row_count = profile.rowCount
|
||||||
|
|
||||||
if self.config.use_sampling and not self.config.limit:
|
if self.config.use_sampling and not self.config.limit:
|
||||||
self.update_dataset_batch_use_sampling(profile)
|
self.update_dataset_batch_use_sampling(profile)
|
||||||
|
|
||||||
|
# Note that this row count may be different from the full_row_count if we are using sampling.
|
||||||
|
row_count: int = profile.rowCount
|
||||||
|
if profile.partitionSpec and "SAMPLE" in profile.partitionSpec.partition:
|
||||||
|
# Querying exact row count of sample using `_get_dataset_rows`.
|
||||||
|
# We are not using `self.config.sample_size` directly as the actual row count
|
||||||
|
# in the sample may be different than configured `sample_size`. For BigQuery,
|
||||||
|
# we've even seen 160k rows returned for a sample size of 10k.
|
||||||
|
logger.debug("Recomputing row count for the sample")
|
||||||
|
|
||||||
|
# Note that we can't just call `self._get_dataset_rows(profile)` here because
|
||||||
|
# there's some sort of caching happening that will return the full table row count
|
||||||
|
# instead of the sample row count.
|
||||||
|
row_count = self.dataset.get_row_count(str(self.dataset._table))
|
||||||
|
|
||||||
|
profile.partitionSpec.partition += f" (sample rows {row_count})"
|
||||||
|
|
||||||
columns_profiling_queue: List[_SingleColumnSpec] = []
|
columns_profiling_queue: List[_SingleColumnSpec] = []
|
||||||
if columns_to_profile:
|
if columns_to_profile:
|
||||||
for column in all_columns:
|
for column in all_columns:
|
||||||
@ -708,16 +727,6 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
|
|||||||
logger.debug(f"profiling {self.dataset_name}: flushing stage 2 queries")
|
logger.debug(f"profiling {self.dataset_name}: flushing stage 2 queries")
|
||||||
self.query_combiner.flush()
|
self.query_combiner.flush()
|
||||||
|
|
||||||
assert profile.rowCount is not None
|
|
||||||
row_count: int # used for null counts calculation
|
|
||||||
if profile.partitionSpec and "SAMPLE" in profile.partitionSpec.partition:
|
|
||||||
# Querying exact row count of sample using `_get_dataset_rows`.
|
|
||||||
# We are not using `self.config.sample_size` directly as actual row count
|
|
||||||
# in sample may be slightly different (more or less) than configured `sample_size`.
|
|
||||||
self._get_dataset_rows(profile)
|
|
||||||
|
|
||||||
row_count = profile.rowCount
|
|
||||||
|
|
||||||
for column_spec in columns_profiling_queue:
|
for column_spec in columns_profiling_queue:
|
||||||
column = column_spec.column
|
column = column_spec.column
|
||||||
column_profile = column_spec.column_profile
|
column_profile = column_spec.column_profile
|
||||||
@ -825,6 +834,10 @@ class _SingleDatasetProfiler(BasicDatasetProfilerBase):
|
|||||||
|
|
||||||
logger.debug(f"profiling {self.dataset_name}: flushing stage 3 queries")
|
logger.debug(f"profiling {self.dataset_name}: flushing stage 3 queries")
|
||||||
self.query_combiner.flush()
|
self.query_combiner.flush()
|
||||||
|
|
||||||
|
# Reset the row count to the original value.
|
||||||
|
profile.rowCount = full_row_count
|
||||||
|
|
||||||
return profile
|
return profile
|
||||||
|
|
||||||
def init_profile(self):
|
def init_profile(self):
|
||||||
@ -1274,6 +1287,7 @@ def create_bigquery_temp_table(
|
|||||||
try:
|
try:
|
||||||
cursor: "BigQueryCursor" = cast("BigQueryCursor", raw_connection.cursor())
|
cursor: "BigQueryCursor" = cast("BigQueryCursor", raw_connection.cursor())
|
||||||
try:
|
try:
|
||||||
|
logger.debug(f"Creating temporary table for {table_pretty_name}: {bq_sql}")
|
||||||
cursor.execute(bq_sql)
|
cursor.execute(bq_sql)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if not instance.config.catch_exceptions:
|
if not instance.config.catch_exceptions:
|
||||||
|
|||||||
@ -159,7 +159,7 @@ class GenericProfiler:
|
|||||||
rows_count=table.rows_count,
|
rows_count=table.rows_count,
|
||||||
):
|
):
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Dataset {dataset_name} was not eliagable for profiling due to last_altered, size in bytes or count of rows limit"
|
f"Dataset {dataset_name} was not eligible for profiling due to last_altered, size in bytes or count of rows limit"
|
||||||
)
|
)
|
||||||
# Profile only table level if dataset is filtered from profiling
|
# Profile only table level if dataset is filtered from profiling
|
||||||
# due to size limits alone
|
# due to size limits alone
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user