From ecffd5ffc73adb772bca80fa07bba0e8e6fcbd34 Mon Sep 17 00:00:00 2001 From: Teddy Date: Fri, 31 Mar 2023 16:57:53 +0200 Subject: [PATCH] Fixes #10727 (& other minor improvements) (#10856) * fix: logic for test suite config workflow * fix: added caching for system metrics (snflk and bq) * fix: linting * fix: added tearDown logic for tests suite/case --- .../profiler/metrics/system/system.py | 70 +++++--- .../src/metadata/profiler/profiler/core.py | 4 +- .../src/metadata/test_suite/api/workflow.py | 127 ++++++++++++- ingestion/src/metadata/utils/entity_link.py | 18 ++ .../ometa/test_ometa_test_suite.py | 14 +- .../integration/test_suite/test_workflow.py | 170 ++++++++++++++++++ .../unit/metadata/utils/test_entity_link.py | 21 ++- 7 files changed, 377 insertions(+), 47 deletions(-) diff --git a/ingestion/src/metadata/profiler/metrics/system/system.py b/ingestion/src/metadata/profiler/metrics/system/system.py index 7e583113608..e8aba607818 100644 --- a/ingestion/src/metadata/profiler/metrics/system/system.py +++ b/ingestion/src/metadata/profiler/metrics/system/system.py @@ -40,6 +40,8 @@ DML_OPERATION_MAP = { "DELETE": "DELETE", } +SYSTEM_QUERY_RESULT_CACHE = {} + @valuedispatch def get_system_metrics_for_dialect( @@ -114,16 +116,21 @@ def _( "query_type,timestamp,destination_table,dml_statistics", ) - cursor_jobs = session.execute(text(jobs)) - rows_jobs = [ - QueryResult( - row.statement_type, - row.start_time, - row.destination_table, - row.dml_statistics, - ) - for row in cursor_jobs.fetchall() - ] + try: + # we'll try to get the cached data first + rows_jobs = kwargs["cache"][Dialects.BigQuery]["rows_jobs"] + except KeyError: + cursor_jobs = session.execute(text(jobs)) + rows_jobs = [ + QueryResult( + row.statement_type, + row.start_time, + row.destination_table, + row.dml_statistics, + ) + for row in cursor_jobs.fetchall() + ] + SYSTEM_QUERY_RESULT_CACHE[Dialects.BigQuery] = {"rows_jobs": rows_jobs} for row_jobs in rows_jobs: if ( @@ -204,7 +211,7 @@ def _( sti."schema" = '{table.__table_args__["schema"]}' AND sti."table" = '{table.__tablename__}' AND "rows" != 0 AND - DATE(starttime) = CURRENT_DATE - 1 + DATE(starttime) >= CURRENT_DATE - 1 GROUP BY 2,3,4,5,6 ORDER BY 6 desc """ @@ -228,7 +235,7 @@ def _( sti."schema" = '{table.__table_args__["schema"]}' AND sti."table" = '{table.__tablename__}' AND "rows" != 0 AND - DATE(starttime) = CURRENT_DATE - 1 + DATE(starttime) >= CURRENT_DATE - 1 GROUP BY 2,3,4,5,6 ORDER BY 6 desc """ @@ -352,23 +359,27 @@ def _( "query_id,database_name,schema_name,query_text,query_type,timestamp", ) - rows = [] - - # limit of results is 10K. We'll query range of 1 hours to make sure we - # get all the necessary data. - rows = session.execute(text(information_schema_query_history)).fetchall() - - query_results = [ - QueryResult( - row.query_id, - row.database_name.lower() if row.database_name else None, - row.schema_name.lower() if row.schema_name else None, - sqlparse.parse(row.query_text)[0], - row.query_type, - row.start_time, - ) - for row in rows - ] + try: + # we'll try to get the cached data first + rows = kwargs["cache"][Dialects.Snowflake]["rows"] + query_results = kwargs["cache"][Dialects.Snowflake]["query_results"] + except KeyError: + rows = session.execute(text(information_schema_query_history)).fetchall() + query_results = [ + QueryResult( + row.query_id, + row.database_name.lower() if row.database_name else None, + row.schema_name.lower() if row.schema_name else None, + sqlparse.parse(row.query_text)[0], + row.query_type, + row.start_time, + ) + for row in rows + ] + SYSTEM_QUERY_RESULT_CACHE[Dialects.Snowflake] = { + "rows": rows, + "query_results": query_results, + } for query_result in query_results: query_text = query_result.query_text @@ -459,6 +470,7 @@ class System(SystemMetric): session=session, table=self.table, conn_config=conn_config, + cache=SYSTEM_QUERY_RESULT_CACHE, ) return system_metrics diff --git a/ingestion/src/metadata/profiler/profiler/core.py b/ingestion/src/metadata/profiler/profiler/core.py index 12ee96f5ad0..e7e60c3b520 100644 --- a/ingestion/src/metadata/profiler/profiler/core.py +++ b/ingestion/src/metadata/profiler/profiler/core.py @@ -396,7 +396,7 @@ class Profiler(Generic[TMetric]): column, self.table, ) - for column in self.columns + for column in columns for metric in self.get_col_metrics(self.query_metrics, column) ], *[ @@ -410,7 +410,7 @@ class Profiler(Generic[TMetric]): column, self.table, ) - for column in self.columns + for column in columns ], ] diff --git a/ingestion/src/metadata/test_suite/api/workflow.py b/ingestion/src/metadata/test_suite/api/workflow.py index 6b245112264..526a6610f67 100644 --- a/ingestion/src/metadata/test_suite/api/workflow.py +++ b/ingestion/src/metadata/test_suite/api/workflow.py @@ -21,7 +21,7 @@ from copy import deepcopy from logging import Logger from typing import List, Optional, Set, Tuple -from pydantic import ValidationError +from pydantic import BaseModel, ValidationError from sqlalchemy import MetaData from metadata.config.common import WorkflowExecutionError @@ -58,7 +58,11 @@ from metadata.interfaces.datalake.datalake_test_suite_interface import ( ) from metadata.interfaces.sqalchemy.sqa_test_suite_interface import SQATestSuiteInterface from metadata.profiler.api.models import ProfileSampleConfig -from metadata.test_suite.api.models import TestCaseDefinition, TestSuiteProcessorConfig +from metadata.test_suite.api.models import ( + TestCaseDefinition, + TestSuiteDefinition, + TestSuiteProcessorConfig, +) from metadata.test_suite.runner.core import DataTestsRunner from metadata.utils import entity_link from metadata.utils.importer import get_sink @@ -70,6 +74,22 @@ from metadata.workflow.workflow_status_mixin import WorkflowStatusMixin logger: Logger = test_suite_logger() +class TestCaseToCreate(BaseModel): + """Test case to create""" + + test_suite_name: str + test_case_name: str + entity_link: str + + def __hash__(self): + """make this base model hashable on unique_name""" + return hash(f"{self.test_suite_name}.{self.test_case_name}") + + def __str__(self) -> str: + """make this base model printable""" + return f"{self.test_suite_name}.{self.test_case_name}" + + class TestSuiteWorkflow(WorkflowStatusMixin): """workflow to run the test suite""" @@ -339,7 +359,9 @@ class TestSuiteWorkflow(WorkflowStatusMixin): return test_cases_entity - def get_test_case_from_cli_config(self) -> List[str]: + def get_test_case_from_cli_config( + self, + ) -> List[Tuple[TestCaseDefinition, TestSuiteDefinition]]: """Get all the test cases names defined in the CLI config file""" return [ (test_case, test_suite) @@ -347,9 +369,66 @@ class TestSuiteWorkflow(WorkflowStatusMixin): for test_case in test_suite.testCases ] + def get_unique_test_case_name_in_config(self, test_cases_in_config: set) -> set: + """Get unique test case names in config. If a test case is created for the same entity + with the same name in different test suites, we only create one test case in the platform. + The other ones will be skipped. + + Args: + test_cases_in_config (set): set of test cases in config + + Returns: + set: set of unique test case names in config + """ + seen = [] + unique_test_case = [] + for test_case in test_cases_in_config: + unique_test_case_name_in_entity = ( + f"{test_case.test_case_name}/{test_case.entity_link}" + ) + if unique_test_case_name_in_entity not in seen: + seen.append(unique_test_case_name_in_entity) + unique_test_case.append(test_case) + continue + logger.info( + f"Test case {test_case.test_case_name} for entity {test_case.entity_link}" + " was already defined in your profiler config file. Skipping it." + ) + + return set(unique_test_case) + + def test_case_name_exists(self, test_case: TestCaseToCreate) -> bool: + """Check if a test case name already exists in the platform + for the same entity. + + Args: + other (set): a set of platform test cases + Returns: + Optional["TestCaseToCreate"] + """ + try: + entity_fqn = entity_link.get_table_or_column_fqn(test_case.entity_link) + except ValueError as exc: + logger.debug(traceback.format_exc()) + logger.error(f"Failed to get entity fqn: {exc}") + # we'll assume that the test case name is not unique + return True + + test_case_fqn = f"{entity_fqn}.{test_case.test_case_name}" + + test_case = self.metadata.get_by_name( + entity=TestCase, + fqn=test_case_fqn, + fields=["testDefinition", "testSuite"], + ) + + if not test_case: + return False + return True + def compare_and_create_test_cases( self, - cli_config_test_cases_def: List[Tuple[TestCaseDefinition, TestSuite]], + cli_config_test_cases_def: List[Tuple[TestCaseDefinition, TestSuiteDefinition]], test_cases: List[TestCase], ) -> Optional[List[TestCase]]: """ @@ -360,9 +439,34 @@ class TestSuiteWorkflow(WorkflowStatusMixin): cli_config_test_case_name: test cases defined in CLI workflow associated with its test suite test_cases: list of test cases entities fetch from the server using test suite names in the config file """ - test_case_names_to_create = { - test_case_def[0].name for test_case_def in cli_config_test_cases_def - } - {test_case.name.__root__ for test_case in test_cases} + cli_test_cases = { + TestCaseToCreate( + test_suite_name=test_case_def[1].name, + test_case_name=test_case_def[0].name, + entity_link=test_case_def[0].entityLink.__root__, + ) + for test_case_def in cli_config_test_cases_def + } + platform_test_cases = { + TestCaseToCreate( + test_suite_name=test_case.testSuite.name, + test_case_name=test_case.name.__root__, + entity_link=test_case.entityLink.__root__, + ) + for test_case in test_cases + } + + # we'll check the test cases defined in the CLI config file and not present in the platform + unique_test_cases_across_test_suites = cli_test_cases - platform_test_cases + # we'll check if we are creating a test case for an entity that already has the same name + unique_test_case_across_entities = { + test_case + for test_case in unique_test_cases_across_test_suites + if not self.test_case_name_exists(test_case) + } + test_case_names_to_create = self.get_unique_test_case_name_in_config( + unique_test_case_across_entities + ) if not test_case_names_to_create: return None @@ -374,7 +478,14 @@ class TestSuiteWorkflow(WorkflowStatusMixin): ( cli_config_test_case_def for cli_config_test_case_def in cli_config_test_cases_def - if cli_config_test_case_def[0].name == test_case_name_to_create + if ( + cli_config_test_case_def[0].name + == test_case_name_to_create.test_case_name + and cli_config_test_case_def[0].entityLink.__root__ + == test_case_name_to_create.entity_link + and cli_config_test_case_def[1].name + == test_case_name_to_create.test_suite_name + ) ), (None, None), ) diff --git a/ingestion/src/metadata/utils/entity_link.py b/ingestion/src/metadata/utils/entity_link.py index e2067944f20..03621339b80 100644 --- a/ingestion/src/metadata/utils/entity_link.py +++ b/ingestion/src/metadata/utils/entity_link.py @@ -68,6 +68,24 @@ def get_table_fqn(entity_link: str) -> str: return entity_link.split("::")[2] +def get_table_or_column_fqn(entity_link: str) -> str: + """From an entity link get the column fqn + + Args: + entity_link: entity link + """ + split_entity_link = split(entity_link) + if len(split_entity_link) == 2: + return split_entity_link[1] + if len(split_entity_link) == 4 and split_entity_link[2] == "columns": + return f"{split_entity_link[1]}.{split_entity_link[3]}" + + raise ValueError( + "Invalid entity link." + " {split_entity_link} does not look like a table or a column entity link" + ) + + def get_entity_link(table_fqn: str, column_name: Optional[str]) -> str: """From table fqn and column name get the entity_link diff --git a/ingestion/tests/integration/ometa/test_ometa_test_suite.py b/ingestion/tests/integration/ometa/test_ometa_test_suite.py index 78eefb520d3..63111058a2f 100644 --- a/ingestion/tests/integration/ometa/test_ometa_test_suite.py +++ b/ingestion/tests/integration/ometa/test_ometa_test_suite.py @@ -66,13 +66,6 @@ class OMetaTestSuiteTest(TestCase): assert metadata.health_check() - test_suite: TestSuite = metadata.create_or_update( - CreateTestSuiteRequest( - name="testSuiteForIntegrationTest", - description="This is a test suite for the integration tests", - ) - ) - test_definition = metadata.create_or_update( CreateTestDefinitionRequest( name="testDefinitionForIntegration", @@ -87,6 +80,13 @@ class OMetaTestSuiteTest(TestCase): def setUpClass(cls) -> None: """set up tests""" + cls.test_suite: TestSuite = cls.metadata.create_or_update( + CreateTestSuiteRequest( + name="testSuiteForIntegrationTest", + description="This is a test suite for the integration tests", + ) + ) + cls.metadata.create_or_update( CreateTestCaseRequest( name="testCaseForIntegration", diff --git a/ingestion/tests/integration/test_suite/test_workflow.py b/ingestion/tests/integration/test_suite/test_workflow.py index 32ebb9a22f6..2723a14315c 100644 --- a/ingestion/tests/integration/test_suite/test_workflow.py +++ b/ingestion/tests/integration/test_suite/test_workflow.py @@ -58,6 +58,28 @@ class TestSuiteWorkflowTests(unittest.TestCase): ) ) + test_case_ids = [] + test_suite_ids = [] + + def tearDown(self) -> None: + for test_case_id in self.test_case_ids: + self.metadata.delete( + entity=TestCase, + entity_id=test_case_id, + recursive=True, + hard_delete=True, + ) + for test_suite_id in self.test_suite_ids: + self.metadata.delete( + entity=TestSuite, + entity_id=test_suite_id, + recursive=True, + hard_delete=True, + ) + + self.test_case_ids = [] + self.test_suite_ids = [] + def test_create_workflow_object(self): """Test workflow object is correctly instantiated""" TestSuiteWorkflow.create(test_suite_config) @@ -101,6 +123,7 @@ class TestSuiteWorkflowTests(unittest.TestCase): test_suite = self.metadata.get_by_name(entity=TestSuite, fqn="my_test_suite") assert workflow_test_suite[0].id == test_suite.id + self.test_suite_ids = [test_suite.id] def test_get_test_suite_entity_for_ui_workflow(self): """test we can correctly retrieve a test suite""" @@ -316,6 +339,153 @@ class TestSuiteWorkflowTests(unittest.TestCase): assert not created_test_case + def test_compare_and_create_test_cases_same_test_name_diff_test_suite(self): + """Test function creates the correct test case if they don't exists when + test case name is the same but test suite and entity is different + """ + _test_suite_config = deepcopy(test_suite_config) + processor = { + "processor": { + "type": "orm-test-runner", + "config": { + "testSuites": [ + { + "name": "another_test_suite", + "testCases": [ + { + "name": "table_column_count_between", + "testDefinitionName": "TableColumnCountToBeBetween", + "entityLink": "<#E::table::sample_data.ecommerce_db.shopify.dim_address_clean>", + "parameterValues": [ + {"name": "minColValue", "value": 1}, + {"name": "maxColValue", "value": 15}, + ], + }, + ], + }, + { + "name": "new_test_suite", + "testCases": [ + { + "name": "table_column_count_between", + "testDefinitionName": "TableColumnCountToBeBetween", + "entityLink": "<#E::table::sample_data.ecommerce_db.shopify.dim_address_clean>", + "parameterValues": [ + {"name": "minColValue", "value": 1}, + {"name": "maxColValue", "value": 5}, + ], + }, + ], + }, + ], + }, + } + } + + _test_suite_config.update(processor) + workflow = TestSuiteWorkflow.create(_test_suite_config) + + assert not self.metadata.get_by_name( + entity=TestCase, + fqn="sample_data.ecommerce_db.shopify.dim_address_clean.table_column_count_between", + ) + + test_suite = workflow.get_or_create_test_suite_entity_for_cli_workflow() + test_cases = workflow.get_test_cases_from_test_suite(test_suite) + config_test_cases_def = workflow.get_test_case_from_cli_config() + created_workflow = workflow.compare_and_create_test_cases( + config_test_cases_def, test_cases + ) + + my_test_case = self.metadata.get_by_name( + entity=TestCase, + fqn="sample_data.ecommerce_db.shopify.dim_address_clean.table_column_count_between", + fields=["testDefinition", "testSuite"], + ) + + assert len(created_workflow) == 1 + + another_test_suite = self.metadata.get_by_name( + entity=TestSuite, + fqn="another_test_suite", + ) + new_test_suite = self.metadata.get_by_name( + entity=TestSuite, + fqn="new_test_suite", + ) + + self.test_suite_ids = [new_test_suite.id, another_test_suite.id] + + def test_compare_and_create_test_cases_same_test_name_same_test_suite(self): + """Test function creates the correct test case if they don't exists when + test case name is the same but test suite and entity is different + """ + _test_suite_config = deepcopy(test_suite_config) + processor = { + "processor": { + "type": "orm-test-runner", + "config": { + "testSuites": [ + { + "name": "critical_metrics_suite", + "testCases": [ + { + "name": "table_column_count_between", + "testDefinitionName": "TableColumnCountToBeBetween", + "entityLink": "<#E::table::sample_data.ecommerce_db.shopify.dim_address>", + "parameterValues": [ + {"name": "minColValue", "value": 1}, + {"name": "maxColValue", "value": 30}, + ], + }, + { + "name": "table_column_count_between", + "testDefinitionName": "TableColumnCountToBeBetween", + "entityLink": "<#E::table::sample_data.ecommerce_db.shopify.dim_customer>", + "parameterValues": [ + {"name": "minColValue", "value": 1}, + {"name": "maxColValue", "value": 5}, + ], + }, + ], + }, + ], + }, + } + } + + _test_suite_config.update(processor) + workflow = TestSuiteWorkflow.create(_test_suite_config) + + assert not self.metadata.get_by_name( + entity=TestCase, + fqn="sample_data.ecommerce_db.shopify.dim_customer.table_column_count_between", + ) + assert self.metadata.get_by_name( + entity=TestCase, + fqn="sample_data.ecommerce_db.shopify.dim_address.table_column_count_between", + fields=["testDefinition", "testSuite"], + ) + + test_suite = workflow.get_or_create_test_suite_entity_for_cli_workflow() + test_cases = workflow.get_test_cases_from_test_suite(test_suite) + config_test_cases_def = workflow.get_test_case_from_cli_config() + created_test_case = workflow.compare_and_create_test_cases( + config_test_cases_def, test_cases + ) + + dim_customer_test_case = self.metadata.get_by_name( + entity=TestCase, + fqn="sample_data.ecommerce_db.shopify.dim_customer.table_column_count_between", + fields=["testDefinition", "testSuite"], + ) + + assert len(created_test_case) == 1 + assert created_test_case[0].name.__root__ == "table_column_count_between" + assert created_test_case[0].testSuite.name == "critical_metrics_suite" + + self.test_case_ids = [dim_customer_test_case.id] + def test_get_service_connection_from_test_case(self): """test get service connection returns correct info""" workflow = TestSuiteWorkflow.create(test_suite_config) diff --git a/ingestion/tests/unit/metadata/utils/test_entity_link.py b/ingestion/tests/unit/metadata/utils/test_entity_link.py index 991eff7f4f2..6eb386777bc 100644 --- a/ingestion/tests/unit/metadata/utils/test_entity_link.py +++ b/ingestion/tests/unit/metadata/utils/test_entity_link.py @@ -15,7 +15,7 @@ test entity link utils import pytest -from metadata.utils.entity_link import get_decoded_column +from metadata.utils.entity_link import get_decoded_column, get_table_or_column_fqn @pytest.mark.parametrize( @@ -34,3 +34,22 @@ from metadata.utils.entity_link import get_decoded_column def test_get_decoded_column(entity_link, expected): """test get_decoded_column return expected values""" assert get_decoded_column(entity_link) == expected + + +def test_get_table_or_column_fqn(): + entity_link = "<#E::table::rds.dev.dbt_jaffle.customers::columns::number_of_orders>" + assert ( + get_table_or_column_fqn(entity_link) + == "rds.dev.dbt_jaffle.customers.number_of_orders" + ) + + invalid_entity_link = ( + "<#E::table::rds.dev.dbt_jaffle.customers::foo::number_of_orders>" + ) + with pytest.raises(ValueError): + get_table_or_column_fqn(invalid_entity_link) + + invalid_entity_link = "<#E::table::rds.dev.dbt_jaffle.customers>" + assert ( + get_table_or_column_fqn(invalid_entity_link) == "rds.dev.dbt_jaffle.customers" + )