mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-03 12:08:31 +00:00
Fixes 8428: make it possible to choice a sampling method type when we create profile ingestion for the Snowflake (#17831)
* Add test for existing code * Add sampling method at ingestion. * add samplingMethodType into UI * modify init method to use new parameter. * create descriptions * execute isort * fix an unintended change. * apply py_format * close section * specify init arguments * fix bug * apply py_format --------- Co-authored-by: Teddy <teddy.crepineau@gmail.com>
This commit is contained in:
parent
08c13e5333
commit
3d8e30142c
@ -15,6 +15,7 @@ Return types for Profiler workflow execution.
|
||||
We need to define this class as we end up having
|
||||
multiple profilers per table and columns.
|
||||
"""
|
||||
|
||||
from typing import List, Optional, Type, Union
|
||||
|
||||
from pydantic import ConfigDict
|
||||
@ -29,6 +30,7 @@ from metadata.generated.schema.entity.data.table import (
|
||||
ColumnProfilerConfig,
|
||||
PartitionProfilerConfig,
|
||||
ProfileSampleType,
|
||||
SamplingMethodType,
|
||||
Table,
|
||||
TableData,
|
||||
)
|
||||
@ -56,6 +58,7 @@ class BaseProfileConfig(ConfigModel):
|
||||
fullyQualifiedName: FullyQualifiedEntityName
|
||||
profileSample: Optional[Union[float, int]] = None
|
||||
profileSampleType: Optional[ProfileSampleType] = None
|
||||
samplingMethodType: Optional[SamplingMethodType] = None
|
||||
sampleDataCount: Optional[int] = 100
|
||||
|
||||
|
||||
@ -75,6 +78,7 @@ class TableConfig(BaseProfileConfig):
|
||||
profileSample=config.profileSample,
|
||||
profileSampleType=config.profileSampleType,
|
||||
sampleDataCount=config.sampleDataCount,
|
||||
samplingMethodType=config.samplingMethodType,
|
||||
)
|
||||
return table_config
|
||||
|
||||
@ -90,6 +94,7 @@ class ProfileSampleConfig(ConfigModel):
|
||||
|
||||
profile_sample: Optional[Union[float, int]] = None
|
||||
profile_sample_type: Optional[ProfileSampleType] = ProfileSampleType.PERCENTAGE
|
||||
sampling_method_type: Optional[SamplingMethodType] = None
|
||||
|
||||
|
||||
class ProfilerProcessorConfig(ConfigModel):
|
||||
|
||||
@ -248,7 +248,7 @@ class ProfilerInterface(ABC):
|
||||
DatabaseSchemaProfilerConfig,
|
||||
DatabaseProfilerConfig,
|
||||
DatabaseAndSchemaConfig,
|
||||
]
|
||||
],
|
||||
) -> Optional[DataStorageConfig]:
|
||||
if (
|
||||
config
|
||||
@ -329,6 +329,7 @@ class ProfilerInterface(ABC):
|
||||
return ProfileSampleConfig(
|
||||
profile_sample=config.profileSample,
|
||||
profile_sample_type=config.profileSampleType,
|
||||
sampling_method_type=config.samplingMethodType,
|
||||
)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
@ -13,14 +13,19 @@ Helper module to handle data sampling
|
||||
for the profiler
|
||||
"""
|
||||
|
||||
from typing import cast
|
||||
from typing import Dict, Optional, cast
|
||||
|
||||
from sqlalchemy import Table
|
||||
from sqlalchemy.sql.selectable import CTE
|
||||
|
||||
from metadata.generated.schema.entity.data.table import ProfileSampleType
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
ProfileSampleType,
|
||||
SamplingMethodType,
|
||||
)
|
||||
from metadata.profiler.api.models import ProfileSampleConfig
|
||||
from metadata.profiler.processor.handle_partition import partition_filter_handler
|
||||
from metadata.profiler.processor.sampler.sqlalchemy.sampler import SQASampler
|
||||
from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT
|
||||
|
||||
|
||||
class SnowflakeSampler(SQASampler):
|
||||
@ -29,8 +34,27 @@ class SnowflakeSampler(SQASampler):
|
||||
run the query in the whole table.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
# pylint: disable=too-many-arguments
|
||||
def __init__(
|
||||
self,
|
||||
client,
|
||||
table,
|
||||
profile_sample_config: Optional[ProfileSampleConfig] = None,
|
||||
partition_details: Optional[Dict] = None,
|
||||
profile_sample_query: Optional[str] = None,
|
||||
sample_data_count: Optional[int] = SAMPLE_DATA_DEFAULT_COUNT,
|
||||
):
|
||||
super().__init__(
|
||||
client,
|
||||
table,
|
||||
profile_sample_config,
|
||||
partition_details,
|
||||
profile_sample_query,
|
||||
sample_data_count,
|
||||
)
|
||||
self.sampling_method_type = SamplingMethodType.BERNOULLI
|
||||
if profile_sample_config and profile_sample_config.sampling_method_type:
|
||||
self.sampling_method_type = profile_sample_config.sampling_method_type
|
||||
|
||||
@partition_filter_handler(build_sample=True)
|
||||
def get_sample_query(self, *, column=None) -> CTE:
|
||||
@ -44,7 +68,7 @@ class SnowflakeSampler(SQASampler):
|
||||
column,
|
||||
)
|
||||
.suffix_with(
|
||||
f"SAMPLE BERNOULLI ({self.profile_sample or 100})",
|
||||
f"SAMPLE {self.sampling_method_type.value} ({self.profile_sample or 100})",
|
||||
)
|
||||
.cte(f"{self.table.__tablename__}_rnd")
|
||||
)
|
||||
|
||||
@ -0,0 +1,92 @@
|
||||
from unittest import TestCase
|
||||
from unittest.mock import patch
|
||||
from uuid import uuid4
|
||||
|
||||
from sqlalchemy import Column, Integer
|
||||
from sqlalchemy.orm import declarative_base
|
||||
from sqlalchemy.sql.selectable import CTE
|
||||
|
||||
from metadata.generated.schema.entity.data.table import Column as EntityColumn
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
ColumnName,
|
||||
DataType,
|
||||
ProfileSampleType,
|
||||
SamplingMethodType,
|
||||
Table,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.snowflakeConnection import (
|
||||
SnowflakeConnection,
|
||||
)
|
||||
from metadata.profiler.api.models import ProfileSampleConfig
|
||||
from metadata.profiler.interface.sqlalchemy.profiler_interface import (
|
||||
SQAProfilerInterface,
|
||||
)
|
||||
from metadata.profiler.processor.sampler.sqlalchemy.snowflake.sampler import (
|
||||
SnowflakeSampler,
|
||||
)
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class User(Base):
|
||||
__tablename__ = "users"
|
||||
id = Column(Integer, primary_key=True)
|
||||
|
||||
|
||||
class SampleTest(TestCase):
|
||||
table_entity = Table(
|
||||
id=uuid4(),
|
||||
name="user",
|
||||
columns=[
|
||||
EntityColumn(
|
||||
name=ColumnName("id"),
|
||||
dataType=DataType.INT,
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
snowflake_conn = SnowflakeConnection(
|
||||
username="myuser", account="myaccount", warehouse="mywarehouse"
|
||||
)
|
||||
|
||||
with patch.object(
|
||||
SQAProfilerInterface, "_convert_table_to_orm_object", return_value=User
|
||||
):
|
||||
sqa_profiler_interface = SQAProfilerInterface(
|
||||
snowflake_conn, None, table_entity, None, None, None, None, None, 5, 43200
|
||||
)
|
||||
session = sqa_profiler_interface.session
|
||||
|
||||
def test_omit_sampling_method_type(self):
|
||||
"""
|
||||
use BERNOULLI if sampling method type is not specified.
|
||||
"""
|
||||
sampler = SnowflakeSampler(
|
||||
client=self.session,
|
||||
table=User,
|
||||
profile_sample_config=ProfileSampleConfig(
|
||||
profile_sample_type=ProfileSampleType.PERCENTAGE, profile_sample=50.0
|
||||
),
|
||||
)
|
||||
query: CTE = sampler.get_sample_query()
|
||||
assert "FROM users SAMPLE BERNOULLI" in str(query)
|
||||
|
||||
def test_specify_sampling_method_type(self):
|
||||
"""
|
||||
use specified sampling method type.
|
||||
"""
|
||||
for sampling_method_type in [
|
||||
SamplingMethodType.SYSTEM,
|
||||
SamplingMethodType.BERNOULLI,
|
||||
]:
|
||||
sampler = SnowflakeSampler(
|
||||
client=self.session,
|
||||
table=User,
|
||||
profile_sample_config=ProfileSampleConfig(
|
||||
profile_sample_type=ProfileSampleType.PERCENTAGE,
|
||||
profile_sample=50.0,
|
||||
sampling_method_type=sampling_method_type,
|
||||
),
|
||||
)
|
||||
query: CTE = sampler.get_sample_query()
|
||||
assert f"FROM users SAMPLE {sampling_method_type.value}" in str(query)
|
||||
@ -89,6 +89,13 @@ The OpenMetadata UI will always show 50 or fewer rows of sample data. *Sample Da
|
||||
|
||||
{% /note %}
|
||||
|
||||
- **Sampling Method Type**: The sampling method type can be set to **BERNOULLI** or **SYSTEM**. You can find the difference of two values in the document of the Snowflake. When you choice **BERNOULLI**, it will scan full rows in the table even though small value is set at the **Profile Sample**. However, it has less restlictions than **SYSTEM**. If no option is choiced, the default is **BERNOULLI**.
|
||||
|
||||
{% note %}
|
||||
|
||||
This parameter is effective for Snowflake only.
|
||||
|
||||
{% /note %}
|
||||
|
||||
- **Bucket Name**: A bucket name is a unique identifier used to organize and store data objects. It's similar to a folder name, but it's used for object storage rather than file storage.
|
||||
- **Prefix**: The prefix of a data source refers to the first part of the data path that identifies the source or origin of the data. The generated sample data parquet file will be uploaded to this prefix path in your bucket.
|
||||
|
||||
@ -89,6 +89,14 @@ The OpenMetadata UI will always show 50 or fewer rows of sample data. *Sample Da
|
||||
|
||||
{% /note %}
|
||||
|
||||
- **Sampling Method Type**: The sampling method type can be set to **BERNOULLI** or **SYSTEM**. You can find the difference of two values in the document of the Snowflake. When you choice **BERNOULLI**, it will scan full rows in the table even though small value is set at the **Profile Sample**. However, it has less restlictions than **SYSTEM**. If no option is choiced, the default is **BERNOULLI**.
|
||||
|
||||
{% note %}
|
||||
|
||||
This parameter is effective for Snowflake only.
|
||||
|
||||
{% /note %}
|
||||
|
||||
|
||||
- **Bucket Name**: A bucket name is a unique identifier used to organize and store data objects. It's similar to a folder name, but it's used for object storage rather than file storage.
|
||||
- **Prefix**: The prefix of a data source refers to the first part of the data path that identifies the source or origin of the data. The generated sample data parquet file will be uploaded to this prefix path in your bucket.
|
||||
|
||||
@ -146,6 +146,9 @@
|
||||
"default": 50,
|
||||
"title": "Sample Data Rows Count"
|
||||
},
|
||||
"samplingMethodType": {
|
||||
"$ref": "./table.json#/definitions/samplingMethodType"
|
||||
},
|
||||
"sampleDataStorageConfig": {
|
||||
"title": "Storage Config for Sample Data",
|
||||
"$ref": "../services/connections/connectionBasicType.json#/definitions/sampleDataStorageConfig"
|
||||
|
||||
@ -142,6 +142,9 @@
|
||||
"default": 50,
|
||||
"title": "Sample Data Rows Count"
|
||||
},
|
||||
"samplingMethodType": {
|
||||
"$ref": "./table.json#/definitions/samplingMethodType"
|
||||
},
|
||||
"sampleDataStorageConfig": {
|
||||
"title": "Storage Config for Sample Data",
|
||||
"$ref": "../services/connections/connectionBasicType.json#/definitions/sampleDataStorageConfig"
|
||||
|
||||
@ -19,6 +19,14 @@
|
||||
],
|
||||
"default": "PERCENTAGE"
|
||||
},
|
||||
"samplingMethodType": {
|
||||
"description": "Type of Sampling Method (BERNOULLI or SYSTEM)",
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"BERNOULLI",
|
||||
"SYSTEM"
|
||||
]
|
||||
},
|
||||
"tableType": {
|
||||
"javaType": "org.openmetadata.schema.type.TableType",
|
||||
"description": "This schema defines the type used for describing different types of tables.",
|
||||
@ -744,6 +752,9 @@
|
||||
"type": "number",
|
||||
"default": null
|
||||
},
|
||||
"samplingMethodType": {
|
||||
"$ref": "#/definitions/samplingMethodType"
|
||||
},
|
||||
"sampleDataCount": {
|
||||
"description": "Number of sample rows to ingest when 'Generate Sample Data' is enabled",
|
||||
"type": "integer",
|
||||
@ -794,6 +805,9 @@
|
||||
"profileSampleType": {
|
||||
"$ref": "#/definitions/profileSampleType"
|
||||
},
|
||||
"samplingMethodType": {
|
||||
"$ref": "#/definitions/samplingMethodType"
|
||||
},
|
||||
"columnCount": {
|
||||
"description": "No.of columns in the table.",
|
||||
"type": "number"
|
||||
|
||||
@ -79,6 +79,10 @@
|
||||
"default": null,
|
||||
"title": "Profile Sample"
|
||||
},
|
||||
"samplingMethodType": {
|
||||
"$ref": "../entity/data/table.json#/definitions/samplingMethodType",
|
||||
"title": "Sampling Method Type"
|
||||
},
|
||||
"sampleDataCount": {
|
||||
"description": "Number of sample rows to ingest when 'Generate Sample Data' is enabled",
|
||||
"type": "integer",
|
||||
|
||||
@ -32,6 +32,10 @@
|
||||
"$ref": "../entity/data/table.json#/definitions/profileSampleType",
|
||||
"title": "Profile Sample Type"
|
||||
},
|
||||
"samplingMethodType": {
|
||||
"$ref": "../entity/data/table.json#/definitions/samplingMethodType",
|
||||
"title": "Sampling Method Type"
|
||||
},
|
||||
"testCases": {
|
||||
"description": "List of test cases to be executed on the entity. If null, all test cases will be executed.",
|
||||
"type": "array",
|
||||
|
||||
@ -108,6 +108,16 @@ $$section
|
||||
Percentage of data or number of rows to use when sampling tables to compute the profiler metrics. By default (i.e. if left blank), the profiler will run against the entire table.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Sampling Method Type $(id="samplingMethodType")
|
||||
|
||||
**This parameter is effective for Snowflake only**
|
||||
|
||||
The sampling method type can be set to **BERNOULLI** or **SYSTEM**. You can find the difference of two values in the document of the Snowflake. When you choice **BERNOULLI**, it will scan full rows in the table even though small value is set at the **Profile Sample**. However, it has less restlictions than **SYSTEM**.
|
||||
|
||||
If no option is choiced, the default is **BERNOULLI**.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### PII Inference Confidence Level $(id="confidence")
|
||||
Confidence level to use when infering whether a column shoul be flagged as PII or not (between 0 and 100). A number closer to 100 will yield less false positive but potentially more false negative.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user