2025-04-03 10:39:47 +05:30
|
|
|
# Copyright 2025 Collate
|
|
|
|
# Licensed under the Collate Community License, Version 1.0 (the "License");
|
2023-07-12 17:02:32 +02:00
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
2025-04-03 10:39:47 +05:30
|
|
|
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
|
2023-07-12 17:02:32 +02:00
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
|
|
"""
|
|
|
|
Test Sample behavior
|
|
|
|
"""
|
|
|
|
import os
|
2024-11-19 08:10:45 +01:00
|
|
|
import sys
|
2023-07-12 17:02:32 +02:00
|
|
|
from unittest import TestCase, mock
|
2024-11-19 08:10:45 +01:00
|
|
|
from unittest.mock import Mock, patch
|
2023-07-12 17:02:32 +02:00
|
|
|
from uuid import uuid4
|
|
|
|
|
2023-07-14 09:12:46 +02:00
|
|
|
import pytest
|
2023-07-20 13:34:35 +05:30
|
|
|
from sqlalchemy import TEXT, Column, Integer, String
|
2023-07-12 17:02:32 +02:00
|
|
|
from sqlalchemy.orm import declarative_base
|
|
|
|
|
|
|
|
from metadata.generated.schema.entity.data.table import Column as EntityColumn
|
|
|
|
from metadata.generated.schema.entity.data.table import ColumnName, DataType, Table
|
2024-04-03 15:51:19 +05:30
|
|
|
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
|
|
|
|
DatalakeConnection,
|
|
|
|
)
|
|
|
|
from metadata.generated.schema.type.entityReference import EntityReference
|
2023-07-12 17:02:32 +02:00
|
|
|
from metadata.profiler.interface.pandas.profiler_interface import (
|
|
|
|
PandasProfilerInterface,
|
|
|
|
)
|
|
|
|
from metadata.profiler.metrics.registry import Metrics
|
|
|
|
from metadata.profiler.processor.core import Profiler
|
2024-11-19 08:10:45 +01:00
|
|
|
from metadata.sampler.models import SampleConfig
|
|
|
|
from metadata.sampler.pandas.sampler import DatalakeSampler
|
2023-07-12 17:02:32 +02:00
|
|
|
|
|
|
|
Base = declarative_base()
|
|
|
|
|
|
|
|
|
2024-11-19 08:10:45 +01:00
|
|
|
if sys.version_info < (3, 9):
|
|
|
|
pytest.skip(
|
|
|
|
"requires python 3.9+ due to incompatibility with object patch",
|
|
|
|
allow_module_level=True,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2023-07-12 17:02:32 +02:00
|
|
|
class User(Base):
|
|
|
|
__tablename__ = "users"
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
|
|
name = Column(String(256))
|
|
|
|
fullname = Column(String(256))
|
|
|
|
nickname = Column(String(256))
|
|
|
|
comments = Column(TEXT)
|
|
|
|
age = Column(Integer)
|
|
|
|
|
|
|
|
|
2024-06-20 08:38:21 +02:00
|
|
|
class FakeClient:
|
|
|
|
def __init__(self):
|
|
|
|
self._client = None
|
|
|
|
|
|
|
|
|
2023-07-12 17:02:32 +02:00
|
|
|
class FakeConnection:
|
2024-06-20 08:38:21 +02:00
|
|
|
def __init__(self):
|
|
|
|
self.client = FakeClient()
|
2023-07-12 17:02:32 +02:00
|
|
|
|
|
|
|
|
|
|
|
class DatalakeSampleTest(TestCase):
|
|
|
|
"""
|
|
|
|
Run checks on different metrics
|
|
|
|
"""
|
|
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
2023-09-13 15:15:49 +05:30
|
|
|
col_names = [
|
|
|
|
"name",
|
|
|
|
"fullname",
|
|
|
|
"nickname",
|
|
|
|
"comments",
|
|
|
|
"age",
|
|
|
|
"dob",
|
|
|
|
"tob",
|
|
|
|
"doe",
|
|
|
|
"json",
|
|
|
|
"array",
|
|
|
|
]
|
2023-07-12 17:02:32 +02:00
|
|
|
root_dir = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
csv_dir = "../custom_csv"
|
2023-09-13 15:15:49 +05:30
|
|
|
df1 = pd.read_csv(
|
|
|
|
os.path.join(root_dir, csv_dir, "test_datalake_metrics_1.csv"), names=col_names
|
|
|
|
)
|
|
|
|
df2 = pd.read_csv(
|
|
|
|
os.path.join(root_dir, csv_dir, "test_datalake_metrics_2.csv"), names=col_names
|
|
|
|
)
|
2023-07-12 17:02:32 +02:00
|
|
|
|
|
|
|
table_entity = Table(
|
|
|
|
id=uuid4(),
|
|
|
|
name="user",
|
2024-04-03 15:51:19 +05:30
|
|
|
databaseSchema=EntityReference(id=uuid4(), type="databaseSchema", name="name"),
|
|
|
|
fileFormat="csv",
|
2023-07-12 17:02:32 +02:00
|
|
|
columns=[
|
|
|
|
EntityColumn(
|
2024-06-05 21:18:37 +02:00
|
|
|
name=ColumnName("name"),
|
2023-07-12 17:02:32 +02:00
|
|
|
dataType=DataType.STRING,
|
|
|
|
),
|
|
|
|
EntityColumn(
|
2024-06-05 21:18:37 +02:00
|
|
|
name=ColumnName("fullname"),
|
2023-07-12 17:02:32 +02:00
|
|
|
dataType=DataType.STRING,
|
|
|
|
),
|
|
|
|
EntityColumn(
|
2024-06-05 21:18:37 +02:00
|
|
|
name=ColumnName("nickname"),
|
2023-07-12 17:02:32 +02:00
|
|
|
dataType=DataType.STRING,
|
|
|
|
),
|
|
|
|
EntityColumn(
|
2024-06-05 21:18:37 +02:00
|
|
|
name=ColumnName("comments"),
|
2023-07-12 17:02:32 +02:00
|
|
|
dataType=DataType.STRING,
|
|
|
|
),
|
|
|
|
EntityColumn(
|
2024-06-05 21:18:37 +02:00
|
|
|
name=ColumnName("age"),
|
2023-07-12 17:02:32 +02:00
|
|
|
dataType=DataType.INT,
|
|
|
|
),
|
2024-04-03 15:51:19 +05:30
|
|
|
EntityColumn(
|
2024-06-05 21:18:37 +02:00
|
|
|
name=ColumnName("dob"),
|
2024-04-03 15:51:19 +05:30
|
|
|
dataType=DataType.DATETIME,
|
|
|
|
),
|
|
|
|
EntityColumn(
|
2024-06-05 21:18:37 +02:00
|
|
|
name=ColumnName("tob"),
|
2024-04-03 15:51:19 +05:30
|
|
|
dataType=DataType.DATE,
|
|
|
|
),
|
|
|
|
EntityColumn(
|
2024-06-05 21:18:37 +02:00
|
|
|
name=ColumnName("doe"),
|
2024-04-03 15:51:19 +05:30
|
|
|
dataType=DataType.DATE,
|
|
|
|
),
|
|
|
|
EntityColumn(
|
2024-06-05 21:18:37 +02:00
|
|
|
name=ColumnName("json"),
|
2024-04-03 15:51:19 +05:30
|
|
|
dataType=DataType.JSON,
|
|
|
|
),
|
|
|
|
EntityColumn(
|
2024-06-05 21:18:37 +02:00
|
|
|
name=ColumnName("array"),
|
2024-04-03 15:51:19 +05:30
|
|
|
dataType=DataType.ARRAY,
|
|
|
|
),
|
2023-07-12 17:02:32 +02:00
|
|
|
],
|
|
|
|
)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
@mock.patch(
|
2024-06-12 11:40:30 +05:30
|
|
|
"metadata.profiler.interface.profiler_interface.get_ssl_connection",
|
2024-06-20 08:38:21 +02:00
|
|
|
return_value=FakeConnection(),
|
2023-07-12 17:02:32 +02:00
|
|
|
)
|
2024-04-03 15:51:19 +05:30
|
|
|
@mock.patch(
|
2024-11-19 08:10:45 +01:00
|
|
|
"metadata.sampler.sampler_interface.get_ssl_connection",
|
|
|
|
return_value=FakeConnection(),
|
2023-07-12 17:02:32 +02:00
|
|
|
)
|
2024-11-19 08:10:45 +01:00
|
|
|
def setUpClass(cls, mock_get_connection, mock_sample_get_connection) -> None:
|
2023-07-12 17:02:32 +02:00
|
|
|
"""
|
|
|
|
Prepare Ingredients
|
|
|
|
"""
|
2024-11-19 08:10:45 +01:00
|
|
|
with (
|
|
|
|
patch.object(
|
2024-11-27 08:50:54 +01:00
|
|
|
DatalakeSampler, "raw_dataset", new_callable=lambda: [cls.df1, cls.df2]
|
2024-11-19 08:10:45 +01:00
|
|
|
),
|
|
|
|
patch.object(DatalakeSampler, "get_client", return_value=Mock()),
|
|
|
|
):
|
|
|
|
sampler = DatalakeSampler(
|
|
|
|
service_connection_config=DatalakeConnection(configSource={}),
|
|
|
|
ometa_client=None,
|
|
|
|
entity=cls.table_entity,
|
2025-02-04 10:40:40 +01:00
|
|
|
sample_config=SampleConfig(profileSample=50.0),
|
2024-11-19 08:10:45 +01:00
|
|
|
)
|
|
|
|
cls.datalake_profiler_interface = PandasProfilerInterface(
|
|
|
|
service_connection_config=DatalakeConnection(configSource={}),
|
|
|
|
ometa_client=None,
|
|
|
|
entity=cls.table_entity,
|
|
|
|
source_config=None,
|
|
|
|
sampler=sampler,
|
|
|
|
thread_count=None,
|
|
|
|
)
|
2023-07-12 17:02:32 +02:00
|
|
|
|
2024-11-19 08:10:45 +01:00
|
|
|
@mock.patch(
|
|
|
|
"metadata.sampler.sampler_interface.get_ssl_connection",
|
|
|
|
return_value=FakeConnection(),
|
|
|
|
)
|
|
|
|
def test_random_sampler(self, _):
|
2023-07-12 17:02:32 +02:00
|
|
|
"""
|
|
|
|
The random sampler should be able to
|
|
|
|
generate a random subset of data
|
|
|
|
"""
|
2024-11-19 08:10:45 +01:00
|
|
|
with (
|
|
|
|
patch.object(
|
2024-11-27 08:50:54 +01:00
|
|
|
DatalakeSampler,
|
|
|
|
"raw_dataset",
|
|
|
|
new_callable=lambda: [self.df1, self.df2],
|
2024-11-19 08:10:45 +01:00
|
|
|
),
|
|
|
|
patch.object(DatalakeSampler, "get_client", return_value=Mock()),
|
|
|
|
):
|
|
|
|
sampler = DatalakeSampler(
|
|
|
|
service_connection_config=DatalakeConnection(configSource={}),
|
|
|
|
ometa_client=None,
|
|
|
|
entity=self.table_entity,
|
2025-02-04 10:40:40 +01:00
|
|
|
sample_config=SampleConfig(profileSample=50.0),
|
2024-11-19 08:10:45 +01:00
|
|
|
)
|
2024-11-27 08:50:54 +01:00
|
|
|
random_sample = sampler.get_dataset()
|
2024-11-19 08:10:45 +01:00
|
|
|
res = sum(len(r) for r in random_sample)
|
|
|
|
assert res < 5
|
2023-07-12 17:02:32 +02:00
|
|
|
|
|
|
|
@mock.patch(
|
2024-06-12 11:40:30 +05:30
|
|
|
"metadata.profiler.interface.profiler_interface.get_ssl_connection",
|
2024-06-20 08:38:21 +02:00
|
|
|
return_value=FakeConnection(),
|
2023-07-12 17:02:32 +02:00
|
|
|
)
|
2024-11-19 08:10:45 +01:00
|
|
|
@mock.patch(
|
|
|
|
"metadata.sampler.sampler_interface.get_ssl_connection",
|
|
|
|
return_value=FakeConnection(),
|
|
|
|
)
|
2024-04-03 15:51:19 +05:30
|
|
|
@mock.patch(
|
|
|
|
"metadata.mixins.pandas.pandas_mixin.fetch_dataframe",
|
2023-09-13 15:15:49 +05:30
|
|
|
return_value=[df1, pd.concat([df2, pd.DataFrame(index=df1.index)])],
|
2023-07-12 17:02:32 +02:00
|
|
|
)
|
2024-11-19 08:10:45 +01:00
|
|
|
def test_sample_property(self, *_):
|
2023-07-12 17:02:32 +02:00
|
|
|
"""
|
|
|
|
Sample property should be properly generated
|
|
|
|
"""
|
2024-11-19 08:10:45 +01:00
|
|
|
with (
|
|
|
|
patch.object(
|
2024-11-27 08:50:54 +01:00
|
|
|
DatalakeSampler,
|
|
|
|
"raw_dataset",
|
|
|
|
new_callable=lambda: [self.df1, self.df2],
|
2024-11-19 08:10:45 +01:00
|
|
|
),
|
|
|
|
patch.object(DatalakeSampler, "get_client", return_value=Mock()),
|
|
|
|
):
|
|
|
|
sampler = DatalakeSampler(
|
|
|
|
service_connection_config=DatalakeConnection(configSource={}),
|
|
|
|
ometa_client=None,
|
|
|
|
entity=self.table_entity,
|
2025-02-04 10:40:40 +01:00
|
|
|
sample_config=SampleConfig(profileSample=50.0),
|
2024-11-19 08:10:45 +01:00
|
|
|
)
|
|
|
|
datalake_profiler_interface = PandasProfilerInterface(
|
|
|
|
service_connection_config=DatalakeConnection(configSource={}),
|
|
|
|
ometa_client=None,
|
|
|
|
entity=self.table_entity,
|
|
|
|
source_config=None,
|
|
|
|
sampler=sampler,
|
|
|
|
thread_count=None,
|
|
|
|
)
|
2023-07-12 17:02:32 +02:00
|
|
|
|
2024-11-27 08:50:54 +01:00
|
|
|
random_sample = datalake_profiler_interface.sampler.get_dataset()
|
2024-11-19 08:10:45 +01:00
|
|
|
res = sum(len(r) for r in random_sample)
|
|
|
|
assert res < 5
|
2023-07-12 17:02:32 +02:00
|
|
|
|
|
|
|
def test_table_row_count(self):
|
|
|
|
"""
|
|
|
|
Profile sample should be ignored in row count
|
|
|
|
"""
|
|
|
|
|
|
|
|
table_count = Metrics.ROW_COUNT.value
|
|
|
|
profiler = Profiler(
|
|
|
|
table_count,
|
|
|
|
profiler_interface=self.datalake_profiler_interface,
|
|
|
|
)
|
|
|
|
res = profiler.compute_metrics()._table_results
|
2024-11-19 08:10:45 +01:00
|
|
|
# We expect the full count of the table
|
2024-11-27 08:50:54 +01:00
|
|
|
assert res.get(Metrics.ROW_COUNT.name) == 2
|
2023-07-12 17:02:32 +02:00
|
|
|
|
2023-07-14 09:12:46 +02:00
|
|
|
@pytest.mark.skip(reason="Flaky test due to small sample size")
|
2023-07-12 17:02:32 +02:00
|
|
|
def test_random_sample_histogram(self):
|
|
|
|
"""
|
|
|
|
Histogram should run correctly
|
|
|
|
"""
|
|
|
|
hist = Metrics.HISTOGRAM.value
|
|
|
|
count = Metrics.COUNT.value
|
|
|
|
min = Metrics.MIN.value
|
|
|
|
max = Metrics.MAX.value
|
|
|
|
first_quartile = Metrics.FIRST_QUARTILE.value
|
|
|
|
third_quartile = Metrics.THIRD_QUARTILE.value
|
|
|
|
iqr = Metrics.IQR.value
|
|
|
|
|
|
|
|
profiler = Profiler(
|
|
|
|
hist,
|
|
|
|
count,
|
|
|
|
min,
|
|
|
|
max,
|
|
|
|
first_quartile,
|
|
|
|
third_quartile,
|
|
|
|
iqr,
|
|
|
|
profiler_interface=self.datalake_profiler_interface,
|
|
|
|
)
|
|
|
|
res = profiler.compute_metrics()._column_results
|
|
|
|
|
|
|
|
# The sum of all frequencies should be sampled
|
|
|
|
assert sum(res.get(User.age.name)[Metrics.HISTOGRAM.name]["frequencies"]) < 30
|
|
|
|
|
2024-11-19 08:10:45 +01:00
|
|
|
@mock.patch(
|
|
|
|
"metadata.sampler.sampler_interface.get_ssl_connection",
|
|
|
|
return_value=FakeConnection(),
|
|
|
|
)
|
|
|
|
def test_sample_data(self, *_):
|
2023-07-12 17:02:32 +02:00
|
|
|
"""
|
|
|
|
We should be able to pick up sample data from the sampler
|
|
|
|
"""
|
2024-11-19 08:10:45 +01:00
|
|
|
with (
|
|
|
|
patch.object(
|
2024-11-27 08:50:54 +01:00
|
|
|
DatalakeSampler,
|
|
|
|
"raw_dataset",
|
|
|
|
new_callable=lambda: [self.df1, self.df2],
|
2024-11-19 08:10:45 +01:00
|
|
|
),
|
|
|
|
patch.object(DatalakeSampler, "get_client", return_value=Mock()),
|
|
|
|
):
|
|
|
|
sampler = DatalakeSampler(
|
|
|
|
service_connection_config=DatalakeConnection(configSource={}),
|
|
|
|
ometa_client=None,
|
|
|
|
entity=self.table_entity,
|
2025-02-04 10:40:40 +01:00
|
|
|
sample_config=SampleConfig(profileSample=50.0),
|
2024-11-19 08:10:45 +01:00
|
|
|
)
|
|
|
|
sample_data = sampler.fetch_sample_data()
|
2023-07-12 17:02:32 +02:00
|
|
|
|
2024-11-19 08:10:45 +01:00
|
|
|
assert len(sample_data.columns) == 10
|
|
|
|
# we drop na values when fecthing sample data
|
|
|
|
assert len(sample_data.rows) == 4
|
2023-07-12 17:02:32 +02:00
|
|
|
|
2024-11-19 08:10:45 +01:00
|
|
|
@mock.patch(
|
|
|
|
"metadata.sampler.sampler_interface.get_ssl_connection",
|
|
|
|
return_value=FakeConnection(),
|
|
|
|
)
|
|
|
|
def test_sample_from_user_query(self, *_):
|
2023-07-12 17:02:32 +02:00
|
|
|
"""
|
|
|
|
Test sample data are returned based on user query
|
|
|
|
"""
|
2024-11-19 08:10:45 +01:00
|
|
|
with (
|
|
|
|
patch.object(
|
2024-11-27 08:50:54 +01:00
|
|
|
DatalakeSampler,
|
|
|
|
"raw_dataset",
|
|
|
|
new_callable=lambda: [self.df1, self.df2],
|
2024-11-19 08:10:45 +01:00
|
|
|
),
|
|
|
|
patch.object(DatalakeSampler, "get_client", return_value=Mock()),
|
|
|
|
):
|
|
|
|
sampler = DatalakeSampler(
|
|
|
|
service_connection_config=DatalakeConnection(configSource={}),
|
|
|
|
ometa_client=None,
|
|
|
|
entity=self.table_entity,
|
2025-02-04 10:40:40 +01:00
|
|
|
default_sample_config=SampleConfig(profileSample=50.0),
|
2024-11-19 08:10:45 +01:00
|
|
|
sample_query="`age` > 30",
|
|
|
|
)
|
|
|
|
sample_data = sampler.fetch_sample_data()
|
2023-07-12 17:02:32 +02:00
|
|
|
|
2024-11-19 08:10:45 +01:00
|
|
|
assert len(sample_data.columns) == 10
|
|
|
|
assert len(sample_data.rows) == 3
|