diff --git a/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py b/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py index 67baa22335d..76d8d640e30 100644 --- a/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py +++ b/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py @@ -104,6 +104,7 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteInterface): QueryRunner( session=self.session, dataset=self.dataset, + raw_dataset=self.sampler.raw_dataset, partition_details=self.table_partition_config, profile_sample_query=self.table_sample_query, ) diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueLengthsToBeBetween.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueLengthsToBeBetween.py index 2473436cd2b..33df13ff209 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueLengthsToBeBetween.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueLengthsToBeBetween.py @@ -40,7 +40,7 @@ class ColumnValueLengthsToBeBetweenValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column) -> Optional[int]: diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMaxToBeBetween.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMaxToBeBetween.py index 235ca42985e..13d860c35f6 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMaxToBeBetween.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMaxToBeBetween.py @@ -38,7 +38,7 @@ class ColumnValueMaxToBeBetweenValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column) -> Optional[int]: diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMeanToBeBetween.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMeanToBeBetween.py index 80aca69912f..5e45344b3ff 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMeanToBeBetween.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMeanToBeBetween.py @@ -39,7 +39,7 @@ class ColumnValueMeanToBeBetweenValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column) -> Optional[int]: diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMedianToBeBetween.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMedianToBeBetween.py index a4104213470..b473c4bda91 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMedianToBeBetween.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMedianToBeBetween.py @@ -39,7 +39,7 @@ class ColumnValueMedianToBeBetweenValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column) -> Optional[int]: diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMinToBeBetween.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMinToBeBetween.py index dd867dab6ec..512b3bee68e 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMinToBeBetween.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueMinToBeBetween.py @@ -39,7 +39,7 @@ class ColumnValueMinToBeBetweenValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column) -> Optional[int]: diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueStdDevToBeBetween.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueStdDevToBeBetween.py index 8be659d211e..7d08f3ab8fd 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueStdDevToBeBetween.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValueStdDevToBeBetween.py @@ -39,7 +39,7 @@ class ColumnValueStdDevToBeBetweenValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column) -> Optional[int]: diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesMissingCount.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesMissingCount.py index ebbd620dd61..a3b06e648b6 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesMissingCount.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesMissingCount.py @@ -42,7 +42,7 @@ class ColumnValuesMissingCountValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column, **kwargs) -> Optional[int]: diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesSumToBeBetween.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesSumToBeBetween.py index 16b7f939cb1..96ba1d14d8d 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesSumToBeBetween.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesSumToBeBetween.py @@ -39,7 +39,7 @@ class ColumnValuesSumToBeBetweenValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column) -> Optional[int]: diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeAtExpectedLocation.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeAtExpectedLocation.py index 138c5d0c2f8..a4e6c1ef9a3 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeAtExpectedLocation.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeAtExpectedLocation.py @@ -37,7 +37,7 @@ class ColumnValuesToBeAtExpectedLocationValidator( def _fetch_data(self, columns: List[str]) -> Iterator: """Fetch data from the runner object""" self.runner = cast(QueryRunner, self.runner) - inspection = inspect(self.runner.table) + inspection = inspect(self.runner.dataset) table_columns: List[Column] = inspection.c if inspection is not None else [] cols = [col for col in table_columns if col.name in columns] for col in cols: diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeBetween.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeBetween.py index af8fcc9b5fe..c1cb8ad1acc 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeBetween.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeBetween.py @@ -39,7 +39,7 @@ class ColumnValuesToBeBetweenValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column) -> Optional[int]: diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeInSet.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeInSet.py index 4bccac6445a..f920ae25305 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeInSet.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeInSet.py @@ -39,7 +39,7 @@ class ColumnValuesToBeInSetValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column, **kwargs) -> Optional[int]: diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeNotInSet.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeNotInSet.py index d50e98efa9b..012059bd72f 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeNotInSet.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeNotInSet.py @@ -39,7 +39,7 @@ class ColumnValuesToBeNotInSetValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column, **kwargs) -> Optional[int]: diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeNotNull.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeNotNull.py index da11812ad8c..e425cbcb89f 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeNotNull.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeNotNull.py @@ -42,7 +42,7 @@ class ColumnValuesToBeNotNullValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column) -> Optional[int]: diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeUnique.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeUnique.py index 89a93f09f90..daf1afef4bc 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeUnique.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToBeUnique.py @@ -17,7 +17,6 @@ from typing import Optional from sqlalchemy import Column, inspect from sqlalchemy.exc import SQLAlchemyError -from sqlalchemy.orm.util import AliasedClass from metadata.data_quality.validations.column.base.columnValuesToBeUnique import ( BaseColumnValuesToBeUniqueValidator, @@ -41,7 +40,7 @@ class ColumnValuesToBeUniqueValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column) -> Optional[int]: @@ -53,12 +52,7 @@ class ColumnValuesToBeUniqueValidator( """ count = Metrics.COUNT.value(column).fn() unique_count = Metrics.UNIQUE_COUNT.value(column).query( - sample=self.runner._sample # pylint: disable=protected-access - if isinstance( - self.runner._sample, # pylint: disable=protected-access - AliasedClass, - ) - else self.runner.table, + sample=self.runner.dataset, session=self.runner._session, # pylint: disable=protected-access ) # type: ignore diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToMatchRegex.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToMatchRegex.py index be28e57963b..0f1c0537303 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToMatchRegex.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToMatchRegex.py @@ -43,7 +43,7 @@ class ColumnValuesToMatchRegexValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results( diff --git a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToNotMatchRegex.py b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToNotMatchRegex.py index f5a8c2656dc..fda43496900 100644 --- a/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToNotMatchRegex.py +++ b/ingestion/src/metadata/data_quality/validations/column/sqlalchemy/columnValuesToNotMatchRegex.py @@ -43,7 +43,7 @@ class ColumnValuesToNotMatchRegexValidator( """ return self.get_column_name( self.test_case.entityLink.root, - inspect(self.runner.table).c, + inspect(self.runner.dataset).c, ) def _run_results(self, metric: Metrics, column: Column, **kwargs) -> Optional[int]: diff --git a/ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableRowInsertedCountToBeBetween.py b/ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableRowInsertedCountToBeBetween.py index 425f4960652..30894951c3f 100644 --- a/ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableRowInsertedCountToBeBetween.py +++ b/ingestion/src/metadata/data_quality/validations/table/sqlalchemy/tableRowInsertedCountToBeBetween.py @@ -13,7 +13,7 @@ Validator for table row inserted count to be between test case """ -from sqlalchemy import Column, text +from sqlalchemy import Column, inspect, text from metadata.data_quality.validations.mixins.sqa_validator_mixin import ( SQAValidatorMixin, @@ -52,7 +52,7 @@ class TableRowInsertedCountToBeBetweenValidator( date_or_datetime_fn = dispatch_to_date_or_datetime( range_interval, text(range_type), - get_partition_col_type(column_name.name, self.runner.table.c), # type: ignore + get_partition_col_type(column_name.name, inspect(self.runner.dataset).c), # type: ignore ) return dict( diff --git a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py b/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py index f85810bfd13..2cd0f225b31 100644 --- a/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py +++ b/ingestion/src/metadata/ingestion/source/database/bigquery/profiler/profiler.py @@ -22,7 +22,7 @@ class BigQueryProfiler(BigQueryProfilerInterface): **kwargs, ) -> List[SystemProfile]: return self.system_metrics_computer.get_system_metrics( - table=runner.table, + table=runner.dataset, usage_location=self.service_connection_config.usageLocation, ) diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/db2/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/db2/profiler_interface.py index 4c92aa1c8a0..55aeca6d1ef 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/db2/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/db2/profiler_interface.py @@ -32,7 +32,7 @@ class DB2ProfilerInterface(SQAProfilerInterface): # pylint: disable=protected-access if exc.orig and "overflow" in exc.orig._message: logger.info( - f"Computing metrics without sum for {runner.table.name}.{column.name}" + f"Computing metrics without sum for {runner.table_name}.{column.name}" ) return self._compute_static_metrics_wo_sum(metrics, runner, session, column) return None diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/mariadb/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/mariadb/profiler_interface.py index edca4361ef7..cb2f24f5014 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/mariadb/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/mariadb/profiler_interface.py @@ -77,11 +77,11 @@ class MariaDBProfilerInterface(SQAProfilerInterface): return dict(row) except ProgrammingError: logger.info( - f"Skipping window metrics for {runner.table.name}.{column.name} due to overflow" + f"Skipping window metrics for {runner.table_name}.{column.name} due to overflow" ) return None except Exception as exc: - msg = f"Error trying to compute profile for {runner.table.name}.{column.name}: {exc}" + msg = f"Error trying to compute profile for {runner.table_name}.{column.name}: {exc}" handle_query_exception(msg, exc, session) return None diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py index cfe76f34be9..571cd247a9d 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py @@ -156,7 +156,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): ) return dict(row) except Exception as exc: - msg = f"Error trying to compute profile for {runner.table.name}.{column.name}: {exc}" + msg = f"Error trying to compute profile for {runner.table_name}.{column.name}: {exc}" handle_query_exception(msg, exc, session) return None @@ -194,7 +194,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): except Exception as exc: logger.debug(traceback.format_exc()) logger.warning( - f"Error trying to compute profile for {runner.table.name}: {exc}" # type: ignore + f"Error trying to compute profile for {runner.table_name}: {exc}" # type: ignore ) session.rollback() raise RuntimeError(exc) @@ -231,7 +231,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): runner, column, exc, session, metrics ) except Exception as exc: - msg = f"Error trying to compute profile for {runner.table.name}.{column.name}: {exc}" + msg = f"Error trying to compute profile for {runner.table_name}.{column.name}: {exc}" handle_query_exception(msg, exc, session) return None @@ -274,10 +274,10 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): runner._session.get_bind().dialect.name != Dialects.Druid ): - msg = f"Error trying to compute profile for {runner.table.name}.{column.name}: {exc}" + msg = f"Error trying to compute profile for {runner.table_name}.{column.name}: {exc}" handle_query_exception(msg, exc, session) except Exception as exc: - msg = f"Error trying to compute profile for {runner.table.name}.{column.name}: {exc}" + msg = f"Error trying to compute profile for {runner.table_name}.{column.name}: {exc}" handle_query_exception(msg, exc, session) return None @@ -310,10 +310,10 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): return dict(row) except ProgrammingError as exc: logger.info( - f"Skipping metrics for {runner.table.name}.{column.name} due to {exc}" + f"Skipping metrics for {runner.table_name}.{column.name} due to {exc}" ) except Exception as exc: - msg = f"Error trying to compute profile for {runner.table.name}.{column.name}: {exc}" + msg = f"Error trying to compute profile for {runner.table_name}.{column.name}: {exc}" handle_query_exception(msg, exc, session) return None @@ -347,7 +347,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): ) except Exception as exc: - msg = f"Error trying to compute profile for {runner.table.name}.{metric.columnName}: {exc}" + msg = f"Error trying to compute profile for {runner.table_name}.{metric.columnName}: {exc}" logger.debug(traceback.format_exc()) logger.warning(msg) if custom_metrics: @@ -371,8 +371,8 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): Returns: dictionnary of results """ - logger.debug(f"Computing system metrics for {runner.table.name}") - return self.system_metrics_computer.get_system_metrics(table=runner.table) + logger.debug(f"Computing system metrics for {runner.table_name}") + return self.system_metrics_computer.get_system_metrics(table=runner.dataset) def _create_thread_safe_runner(self, session, column=None): """Create thread safe runner""" @@ -380,6 +380,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): thread_local.runner = QueryRunner( session=session, dataset=self.sampler.get_dataset(column=column), + raw_dataset=self.sampler.raw_dataset, partition_details=self.sampler.partition_details, profile_sample_query=self.sampler.sample_query, ) diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/single_store/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/single_store/profiler_interface.py index 7c032e651b6..26e4b317456 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/single_store/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/single_store/profiler_interface.py @@ -76,11 +76,11 @@ class SingleStoreProfilerInterface(SQAProfilerInterface): return dict(row) except ProgrammingError: logger.info( - f"Skipping window metrics for {runner.table.name}.{column.name} due to overflow" + f"Skipping window metrics for {runner.table_name}.{column.name} due to overflow" ) return None except Exception as exc: - msg = f"Error trying to compute profile for {runner.table.name}.{column.name}: {exc}" + msg = f"Error trying to compute profile for {runner.table_name}.{column.name}: {exc}" handle_query_exception(msg, exc, session) return None diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/snowflake/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/snowflake/profiler_interface.py index 464fbdd88a4..61df2a3c7a6 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/snowflake/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/snowflake/profiler_interface.py @@ -41,7 +41,7 @@ class SnowflakeProfilerInterface(SQAProfilerInterface): session.bind.dialect.name ): logger.info( - f"Computing metrics without sum for {runner.table.name}.{column.name}" + f"Computing metrics without sum for {runner.table_name}.{column.name}" ) return self._compute_static_metrics_wo_sum(metrics, runner, session, column) return None diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/stored_statistics_profiler.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/stored_statistics_profiler.py index c949098d974..c0f570eb0e3 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/stored_statistics_profiler.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/stored_statistics_profiler.py @@ -79,8 +79,8 @@ class ProfilerWithStatistics(SQAProfilerInterface, StoredStatisticsSource): list, partition(self.is_statistic_metric, metrics), ) - schema = runner.table.schema - table_name = runner.table.name + schema = runner.schema_name + table_name = runner.table_name logger.debug( "Getting statistics for column: %s.%s.%s", schema, @@ -118,8 +118,8 @@ class ProfilerWithStatistics(SQAProfilerInterface, StoredStatisticsSource): list, partition(self.is_statistic_metric, metrics), ) - schema = runner.table.schema - table_name = runner.table.name + schema = runner.schema_name + table_name = runner.table_name logger.debug("Geting statistics for table: %s.%s", schema, table_name) result.update( super().get_table_statistics(stat_metrics, schema, table_name) diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/trino/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/trino/profiler_interface.py index b7c387a748f..a45b1bf512b 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/trino/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/trino/profiler_interface.py @@ -76,11 +76,11 @@ class TrinoProfilerInterface(ProfilerWithStatistics, TrinoStoredStatisticsSource return dict(row) except ProgrammingError as err: logger.info( - f"Skipping window metrics for {runner.table.name}.{column.name} due to {err}" + f"Skipping window metrics for {runner.table_name}.{column.name} due to {err}" ) return None except Exception as exc: - msg = f"Error trying to compute profile for {runner.table.name}.{column.name}: {exc}" + msg = f"Error trying to compute profile for {runner.table_name}.{column.name}: {exc}" handle_query_exception(msg, exc, session) return None diff --git a/ingestion/src/metadata/profiler/orm/functions/table_metric_computer.py b/ingestion/src/metadata/profiler/orm/functions/table_metric_computer.py index 5f8a69cf290..6a6a0218822 100644 --- a/ingestion/src/metadata/profiler/orm/functions/table_metric_computer.py +++ b/ingestion/src/metadata/profiler/orm/functions/table_metric_computer.py @@ -53,7 +53,7 @@ class AbstractTableMetricComputer(ABC): self._metrics = metrics self._conn_config = conn_config self._database = self._runner._session.get_bind().url.database - self._table = self._runner.table + self._table = self._runner.dataset self._entity = entity @property diff --git a/ingestion/src/metadata/profiler/processor/runner.py b/ingestion/src/metadata/profiler/processor/runner.py index 3098a26083e..8f80108de62 100644 --- a/ingestion/src/metadata/profiler/processor/runner.py +++ b/ingestion/src/metadata/profiler/processor/runner.py @@ -44,6 +44,7 @@ class QueryRunner: self, session: Session, dataset: Union[DeclarativeMeta, AliasedClass], + raw_dataset: Table, partition_details: Optional[Dict] = None, profile_sample_query: Optional[str] = None, ): @@ -51,11 +52,12 @@ class QueryRunner: self._dataset = dataset self.partition_details = partition_details self.profile_sample_query = profile_sample_query + self.raw_dataset = raw_dataset @property def table(self) -> Table: """Backward compatibility table attribute access""" - return self._dataset.__table__ + return self.raw_dataset @property def _sample(self): @@ -71,6 +73,16 @@ class QueryRunner: def dataset(self, dataset): self._dataset = dataset + @property + def table_name(self): + """Table name attribute access""" + return self.raw_dataset.__table__.name + + @property + def schema_name(self): + """Table name attribute access""" + return self.raw_dataset.__table__.schema + def _build_query(self, *entities, **kwargs) -> Query: """Build query object diff --git a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py index 3cb7fd62944..0fa6a3d68bd 100644 --- a/ingestion/src/metadata/sampler/sqlalchemy/sampler.py +++ b/ingestion/src/metadata/sampler/sqlalchemy/sampler.py @@ -16,7 +16,7 @@ import traceback from typing import List, Optional, Union, cast from sqlalchemy import Column, inspect, text -from sqlalchemy.orm import DeclarativeMeta, Query, aliased +from sqlalchemy.orm import DeclarativeMeta, Query from sqlalchemy.orm.util import AliasedClass from sqlalchemy.schema import Table from sqlalchemy.sql.sqltypes import Enum @@ -145,13 +145,12 @@ class SQASampler(SamplerInterface, SQAInterfaceMixin): and self.sample_config.profile_sample_type == ProfileSampleType.PERCENTAGE ): if self.partition_details: - return self._partitioned_table() + partitioned = self._partitioned_table() + return partitioned.cte(f"{self.raw_dataset.__tablename__}_partitioned") return self.raw_dataset - sampled = self.get_sample_query(column=column) - - return aliased(self.raw_dataset, sampled) + return self.get_sample_query(column=column) def fetch_sample_data(self, columns: Optional[List[Column]] = None) -> TableData: """ @@ -230,7 +229,7 @@ class SQASampler(SamplerInterface, SQAInterfaceMixin): def _partitioned_table(self) -> Query: """Return the Query object for partitioned tables""" - return aliased(self.raw_dataset, self.get_partitioned_query().subquery()) + return self.get_partitioned_query() def get_partitioned_query(self, query=None) -> Query: """Return the partitioned query""" diff --git a/ingestion/tests/unit/profiler/sqlalchemy/test_runner.py b/ingestion/tests/unit/profiler/sqlalchemy/test_runner.py index 738d4985fe4..880cc6ed94f 100644 --- a/ingestion/tests/unit/profiler/sqlalchemy/test_runner.py +++ b/ingestion/tests/unit/profiler/sqlalchemy/test_runner.py @@ -94,7 +94,9 @@ class RunnerTest(TestCase): ) cls.dataset = sampler.get_dataset() - cls.raw_runner = QueryRunner(session=cls.session, dataset=cls.dataset) + cls.raw_runner = QueryRunner( + session=cls.session, dataset=cls.dataset, raw_dataset=sampler.raw_dataset + ) cls.timeout_runner: Timer = cls_timeout(1)(Timer()) # Insert 30 rows