mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-08 05:26:19 +00:00
fix: azuresql sampler logic (#19034)
This commit is contained in:
parent
ae046c502d
commit
79b2888bb5
@ -0,0 +1,39 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Map Types to convert/cast mssql related data types to relevant data types
|
||||
"""
|
||||
|
||||
|
||||
from sqlalchemy import NVARCHAR, TEXT
|
||||
|
||||
from metadata.profiler.orm.converter.common import CommonMapTypes
|
||||
from metadata.profiler.orm.registry import CustomImage, CustomTypes, DataType
|
||||
|
||||
cast_dict = {
|
||||
CustomImage: "VARBINARY(max)",
|
||||
TEXT: "VARCHAR(max)",
|
||||
NVARCHAR: "NVARCHAR(max)",
|
||||
}
|
||||
|
||||
|
||||
class AzureSqlMapTypes(CommonMapTypes):
|
||||
"""
|
||||
AzureSql type mapper
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._TYPE_MAP.update(
|
||||
{
|
||||
DataType.TIMESTAMP: CustomTypes.TIMESTAMP.value,
|
||||
}
|
||||
)
|
||||
@ -17,6 +17,7 @@ from collections import defaultdict
|
||||
from metadata.generated.schema.entity.services.databaseService import (
|
||||
DatabaseServiceType,
|
||||
)
|
||||
from metadata.profiler.orm.converter.azuresql.converter import AzureSqlMapTypes
|
||||
from metadata.profiler.orm.converter.bigquery.converter import BigqueryMapTypes
|
||||
from metadata.profiler.orm.converter.common import CommonMapTypes
|
||||
from metadata.profiler.orm.converter.mssql.converter import MssqlMapTypes
|
||||
@ -28,3 +29,4 @@ converter_registry[DatabaseServiceType.BigQuery] = BigqueryMapTypes
|
||||
converter_registry[DatabaseServiceType.Snowflake] = SnowflakeMapTypes
|
||||
converter_registry[DatabaseServiceType.Redshift] = RedshiftMapTypes
|
||||
converter_registry[DatabaseServiceType.Mssql] = MssqlMapTypes
|
||||
converter_registry[DatabaseServiceType.AzureSQL] = AzureSqlMapTypes
|
||||
|
||||
@ -14,10 +14,11 @@ for the profiler
|
||||
"""
|
||||
from typing import List, Optional
|
||||
|
||||
from sqlalchemy import Column
|
||||
from sqlalchemy import Column, Table, text
|
||||
from sqlalchemy.sql.selectable import CTE
|
||||
|
||||
from metadata.generated.schema.entity.data.table import TableData
|
||||
from metadata.sampler.sqlalchemy.sampler import SQASampler
|
||||
from metadata.sampler.sqlalchemy.sampler import ProfileSampleType, SQASampler
|
||||
|
||||
|
||||
class AzureSQLSampler(SQASampler):
|
||||
@ -31,6 +32,28 @@ class AzureSQLSampler(SQASampler):
|
||||
# pyodbc.ProgrammingError: ('ODBC SQL type -151 is not yet supported. column-index=x type=-151', 'HY106')
|
||||
NOT_COMPUTE_PYODBC = {"SQASGeography", "UndeterminedType"}
|
||||
|
||||
def set_tablesample(self, selectable: Table):
|
||||
"""Set the TABLESAMPLE clause for MSSQL
|
||||
Args:
|
||||
selectable (Table): _description_
|
||||
"""
|
||||
if self.sample_config.profile_sample_type == ProfileSampleType.PERCENTAGE:
|
||||
return selectable.tablesample(
|
||||
text(f"{self.sample_config.profile_sample or 100} PERCENT")
|
||||
)
|
||||
|
||||
return selectable.tablesample(
|
||||
text(f"{int(self.sample_config.profile_sample or 100)} ROWS")
|
||||
)
|
||||
|
||||
def get_sample_query(self, *, column=None) -> CTE:
|
||||
"""get query for sample data"""
|
||||
rnd = self._base_sample_query(column).cte(
|
||||
f"{self.raw_dataset.__tablename__}_rnd"
|
||||
)
|
||||
query = self.client.query(rnd)
|
||||
return query.cte(f"{self.raw_dataset.__tablename__}_sample")
|
||||
|
||||
def fetch_sample_data(self, columns: Optional[List[Column]] = None) -> TableData:
|
||||
sqa_columns = []
|
||||
if columns:
|
||||
|
||||
@ -0,0 +1,150 @@
|
||||
from unittest import TestCase
|
||||
from unittest.mock import patch
|
||||
from uuid import uuid4
|
||||
|
||||
from sqlalchemy import Column, Integer
|
||||
from sqlalchemy.orm import declarative_base
|
||||
from sqlalchemy.sql.selectable import CTE
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Column as EntityColumn
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
ColumnName,
|
||||
DataType,
|
||||
PartitionIntervalTypes,
|
||||
PartitionProfilerConfig,
|
||||
ProfileSampleType,
|
||||
Table,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.azureSQLConnection import (
|
||||
AzureSQLConnection,
|
||||
)
|
||||
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
|
||||
SQAProfilerInterface,
|
||||
)
|
||||
from metadata.sampler.models import SampleConfig
|
||||
from metadata.sampler.sqlalchemy.azuresql.sampler import AzureSQLSampler
|
||||
from metadata.sampler.sqlalchemy.sampler import SQASampler
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class User(Base):
|
||||
__tablename__ = "users"
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
||||
|
||||
@patch.object(SQASampler, "build_table_orm", return_value=User)
|
||||
class SampleTest(TestCase):
|
||||
@classmethod
|
||||
@patch.object(SQASampler, "build_table_orm", return_value=User)
|
||||
def setUpClass(cls, sampler_mock):
|
||||
cls.table_entity = Table(
|
||||
id=uuid4(),
|
||||
name="user",
|
||||
columns=[
|
||||
EntityColumn(
|
||||
name=ColumnName("id"),
|
||||
dataType=DataType.INT,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
cls.azuresql_conn = AzureSQLConnection(
|
||||
username="myuser",
|
||||
password="myaccount",
|
||||
database="mywarehouse",
|
||||
hostPort="host//foo.bar:1433",
|
||||
)
|
||||
|
||||
sampler = SQASampler(
|
||||
service_connection_config=cls.azuresql_conn,
|
||||
ometa_client=None,
|
||||
entity=None,
|
||||
)
|
||||
cls.sqa_profiler_interface = SQAProfilerInterface(
|
||||
cls.azuresql_conn,
|
||||
None,
|
||||
cls.table_entity,
|
||||
None,
|
||||
sampler,
|
||||
5,
|
||||
43200,
|
||||
)
|
||||
|
||||
cls.session = cls.sqa_profiler_interface.session
|
||||
|
||||
def test_omit_sampling_method_type(self, sampler_mock):
|
||||
"""
|
||||
use percentage sampling.
|
||||
"""
|
||||
sampler = AzureSQLSampler(
|
||||
service_connection_config=self.azuresql_conn,
|
||||
ometa_client=None,
|
||||
entity=self.table_entity,
|
||||
sample_config=SampleConfig(
|
||||
profile_sample_type=ProfileSampleType.PERCENTAGE, profile_sample=50.0
|
||||
),
|
||||
)
|
||||
query: CTE = sampler.get_sample_query()
|
||||
expected_query = (
|
||||
"WITH users_rnd AS \n(SELECT users_1.id AS id \n"
|
||||
"FROM users AS users_1 TABLESAMPLE system(50.0 PERCENT))\n "
|
||||
"SELECT users_rnd.id \nFROM users_rnd"
|
||||
)
|
||||
assert (
|
||||
expected_query.casefold()
|
||||
== str(query.compile(compile_kwargs={"literal_binds": True})).casefold()
|
||||
)
|
||||
|
||||
def test_row_sampling(self, sampler_mock):
|
||||
"""
|
||||
use ROW sampling if profile sample type is ROW.
|
||||
"""
|
||||
sampler = AzureSQLSampler(
|
||||
service_connection_config=self.azuresql_conn,
|
||||
ometa_client=None,
|
||||
entity=self.table_entity,
|
||||
sample_config=SampleConfig(
|
||||
profile_sample_type=ProfileSampleType.ROWS, profile_sample=50
|
||||
),
|
||||
)
|
||||
query: CTE = sampler.get_sample_query()
|
||||
expected_query = (
|
||||
"WITH users_rnd AS \n(SELECT users_1.id AS id "
|
||||
"\nFROM users AS users_1 TABLESAMPLE system(50 ROWS))\n "
|
||||
"SELECT users_rnd.id \nFROM users_rnd"
|
||||
)
|
||||
assert (
|
||||
expected_query.casefold()
|
||||
== str(query.compile(compile_kwargs={"literal_binds": True})).casefold()
|
||||
)
|
||||
|
||||
def test_sampling_with_partition(self, sampler_mock):
|
||||
"""
|
||||
use specified partition columns.
|
||||
"""
|
||||
sampler = AzureSQLSampler(
|
||||
service_connection_config=self.azuresql_conn,
|
||||
ometa_client=None,
|
||||
entity=self.table_entity,
|
||||
sample_config=SampleConfig(
|
||||
profile_sample_type=ProfileSampleType.PERCENTAGE,
|
||||
profile_sample=50.0,
|
||||
),
|
||||
partition_details=PartitionProfilerConfig(
|
||||
enablePartitioning=True,
|
||||
partitionColumnName="id",
|
||||
partitionIntervalType=PartitionIntervalTypes.COLUMN_VALUE,
|
||||
partitionValues=["1", "2"],
|
||||
),
|
||||
)
|
||||
query: CTE = sampler.get_sample_query()
|
||||
expected_query = (
|
||||
"WITH users_rnd AS \n(SELECT users_1.id AS id \n"
|
||||
"FROM users AS users_1 TABLESAMPLE system(50.0 PERCENT) "
|
||||
"\nWHERE id IN ('1', '2'))\n SELECT users_rnd.id \nFROM users_rnd"
|
||||
)
|
||||
assert (
|
||||
expected_query.casefold()
|
||||
== str(query.compile(compile_kwargs={"literal_binds": True})).casefold()
|
||||
)
|
||||
Loading…
x
Reference in New Issue
Block a user