diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index 7a146ee365..681a67f9ee 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -733,6 +733,9 @@ class DatahubGEProfiler: connection.execute( f"drop view if exists `{ge_config.get('bigquery_temp_table')}`" ) + logger.debug( + f"Temp table {ge_config.get('bigquery_temp_table')} was dropped." + ) except Exception: logger.warning( f"Unable to delete bigquery temporary table: {ge_config.get('bigquery_temp_table')}" @@ -745,10 +748,11 @@ class DatahubGEProfiler: schema: str = None, table: str = None, partition: Optional[str] = None, - custom_sql: str = None, + custom_sql: Optional[str] = None, **kwargs: Any, ) -> Optional[DatasetProfileClass]: bigquery_temp_table: Optional[str] = None + ge_config = { "schema": schema, "table": table, @@ -757,17 +761,14 @@ class DatahubGEProfiler: **kwargs, } - if custom_sql: - if self.config.bigquery_temp_table_schema: - bigquery_temp_table = ( - f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}" - ) + if self.config.bigquery_temp_table_schema is not None: + bigquery_temp_table = ( + f"{self.config.bigquery_temp_table_schema}.ge-temp-{uuid.uuid4()}" + ) + ge_config["bigquery_temp_table"] = bigquery_temp_table - ge_config = { - "query": custom_sql, - "bigquery_temp_table": bigquery_temp_table, - **kwargs, - } + if custom_sql is not None: + ge_config["query"] = custom_sql with self._ge_context() as ge_context, PerfTimer() as timer: try: @@ -792,16 +793,15 @@ class DatahubGEProfiler: f"Finished profiling {pretty_name}; took {(timer.elapsed_seconds()):.3f} seconds" ) - self._drop_bigquery_temp_table(ge_config) - return profile except Exception as e: if not self.config.catch_exceptions: raise e logger.exception(f"Encountered exception while profiling {pretty_name}") self.report.report_failure(pretty_name, f"Profiling exception {e}") - self._drop_bigquery_temp_table(ge_config) return None + finally: + self._drop_bigquery_temp_table(ge_config) def _get_ge_dataset( self,