Teddy 28bd01c471
MINOR: Remove default 100 when profileSample is None (#19672)
* fix: remove default 100% percent

* fix: use get_dataset

* fix: orm_profiler tests
2025-02-05 19:14:31 +01:00

202 lines
6.4 KiB
Python

from collections.abc import Callable
from copy import deepcopy
from dataclasses import dataclass
from typing import List
import pytest
from _openmetadata_testutils.dict import merge
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.generated.schema.entity.data.table import (
ColumnProfile,
Table,
TableProfile,
)
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.type.basic import Timestamp
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
@pytest.fixture(
scope="module",
params=[
pytest.param({}, id="no_changes"),
pytest.param(
{"source": {"sourceConfig": {"config": {"useStatistics": True}}}},
id="useStatistics=True",
),
],
)
def run_profiler(
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
profiler_config,
create_test_data,
request,
):
search_cache.clear()
profiler_config = deepcopy(profiler_config)
merge(request.param, profiler_config)
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(ProfilerWorkflow, profiler_config)
return profiler_config
@dataclass
class ProfilerTestParameters:
table_fqn: str
expected_table_profile: TableProfile
expected_column_profiles: List[ColumnProfile] = None
config_predicate: Callable[[DatabaseServiceProfilerPipeline], bool] = lambda x: True
@pytest.mark.parametrize(
"parameters",
[
ProfilerTestParameters(
"{database_service}.minio.my_schema.table",
TableProfile(timestamp=Timestamp(0), rowCount=3),
[
ColumnProfile(
name="three",
timestamp=Timestamp(0),
valuesCount=2,
nullCount=1,
)
],
),
ProfilerTestParameters(
"{database_service}.minio.my_schema.titanic",
TableProfile(timestamp=Timestamp(0), rowCount=891),
[
ColumnProfile(
name="name",
timestamp=Timestamp(0),
valuesCount=891,
nullCount=0,
)
],
),
ProfilerTestParameters(
"{database_service}.minio.my_schema.iris",
TableProfile(timestamp=Timestamp(0), rowCount=150),
[
ColumnProfile(
name="petal.length",
timestamp=Timestamp(0),
valuesCount=150,
nullCount=0,
)
],
),
ProfilerTestParameters(
"{database_service}.minio.my_schema.userdata",
TableProfile(timestamp=Timestamp(0), rowCount=1000),
[
ColumnProfile(
name="gender",
timestamp=Timestamp(0),
valuesCount=1000,
nullCount=0,
)
],
),
ProfilerTestParameters(
"{database_service}.minio.my_schema.empty",
TableProfile(timestamp=Timestamp(0), rowCount=0),
[
ColumnProfile(
name="a",
timestamp=Timestamp(0),
valuesCount=0,
nullCount=0,
)
],
lambda x: x.useStatistics == False,
),
ProfilerTestParameters(
"{database_service}.minio.my_schema.empty",
TableProfile(timestamp=Timestamp(0), rowCount=None),
[
ColumnProfile(
name="a",
timestamp=Timestamp(0),
valuesCount=0,
nullCount=0,
)
],
lambda x: x.useStatistics == True,
),
],
ids=lambda x: x.table_fqn.split(".")[-1],
)
def test_profiler(
run_profiler, metadata, db_service, parameters: ProfilerTestParameters
):
if not parameters.config_predicate(
DatabaseServiceProfilerPipeline.model_validate(
run_profiler["source"]["sourceConfig"]["config"]
)
):
pytest.skip(
"Skipping test becuase its not supported for this profiler configuation"
)
table: Table = metadata.get_latest_table_profile(
parameters.table_fqn.format(database_service=db_service.fullyQualifiedName.root)
)
assert_equal_pydantic_objects(
parameters.expected_table_profile,
# we dont want to validate the timestamp because it will be different for each run
table.profile.model_copy(
update={"timestamp": parameters.expected_table_profile.timestamp}
),
)
for profile in parameters.expected_column_profiles:
column = next(
(col for col in table.columns if col.profile.name == profile.name), None
)
if column is None:
raise AssertionError(
f"Column [{profile.name}] not found in table [{table.fullyQualifiedName.root}]"
)
assert_equal_pydantic_objects(
profile,
column.profile.model_copy(update={"timestamp": profile.timestamp}),
)
@pytest.mark.parametrize(
"parameters",
[
ProfilerTestParameters(
"{database_service}.minio.my_schema.empty",
TableProfile(timestamp=Timestamp(0), rowCount=None),
[],
lambda x: x.useStatistics == True,
),
],
ids=lambda x: x.table_fqn.split(".")[-1],
)
def test_no_statistics(
run_profiler, metadata, db_service, parameters: ProfilerTestParameters
):
if not parameters.config_predicate(
DatabaseServiceProfilerPipeline.model_validate(
run_profiler["source"]["sourceConfig"]["config"]
)
):
pytest.skip(
"Skipping test becuase its not supported for this profiler configuation"
)
table: Table = metadata.get_latest_table_profile(
parameters.table_fqn.format(database_service=db_service.fullyQualifiedName.root)
)
assert (
table.profile.rowCount is None
), "expected empty row count for a table with no collected statistics"