mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-06-27 04:22:05 +00:00
Fixed linting for Profiler (#7922)
* - Fixed linting - Added logic to skip partition check when engine is not BQ - Added ingestion partition logic to testSuite * Fixed python formating * Fixed test for BQ partition
This commit is contained in:
parent
22ac150d7b
commit
3b7f576d04
@ -22,6 +22,7 @@ extension-pkg-allow-list=pydantic
|
||||
|
||||
[MESSAGES CONTROL]
|
||||
disable=no-name-in-module
|
||||
enable=useless-suppression
|
||||
|
||||
[FORMAT]
|
||||
# We all have big monitors now
|
||||
|
@ -82,6 +82,7 @@ class ProfilerWorkflow:
|
||||
metadata: OpenMetadata
|
||||
|
||||
def __init__(self, config: OpenMetadataWorkflowConfig):
|
||||
self.profiler_obj = None # defined in `create_profiler_obj()``
|
||||
self.config = config
|
||||
self.metadata_config: OpenMetadataConnection = (
|
||||
self.config.workflowConfig.openMetadataServerConfig
|
||||
@ -211,24 +212,29 @@ class ProfilerWorkflow:
|
||||
Args:
|
||||
entity: table entity
|
||||
"""
|
||||
if (
|
||||
not hasattr(entity, "serviceType")
|
||||
or entity.serviceType != DatabaseServiceType.BigQuery
|
||||
):
|
||||
return None
|
||||
|
||||
entity_config: TableConfig = self.get_config_for_entity(entity)
|
||||
if entity_config:
|
||||
return entity_config.partitionConfig
|
||||
|
||||
if hasattr(entity, "tablePartition") and entity.tablePartition:
|
||||
try:
|
||||
if entity.tablePartition.intervalType == IntervalType.TIME_UNIT:
|
||||
return TablePartitionConfig(
|
||||
partitionField=entity.tablePartition.columns[0]
|
||||
)
|
||||
elif entity.tablePartition.intervalType == IntervalType.INGESTION_TIME:
|
||||
if entity.tablePartition.interval == "DAY":
|
||||
return TablePartitionConfig(partitionField="_PARTITIONDATE")
|
||||
return TablePartitionConfig(partitionField="_PARTITIONTIME")
|
||||
except Exception:
|
||||
raise TypeError(
|
||||
f"Unsupported partition type {entity.tablePartition.intervalType}. Skipping table"
|
||||
if entity.tablePartition.intervalType == IntervalType.TIME_UNIT:
|
||||
return TablePartitionConfig(
|
||||
partitionField=entity.tablePartition.columns[0]
|
||||
)
|
||||
if entity.tablePartition.intervalType == IntervalType.INGESTION_TIME:
|
||||
if entity.tablePartition.interval == "DAY":
|
||||
return TablePartitionConfig(partitionField="_PARTITIONDATE")
|
||||
return TablePartitionConfig(partitionField="_PARTITIONTIME")
|
||||
raise TypeError(
|
||||
f"Unsupported partition type {entity.tablePartition.intervalType}. Skipping table"
|
||||
)
|
||||
return None
|
||||
|
||||
def create_profiler_interface(
|
||||
self,
|
||||
@ -288,8 +294,7 @@ class ProfilerWorkflow:
|
||||
database.name.__root__, "Database pattern not allowed"
|
||||
)
|
||||
return None
|
||||
else:
|
||||
return database
|
||||
return database
|
||||
|
||||
def filter_entities(self, tables: List[Table]) -> Iterable[Table]:
|
||||
"""
|
||||
@ -400,7 +405,8 @@ class ProfilerWorkflow:
|
||||
if not databases:
|
||||
raise ValueError(
|
||||
"databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern."
|
||||
f"\n\t- includes: {self.source_config.databaseFilterPattern.includes}\n\t- excludes: {self.source_config.databaseFilterPattern.excludes}"
|
||||
f"\n\t- includes: {self.source_config.databaseFilterPattern.includes}\n\t"
|
||||
f"- excludes: {self.source_config.databaseFilterPattern.excludes}"
|
||||
)
|
||||
|
||||
for database in databases:
|
||||
@ -457,8 +463,7 @@ class ProfilerWorkflow:
|
||||
or (hasattr(self, "sink") and self.sink.get_status().failures)
|
||||
):
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
return 0
|
||||
|
||||
def raise_from_status(self, raise_warnings=False):
|
||||
"""
|
||||
@ -522,4 +527,4 @@ class ProfilerWorkflow:
|
||||
|
||||
@staticmethod
|
||||
def _is_sample_source(service_type):
|
||||
return service_type == "sample-data" or service_type == "sample-usage"
|
||||
return service_type in {"sample-data", "sample-usage"}
|
||||
|
@ -13,6 +13,8 @@
|
||||
Metric Core definitions
|
||||
"""
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from functools import wraps
|
||||
|
@ -29,7 +29,7 @@ from metadata.utils.logger import profiler_logger
|
||||
|
||||
logger = profiler_logger()
|
||||
|
||||
|
||||
# pylint: disable=invalid-name
|
||||
class avg(GenericFunction):
|
||||
name = "avg"
|
||||
inherit_cache = CACHE
|
||||
@ -39,7 +39,7 @@ class avg(GenericFunction):
|
||||
def _(element, compiler, **kw):
|
||||
"""Handle case for empty table. If empty, clickhouse returns NaN"""
|
||||
proc = compiler.process(element.clauses, **kw)
|
||||
return "if(isNaN(avg(%s)), null, avg(%s))" % ((proc,) * 2)
|
||||
return f"if(isNaN(avg({proc})), null, avg({proc}))"
|
||||
|
||||
|
||||
class Mean(StaticMetric):
|
||||
|
@ -82,6 +82,7 @@ def map_types(col: Column, table_service_type):
|
||||
table_service_type == databaseService.DatabaseServiceType.Snowflake
|
||||
and col.dataType == DataType.JSON
|
||||
):
|
||||
# pylint: disable=import-outside-toplevel
|
||||
from snowflake.sqlalchemy import VARIANT
|
||||
|
||||
return VARIANT
|
||||
|
@ -29,7 +29,7 @@ class MedianFn(FunctionElement):
|
||||
|
||||
|
||||
@compiles(MedianFn)
|
||||
def _(elements, compiler, **kwargs):
|
||||
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
|
||||
col = elements.clauses.clauses[0].name
|
||||
return "percentile_cont(0.5) WITHIN GROUP (ORDER BY %s ASC)" % col
|
||||
|
||||
@ -69,7 +69,7 @@ def _(elements, compiler, **kwargs):
|
||||
|
||||
|
||||
@compiles(MedianFn, Dialects.MySQL)
|
||||
def _(elemenst, compiler, **kwargs):
|
||||
def _(elemenst, compiler, **kwargs): # pylint: disable=unused-argument
|
||||
"""Median computation for MySQL currently not supported
|
||||
Needs to be tackled in https://github.com/open-metadata/OpenMetadata/issues/6340
|
||||
"""
|
||||
@ -77,8 +77,8 @@ def _(elemenst, compiler, **kwargs):
|
||||
|
||||
|
||||
@compiles(MedianFn, Dialects.SQLite)
|
||||
def _(elements, compiler, **kwargs):
|
||||
col, table = [element for element in elements.clauses]
|
||||
def _(elements, compiler, **kwargs): # pylint: disable=unused-argument
|
||||
col, table = list(elements.clauses)
|
||||
return """
|
||||
(SELECT
|
||||
AVG({col})
|
||||
|
@ -13,7 +13,7 @@
|
||||
Define Modulo function
|
||||
"""
|
||||
# Keep SQA docs style defining custom constructs
|
||||
# pylint: disable=consider-using-f-string,duplicate-code
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy.ext.compiler import compiles
|
||||
from sqlalchemy.sql.functions import FunctionElement
|
||||
|
@ -33,11 +33,11 @@ class SumFn(GenericFunction):
|
||||
def _(element, compiler, **kw):
|
||||
"""Handle case for empty table. If empty, clickhouse returns NaN"""
|
||||
proc = compiler.process(element.clauses, **kw)
|
||||
return "SUM(%s)" % proc
|
||||
return f"SUM({proc})"
|
||||
|
||||
|
||||
@compiles(SumFn, Dialects.BigQuery)
|
||||
def _(element, compiler, **kw):
|
||||
"""Handle case where column type is INTEGER but SUM returns a NUMBER"""
|
||||
proc = compiler.process(element.clauses, **kw)
|
||||
return "SUM(CAST(%s AS NUMERIC))" % proc
|
||||
return f"SUM(CAST({proc} AS NUMERIC))"
|
||||
|
@ -12,7 +12,7 @@
|
||||
"""
|
||||
Expand sqlalchemy types to map them to OpenMetadata DataType
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
# pylint: disable=duplicate-code,abstract-method
|
||||
|
||||
from typing import Optional
|
||||
|
||||
|
@ -12,7 +12,7 @@
|
||||
"""
|
||||
Expand sqlalchemy types to map them to OpenMetadata DataType
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
# pylint: disable=duplicate-code,abstract-method
|
||||
|
||||
from typing import Optional
|
||||
|
||||
|
@ -12,7 +12,7 @@
|
||||
"""
|
||||
Expand sqlalchemy types to map them to OpenMetadata DataType
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
# pylint: disable=duplicate-code,abstract-method
|
||||
import traceback
|
||||
from uuid import UUID
|
||||
|
||||
|
@ -38,7 +38,6 @@ from metadata.orm_profiler.api.models import ProfilerResponse
|
||||
from metadata.orm_profiler.metrics.core import (
|
||||
ComposedMetric,
|
||||
CustomMetric,
|
||||
Metric,
|
||||
MetricTypes,
|
||||
QueryMetric,
|
||||
StaticMetric,
|
||||
@ -69,8 +68,6 @@ class Profiler(Generic[TMetric]):
|
||||
- A tuple of metrics, from which we will construct queries.
|
||||
"""
|
||||
|
||||
# pylint: disable=too-many-instance-attributes,too-many-public-methods
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*metrics: Type[TMetric],
|
||||
|
@ -87,8 +87,7 @@ class QueryRunner:
|
||||
"""dispatch query to sample or all table"""
|
||||
if isinstance(self._sample, AliasedClass):
|
||||
return self.select_first_from_sample(*entities, **kwargs)
|
||||
else:
|
||||
return self.select_first_from_table(*entities, **kwargs)
|
||||
return self.select_first_from_table(*entities, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def select_first_from_query(query: Query):
|
||||
|
@ -216,29 +216,24 @@ class TestSuiteWorkflow:
|
||||
Args:
|
||||
entity: table entity
|
||||
"""
|
||||
# Should remove this with https://github.com/open-metadata/OpenMetadata/issues/5458
|
||||
if entity.serviceType != DatabaseServiceType.BigQuery:
|
||||
if (
|
||||
not hasattr(entity, "serviceType")
|
||||
or entity.serviceType != DatabaseServiceType.BigQuery
|
||||
):
|
||||
return None
|
||||
if entity.tablePartition:
|
||||
if entity.tablePartition.intervalType in {
|
||||
IntervalType.TIME_UNIT,
|
||||
IntervalType.INGESTION_TIME,
|
||||
}:
|
||||
try:
|
||||
partition_field = entity.tablePartition.columns[0]
|
||||
except Exception:
|
||||
raise TypeError(
|
||||
"Unsupported ingestion based partition type. Skipping table"
|
||||
)
|
||||
|
||||
if hasattr(entity, "tablePartition") and entity.tablePartition:
|
||||
if entity.tablePartition.intervalType == IntervalType.TIME_UNIT:
|
||||
return TablePartitionConfig(
|
||||
partitionField=partition_field,
|
||||
partitionField=entity.tablePartition.columns[0]
|
||||
)
|
||||
|
||||
if entity.tablePartition.intervalType == IntervalType.INGESTION_TIME:
|
||||
if entity.tablePartition.interval == "DAY":
|
||||
return TablePartitionConfig(partitionField="_PARTITIONDATE")
|
||||
return TablePartitionConfig(partitionField="_PARTITIONTIME")
|
||||
raise TypeError(
|
||||
f"Unsupported partition type {entity.tablePartition.intervalType}. Skipping table"
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def _create_sqa_tests_runner_interface(self, table_fqn: str):
|
||||
@ -436,8 +431,8 @@ class TestSuiteWorkflow:
|
||||
)
|
||||
except TypeError as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Could not run test case {test_case.name}: {exc}")
|
||||
self.status.failure(test_case.fullyQualifiedName.__root__)
|
||||
logger.warning(f"Could not run test case for table {table_fqn}: {exc}")
|
||||
self.status.failure(table_fqn)
|
||||
|
||||
def print_status(self) -> None:
|
||||
"""
|
||||
|
@ -18,6 +18,9 @@ from pydantic import BaseModel
|
||||
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.table import IntervalType, TablePartition
|
||||
from metadata.generated.schema.entity.services.databaseService import (
|
||||
DatabaseServiceType,
|
||||
)
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.orm_profiler.api.workflow import ProfilerWorkflow
|
||||
|
||||
@ -77,6 +80,7 @@ MOCK_DATABASE = Database(
|
||||
|
||||
class MockTable(BaseModel):
|
||||
tablePartition: Optional[TablePartition]
|
||||
serviceType = DatabaseServiceType.BigQuery
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
Loading…
x
Reference in New Issue
Block a user