Fixes 13688: Trino Profiler Bugs (#13853)

This commit is contained in:
Ayush Shah 2023-11-06 17:14:04 +05:30 committed by GitHub
parent c7834e74cc
commit 29b7f3aa33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 70 additions and 8 deletions

View File

@ -20,6 +20,7 @@ from sqlalchemy import exc, inspect, sql, util
from sqlalchemy.engine.base import Connection
from sqlalchemy.sql import sqltypes
from trino.sqlalchemy import datatype, error
from trino.sqlalchemy.datatype import JSON
from trino.sqlalchemy.dialect import TrinoDialect
from metadata.generated.schema.entity.data.database import Database
@ -32,6 +33,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.ingestion.source.database.common_db_source import CommonDbSourceService
from metadata.ingestion.source.database.trino.queries import TRINO_TABLE_COMMENTS
from metadata.utils import fqn
@ -176,6 +178,10 @@ class TrinoSource(CommonDbSourceService):
Trino does not support querying by table type: Getting views is not supported.
"""
ColumnTypeParser._COLUMN_TYPE_MAPPING[ # pylint: disable=protected-access
JSON
] = "JSON"
@classmethod
def create(cls, config_dict, metadata: OpenMetadata):
config = WorkflowSource.parse_obj(config_dict)

View File

@ -24,6 +24,7 @@ from metadata.profiler.interface.sqlalchemy.profiler_interface import (
handle_query_exception,
)
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.orm.registry import FLOAT_SET
from metadata.profiler.processor.runner import QueryRunner
from metadata.utils.logger import profiler_interface_registry_logger
@ -58,15 +59,17 @@ class TrinoProfilerInterface(SQAProfilerInterface):
if not metrics:
return None
try:
runner_kwargs = {}
if column.type in FLOAT_SET:
runner_kwargs = {
"query_filter_": {"filters": [(func.is_nan(column), "eq", False)]}
}
row = runner.select_first_from_sample(
*[metric(column).fn() for metric in metrics],
query_filter_={
"filters": [(func.is_nan(column), "eq", "False")],
},
*[metric(column).fn() for metric in metrics], **runner_kwargs
)
except ProgrammingError:
except ProgrammingError as err:
logger.info(
f"Skipping window metrics for {runner.table.__tablename__}.{column.name} due to overflow"
f"Skipping window metrics for {runner.table.__tablename__}.{column.name} due to {err}"
)
return None

View File

@ -22,6 +22,7 @@ from sqlalchemy.sql.functions import GenericFunction
from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
from metadata.profiler.orm.functions.length import LenFn
from metadata.profiler.orm.registry import (
FLOAT_SET,
Dialects,
is_concatenable,
is_date_time,
@ -43,6 +44,15 @@ def _(element, compiler, **kw):
@compiles(MaxFn, Dialects.Trino)
def _(element, compiler, **kw):
col = compiler.process(element.clauses, **kw)
first_clause = element.clauses.clauses[0]
# Check if the first clause is an instance of LenFn and its type is not in FLOAT_SET
# or if the type of the first clause is date time
if (
isinstance(first_clause, LenFn)
and type(first_clause.clauses.clauses[0].type) not in FLOAT_SET
) or is_date_time(first_clause.type):
# If the condition is true, return the maximum value of the column
return f"MAX({col})"
return f"IF(is_nan(MAX({col})), NULL, MAX({col}))"

View File

@ -23,7 +23,13 @@ from sqlalchemy.sql.functions import GenericFunction
from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
from metadata.profiler.orm.functions.length import LenFn
from metadata.profiler.orm.registry import Dialects, is_concatenable, is_quantifiable
from metadata.profiler.orm.registry import (
FLOAT_SET,
Dialects,
is_concatenable,
is_date_time,
is_quantifiable,
)
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
@ -55,6 +61,15 @@ def _(element, compiler, **kw):
@compiles(avg, Dialects.Trino)
def _(element, compiler, **kw):
proc = compiler.process(element.clauses, **kw)
first_clause = element.clauses.clauses[0]
# Check if the first clause is an instance of LenFn and its type is not in FLOAT_SET
# or if the type of the first clause is date time
if (
isinstance(first_clause, LenFn)
and type(first_clause.clauses.clauses[0].type) not in FLOAT_SET
) or is_date_time(first_clause.type):
# If the condition is true, return the mean value of the column
return f"avg({proc})"
return f"IF(is_nan(avg({proc})), NULL, avg({proc}))"

View File

@ -21,6 +21,7 @@ from sqlalchemy.sql.functions import GenericFunction
from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
from metadata.profiler.orm.functions.length import LenFn
from metadata.profiler.orm.registry import (
FLOAT_SET,
Dialects,
is_concatenable,
is_date_time,
@ -42,6 +43,15 @@ def _(element, compiler, **kw):
@compiles(MinFn, Dialects.Trino)
def _(element, compiler, **kw):
col = compiler.process(element.clauses, **kw)
first_clause = element.clauses.clauses[0]
# Check if the first clause is an instance of LenFn and its type is not in FLOAT_SET
# or if the type of the first clause is date time
if (
isinstance(first_clause, LenFn)
and type(first_clause.clauses.clauses[0].type) not in FLOAT_SET
) or is_date_time(first_clause.type):
# If the condition is true, return the minimum value of the column
return f"MIN({col})"
return f"IF(is_nan(MIN({col})), NULL, MIN({col}))"

View File

@ -23,7 +23,13 @@ from sqlalchemy.sql.functions import FunctionElement
from metadata.profiler.metrics.core import CACHE, StaticMetric, _label
from metadata.profiler.orm.functions.length import LenFn
from metadata.profiler.orm.registry import Dialects, is_concatenable, is_quantifiable
from metadata.profiler.orm.registry import (
FLOAT_SET,
Dialects,
is_concatenable,
is_date_time,
is_quantifiable,
)
from metadata.utils.logger import profiler_logger
logger = profiler_logger()
@ -58,6 +64,15 @@ def _(element, compiler, **kw):
@compiles(StdDevFn, Dialects.Trino)
def _(element, compiler, **kw):
proc = compiler.process(element.clauses, **kw)
first_clause = element.clauses.clauses[0]
# Check if the first clause is an instance of LenFn and its type is not in FLOAT_SET
# or if the type of the first clause is date time
if (
isinstance(first_clause, LenFn)
and type(first_clause.clauses.clauses[0].type) not in FLOAT_SET
) or is_date_time(first_clause.type):
# If the condition is true, return the stddev value of the column
return f"STDDEV_POP({proc})"
return f"IF(is_nan(STDDEV_POP({proc})), NULL, STDDEV_POP({proc}))"

View File

@ -13,6 +13,7 @@ Helper module to handle data sampling
for the profiler
"""
from sqlalchemy import inspect, or_, text
from trino.sqlalchemy.dialect import TrinoDialect
from metadata.profiler.orm.registry import FLOAT_SET
from metadata.profiler.processor.handle_partition import RANDOM_LABEL
@ -25,6 +26,8 @@ class TrinoSampler(SQASampler):
run the query in the whole table.
"""
TrinoDialect._json_deserializer = None # pylint: disable=protected-access
def _base_sample_query(self, label=None):
sqa_columns = [col for col in inspect(self.table).c if col.name != RANDOM_LABEL]
return self.client.query(self.table, label).where(