fix: import error + BQ E2E CLI (#12420)

This commit is contained in:
Teddy 2023-07-13 13:35:37 +02:00 committed by GitHub
parent a47e80baf0
commit 54fbe250a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 140 additions and 45 deletions

View File

@ -32,7 +32,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.connections import get_connection
from metadata.mixins.sqalchemy.sqa_mixin import SQAInterfaceMixin
from metadata.profiler.processor.runner import QueryRunner
from metadata.profiler.processor.sampler.sampler_factory import sampler_factory
from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_
from metadata.profiler.processor.sampler.sqlalchemy.sampler import SQASampler
from metadata.utils.constants import TEN_MIN
from metadata.utils.importer import import_test_case_class
@ -118,7 +118,7 @@ class SQATestSuiteInterface(SQAInterfaceMixin, TestSuiteInterface):
def _create_sampler(self) -> SQASampler:
"""Create sampler instance"""
return sampler_factory.create(
return sampler_factory_.create(
self.service_connection_config.__class__.__name__,
client=self.session,
table=self.table,

View File

@ -32,7 +32,7 @@ from metadata.mixins.pandas.pandas_mixin import PandasInterfaceMixin
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.metrics.core import MetricTypes
from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.sampler.sampler_factory import sampler_factory
from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_
from metadata.utils.datalake.datalake_utils import fetch_dataframe
from metadata.utils.dispatch import valuedispatch
from metadata.utils.logger import profiler_interface_registry_logger
@ -104,7 +104,7 @@ class PandasProfilerInterface(ProfilerInterface, PandasInterfaceMixin):
def _get_sampler(self):
"""Get dataframe sampler from config"""
return sampler_factory.create(
return sampler_factory_.create(
DatalakeConnection.__name__,
client=self.client,
table=self.dfs,

View File

@ -15,6 +15,9 @@ Factory class for creating profiler interface objects
from typing import cast
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
@ -23,6 +26,9 @@ from metadata.profiler.interface.pandas.profiler_interface import (
PandasProfilerInterface,
)
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.interface.sqlalchemy.bigquery.profiler_interface import (
BigQueryProfilerInterface,
)
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
@ -49,6 +55,9 @@ class ProfilerInterfaceFactory:
profiler_interface_factory = ProfilerInterfaceFactory()
profiler_interface_factory.register(DatabaseConnection.__name__, SQAProfilerInterface)
profiler_interface_factory.register(
BigQueryConnection.__name__, BigQueryProfilerInterface
)
profiler_interface_factory.register(
DatalakeConnection.__name__, PandasProfilerInterface
)

View File

@ -0,0 +1,39 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
SQAProfilerInterface,
)
from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_
class BigQueryProfilerInterface(SQAProfilerInterface):
"""BigQuery profiler interface"""
def _get_sampler(self, **kwargs):
"""get sampler object"""
session = kwargs.get("session")
table = kwargs["table"]
return sampler_factory_.create(
self.service_connection_config.__class__.__name__,
client=session or self.session,
table=table,
profile_sample_config=self.profile_sample_config,
partition_details=self.partition_details,
profile_sample_query=self.profile_query,
table_type=self.table_entity.tableType,
)

View File

@ -19,7 +19,7 @@ import threading
import traceback
from collections import defaultdict
from datetime import datetime, timezone
from typing import Dict, List, Optional, Union
from typing import Dict, List
from sqlalchemy import Column
from sqlalchemy.exc import ProgrammingError
@ -38,7 +38,7 @@ from metadata.profiler.orm.functions.table_metric_construct import (
table_metric_construct_factory,
)
from metadata.profiler.processor.runner import QueryRunner
from metadata.profiler.processor.sampler.sampler_factory import sampler_factory
from metadata.profiler.processor.sampler.sampler_factory import sampler_factory_
from metadata.utils.custom_thread_pool import CustomThreadPoolExecutor
from metadata.utils.dispatch import valuedispatch
from metadata.utils.logger import profiler_interface_registry_logger
@ -110,7 +110,7 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
session = kwargs.get("session")
table = kwargs["table"]
return sampler_factory.create(
return sampler_factory_.create(
self.service_connection_config.__class__.__name__,
client=session or self.session,
table=table,
@ -119,28 +119,6 @@ class SQAProfilerInterface(ProfilerInterface, SQAInterfaceMixin):
profile_sample_query=self.profile_query,
)
@staticmethod
def _is_array_column(column) -> Dict[str, Union[Optional[str], bool]]:
"""check if column is an array column
Args:
column: column to check
Returns:
True if column is an array column else False
"""
kwargs = {}
try:
kwargs["is_array"] = column._is_array # pylint: disable=protected-access
except AttributeError:
kwargs["is_array"] = False
try:
kwargs["array_col"] = column._array_col # pylint: disable=protected-access
except AttributeError:
kwargs["array_col"] = None
return kwargs
def _session_factory(self) -> scoped_session:
"""Create thread safe session that will be automatically
garbage collected once the application thread ends

View File

@ -15,11 +15,17 @@ Factory class for creating sampler objects
from typing import Union
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.profiler.processor.sampler.pandas.sampler import DatalakeSampler
from metadata.profiler.processor.sampler.sqlalchemy.bigquery.sampler import (
BigQuerySampler,
)
from metadata.profiler.processor.sampler.sqlalchemy.sampler import SQASampler
@ -44,6 +50,7 @@ class SamplerFactory:
return sampler_class(*args, **kwargs)
sampler_factory = SamplerFactory()
sampler_factory.register(DatabaseConnection.__name__, SQASampler)
sampler_factory.register(DatalakeConnection.__name__, DatalakeSampler)
sampler_factory_ = SamplerFactory()
sampler_factory_.register(DatabaseConnection.__name__, SQASampler)
sampler_factory_.register(BigQueryConnection.__name__, BigQuerySampler)
sampler_factory_.register(DatalakeConnection.__name__, DatalakeSampler)

View File

@ -0,0 +1,65 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Helper module to handle data sampling
for the profiler
"""
from typing import Dict, Optional
from sqlalchemy.orm import Query
from metadata.generated.schema.entity.data.table import ProfileSampleType, TableType
from metadata.profiler.api.models import ProfileSampleConfig
from metadata.profiler.processor.handle_partition import partition_filter_handler
from metadata.profiler.processor.sampler.sqlalchemy.sampler import SQASampler
class BigQuerySampler(SQASampler):
"""
Generates a sample of the data to not
run the query in the whole table.
"""
def __init__(
self,
client,
table,
profile_sample_config: Optional[ProfileSampleConfig] = None,
partition_details: Optional[Dict] = None,
profile_sample_query: Optional[str] = None,
table_type: TableType = None,
):
super().__init__(
client,
table,
profile_sample_config,
partition_details,
profile_sample_query,
)
self.table_type: TableType = table_type
@partition_filter_handler(build_sample=True)
def get_sample_query(self) -> Query:
"""get query for sample data"""
# TABLESAMPLE SYSTEM is not supported for views
if (
self.profile_sample_type == ProfileSampleType.PERCENTAGE
and self.table_type != TableType.View
):
return (
self.client.query(self.table)
.suffix_with(
f"TABLESAMPLE SYSTEM ({self.profile_sample or 100} PERCENT)",
)
.cte(f"{self.table.__tablename__}_sample")
)
return super().get_sample_query()

View File

@ -15,7 +15,7 @@ for the profiler
from typing import Union, cast
from sqlalchemy import Column, inspect, text
from sqlalchemy.orm import DeclarativeMeta, Query, Session, aliased
from sqlalchemy.orm import DeclarativeMeta, Query, aliased
from sqlalchemy.orm.util import AliasedClass
from sqlalchemy.sql.sqltypes import Enum
@ -25,7 +25,6 @@ from metadata.generated.schema.entity.data.table import (
ProfileSampleType,
TableData,
)
from metadata.profiler.api.models import ProfileSampleConfig
from metadata.profiler.orm.functions.modulo import ModuloFn
from metadata.profiler.orm.functions.random_num import RandomNumFn
from metadata.profiler.orm.registry import Dialects
@ -68,7 +67,7 @@ class SQASampler(SamplerInterface):
def get_sample_query(self) -> Query:
"""get query for sample data"""
if self.profile_sample_type == ProfileSampleType.PERCENTAGE:
return (
rnd = (
self.client.query(
self.table,
(ModuloFn(RandomNumFn(), 100)).label(RANDOM_LABEL),
@ -77,12 +76,13 @@ class SQASampler(SamplerInterface):
f"SAMPLE BERNOULLI ({self.profile_sample or 100})",
dialect=Dialects.Snowflake,
)
.suffix_with(
f"TABLESAMPLE SYSTEM ({self.profile_sample or 100} PERCENT)",
dialect=Dialects.BigQuery,
)
.cte(f"{self.table.__tablename__}_rnd")
)
session_query = self.client.query(rnd)
return session_query.where(rnd.c.random <= self.profile_sample).cte(
f"{self.table.__tablename__}_sample"
)
table_query = self.client.query(self.table)
return (
self.client.query(
@ -109,13 +109,8 @@ class SQASampler(SamplerInterface):
return self.table
# Add new RandomNumFn column
rnd = self.get_sample_query()
session_query = self.client.query(rnd)
sampled = self.get_sample_query()
# Prepare sampled CTE
sampled = session_query.where(rnd.c.random <= self.profile_sample).cte(
f"{self.table.__tablename__}_sample"
)
# Assign as an alias
return aliased(self.table, sampled)

View File

@ -144,6 +144,8 @@ class PandasInterfaceTest(TestCase):
query_metrics = []
window_metrics = []
for col in inspect(User).c:
if col.name == "id":
continue
column_metrics.append(
(
[