Issue 10805 profiler e2e mssql (#11096)

* feat: extracted builder logic into factory and cleaned up comments

* feat: added E3E logic for time based profiler partition + logic for testing detailed profiler results
This commit is contained in:
Teddy 2023-04-18 11:56:16 +02:00 committed by GitHub
parent 22ce62e13b
commit c415f04eb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 399 additions and 140 deletions

View File

@ -0,0 +1,152 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Config builder classes
"""
from copy import deepcopy
from ..e2e_types import E2EType
class BaseBuilder:
"""Base builder class to inherit by all builder classes"""
def __init__(self, config: dict, config_args: dict) -> None:
"""Base builder
Attributes:
config (dict): config dict from the yaml file
"""
self.config = deepcopy(config)
self.config_args = deepcopy(config_args) or {}
def build(self) -> dict:
"""build config"""
return self.config
class ProfilerConfigBuilder(BaseBuilder):
"""Builder class for the profiler config
Attributes:
profilerSample (int): sample size for the profiler
"""
# pylint: disable=invalid-name
def __init__(self, config: dict, config_args: dict) -> None:
super().__init__(config, config_args)
self.profilerSample = self.config_args.get("profilerSample", 100)
# pylint: enable=invalid-name
def build(self) -> dict:
"""build profiler config"""
del self.config["source"]["sourceConfig"]["config"]
self.config["source"]["sourceConfig"] = {
"config": {
"type": "Profiler",
"generateSampleData": True,
"profileSample": self.profilerSample,
}
}
self.config["processor"] = {"type": "orm-profiler", "config": {}}
return self.config
class SchemaConfigBuilder(BaseBuilder):
"""Builder for schema filter config"""
def build(self) -> dict:
self.config["source"]["sourceConfig"]["config"][
"schemaFilterPattern"
] = self.config_args
return self.config
class TableConfigBuilder(BaseBuilder):
"""Builder for table filter config"""
def build(self) -> dict:
self.config["source"]["sourceConfig"]["config"][
"tableFilterPattern"
] = self.config_args
return self.config
class MixConfigBuilder(BaseBuilder):
"""Builder for mix filter config (table and schema)"""
def build(self) -> dict:
schema_builder = SchemaConfigBuilder(self.config, self.config_args["schema"])
config = schema_builder.build()
table_builder = TableConfigBuilder(config, self.config_args["table"])
return table_builder.build()
class DashboardConfigBuilder(BaseBuilder):
"""Builder for dashboard filter config"""
def build(self) -> dict:
self.config["source"]["sourceConfig"]["config"][
"includeTags"
] = self.config_args["includeTags"]
self.config["source"]["sourceConfig"]["config"][
"includeDataModels"
] = self.config_args["includeDataModels"]
return self.config
class DashboardMixConfigBuilder(BaseBuilder):
"""Builder for dashboard mix filter config (table and schema)"""
def build(self) -> dict:
self.config["source"]["sourceConfig"]["config"][
"dashboardFilterPattern"
] = self.config_args["dashboards"]
self.config["source"]["sourceConfig"]["config"][
"chartFilterPattern"
] = self.config_args["charts"]
self.config["source"]["sourceConfig"]["config"][
"dataModelFilterPattern"
] = self.config_args["dataModels"]
return self.config
class ProfilerProcessorConfigBuilder(BaseBuilder):
"""Builder for profiler processor config"""
def build(self) -> dict:
profiler_builder = ProfilerConfigBuilder(self.config, self.config_args)
config = profiler_builder.build()
processor = self.config_args.get("processor")
if processor:
config.update(processor)
return config
def builder_factory(builder, config: dict, config_args: dict):
"""Factory method to return the builder class"""
builder_classes = {
E2EType.PROFILER.value: ProfilerConfigBuilder,
E2EType.INGEST_DB_FILTER_SCHEMA.value: SchemaConfigBuilder,
E2EType.INGEST_DB_FILTER_TABLE.value: TableConfigBuilder,
E2EType.INGEST_DB_FILTER_MIX.value: MixConfigBuilder,
E2EType.INGEST_DASHBOARD_FILTER_MIX.value: DashboardMixConfigBuilder,
E2EType.INGEST_DASHBOARD_NOT_INCLUDING.value: DashboardConfigBuilder,
E2EType.PROFILER_PROCESSOR.value: ProfilerProcessorConfigBuilder,
}
return builder_classes.get(builder, BaseBuilder)(config, config_args)

View File

@ -0,0 +1,31 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
E2E test types
"""
from enum import Enum
class E2EType(Enum):
"""
E2E Type Enum Class
"""
INGEST = "ingest"
PROFILER = "profiler"
PROFILER_PROCESSOR = "profiler-processor"
INGEST_DB_FILTER_SCHEMA = "ingest-db-filter-schema"
INGEST_DB_FILTER_TABLE = "ingest-db-filter-table"
INGEST_DB_FILTER_MIX = "ingest-db-filter-mix"
INGEST_DASHBOARD_FILTER_MIX = "ingest-dashboard-filter-mix"
INGEST_DASHBOARD_NOT_INCLUDING = "ingest-dashboard-not-including"

View File

@ -16,7 +16,6 @@ import os
import re
import subprocess
from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
import yaml
@ -28,25 +27,14 @@ from metadata.ingestion.api.workflow import Workflow
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import UTF_8
from .config_builders.builders import builder_factory
from .e2e_types import E2EType
PATH_TO_RESOURCES = os.path.dirname(Path(os.path.realpath(__file__)).parent)
REGEX_AUX = {"log": r"\s+\[[^]]+]\s+[A-Z]+\s+[^}]+}\s+-\s+"}
class E2EType(Enum):
"""
E2E Type Enum Class
"""
INGEST = "ingest"
PROFILER = "profiler"
INGEST_DB_FILTER_SCHEMA = "ingest-db-filter-schema"
INGEST_DB_FILTER_TABLE = "ingest-db-filter-table"
INGEST_DB_FILTER_MIX = "ingest-db-filter-mix"
INGEST_DASHBOARD_FILTER_MIX = "ingest-dashboard-filter-mix"
INGEST_DASHBOARD_NOT_INCLUDING = "ingest-dashboard-not-including"
class CliBase(ABC):
"""
CLI Base class
@ -128,52 +116,14 @@ class CliBase(ABC):
"""
Build yaml as per E2EType
"""
if test_type == E2EType.PROFILER:
del config_yaml["source"]["sourceConfig"]["config"]
config_yaml["source"]["sourceConfig"] = {
"config": {
"type": "Profiler",
"generateSampleData": True,
"profileSample": extra_args.get("profileSample", 1)
if extra_args
else 1,
}
}
config_yaml["processor"] = {"type": "orm-profiler", "config": {}}
if test_type == E2EType.INGEST_DB_FILTER_SCHEMA:
config_yaml["source"]["sourceConfig"]["config"][
"schemaFilterPattern"
] = extra_args
if test_type == E2EType.INGEST_DB_FILTER_TABLE:
config_yaml["source"]["sourceConfig"]["config"][
"tableFilterPattern"
] = extra_args
if test_type == E2EType.INGEST_DB_FILTER_MIX:
config_yaml["source"]["sourceConfig"]["config"][
"schemaFilterPattern"
] = extra_args["schema"]
config_yaml["source"]["sourceConfig"]["config"][
"tableFilterPattern"
] = extra_args["table"]
if test_type == E2EType.INGEST_DASHBOARD_FILTER_MIX:
config_yaml["source"]["sourceConfig"]["config"][
"dashboardFilterPattern"
] = extra_args["dashboards"]
config_yaml["source"]["sourceConfig"]["config"][
"chartFilterPattern"
] = extra_args["charts"]
config_yaml["source"]["sourceConfig"]["config"][
"dataModelFilterPattern"
] = extra_args["dataModels"]
if test_type == E2EType.INGEST_DASHBOARD_NOT_INCLUDING:
config_yaml["source"]["sourceConfig"]["config"]["includeTags"] = extra_args[
"includeTags"
]
config_yaml["source"]["sourceConfig"]["config"][
"includeDataModels"
] = extra_args["includeDataModels"]
return config_yaml
builder = builder_factory(
test_type.value,
config_yaml,
extra_args,
)
return builder.build()
@staticmethod
@abstractmethod

View File

@ -21,7 +21,8 @@ import pytest
from metadata.ingestion.api.sink import SinkStatus
from metadata.ingestion.api.source import SourceStatus
from .test_cli import CliBase, E2EType
from .e2e_types import E2EType
from .test_cli import CliBase
class CliDashboardBase(TestCase):

View File

@ -13,7 +13,7 @@
Test database connectors with CLI
"""
from abc import abstractmethod
from typing import List
from typing import List, Optional
from unittest import TestCase
import pytest
@ -22,7 +22,8 @@ from metadata.generated.schema.entity.data.table import Table
from metadata.ingestion.api.sink import SinkStatus
from metadata.ingestion.api.source import SourceStatus
from .test_cli import CliBase, E2EType
from .e2e_types import E2EType
from .test_cli import CliBase
class CliDBBase(TestCase):
@ -35,9 +36,9 @@ class CliDBBase(TestCase):
TestSuite class to define test structure
"""
# 1. deploy vanilla ingestion
@pytest.mark.order(1)
def test_vanilla_ingestion(self) -> None:
"""1. Deploy vanilla ingestion"""
# build config file for ingest
self.build_config_file(E2EType.INGEST)
# run ingest with new tables
@ -45,32 +46,38 @@ class CliDBBase(TestCase):
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_vanilla_ingestion(source_status, sink_status)
# 2. create a new table + deploy ingestion with views, sample data, and profiler
@pytest.mark.order(2)
def test_create_table_with_profiler(self) -> None:
# delete table in case it exists
"""2. create a new table + deploy ingestion with views, sample data, and profiler.
We will perform the following steps:
1. delete table in case it exists
2. create a table and a view
3. build config file for ingest
4. run ingest with new tables `self.run_command()` defaults to `ingestion`
5. build config file for profiler
6. run profiler with new tables
"""
self.delete_table_and_view()
# create a table and a view
self.create_table_and_view()
# build config file for ingest
self.build_config_file()
# run ingest with new tables
self.run_command()
# build config file for profiler
self.build_config_file(E2EType.PROFILER)
# run profiler with new tables
result = self.run_command("profile")
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_table_with_profiler(source_status, sink_status)
# 3. delete the new table + deploy marking tables as deleted
@pytest.mark.order(3)
def test_delete_table_is_marked_as_deleted(self) -> None:
# delete table created in previous test
"""3. delete the new table + deploy marking tables as deleted
We will perform the following steps:
1. delete table created in previous test
2. build config file for ingest
3. run ingest `self.run_command()` defaults to `ingestion`
"""
self.delete_table_and_view()
# build config file for ingest
self.build_config_file()
# run ingest
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
@ -78,62 +85,78 @@ class CliDBBase(TestCase):
source_status, sink_status
)
# 4. vanilla ingestion + include schema filter pattern
@pytest.mark.order(4)
def test_schema_filter_includes(self) -> None:
# build config file for ingest with filters
"""4. vanilla ingestion + include schema filter pattern
We will perform the following steps:
1. build config file for ingest with filters
2. run ingest `self.run_command()` defaults to `ingestion`
"""
self.build_config_file(
E2EType.INGEST_DB_FILTER_SCHEMA,
{"includes": self.get_includes_schemas()},
)
# run ingest
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_filtered_schemas_includes(source_status, sink_status)
# 5. vanilla ingestion + exclude schema filter pattern
@pytest.mark.order(5)
def test_schema_filter_excludes(self) -> None:
# build config file for ingest with filters
"""5. vanilla ingestion + exclude schema filter pattern
We will perform the following steps:
1. build config file for ingest with filters
2. run ingest `self.run_command()` defaults to `ingestion`
"""
self.build_config_file(
E2EType.INGEST_DB_FILTER_SCHEMA,
{"excludes": self.get_includes_schemas()},
)
# run ingest
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_filtered_schemas_excludes(source_status, sink_status)
# 6. Vanilla ingestion + include table filter pattern
@pytest.mark.order(6)
def test_table_filter_includes(self) -> None:
# build config file for ingest with filters
"""6. Vanilla ingestion + include table filter pattern
We will perform the following steps:
1. build config file for ingest with filters
2. run ingest `self.run_command()` defaults to `ingestion`
"""
self.build_config_file(
E2EType.INGEST_DB_FILTER_TABLE, {"includes": self.get_includes_tables()}
)
# run ingest
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_filtered_tables_includes(source_status, sink_status)
# 7. Vanilla ingestion + include table filter pattern
@pytest.mark.order(7)
def test_table_filter_excludes(self) -> None:
# build config file for ingest with filters
"""7. Vanilla ingestion + exclude table filter pattern
We will perform the following steps:
1. build config file for ingest with filters
2. run ingest `self.run_command()` defaults to `ingestion`
"""
self.build_config_file(
E2EType.INGEST_DB_FILTER_TABLE, {"excludes": self.get_includes_tables()}
)
# run ingest
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_filtered_tables_excludes(source_status, sink_status)
# 8. Vanilla ingestion mixing filters
@pytest.mark.order(8)
def test_table_filter_mix(self) -> None:
# build config file for ingest with filters
"""8. Vanilla ingestion + include schema filter pattern + exclude table filter pattern
We will perform the following steps:
1. build config file for ingest with filters
2. run ingest `self.run_command()` defaults to `ingestion`
"""
self.build_config_file(
E2EType.INGEST_DB_FILTER_MIX,
{
@ -144,22 +167,42 @@ class CliDBBase(TestCase):
},
},
)
# run ingest
result = self.run_command()
sink_status, source_status = self.retrieve_statuses(result)
self.assert_filtered_mix(source_status, sink_status)
# 9. Run usage
@pytest.mark.order(9)
def test_usage(self) -> None:
# to be implemented
pass
"""9. Run queries in the source (creates, inserts, views) and ingest metadata & Lineage
This test will need to be implemented on the database specific test classes
"""
# 10. Run queries in the source (creates, inserts, views) and ingest metadata & Lineage
@pytest.mark.order(10)
def test_lineage(self) -> None:
# to be implemented
pass
"""10. Run queries in the source (creates, inserts, views) and ingest metadata & Lineage
This test will need to be implemented on the database specific test classes
"""
@pytest.mark.order(11)
def test_profiler_with_time_partition(self) -> None:
"""11. Test time partitioning for the profiler"""
time_partition = self.get_profiler_time_partition()
if time_partition:
processor_config = self.get_profiler_processor_config(
self.get_profiler_time_partition()
)
self.build_config_file(
E2EType.PROFILER_PROCESSOR,
{"processor": processor_config},
)
result = self.run_command("profile")
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_table_with_profiler_time_partition(
source_status,
sink_status,
)
def retrieve_table(self, table_name_fqn: str) -> Table:
return self.openmetadata.get_by_name(entity=Table, fqn=table_name_fqn)
@ -170,6 +213,11 @@ class CliDBBase(TestCase):
)
return self.openmetadata.get_sample_data(table=table)
def retrieve_profile(self, table_fqn: str) -> Table:
table: Table = self.openmetadata.get_latest_table_profile(fqn=table_fqn)
return table
def retrieve_lineage(self, entity_fqn: str) -> dict:
return self.openmetadata.client.get(
f"/lineage/table/name/{entity_fqn}?upstreamDepth=3&downstreamDepth=3"
@ -200,6 +248,12 @@ class CliDBBase(TestCase):
):
raise NotImplementedError()
@abstractmethod
def assert_for_table_with_profiler_time_partition(
self, source_status: SourceStatus, sink_status: SinkStatus
):
raise NotImplementedError()
@abstractmethod
def assert_for_delete_table_is_marked_as_deleted(
self, source_status: SourceStatus, sink_status: SinkStatus
@ -251,6 +305,22 @@ class CliDBBase(TestCase):
def get_excludes_tables() -> List[str]:
raise NotImplementedError()
@staticmethod
def get_profiler_time_partition() -> Optional[dict]:
return None
@staticmethod
def get_profiler_time_partition_results() -> Optional[dict]:
return None
@staticmethod
def get_test_type() -> str:
return "database"
def get_profiler_processor_config(self, config: dict) -> dict:
return {
"processor": {
"type": "orm-profiler",
"config": {"tableConfig": [config]},
}
}

View File

@ -71,6 +71,46 @@ class CliCommonDB:
== self.view_column_lineage_count()
)
def assert_for_table_with_profiler_time_partition(
self, source_status: SourceStatus, sink_status: SinkStatus
):
self.assertTrue(len(source_status.failures) == 0)
self.assertTrue(len(source_status.records) > self.expected_tables())
self.assertTrue(len(sink_status.failures) == 0)
self.assertTrue(len(sink_status.records) > self.expected_tables())
sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData
profile = self.retrieve_profile(self.fqn_created_table())
expected_profiler_time_partition_results = (
self.get_profiler_time_partition_results()
)
if expected_profiler_time_partition_results:
table_profile = profile.profile.dict()
for key in expected_profiler_time_partition_results["table_profile"]:
self.assertTrue(
table_profile[key]
== expected_profiler_time_partition_results["table_profile"][
key
]
)
for column in profile.columns:
expected_column_profile = next(
(
profile.get(column.name.__root__)
for profile in expected_profiler_time_partition_results[
"column_profile"
]
if profile.get(column.name.__root__)
),
None,
)
if expected_column_profile:
column_profile = column.profile.dict()
for key in expected_column_profile: # type: ignore
self.assertTrue(
column_profile[key] == expected_column_profile[key]
)
def assert_for_delete_table_is_marked_as_deleted(
self, source_status: SourceStatus, sink_status: SinkStatus
):

View File

@ -18,7 +18,7 @@ from typing import List
import pytest
import yaml
from metadata.utils.constants import UTF_8
from metadata.generated.schema.entity.data.table import Histogram
from .common.test_cli_db import CliCommonDB
from .common_e2e_sqa_mixins import SQACommonMethods
@ -75,48 +75,6 @@ class MSSQLCliTest(CliCommonDB.TestSuite, SQACommonMethods):
def delete_table_and_view(self) -> None:
SQACommonMethods.delete_table_and_view(self)
@pytest.mark.order(9999)
def test_profiler_with_partition(self) -> None:
processor_config = {
"processor": {
"type": "orm-profiler",
"config": {
"tableConfig": [
{
"fullyQualifiedName": "mssql.e2e_cli_tests.dbo.persons",
"partitionConfig": {
"enablePartitioning": True,
"partitionColumnName": "birthdate",
"partitionIntervalType": "TIME-UNIT",
"partitionInterval": 30,
"partitionIntervalUnit": "YEAR",
},
}
]
},
}
}
with open(self.config_file_path, encoding=UTF_8) as config_file:
config_yaml = yaml.safe_load(config_file)
config_yaml["source"]["sourceConfig"] = {
"config": {
"type": "Profiler",
"generateSampleData": True,
"profileSample": 100,
}
}
config_yaml.update(processor_config)
with open(self.test_file_path, "w", encoding=UTF_8) as test_file:
yaml.dump(config_yaml, test_file)
result = self.run_command("profile")
sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData
assert len(sample_data.rows) == 3
@staticmethod
def expected_tables() -> int:
return 1
@ -131,6 +89,19 @@ class MSSQLCliTest(CliCommonDB.TestSuite, SQACommonMethods):
def fqn_created_table() -> str:
return "mssql.e2e_cli_tests.dbo.persons"
@staticmethod
def get_profiler_time_partition() -> dict:
return {
"fullyQualifiedName": "mssql.e2e_cli_tests.dbo.persons",
"partitionConfig": {
"enablePartitioning": True,
"partitionColumnName": "birthdate",
"partitionIntervalType": "TIME-UNIT",
"partitionInterval": 30,
"partitionIntervalUnit": "YEAR",
},
}
@staticmethod
def get_includes_schemas() -> List[str]:
return ["dbo"]
@ -162,3 +133,47 @@ class MSSQLCliTest(CliCommonDB.TestSuite, SQACommonMethods):
@staticmethod
def expected_filtered_mix() -> int:
return 14
@staticmethod
def get_profiler_time_partition_results() -> dict:
return {
"table_profile": {
"columnCount": 3.0,
"rowCount": 3.0,
},
"column_profile": [
{
"person_id": {
"distinctCount": 3.0,
"distinctProportion": 1.0,
"duplicateCount": None,
"firstQuartile": 2.1999999999999997,
"histogram": Histogram(
boundaries=["1.00 to 4.33", "4.33 and up"],
frequencies=[2, 1],
),
"interQuartileRange": 2.4,
"max": 5.0,
"maxLength": None,
"mean": 3.333333,
"median": 4.0,
"min": 1.0,
"minLength": None,
"missingCount": None,
"missingPercentage": None,
"nonParametricSkew": -0.3922324663925032,
"nullCount": 0.0,
"nullProportion": 0.0,
"stddev": 1.6996731711975948,
"sum": 10.0,
"thirdQuartile": 4.6,
"uniqueCount": 3.0,
"uniqueProportion": 1.0,
"validCount": None,
"valuesCount": 3.0,
"valuesPercentage": None,
"variance": None,
}
}
],
}

View File

@ -19,7 +19,7 @@ import pytest
from metadata.ingestion.api.sink import SinkStatus
from metadata.ingestion.api.source import SourceStatus
from .base.test_cli_db import E2EType
from .base.e2e_types import E2EType
from .common.test_cli_db import CliCommonDB