mirror of
https://github.com/datahub-project/datahub.git
synced 2025-09-27 01:55:17 +00:00
fix(ingest): bigquery - profiling works with limit and offset using temp tables (#4161)
This commit is contained in:
parent
33de579c66
commit
c7ea6f0db2
@ -733,6 +733,9 @@ class DatahubGEProfiler:
|
|||||||
connection.execute(
|
connection.execute(
|
||||||
f"drop view if exists `{ge_config.get('bigquery_temp_table')}`"
|
f"drop view if exists `{ge_config.get('bigquery_temp_table')}`"
|
||||||
)
|
)
|
||||||
|
logger.debug(
|
||||||
|
f"Temp table {ge_config.get('bigquery_temp_table')} was dropped."
|
||||||
|
)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"Unable to delete bigquery temporary table: {ge_config.get('bigquery_temp_table')}"
|
f"Unable to delete bigquery temporary table: {ge_config.get('bigquery_temp_table')}"
|
||||||
@ -745,10 +748,11 @@ class DatahubGEProfiler:
|
|||||||
schema: str = None,
|
schema: str = None,
|
||||||
table: str = None,
|
table: str = None,
|
||||||
partition: Optional[str] = None,
|
partition: Optional[str] = None,
|
||||||
custom_sql: str = None,
|
custom_sql: Optional[str] = None,
|
||||||
**kwargs: Any,
|
**kwargs: Any,
|
||||||
) -> Optional[DatasetProfileClass]:
|
) -> Optional[DatasetProfileClass]:
|
||||||
bigquery_temp_table: Optional[str] = None
|
bigquery_temp_table: Optional[str] = None
|
||||||
|
|
||||||
ge_config = {
|
ge_config = {
|
||||||
"schema": schema,
|
"schema": schema,
|
||||||
"table": table,
|
"table": table,
|
||||||
@ -757,17 +761,14 @@ class DatahubGEProfiler:
|
|||||||
**kwargs,
|
**kwargs,
|
||||||
}
|
}
|
||||||
|
|
||||||
if custom_sql:
|
if self.config.bigquery_temp_table_schema is not None:
|
||||||
if self.config.bigquery_temp_table_schema:
|
|
||||||
bigquery_temp_table = (
|
bigquery_temp_table = (
|
||||||
f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
|
f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}"
|
||||||
)
|
)
|
||||||
|
ge_config["bigquery_temp_table"] = bigquery_temp_table
|
||||||
|
|
||||||
ge_config = {
|
if custom_sql is not None:
|
||||||
"query": custom_sql,
|
ge_config["query"] = custom_sql
|
||||||
"bigquery_temp_table": bigquery_temp_table,
|
|
||||||
**kwargs,
|
|
||||||
}
|
|
||||||
|
|
||||||
with self._ge_context() as ge_context, PerfTimer() as timer:
|
with self._ge_context() as ge_context, PerfTimer() as timer:
|
||||||
try:
|
try:
|
||||||
@ -792,16 +793,15 @@ class DatahubGEProfiler:
|
|||||||
f"Finished profiling {pretty_name}; took {(timer.elapsed_seconds()):.3f} seconds"
|
f"Finished profiling {pretty_name}; took {(timer.elapsed_seconds()):.3f} seconds"
|
||||||
)
|
)
|
||||||
|
|
||||||
self._drop_bigquery_temp_table(ge_config)
|
|
||||||
|
|
||||||
return profile
|
return profile
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if not self.config.catch_exceptions:
|
if not self.config.catch_exceptions:
|
||||||
raise e
|
raise e
|
||||||
logger.exception(f"Encountered exception while profiling {pretty_name}")
|
logger.exception(f"Encountered exception while profiling {pretty_name}")
|
||||||
self.report.report_failure(pretty_name, f"Profiling exception {e}")
|
self.report.report_failure(pretty_name, f"Profiling exception {e}")
|
||||||
self._drop_bigquery_temp_table(ge_config)
|
|
||||||
return None
|
return None
|
||||||
|
finally:
|
||||||
|
self._drop_bigquery_temp_table(ge_config)
|
||||||
|
|
||||||
def _get_ge_dataset(
|
def _get_ge_dataset(
|
||||||
self,
|
self,
|
||||||
|
Loading…
x
Reference in New Issue
Block a user