diff --git a/.pylintrc b/.pylintrc index 51126c4e6e6..99d9253880b 100644 --- a/.pylintrc +++ b/.pylintrc @@ -22,6 +22,7 @@ extension-pkg-allow-list=pydantic [MESSAGES CONTROL] disable=no-name-in-module +enable=useless-suppression [FORMAT] # We all have big monitors now diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index a5f2cbc088b..f34daa589bb 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -82,6 +82,7 @@ class ProfilerWorkflow: metadata: OpenMetadata def __init__(self, config: OpenMetadataWorkflowConfig): + self.profiler_obj = None # defined in `create_profiler_obj()`` self.config = config self.metadata_config: OpenMetadataConnection = ( self.config.workflowConfig.openMetadataServerConfig @@ -211,24 +212,29 @@ class ProfilerWorkflow: Args: entity: table entity """ + if ( + not hasattr(entity, "serviceType") + or entity.serviceType != DatabaseServiceType.BigQuery + ): + return None + entity_config: TableConfig = self.get_config_for_entity(entity) if entity_config: return entity_config.partitionConfig if hasattr(entity, "tablePartition") and entity.tablePartition: - try: - if entity.tablePartition.intervalType == IntervalType.TIME_UNIT: - return TablePartitionConfig( - partitionField=entity.tablePartition.columns[0] - ) - elif entity.tablePartition.intervalType == IntervalType.INGESTION_TIME: - if entity.tablePartition.interval == "DAY": - return TablePartitionConfig(partitionField="_PARTITIONDATE") - return TablePartitionConfig(partitionField="_PARTITIONTIME") - except Exception: - raise TypeError( - f"Unsupported partition type {entity.tablePartition.intervalType}. Skipping table" + if entity.tablePartition.intervalType == IntervalType.TIME_UNIT: + return TablePartitionConfig( + partitionField=entity.tablePartition.columns[0] ) + if entity.tablePartition.intervalType == IntervalType.INGESTION_TIME: + if entity.tablePartition.interval == "DAY": + return TablePartitionConfig(partitionField="_PARTITIONDATE") + return TablePartitionConfig(partitionField="_PARTITIONTIME") + raise TypeError( + f"Unsupported partition type {entity.tablePartition.intervalType}. Skipping table" + ) + return None def create_profiler_interface( self, @@ -288,8 +294,7 @@ class ProfilerWorkflow: database.name.__root__, "Database pattern not allowed" ) return None - else: - return database + return database def filter_entities(self, tables: List[Table]) -> Iterable[Table]: """ @@ -400,7 +405,8 @@ class ProfilerWorkflow: if not databases: raise ValueError( "databaseFilterPattern returned 0 result. At least 1 database must be returned by the filter pattern." - f"\n\t- includes: {self.source_config.databaseFilterPattern.includes}\n\t- excludes: {self.source_config.databaseFilterPattern.excludes}" + f"\n\t- includes: {self.source_config.databaseFilterPattern.includes}\n\t" + f"- excludes: {self.source_config.databaseFilterPattern.excludes}" ) for database in databases: @@ -457,8 +463,7 @@ class ProfilerWorkflow: or (hasattr(self, "sink") and self.sink.get_status().failures) ): return 1 - else: - return 0 + return 0 def raise_from_status(self, raise_warnings=False): """ @@ -522,4 +527,4 @@ class ProfilerWorkflow: @staticmethod def _is_sample_source(service_type): - return service_type == "sample-data" or service_type == "sample-usage" + return service_type in {"sample-data", "sample-usage"} diff --git a/ingestion/src/metadata/orm_profiler/metrics/core.py b/ingestion/src/metadata/orm_profiler/metrics/core.py index 2274cb698c7..c9a7f9201d9 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/core.py +++ b/ingestion/src/metadata/orm_profiler/metrics/core.py @@ -13,6 +13,8 @@ Metric Core definitions """ +# pylint: disable=invalid-name + from abc import ABC, abstractmethod from enum import Enum from functools import wraps diff --git a/ingestion/src/metadata/orm_profiler/metrics/static/mean.py b/ingestion/src/metadata/orm_profiler/metrics/static/mean.py index f92791cdc88..ddbd8fc2c04 100644 --- a/ingestion/src/metadata/orm_profiler/metrics/static/mean.py +++ b/ingestion/src/metadata/orm_profiler/metrics/static/mean.py @@ -29,7 +29,7 @@ from metadata.utils.logger import profiler_logger logger = profiler_logger() - +# pylint: disable=invalid-name class avg(GenericFunction): name = "avg" inherit_cache = CACHE @@ -39,7 +39,7 @@ class avg(GenericFunction): def _(element, compiler, **kw): """Handle case for empty table. If empty, clickhouse returns NaN""" proc = compiler.process(element.clauses, **kw) - return "if(isNaN(avg(%s)), null, avg(%s))" % ((proc,) * 2) + return f"if(isNaN(avg({proc})), null, avg({proc}))" class Mean(StaticMetric): diff --git a/ingestion/src/metadata/orm_profiler/orm/converter.py b/ingestion/src/metadata/orm_profiler/orm/converter.py index c028f2e79dc..73a1f9076c9 100644 --- a/ingestion/src/metadata/orm_profiler/orm/converter.py +++ b/ingestion/src/metadata/orm_profiler/orm/converter.py @@ -82,6 +82,7 @@ def map_types(col: Column, table_service_type): table_service_type == databaseService.DatabaseServiceType.Snowflake and col.dataType == DataType.JSON ): + # pylint: disable=import-outside-toplevel from snowflake.sqlalchemy import VARIANT return VARIANT diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/median.py b/ingestion/src/metadata/orm_profiler/orm/functions/median.py index 661e1c8c9e1..87381289af1 100644 --- a/ingestion/src/metadata/orm_profiler/orm/functions/median.py +++ b/ingestion/src/metadata/orm_profiler/orm/functions/median.py @@ -29,7 +29,7 @@ class MedianFn(FunctionElement): @compiles(MedianFn) -def _(elements, compiler, **kwargs): +def _(elements, compiler, **kwargs): # pylint: disable=unused-argument col = elements.clauses.clauses[0].name return "percentile_cont(0.5) WITHIN GROUP (ORDER BY %s ASC)" % col @@ -69,7 +69,7 @@ def _(elements, compiler, **kwargs): @compiles(MedianFn, Dialects.MySQL) -def _(elemenst, compiler, **kwargs): +def _(elemenst, compiler, **kwargs): # pylint: disable=unused-argument """Median computation for MySQL currently not supported Needs to be tackled in https://github.com/open-metadata/OpenMetadata/issues/6340 """ @@ -77,8 +77,8 @@ def _(elemenst, compiler, **kwargs): @compiles(MedianFn, Dialects.SQLite) -def _(elements, compiler, **kwargs): - col, table = [element for element in elements.clauses] +def _(elements, compiler, **kwargs): # pylint: disable=unused-argument + col, table = list(elements.clauses) return """ (SELECT AVG({col}) diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py b/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py index 8ae8911d42a..d48fd246576 100644 --- a/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py +++ b/ingestion/src/metadata/orm_profiler/orm/functions/modulo.py @@ -13,7 +13,7 @@ Define Modulo function """ # Keep SQA docs style defining custom constructs -# pylint: disable=consider-using-f-string,duplicate-code +# pylint: disable=duplicate-code from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql.functions import FunctionElement diff --git a/ingestion/src/metadata/orm_profiler/orm/functions/sum.py b/ingestion/src/metadata/orm_profiler/orm/functions/sum.py index fbaee8468d1..5803dabaa79 100644 --- a/ingestion/src/metadata/orm_profiler/orm/functions/sum.py +++ b/ingestion/src/metadata/orm_profiler/orm/functions/sum.py @@ -33,11 +33,11 @@ class SumFn(GenericFunction): def _(element, compiler, **kw): """Handle case for empty table. If empty, clickhouse returns NaN""" proc = compiler.process(element.clauses, **kw) - return "SUM(%s)" % proc + return f"SUM({proc})" @compiles(SumFn, Dialects.BigQuery) def _(element, compiler, **kw): """Handle case where column type is INTEGER but SUM returns a NUMBER""" proc = compiler.process(element.clauses, **kw) - return "SUM(CAST(%s AS NUMERIC))" % proc + return f"SUM(CAST({proc} AS NUMERIC))" diff --git a/ingestion/src/metadata/orm_profiler/orm/types/bytea_to_string.py b/ingestion/src/metadata/orm_profiler/orm/types/bytea_to_string.py index a95f52a05be..45c00ef5e74 100644 --- a/ingestion/src/metadata/orm_profiler/orm/types/bytea_to_string.py +++ b/ingestion/src/metadata/orm_profiler/orm/types/bytea_to_string.py @@ -12,7 +12,7 @@ """ Expand sqlalchemy types to map them to OpenMetadata DataType """ -# pylint: disable=duplicate-code +# pylint: disable=duplicate-code,abstract-method from typing import Optional diff --git a/ingestion/src/metadata/orm_profiler/orm/types/hex_byte_string.py b/ingestion/src/metadata/orm_profiler/orm/types/hex_byte_string.py index 71ed75a8a03..26d33dd1428 100644 --- a/ingestion/src/metadata/orm_profiler/orm/types/hex_byte_string.py +++ b/ingestion/src/metadata/orm_profiler/orm/types/hex_byte_string.py @@ -12,7 +12,7 @@ """ Expand sqlalchemy types to map them to OpenMetadata DataType """ -# pylint: disable=duplicate-code +# pylint: disable=duplicate-code,abstract-method from typing import Optional diff --git a/ingestion/src/metadata/orm_profiler/orm/types/uuid.py b/ingestion/src/metadata/orm_profiler/orm/types/uuid.py index 99a15c51f9a..061a58813ac 100644 --- a/ingestion/src/metadata/orm_profiler/orm/types/uuid.py +++ b/ingestion/src/metadata/orm_profiler/orm/types/uuid.py @@ -12,7 +12,7 @@ """ Expand sqlalchemy types to map them to OpenMetadata DataType """ -# pylint: disable=duplicate-code +# pylint: disable=duplicate-code,abstract-method import traceback from uuid import UUID diff --git a/ingestion/src/metadata/orm_profiler/profiler/core.py b/ingestion/src/metadata/orm_profiler/profiler/core.py index 847ad7d1175..1ce661050f3 100644 --- a/ingestion/src/metadata/orm_profiler/profiler/core.py +++ b/ingestion/src/metadata/orm_profiler/profiler/core.py @@ -38,7 +38,6 @@ from metadata.orm_profiler.api.models import ProfilerResponse from metadata.orm_profiler.metrics.core import ( ComposedMetric, CustomMetric, - Metric, MetricTypes, QueryMetric, StaticMetric, @@ -69,8 +68,6 @@ class Profiler(Generic[TMetric]): - A tuple of metrics, from which we will construct queries. """ - # pylint: disable=too-many-instance-attributes,too-many-public-methods - def __init__( self, *metrics: Type[TMetric], diff --git a/ingestion/src/metadata/orm_profiler/profiler/runner.py b/ingestion/src/metadata/orm_profiler/profiler/runner.py index 227465dc4ec..b8dbeca2ab0 100644 --- a/ingestion/src/metadata/orm_profiler/profiler/runner.py +++ b/ingestion/src/metadata/orm_profiler/profiler/runner.py @@ -87,8 +87,7 @@ class QueryRunner: """dispatch query to sample or all table""" if isinstance(self._sample, AliasedClass): return self.select_first_from_sample(*entities, **kwargs) - else: - return self.select_first_from_table(*entities, **kwargs) + return self.select_first_from_table(*entities, **kwargs) @staticmethod def select_first_from_query(query: Query): diff --git a/ingestion/src/metadata/test_suite/api/workflow.py b/ingestion/src/metadata/test_suite/api/workflow.py index bb8d2c16264..29eb2ba8baa 100644 --- a/ingestion/src/metadata/test_suite/api/workflow.py +++ b/ingestion/src/metadata/test_suite/api/workflow.py @@ -216,29 +216,24 @@ class TestSuiteWorkflow: Args: entity: table entity """ - # Should remove this with https://github.com/open-metadata/OpenMetadata/issues/5458 - if entity.serviceType != DatabaseServiceType.BigQuery: + if ( + not hasattr(entity, "serviceType") + or entity.serviceType != DatabaseServiceType.BigQuery + ): return None - if entity.tablePartition: - if entity.tablePartition.intervalType in { - IntervalType.TIME_UNIT, - IntervalType.INGESTION_TIME, - }: - try: - partition_field = entity.tablePartition.columns[0] - except Exception: - raise TypeError( - "Unsupported ingestion based partition type. Skipping table" - ) + if hasattr(entity, "tablePartition") and entity.tablePartition: + if entity.tablePartition.intervalType == IntervalType.TIME_UNIT: return TablePartitionConfig( - partitionField=partition_field, + partitionField=entity.tablePartition.columns[0] ) - + if entity.tablePartition.intervalType == IntervalType.INGESTION_TIME: + if entity.tablePartition.interval == "DAY": + return TablePartitionConfig(partitionField="_PARTITIONDATE") + return TablePartitionConfig(partitionField="_PARTITIONTIME") raise TypeError( f"Unsupported partition type {entity.tablePartition.intervalType}. Skipping table" ) - return None def _create_sqa_tests_runner_interface(self, table_fqn: str): @@ -436,8 +431,8 @@ class TestSuiteWorkflow: ) except TypeError as exc: logger.debug(traceback.format_exc()) - logger.warning(f"Could not run test case {test_case.name}: {exc}") - self.status.failure(test_case.fullyQualifiedName.__root__) + logger.warning(f"Could not run test case for table {table_fqn}: {exc}") + self.status.failure(table_fqn) def print_status(self) -> None: """ diff --git a/ingestion/tests/unit/profiler/test_profiler_partitions.py b/ingestion/tests/unit/profiler/test_profiler_partitions.py index eb3d1ca6478..273c56c384c 100644 --- a/ingestion/tests/unit/profiler/test_profiler_partitions.py +++ b/ingestion/tests/unit/profiler/test_profiler_partitions.py @@ -18,6 +18,9 @@ from pydantic import BaseModel from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import IntervalType, TablePartition +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseServiceType, +) from metadata.generated.schema.type.entityReference import EntityReference from metadata.orm_profiler.api.workflow import ProfilerWorkflow @@ -77,6 +80,7 @@ MOCK_DATABASE = Database( class MockTable(BaseModel): tablePartition: Optional[TablePartition] + serviceType = DatabaseServiceType.BigQuery class Config: arbitrary_types_allowed = True