Fixes #10727 (& other minor improvements) (#10856)

* fix: logic for test suite config workflow

* fix: added caching for system metrics (snflk and bq)

* fix: linting

* fix: added tearDown logic for tests suite/case
This commit is contained in:
Teddy 2023-03-31 16:57:53 +02:00 committed by GitHub
parent 6d24455738
commit ecffd5ffc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 377 additions and 47 deletions

View File

@ -40,6 +40,8 @@ DML_OPERATION_MAP = {
"DELETE": "DELETE",
}
SYSTEM_QUERY_RESULT_CACHE = {}
@valuedispatch
def get_system_metrics_for_dialect(
@ -114,16 +116,21 @@ def _(
"query_type,timestamp,destination_table,dml_statistics",
)
cursor_jobs = session.execute(text(jobs))
rows_jobs = [
QueryResult(
row.statement_type,
row.start_time,
row.destination_table,
row.dml_statistics,
)
for row in cursor_jobs.fetchall()
]
try:
# we'll try to get the cached data first
rows_jobs = kwargs["cache"][Dialects.BigQuery]["rows_jobs"]
except KeyError:
cursor_jobs = session.execute(text(jobs))
rows_jobs = [
QueryResult(
row.statement_type,
row.start_time,
row.destination_table,
row.dml_statistics,
)
for row in cursor_jobs.fetchall()
]
SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery] = {"rows_jobs": rows_jobs}
for row_jobs in rows_jobs:
if (
@ -204,7 +211,7 @@ def _(
sti."schema" = '{table.__table_args__["schema"]}' AND
sti."table" = '{table.__tablename__}' AND
"rows" != 0 AND
DATE(starttime) = CURRENT_DATE - 1
DATE(starttime) >= CURRENT_DATE - 1
GROUP BY 2,3,4,5,6
ORDER BY 6 desc
"""
@ -228,7 +235,7 @@ def _(
sti."schema" = '{table.__table_args__["schema"]}' AND
sti."table" = '{table.__tablename__}' AND
"rows" != 0 AND
DATE(starttime) = CURRENT_DATE - 1
DATE(starttime) >= CURRENT_DATE - 1
GROUP BY 2,3,4,5,6
ORDER BY 6 desc
"""
@ -352,23 +359,27 @@ def _(
"query_id,database_name,schema_name,query_text,query_type,timestamp",
)
rows = []
# limit of results is 10K. We'll query range of 1 hours to make sure we
# get all the necessary data.
rows = session.execute(text(information_schema_query_history)).fetchall()
query_results = [
QueryResult(
row.query_id,
row.database_name.lower() if row.database_name else None,
row.schema_name.lower() if row.schema_name else None,
sqlparse.parse(row.query_text)[0],
row.query_type,
row.start_time,
)
for row in rows
]
try:
# we'll try to get the cached data first
rows = kwargs["cache"][Dialects.Snowflake]["rows"]
query_results = kwargs["cache"][Dialects.Snowflake]["query_results"]
except KeyError:
rows = session.execute(text(information_schema_query_history)).fetchall()
query_results = [
QueryResult(
row.query_id,
row.database_name.lower() if row.database_name else None,
row.schema_name.lower() if row.schema_name else None,
sqlparse.parse(row.query_text)[0],
row.query_type,
row.start_time,
)
for row in rows
]
SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake] = {
"rows": rows,
"query_results": query_results,
}
for query_result in query_results:
query_text = query_result.query_text
@ -459,6 +470,7 @@ class System(SystemMetric):
session=session,
table=self.table,
conn_config=conn_config,
cache=SYSTEM_QUERY_RESULT_CACHE,
)
return system_metrics

View File

@ -396,7 +396,7 @@ class Profiler(Generic[TMetric]):
column,
self.table,
)
for column in self.columns
for column in columns
for metric in self.get_col_metrics(self.query_metrics, column)
],
*[
@ -410,7 +410,7 @@ class Profiler(Generic[TMetric]):
column,
self.table,
)
for column in self.columns
for column in columns
],
]

View File

@ -21,7 +21,7 @@ from copy import deepcopy
from logging import Logger
from typing import List, Optional, Set, Tuple
from pydantic import ValidationError
from pydantic import BaseModel, ValidationError
from sqlalchemy import MetaData
from metadata.config.common import WorkflowExecutionError
@ -58,7 +58,11 @@ from metadata.interfaces.datalake.datalake_test_suite_interface import (
)
from metadata.interfaces.sqalchemy.sqa_test_suite_interface import SQATestSuiteInterface
from metadata.profiler.api.models import ProfileSampleConfig
from metadata.test_suite.api.models import TestCaseDefinition, TestSuiteProcessorConfig
from metadata.test_suite.api.models import (
TestCaseDefinition,
TestSuiteDefinition,
TestSuiteProcessorConfig,
)
from metadata.test_suite.runner.core import DataTestsRunner
from metadata.utils import entity_link
from metadata.utils.importer import get_sink
@ -70,6 +74,22 @@ from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin
logger: Logger = test_suite_logger()
class TestCaseToCreate(BaseModel):
"""Test case to create"""
test_suite_name: str
test_case_name: str
entity_link: str
def __hash__(self):
"""make this base model hashable on unique_name"""
return hash(f"{self.test_suite_name}.{self.test_case_name}")
def __str__(self) -> str:
"""make this base model printable"""
return f"{self.test_suite_name}.{self.test_case_name}"
class TestSuiteWorkflow(WorkflowStatusMixin):
"""workflow to run the test suite"""
@ -339,7 +359,9 @@ class TestSuiteWorkflow(WorkflowStatusMixin):
return test_cases_entity
def get_test_case_from_cli_config(self) -> List[str]:
def get_test_case_from_cli_config(
self,
) -> List[Tuple[TestCaseDefinition, TestSuiteDefinition]]:
"""Get all the test cases names defined in the CLI config file"""
return [
(test_case, test_suite)
@ -347,9 +369,66 @@ class TestSuiteWorkflow(WorkflowStatusMixin):
for test_case in test_suite.testCases
]
def get_unique_test_case_name_in_config(self, test_cases_in_config: set) -> set:
"""Get unique test case names in config. If a test case is created for the same entity
with the same name in different test suites, we only create one test case in the platform.
The other ones will be skipped.
Args:
test_cases_in_config (set): set of test cases in config
Returns:
set: set of unique test case names in config
"""
seen = []
unique_test_case = []
for test_case in test_cases_in_config:
unique_test_case_name_in_entity = (
f"{test_case.test_case_name}/{test_case.entity_link}"
)
if unique_test_case_name_in_entity not in seen:
seen.append(unique_test_case_name_in_entity)
unique_test_case.append(test_case)
continue
logger.info(
f"Test case {test_case.test_case_name} for entity {test_case.entity_link}"
" was already defined in your profiler config file. Skipping it."
)
return set(unique_test_case)
def test_case_name_exists(self, test_case: TestCaseToCreate) -> bool:
"""Check if a test case name already exists in the platform
for the same entity.
Args:
other (set): a set of platform test cases
Returns:
Optional["TestCaseToCreate"]
"""
try:
entity_fqn = entity_link.get_table_or_column_fqn(test_case.entity_link)
except ValueError as exc:
logger.debug(traceback.format_exc())
logger.error(f"Failed to get entity fqn: {exc}")
# we'll assume that the test case name is not unique
return True
test_case_fqn = f"{entity_fqn}.{test_case.test_case_name}"
test_case = self.metadata.get_by_name(
entity=TestCase,
fqn=test_case_fqn,
fields=["testDefinition", "testSuite"],
)
if not test_case:
return False
return True
def compare_and_create_test_cases(
self,
cli_config_test_cases_def: List[Tuple[TestCaseDefinition, TestSuite]],
cli_config_test_cases_def: List[Tuple[TestCaseDefinition, TestSuiteDefinition]],
test_cases: List[TestCase],
) -> Optional[List[TestCase]]:
"""
@ -360,9 +439,34 @@ class TestSuiteWorkflow(WorkflowStatusMixin):
cli_config_test_case_name: test cases defined in CLI workflow associated with its test suite
test_cases: list of test cases entities fetch from the server using test suite names in the config file
"""
test_case_names_to_create = {
test_case_def[0].name for test_case_def in cli_config_test_cases_def
} - {test_case.name.__root__ for test_case in test_cases}
cli_test_cases = {
TestCaseToCreate(
test_suite_name=test_case_def[1].name,
test_case_name=test_case_def[0].name,
entity_link=test_case_def[0].entityLink.__root__,
)
for test_case_def in cli_config_test_cases_def
}
platform_test_cases = {
TestCaseToCreate(
test_suite_name=test_case.testSuite.name,
test_case_name=test_case.name.__root__,
entity_link=test_case.entityLink.__root__,
)
for test_case in test_cases
}
# we'll check the test cases defined in the CLI config file and not present in the platform
unique_test_cases_across_test_suites = cli_test_cases - platform_test_cases
# we'll check if we are creating a test case for an entity that already has the same name
unique_test_case_across_entities = {
test_case
for test_case in unique_test_cases_across_test_suites
if not self.test_case_name_exists(test_case)
}
test_case_names_to_create = self.get_unique_test_case_name_in_config(
unique_test_case_across_entities
)
if not test_case_names_to_create:
return None
@ -374,7 +478,14 @@ class TestSuiteWorkflow(WorkflowStatusMixin):
(
cli_config_test_case_def
for cli_config_test_case_def in cli_config_test_cases_def
if cli_config_test_case_def[0].name == test_case_name_to_create
if (
cli_config_test_case_def[0].name
== test_case_name_to_create.test_case_name
and cli_config_test_case_def[0].entityLink.__root__
== test_case_name_to_create.entity_link
and cli_config_test_case_def[1].name
== test_case_name_to_create.test_suite_name
)
),
(None, None),
)

View File

@ -68,6 +68,24 @@ def get_table_fqn(entity_link: str) -> str:
return entity_link.split("::")[2]
def get_table_or_column_fqn(entity_link: str) -> str:
"""From an entity link get the column fqn
Args:
entity_link: entity link
"""
split_entity_link = split(entity_link)
if len(split_entity_link) == 2:
return split_entity_link[1]
if len(split_entity_link) == 4 and split_entity_link[2] == "columns":
return f"{split_entity_link[1]}.{split_entity_link[3]}"
raise ValueError(
"Invalid entity link."
" {split_entity_link} does not look like a table or a column entity link"
)
def get_entity_link(table_fqn: str, column_name: Optional[str]) -> str:
"""From table fqn and column name get the entity_link

View File

@ -66,13 +66,6 @@ class OMetaTestSuiteTest(TestCase):
assert metadata.health_check()
test_suite: TestSuite = metadata.create_or_update(
CreateTestSuiteRequest(
name="testSuiteForIntegrationTest",
description="This is a test suite for the integration tests",
)
)
test_definition = metadata.create_or_update(
CreateTestDefinitionRequest(
name="testDefinitionForIntegration",
@ -87,6 +80,13 @@ class OMetaTestSuiteTest(TestCase):
def setUpClass(cls) -> None:
"""set up tests"""
cls.test_suite: TestSuite = cls.metadata.create_or_update(
CreateTestSuiteRequest(
name="testSuiteForIntegrationTest",
description="This is a test suite for the integration tests",
)
)
cls.metadata.create_or_update(
CreateTestCaseRequest(
name="testCaseForIntegration",

View File

@ -58,6 +58,28 @@ class TestSuiteWorkflowTests(unittest.TestCase):
)
)
test_case_ids = []
test_suite_ids = []
def tearDown(self) -> None:
for test_case_id in self.test_case_ids:
self.metadata.delete(
entity=TestCase,
entity_id=test_case_id,
recursive=True,
hard_delete=True,
)
for test_suite_id in self.test_suite_ids:
self.metadata.delete(
entity=TestSuite,
entity_id=test_suite_id,
recursive=True,
hard_delete=True,
)
self.test_case_ids = []
self.test_suite_ids = []
def test_create_workflow_object(self):
"""Test workflow object is correctly instantiated"""
TestSuiteWorkflow.create(test_suite_config)
@ -101,6 +123,7 @@ class TestSuiteWorkflowTests(unittest.TestCase):
test_suite = self.metadata.get_by_name(entity=TestSuite, fqn="my_test_suite")
assert workflow_test_suite[0].id == test_suite.id
self.test_suite_ids = [test_suite.id]
def test_get_test_suite_entity_for_ui_workflow(self):
"""test we can correctly retrieve a test suite"""
@ -316,6 +339,153 @@ class TestSuiteWorkflowTests(unittest.TestCase):
assert not created_test_case
def test_compare_and_create_test_cases_same_test_name_diff_test_suite(self):
"""Test function creates the correct test case if they don't exists when
test case name is the same but test suite and entity is different
"""
_test_suite_config = deepcopy(test_suite_config)
processor = {
"processor": {
"type": "orm-test-runner",
"config": {
"testSuites": [
{
"name": "another_test_suite",
"testCases": [
{
"name": "table_column_count_between",
"testDefinitionName": "TableColumnCountToBeBetween",
"entityLink": "<#E::table::sample_data.ecommerce_db.shopify.dim_address_clean>",
"parameterValues": [
{"name": "minColValue", "value": 1},
{"name": "maxColValue", "value": 15},
],
},
],
},
{
"name": "new_test_suite",
"testCases": [
{
"name": "table_column_count_between",
"testDefinitionName": "TableColumnCountToBeBetween",
"entityLink": "<#E::table::sample_data.ecommerce_db.shopify.dim_address_clean>",
"parameterValues": [
{"name": "minColValue", "value": 1},
{"name": "maxColValue", "value": 5},
],
},
],
},
],
},
}
}
_test_suite_config.update(processor)
workflow = TestSuiteWorkflow.create(_test_suite_config)
assert not self.metadata.get_by_name(
entity=TestCase,
fqn="sample_data.ecommerce_db.shopify.dim_address_clean.table_column_count_between",
)
test_suite = workflow.get_or_create_test_suite_entity_for_cli_workflow()
test_cases = workflow.get_test_cases_from_test_suite(test_suite)
config_test_cases_def = workflow.get_test_case_from_cli_config()
created_workflow = workflow.compare_and_create_test_cases(
config_test_cases_def, test_cases
)
my_test_case = self.metadata.get_by_name(
entity=TestCase,
fqn="sample_data.ecommerce_db.shopify.dim_address_clean.table_column_count_between",
fields=["testDefinition", "testSuite"],
)
assert len(created_workflow) == 1
another_test_suite = self.metadata.get_by_name(
entity=TestSuite,
fqn="another_test_suite",
)
new_test_suite = self.metadata.get_by_name(
entity=TestSuite,
fqn="new_test_suite",
)
self.test_suite_ids = [new_test_suite.id, another_test_suite.id]
def test_compare_and_create_test_cases_same_test_name_same_test_suite(self):
"""Test function creates the correct test case if they don't exists when
test case name is the same but test suite and entity is different
"""
_test_suite_config = deepcopy(test_suite_config)
processor = {
"processor": {
"type": "orm-test-runner",
"config": {
"testSuites": [
{
"name": "critical_metrics_suite",
"testCases": [
{
"name": "table_column_count_between",
"testDefinitionName": "TableColumnCountToBeBetween",
"entityLink": "<#E::table::sample_data.ecommerce_db.shopify.dim_address>",
"parameterValues": [
{"name": "minColValue", "value": 1},
{"name": "maxColValue", "value": 30},
],
},
{
"name": "table_column_count_between",
"testDefinitionName": "TableColumnCountToBeBetween",
"entityLink": "<#E::table::sample_data.ecommerce_db.shopify.dim_customer>",
"parameterValues": [
{"name": "minColValue", "value": 1},
{"name": "maxColValue", "value": 5},
],
},
],
},
],
},
}
}
_test_suite_config.update(processor)
workflow = TestSuiteWorkflow.create(_test_suite_config)
assert not self.metadata.get_by_name(
entity=TestCase,
fqn="sample_data.ecommerce_db.shopify.dim_customer.table_column_count_between",
)
assert self.metadata.get_by_name(
entity=TestCase,
fqn="sample_data.ecommerce_db.shopify.dim_address.table_column_count_between",
fields=["testDefinition", "testSuite"],
)
test_suite = workflow.get_or_create_test_suite_entity_for_cli_workflow()
test_cases = workflow.get_test_cases_from_test_suite(test_suite)
config_test_cases_def = workflow.get_test_case_from_cli_config()
created_test_case = workflow.compare_and_create_test_cases(
config_test_cases_def, test_cases
)
dim_customer_test_case = self.metadata.get_by_name(
entity=TestCase,
fqn="sample_data.ecommerce_db.shopify.dim_customer.table_column_count_between",
fields=["testDefinition", "testSuite"],
)
assert len(created_test_case) == 1
assert created_test_case[0].name.__root__ == "table_column_count_between"
assert created_test_case[0].testSuite.name == "critical_metrics_suite"
self.test_case_ids = [dim_customer_test_case.id]
def test_get_service_connection_from_test_case(self):
"""test get service connection returns correct info"""
workflow = TestSuiteWorkflow.create(test_suite_config)

View File

@ -15,7 +15,7 @@ test entity link utils
import pytest
from metadata.utils.entity_link import get_decoded_column
from metadata.utils.entity_link import get_decoded_column, get_table_or_column_fqn
@pytest.mark.parametrize(
@ -34,3 +34,22 @@ from metadata.utils.entity_link import get_decoded_column
def test_get_decoded_column(entity_link, expected):
"""test get_decoded_column return expected values"""
assert get_decoded_column(entity_link) == expected
def test_get_table_or_column_fqn():
entity_link = "<#E::table::rds.dev.dbt_jaffle.customers::columns::number_of_orders>"
assert (
get_table_or_column_fqn(entity_link)
== "rds.dev.dbt_jaffle.customers.number_of_orders"
)
invalid_entity_link = (
"<#E::table::rds.dev.dbt_jaffle.customers::foo::number_of_orders>"
)
with pytest.raises(ValueError):
get_table_or_column_fqn(invalid_entity_link)
invalid_entity_link = "<#E::table::rds.dev.dbt_jaffle.customers>"
assert (
get_table_or_column_fqn(invalid_entity_link) == "rds.dev.dbt_jaffle.customers"
)