MINOR: fix table/schema name (#18899)

* fix: fix table/schema name

* fix: minor profiler and dbt issues

* style: ran python linting

---------

Co-authored-by: Imri Paran <imri.paran@gmail.com>
This commit is contained in:
Teddy 2024-12-04 10:12:38 +01:00 committed by GitHub
parent 613fd331e0
commit a615f49ab2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 35 additions and 22 deletions

View File

@ -13,6 +13,7 @@ from metadata.profiler.metrics.system.system import (
SQASessionProvider,
SystemMetricsComputer,
)
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.logger import profiler_logger
from metadata.utils.time_utils import datetime_to_timestamp
@ -25,11 +26,11 @@ class BigQuerySystemMetricsSource(
"""BigQuery system metrics source class"""
def get_kwargs(self, **kwargs):
table = kwargs.get("table")
runner: QueryRunner = kwargs.get("runner")
return {
"table": table.__table__.name,
"dataset_id": table.__table_args__["schema"],
"project_id": super().get_session().get_bind().url.host,
"table": runner.table_name,
"database": runner.session.get_bind().url.database,
"schema": runner.schema_name,
"usage_location": kwargs.get("usage_location"),
}

View File

@ -18,6 +18,7 @@ from metadata.profiler.metrics.system.system import (
SQASessionProvider,
SystemMetricsComputer,
)
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.logger import profiler_logger
from metadata.utils.profiler_utils import QueryResult
from metadata.utils.time_utils import datetime_to_timestamp
@ -45,11 +46,11 @@ class RedshiftSystemMetricsSource(
return get_metric_result(queries, table)
def get_kwargs(self, **kwargs):
table = kwargs.get("table")
runner: QueryRunner = kwargs.get("runner")
return {
"table": table.__table__.name,
"database": self.get_session().get_bind().url.database,
"schema": table.__table__.schema,
"table": runner.table_name,
"database": runner.session.get_bind().url.database,
"schema": runner.schema_name,
}
def get_deletes(self, **kwargs) -> List[SystemProfile]:

View File

@ -20,6 +20,7 @@ from metadata.profiler.metrics.system.system import (
SQASessionProvider,
SystemMetricsComputer,
)
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.collections import CaseInsensitiveString
from metadata.utils.logger import profiler_logger
from metadata.utils.lru_cache import LRU_CACHE_SIZE, LRUCache
@ -294,11 +295,11 @@ class SnowflakeSystemMetricsSource(
)
def get_kwargs(self, **kwargs):
table = kwargs.get("table")
runner: QueryRunner = kwargs.get("runner")
return {
"table": table.__table__.name,
"database": self.get_session().get_bind().url.database,
"schema": table.__table__.schema,
"table": runner.table_name,
"database": runner.session.get_bind().url.database,
"schema": runner.schema_name,
}
def get_inserts(self, **kwargs) -> List[SystemProfile]:

View File

@ -372,7 +372,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
dictionnary of results
"""
logger.debug(f"Computing system metrics for {runner.table_name}")
return self.system_metrics_computer.get_system_metrics(table=runner.dataset)
return self.system_metrics_computer.get_system_metrics(runner=runner)
def _create_thread_safe_runner(self, session, column=None):
"""Create thread safe runner"""

View File

@ -91,8 +91,8 @@ class AbstractTableMetricComputer(ABC):
table (DeclarativeMeta): _description_
"""
try:
self._schema_name = self.table.schema
self._table_name = self.table.name
self._schema_name = self.runner.schema_name
self._table_name = self.runner.table_name
except AttributeError:
raise AttributeError(ERROR_MSG)
@ -119,10 +119,10 @@ class AbstractTableMetricComputer(ABC):
Returns:
Tuple[str, int]
"""
col_names = literal(",".join(inspect(self.table).c.keys()), type_=String).label(
COLUMN_NAMES
)
col_count = literal(len(inspect(self.table).c)).label(COLUMN_COUNT)
col_names = literal(
",".join(inspect(self.runner.raw_dataset).c.keys()), type_=String
).label(COLUMN_NAMES)
col_count = literal(len(inspect(self.runner.raw_dataset).c)).label(COLUMN_COUNT)
return col_names, col_count
def _build_query(

View File

@ -83,6 +83,11 @@ class QueryRunner:
"""Table name attribute access"""
return self.raw_dataset.__table__.schema
@property
def session(self):
"""Table name attribute access"""
return self._session
def _build_query(self, *entities, **kwargs) -> Query:
"""Build query object

View File

@ -223,8 +223,11 @@ class SQASampler(SamplerInterface, SQAInterfaceMixin):
if not is_safe_sql_query(self.sample_query):
raise RuntimeError(f"SQL expression is not safe\n\n{self.sample_query}")
return self.client.query(self.raw_dataset).from_statement(
text(f"{self.sample_query}")
stmt = text(f"{self.sample_query}")
stmt = stmt.columns(*list(inspect(self.raw_dataset).c))
return self.client.query(stmt.subquery()).cte(
f"{self.raw_dataset.__tablename__}_user_sampled"
)
def _partitioned_table(self) -> Query:

View File

@ -98,7 +98,7 @@ $$section
**This parameter is effective for Snowflake only**
The sampling method type can be set to **BERNOULLI** or **SYSTEM**. You can find the difference of two values in the document of the Snowflake. When you choice **BERNOULLI**, it will scan full rows in the table even though small value is set at the **Profile Sample**. However, it has less restlictions than **SYSTEM**.
The sampling method type can be set to **BERNOULLI** or **SYSTEM**. Only database supporting these two sampling methods will take this setting into account. When you choose **BERNOULLI**, it will scan full rows in the table even though small value is set at the **Profile Sample**. However, it has less restrictions than **SYSTEM**. For more information you can reference the service documentation.
If no option is chosen, the default is **BERNOULLI**.
$$

View File

@ -465,7 +465,9 @@ export const ADVANCED_PROPERTIES = [
'computeTableMetrics',
'computeColumnMetrics',
'includeViews',
'useStatistics',
'confidence',
'samplingMethodType',
'sampleDataCount',
'threadCount',
'timeoutSeconds',