Minor fix row computation for views (#15872)

* fix: row computation for views

* style: ran python linting

* fix: MySQL innodb row count

* style: ran python linting
This commit is contained in:
Teddy 2024-04-11 16:55:39 +02:00 committed by GitHub
parent 6ed319c2dc
commit 4c4b7c67a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 39 additions and 20 deletions

View File

@ -192,7 +192,6 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
row_dict = {}
try:
for metric in metrics:
metric_resp = metric(column).df_fn(runner)
row_dict[metric.name()] = (

View File

@ -189,6 +189,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
runner=runner,
metrics=metrics,
conn_config=self.service_connection_config,
entity=self.table_entity,
)
row = table_metric_computer.compute()
if row:

View File

@ -22,6 +22,8 @@ from sqlalchemy import Column, MetaData, Table, func, inspect, literal, select
from sqlalchemy.sql.expression import ColumnOperators, and_, cte
from sqlalchemy.types import String
from metadata.generated.schema.entity.data.table import Table as OMTable
from metadata.generated.schema.entity.data.table import TableType
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.orm.registry import Dialects
from metadata.profiler.processor.runner import QueryRunner
@ -31,7 +33,7 @@ logger = profiler_interface_registry_logger()
COLUMN_COUNT = "columnCount"
COLUMN_NAMES = "columnNames"
ROW_COUNT = "rowCount"
ROW_COUNT = Metrics.ROW_COUNT().name()
SIZE_IN_BYTES = "sizeInBytes"
CREATE_DATETIME = "createDateTime"
@ -43,13 +45,16 @@ ERROR_MSG = (
class AbstractTableMetricComputer(ABC):
"""Base table computer"""
def __init__(self, runner: QueryRunner, metrics: List[Metrics], conn_config):
def __init__(
self, runner: QueryRunner, metrics: List[Metrics], conn_config, entity: OMTable
):
"""Instantiate base table computer"""
self._runner = runner
self._metrics = metrics
self._conn_config = conn_config
self._database = self._runner._session.get_bind().url.database
self._table = self._runner.table
self._entity = entity
@property
def database(self):
@ -141,16 +146,6 @@ class AbstractTableMetricComputer(ABC):
class BaseTableMetricComputer(AbstractTableMetricComputer):
"""Base table computer"""
def _check_and_return(self, res):
"""Check if the result is None and return the result or fallback
Args:
res (object): result
"""
if res.rowCount is None:
return super().compute()
return res
def compute(self):
"""Default compute behavior for table metrics"""
return self.runner.select_first_from_table(
@ -236,7 +231,9 @@ class OracleTableMetricComputer(BaseTableMetricComputer):
)
res = self.runner._session.execute(query).first()
if res.rowCount is None:
if res.rowCount is None or (
res.rowCount == 0 and self._entity.tableType == TableType.View
):
# if we don't have any row count, fallback to the base logic
return super().compute()
return res
@ -263,7 +260,9 @@ class ClickHouseTableMetricComputer(BaseTableMetricComputer):
)
res = self.runner._session.execute(query).first()
if res.rowCount is None:
if res.rowCount is None or (
res.rowCount == 0 and self._entity.tableType == TableType.View
):
# if we don't have any row count, fallback to the base logic
return super().compute()
return res
@ -307,7 +306,9 @@ class BigQueryTableMetricComputer(BaseTableMetricComputer):
)
res = self.runner._session.execute(query).first()
if res.rowCount is None:
if res.rowCount is None or (
res.rowCount == 0 and self._entity.tableType == TableType.View
):
# if we don't have any row count, fallback to the base logic
return super().compute()
return res
@ -336,7 +337,9 @@ class BigQueryTableMetricComputer(BaseTableMetricComputer):
where_clause,
)
res = self.runner._session.execute(query).first()
if res.rowCount is None:
if res.rowCount is None or (
res.rowCount == 0 and self._entity.tableType == TableType.View
):
# if we don't have any row count, fallback to the base logic
return super().compute()
return res
@ -363,9 +366,16 @@ class MySQLTableMetricComputer(BaseTableMetricComputer):
)
res = self.runner._session.execute(query).first()
if res.rowCount is None:
if res.rowCount is None or (
res.rowCount == 0 and self._entity.tableType == TableType.View
):
# if we don't have any row count, fallback to the base logic
return super().compute()
res = res._asdict()
# innodb row count is an estimate we need to patch the row count with COUNT(*)
# https://dev.mysql.com/doc/refman/8.3/en/information-schema-innodb-tablestats-table.html
row_count = self.runner.select_first_from_table(Metrics.ROW_COUNT().fn())
res.update({ROW_COUNT: row_count.rowCount})
return res
@ -390,7 +400,9 @@ class RedshiftTableMetricComputer(BaseTableMetricComputer):
columns, self._build_table("svv_table_info", "pg_catalog"), where_clause
)
res = self.runner._session.execute(query).first()
if res.rowCount is None:
if res.rowCount is None or (
res.rowCount == 0 and self._entity.tableType == TableType.View
):
# if we don't have any row count, fallback to the base logic
return super().compute()
return res
@ -400,9 +412,15 @@ class TableMetricComputer:
"""Table Metric Construct"""
def __init__(
self, dialect: str, runner: QueryRunner, metrics: List[Metrics], conn_config
self,
dialect: str,
runner: QueryRunner,
metrics: List[Metrics],
conn_config,
entity: OMTable,
):
"""Instantiate table metric computer with a dialect computer"""
self._entity = entity
self._dialect = dialect
self._runner = runner
self._metrics = metrics
@ -413,6 +431,7 @@ class TableMetricComputer:
runner=self._runner,
metrics=self._metrics,
conn_config=self._conn_config,
entity=self._entity,
)
)