From 54fbe250a1cd0e7d29f7057c50d30df044376cb7 Mon Sep 17 00:00:00 2001 From: Teddy Date: Thu, 13 Jul 2023 13:35:37 +0200 Subject: [PATCH] fix: import error + BQ E2E CLI (#12420) --- .../sqlalchemy/sqa_test_suite_interface.py | 4 +- .../interface/pandas/profiler_interface.py | 4 +- .../interface/profiler_interface_factory.py | 9 +++ .../interface/sqlalchemy/bigquery/__init__.py | 0 .../sqlalchemy/bigquery/profiler_interface.py | 39 +++++++++++ .../sqlalchemy/profiler_interface.py | 28 +------- .../profiler/processor/sampler/__init__.py | 0 .../processor/sampler/sampler_factory.py | 13 +++- .../sampler/sqlalchemy/bigquery/__init__.py | 0 .../sampler/sqlalchemy/bigquery/sampler.py | 65 +++++++++++++++++++ .../processor/sampler/sqlalchemy/sampler.py | 21 +++--- .../pandas/test_sqa_profiler_interface.py | 2 + 12 files changed, 140 insertions(+), 45 deletions(-) create mode 100644 ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/__init__.py create mode 100644 ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/profiler_interface.py create mode 100644 ingestion/src/metadata/profiler/processor/sampler/__init__.py create mode 100644 ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/bigquery/__init__.py create mode 100644 ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/bigquery/sampler.py diff --git a/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py b/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py index f1e0f17c9d9..8da479a2dab 100644 --- a/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py +++ b/ingestion/src/metadata/data_quality/interface/sqlalchemy/sqa_test_suite_interface.py @@ -32,7 +32,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.connections import get_connection from metadata.mixins.sqalchemy.sqa_mixin import SQAInterfaceMixin from metadata.profiler.processor.runner import QueryRunner -from metadata.profiler.processor.sampler.sampler_factory import sampler_factory +from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_ from metadata.profiler.processor.sampler.sqlalchemy.sampler import SQASampler from metadata.utils.constants import TEN_MIN from metadata.utils.importer import import_test_case_class @@ -118,7 +118,7 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteInterface): def _create_sampler(self) -> SQASampler: """Create sampler instance""" - return sampler_factory.create( + return sampler_factory_.create( self.service_connection_config.__class__.__name__, client=self.session, table=self.table, diff --git a/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py b/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py index 45aab40c673..4c3395a2c52 100644 --- a/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/pandas/profiler_interface.py @@ -32,7 +32,7 @@ from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin from metadata.profiler.interface.profiler_interface import ProfilerInterface from metadata.profiler.metrics.core import MetricTypes from metadata.profiler.metrics.registry import Metrics -from metadata.profiler.processor.sampler.sampler_factory import sampler_factory +from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_ from metadata.utils.datalake.datalake_utils import fetch_dataframe from metadata.utils.dispatch import valuedispatch from metadata.utils.logger import profiler_interface_registry_logger @@ -104,7 +104,7 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin): def _get_sampler(self): """Get dataframe sampler from config""" - return sampler_factory.create( + return sampler_factory_.create( DatalakeConnection.__name__, client=self.client, table=self.dfs, diff --git a/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py b/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py index b315b948432..684e6be3643 100644 --- a/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py +++ b/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py @@ -15,6 +15,9 @@ Factory class for creating profiler interface objects from typing import cast +from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( + BigQueryConnection, +) from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( DatalakeConnection, ) @@ -23,6 +26,9 @@ from metadata.profiler.interface.pandas.profiler_interface import ( PandasProfilerInterface, ) from metadata.profiler.interface.profiler_interface import ProfilerInterface +from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import ( + BigQueryProfilerInterface, +) from metadata.profiler.interface.sqlalchemy.profiler_interface import ( SQAProfilerInterface, ) @@ -49,6 +55,9 @@ class ProfilerInterfaceFactory: profiler_interface_factory = ProfilerInterfaceFactory() profiler_interface_factory.register(DatabaseConnection.__name__, SQAProfilerInterface) +profiler_interface_factory.register( + BigQueryConnection.__name__, BigQueryProfilerInterface +) profiler_interface_factory.register( DatalakeConnection.__name__, PandasProfilerInterface ) diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/__init__.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/profiler_interface.py new file mode 100644 index 00000000000..756db25e50f --- /dev/null +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/bigquery/profiler_interface.py @@ -0,0 +1,39 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Interfaces with database for all database engine +supporting sqlalchemy abstraction layer +""" + +from metadata.profiler.interface.sqlalchemy.profiler_interface import ( + SQAProfilerInterface, +) +from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_ + + +class BigQueryProfilerInterface(SQAProfilerInterface): + """BigQuery profiler interface""" + + def _get_sampler(self, **kwargs): + """get sampler object""" + session = kwargs.get("session") + table = kwargs["table"] + + return sampler_factory_.create( + self.service_connection_config.__class__.__name__, + client=session or self.session, + table=table, + profile_sample_config=self.profile_sample_config, + partition_details=self.partition_details, + profile_sample_query=self.profile_query, + table_type=self.table_entity.tableType, + ) diff --git a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py index b46f55bcb08..99f26c96ea5 100644 --- a/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/sqlalchemy/profiler_interface.py @@ -19,7 +19,7 @@ import threading import traceback from collections import defaultdict from datetime import datetime, timezone -from typing import Dict, List, Optional, Union +from typing import Dict, List from sqlalchemy import Column from sqlalchemy.exc import ProgrammingError @@ -38,7 +38,7 @@ from metadata.profiler.orm.functions.table_metric_construct import ( table_metric_construct_factory, ) from metadata.profiler.processor.runner import QueryRunner -from metadata.profiler.processor.sampler.sampler_factory import sampler_factory +from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_ from metadata.utils.custom_thread_pool import CustomThreadPoolExecutor from metadata.utils.dispatch import valuedispatch from metadata.utils.logger import profiler_interface_registry_logger @@ -110,7 +110,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): session = kwargs.get("session") table = kwargs["table"] - return sampler_factory.create( + return sampler_factory_.create( self.service_connection_config.__class__.__name__, client=session or self.session, table=table, @@ -119,28 +119,6 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin): profile_sample_query=self.profile_query, ) - @staticmethod - def _is_array_column(column) -> Dict[str, Union[Optional[str], bool]]: - """check if column is an array column - - Args: - column: column to check - Returns: - True if column is an array column else False - """ - kwargs = {} - try: - kwargs["is_array"] = column._is_array # pylint: disable=protected-access - except AttributeError: - kwargs["is_array"] = False - - try: - kwargs["array_col"] = column._array_col # pylint: disable=protected-access - except AttributeError: - kwargs["array_col"] = None - - return kwargs - def _session_factory(self) -> scoped_session: """Create thread safe session that will be automatically garbage collected once the application thread ends diff --git a/ingestion/src/metadata/profiler/processor/sampler/__init__.py b/ingestion/src/metadata/profiler/processor/sampler/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/processor/sampler/sampler_factory.py b/ingestion/src/metadata/profiler/processor/sampler/sampler_factory.py index 886d84ec802..b0a9b0ebec0 100644 --- a/ingestion/src/metadata/profiler/processor/sampler/sampler_factory.py +++ b/ingestion/src/metadata/profiler/processor/sampler/sampler_factory.py @@ -15,11 +15,17 @@ Factory class for creating sampler objects from typing import Union +from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import ( + BigQueryConnection, +) from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( DatalakeConnection, ) from metadata.generated.schema.entity.services.databaseService import DatabaseConnection from metadata.profiler.processor.sampler.pandas.sampler import DatalakeSampler +from metadata.profiler.processor.sampler.sqlalchemy.bigquery.sampler import ( + BigQuerySampler, +) from metadata.profiler.processor.sampler.sqlalchemy.sampler import SQASampler @@ -44,6 +50,7 @@ class SamplerFactory: return sampler_class(*args, **kwargs) -sampler_factory = SamplerFactory() -sampler_factory.register(DatabaseConnection.__name__, SQASampler) -sampler_factory.register(DatalakeConnection.__name__, DatalakeSampler) +sampler_factory_ = SamplerFactory() +sampler_factory_.register(DatabaseConnection.__name__, SQASampler) +sampler_factory_.register(BigQueryConnection.__name__, BigQuerySampler) +sampler_factory_.register(DatalakeConnection.__name__, DatalakeSampler) diff --git a/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/bigquery/__init__.py b/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/bigquery/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/bigquery/sampler.py b/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/bigquery/sampler.py new file mode 100644 index 00000000000..5989261765b --- /dev/null +++ b/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/bigquery/sampler.py @@ -0,0 +1,65 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Helper module to handle data sampling +for the profiler +""" +from typing import Dict, Optional + +from sqlalchemy.orm import Query + +from metadata.generated.schema.entity.data.table import ProfileSampleType, TableType +from metadata.profiler.api.models import ProfileSampleConfig +from metadata.profiler.processor.handle_partition import partition_filter_handler +from metadata.profiler.processor.sampler.sqlalchemy.sampler import SQASampler + + +class BigQuerySampler(SQASampler): + """ + Generates a sample of the data to not + run the query in the whole table. + """ + + def __init__( + self, + client, + table, + profile_sample_config: Optional[ProfileSampleConfig] = None, + partition_details: Optional[Dict] = None, + profile_sample_query: Optional[str] = None, + table_type: TableType = None, + ): + super().__init__( + client, + table, + profile_sample_config, + partition_details, + profile_sample_query, + ) + self.table_type: TableType = table_type + + @partition_filter_handler(build_sample=True) + def get_sample_query(self) -> Query: + """get query for sample data""" + # TABLESAMPLE SYSTEM is not supported for views + if ( + self.profile_sample_type == ProfileSampleType.PERCENTAGE + and self.table_type != TableType.View + ): + return ( + self.client.query(self.table) + .suffix_with( + f"TABLESAMPLE SYSTEM ({self.profile_sample or 100} PERCENT)", + ) + .cte(f"{self.table.__tablename__}_sample") + ) + + return super().get_sample_query() diff --git a/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/sampler.py b/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/sampler.py index 5ba6a43f59b..70bd2222163 100644 --- a/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/sampler.py +++ b/ingestion/src/metadata/profiler/processor/sampler/sqlalchemy/sampler.py @@ -15,7 +15,7 @@ for the profiler from typing import Union, cast from sqlalchemy import Column, inspect, text -from sqlalchemy.orm import DeclarativeMeta, Query, Session, aliased +from sqlalchemy.orm import DeclarativeMeta, Query, aliased from sqlalchemy.orm.util import AliasedClass from sqlalchemy.sql.sqltypes import Enum @@ -25,7 +25,6 @@ from metadata.generated.schema.entity.data.table import ( ProfileSampleType, TableData, ) -from metadata.profiler.api.models import ProfileSampleConfig from metadata.profiler.orm.functions.modulo import ModuloFn from metadata.profiler.orm.functions.random_num import RandomNumFn from metadata.profiler.orm.registry import Dialects @@ -68,7 +67,7 @@ class SQASampler(SamplerInterface): def get_sample_query(self) -> Query: """get query for sample data""" if self.profile_sample_type == ProfileSampleType.PERCENTAGE: - return ( + rnd = ( self.client.query( self.table, (ModuloFn(RandomNumFn(), 100)).label(RANDOM_LABEL), @@ -77,12 +76,13 @@ class SQASampler(SamplerInterface): f"SAMPLE BERNOULLI ({self.profile_sample or 100})", dialect=Dialects.Snowflake, ) - .suffix_with( - f"TABLESAMPLE SYSTEM ({self.profile_sample or 100} PERCENT)", - dialect=Dialects.BigQuery, - ) .cte(f"{self.table.__tablename__}_rnd") ) + session_query = self.client.query(rnd) + + return session_query.where(rnd.c.random <= self.profile_sample).cte( + f"{self.table.__tablename__}_sample" + ) table_query = self.client.query(self.table) return ( self.client.query( @@ -109,13 +109,8 @@ class SQASampler(SamplerInterface): return self.table # Add new RandomNumFn column - rnd = self.get_sample_query() - session_query = self.client.query(rnd) + sampled = self.get_sample_query() - # Prepare sampled CTE - sampled = session_query.where(rnd.c.random <= self.profile_sample).cte( - f"{self.table.__tablename__}_sample" - ) # Assign as an alias return aliased(self.table, sampled) diff --git a/ingestion/tests/unit/profiler/pandas/test_sqa_profiler_interface.py b/ingestion/tests/unit/profiler/pandas/test_sqa_profiler_interface.py index 7805b48569d..9a917d97ee1 100644 --- a/ingestion/tests/unit/profiler/pandas/test_sqa_profiler_interface.py +++ b/ingestion/tests/unit/profiler/pandas/test_sqa_profiler_interface.py @@ -144,6 +144,8 @@ class PandasInterfaceTest(TestCase): query_metrics = [] window_metrics = [] for col in inspect(User).c: + if col.name == "id": + continue column_metrics.append( ( [