mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-03 02:55:59 +00:00
FEAT: added TABLESAMPLE for MSSQL (#18926)
* feat: added TABLESAMPLE for sqlserver * fix: class name * test: added test to generated sample query
This commit is contained in:
parent
f6b4434e4a
commit
03bd8e9dc4
@ -1,10 +1,12 @@
|
||||
from metadata.ingestion.source.database.mssql.lineage import MssqlLineageSource
|
||||
from metadata.ingestion.source.database.mssql.metadata import MssqlSource
|
||||
from metadata.ingestion.source.database.mssql.usage import MssqlUsageSource
|
||||
from metadata.sampler.sqlalchemy.mssql.sampler import MssqlSampler
|
||||
from metadata.utils.service_spec.default import DefaultDatabaseSpec
|
||||
|
||||
ServiceSpec = DefaultDatabaseSpec(
|
||||
metadata_source_class=MssqlSource,
|
||||
lineage_source_class=MssqlLineageSource,
|
||||
usage_source_class=MssqlUsageSource,
|
||||
sampler_class=MssqlSampler,
|
||||
)
|
||||
|
||||
50
ingestion/src/metadata/sampler/sqlalchemy/mssql/sampler.py
Normal file
50
ingestion/src/metadata/sampler/sqlalchemy/mssql/sampler.py
Normal file
@ -0,0 +1,50 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
Helper module to handle data sampling
|
||||
for the profiler
|
||||
"""
|
||||
|
||||
|
||||
from sqlalchemy import Table, text
|
||||
from sqlalchemy.sql.selectable import CTE
|
||||
|
||||
from metadata.generated.schema.entity.data.table import ProfileSampleType
|
||||
from metadata.sampler.sqlalchemy.sampler import SQASampler
|
||||
|
||||
|
||||
class MssqlSampler(SQASampler):
|
||||
"""
|
||||
Generates a sample of the data to not
|
||||
run the query in the whole table.
|
||||
"""
|
||||
|
||||
def set_tablesample(self, selectable: Table):
|
||||
"""Set the TABLESAMPLE clause for MSSQL
|
||||
Args:
|
||||
selectable (Table): _description_
|
||||
"""
|
||||
if self.sample_config.profile_sample_type == ProfileSampleType.PERCENTAGE:
|
||||
return selectable.tablesample(
|
||||
text(f"{self.sample_config.profile_sample or 100} PERCENT")
|
||||
)
|
||||
|
||||
return selectable.tablesample(
|
||||
text(f"{int(self.sample_config.profile_sample or 100)} ROWS")
|
||||
)
|
||||
|
||||
def get_sample_query(self, *, column=None) -> CTE:
|
||||
"""get query for sample data"""
|
||||
rnd = self._base_sample_query(column).cte(
|
||||
f"{self.raw_dataset.__tablename__}_rnd"
|
||||
)
|
||||
query = self.client.query(rnd)
|
||||
return query.cte(f"{self.raw_dataset.__tablename__}_sample")
|
||||
@ -0,0 +1,150 @@
|
||||
from unittest import TestCase
|
||||
from unittest.mock import patch
|
||||
from uuid import uuid4
|
||||
|
||||
from sqlalchemy import Column, Integer
|
||||
from sqlalchemy.orm import declarative_base
|
||||
from sqlalchemy.sql.selectable import CTE
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Column as EntityColumn
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
ColumnName,
|
||||
DataType,
|
||||
PartitionIntervalTypes,
|
||||
PartitionProfilerConfig,
|
||||
ProfileSampleType,
|
||||
Table,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.mssqlConnection import (
|
||||
MssqlConnection,
|
||||
)
|
||||
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
|
||||
SQAProfilerInterface,
|
||||
)
|
||||
from metadata.sampler.models import SampleConfig
|
||||
from metadata.sampler.sqlalchemy.mssql.sampler import MssqlSampler
|
||||
from metadata.sampler.sqlalchemy.sampler import SQASampler
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class User(Base):
|
||||
__tablename__ = "users"
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
||||
|
||||
@patch.object(SQASampler, "build_table_orm", return_value=User)
|
||||
class SampleTest(TestCase):
|
||||
@classmethod
|
||||
@patch.object(SQASampler, "build_table_orm", return_value=User)
|
||||
def setUpClass(cls, sampler_mock):
|
||||
cls.table_entity = Table(
|
||||
id=uuid4(),
|
||||
name="user",
|
||||
columns=[
|
||||
EntityColumn(
|
||||
name=ColumnName("id"),
|
||||
dataType=DataType.INT,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
cls.mssql_conn = MssqlConnection(
|
||||
username="myuser",
|
||||
password="myaccount",
|
||||
database="mywarehouse",
|
||||
hostPort="host//foo.bar:5432",
|
||||
)
|
||||
|
||||
sampler = SQASampler(
|
||||
service_connection_config=cls.mssql_conn,
|
||||
ometa_client=None,
|
||||
entity=None,
|
||||
)
|
||||
cls.sqa_profiler_interface = SQAProfilerInterface(
|
||||
cls.mssql_conn,
|
||||
None,
|
||||
cls.table_entity,
|
||||
None,
|
||||
sampler,
|
||||
5,
|
||||
43200,
|
||||
)
|
||||
|
||||
cls.session = cls.sqa_profiler_interface.session
|
||||
|
||||
def test_omit_sampling_method_type(self, sampler_mock):
|
||||
"""
|
||||
use percentage sampling.
|
||||
"""
|
||||
sampler = MssqlSampler(
|
||||
service_connection_config=self.mssql_conn,
|
||||
ometa_client=None,
|
||||
entity=self.table_entity,
|
||||
sample_config=SampleConfig(
|
||||
profile_sample_type=ProfileSampleType.PERCENTAGE, profile_sample=50.0
|
||||
),
|
||||
)
|
||||
query: CTE = sampler.get_sample_query()
|
||||
expected_query = (
|
||||
"WITH users_rnd AS \n(SELECT users_1.id AS id \n"
|
||||
"FROM users AS users_1 TABLESAMPLE system(50.0 PERCENT))\n "
|
||||
"SELECT users_rnd.id \nFROM users_rnd"
|
||||
)
|
||||
assert (
|
||||
expected_query.casefold()
|
||||
== str(query.compile(compile_kwargs={"literal_binds": True})).casefold()
|
||||
)
|
||||
|
||||
def test_row_sampling(self, sampler_mock):
|
||||
"""
|
||||
use ROW sampling if profile sample type is ROW.
|
||||
"""
|
||||
sampler = MssqlSampler(
|
||||
service_connection_config=self.mssql_conn,
|
||||
ometa_client=None,
|
||||
entity=self.table_entity,
|
||||
sample_config=SampleConfig(
|
||||
profile_sample_type=ProfileSampleType.ROWS, profile_sample=50
|
||||
),
|
||||
)
|
||||
query: CTE = sampler.get_sample_query()
|
||||
expected_query = (
|
||||
"WITH users_rnd AS \n(SELECT users_1.id AS id "
|
||||
"\nFROM users AS users_1 TABLESAMPLE system(50 ROWS))\n "
|
||||
"SELECT users_rnd.id \nFROM users_rnd"
|
||||
)
|
||||
assert (
|
||||
expected_query.casefold()
|
||||
== str(query.compile(compile_kwargs={"literal_binds": True})).casefold()
|
||||
)
|
||||
|
||||
def test_sampling_with_partition(self, sampler_mock):
|
||||
"""
|
||||
use specified partition columns.
|
||||
"""
|
||||
sampler = MssqlSampler(
|
||||
service_connection_config=self.mssql_conn,
|
||||
ometa_client=None,
|
||||
entity=self.table_entity,
|
||||
sample_config=SampleConfig(
|
||||
profile_sample_type=ProfileSampleType.PERCENTAGE,
|
||||
profile_sample=50.0,
|
||||
),
|
||||
partition_details=PartitionProfilerConfig(
|
||||
enablePartitioning=True,
|
||||
partitionColumnName="id",
|
||||
partitionIntervalType=PartitionIntervalTypes.COLUMN_VALUE,
|
||||
partitionValues=["1", "2"],
|
||||
),
|
||||
)
|
||||
query: CTE = sampler.get_sample_query()
|
||||
expected_query = (
|
||||
"WITH users_rnd AS \n(SELECT users_1.id AS id \n"
|
||||
"FROM users AS users_1 TABLESAMPLE system(50.0 PERCENT) "
|
||||
"\nWHERE id IN ('1', '2'))\n SELECT users_rnd.id \nFROM users_rnd"
|
||||
)
|
||||
assert (
|
||||
expected_query.casefold()
|
||||
== str(query.compile(compile_kwargs={"literal_binds": True})).casefold()
|
||||
)
|
||||
Loading…
x
Reference in New Issue
Block a user