From e22a66fe45063d9456ba37527aae488cc6fdfb76 Mon Sep 17 00:00:00 2001 From: Imri Paran Date: Thu, 19 Sep 2024 17:55:48 +0200 Subject: [PATCH] Fix 17903: fix(data-quality): snowflake data diff (#17907) * fix(data-quality): snowflake data diff - fixed schema in snowflake URL for data diff - added e2e for snowflake data quality * reverted unintended change (cherry picked from commit 21af02d8d5d65ce73645b5396b3cd2eaee1fac19) --- .../table_diff_params_setter.py | 3 +- .../cli_e2e/base/config_builders/builders.py | 33 ++++ ingestion/tests/cli_e2e/base/e2e_types.py | 1 + ingestion/tests/cli_e2e/base/test_cli_db.py | 61 +++++++ ingestion/tests/cli_e2e/test_cli_snowflake.py | 166 ++++++++++++++++-- 5 files changed, 253 insertions(+), 11 deletions(-) diff --git a/ingestion/src/metadata/data_quality/validations/runtime_param_setter/table_diff_params_setter.py b/ingestion/src/metadata/data_quality/validations/runtime_param_setter/table_diff_params_setter.py index 86a09c17dd4..ec373acc745 100644 --- a/ingestion/src/metadata/data_quality/validations/runtime_param_setter/table_diff_params_setter.py +++ b/ingestion/src/metadata/data_quality/validations/runtime_param_setter/table_diff_params_setter.py @@ -27,6 +27,7 @@ from metadata.generated.schema.entity.data.table import Constraint, Table from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.tests.testCase import TestCase from metadata.ingestion.source.connections import get_connection +from metadata.profiler.orm.registry import Dialects from metadata.utils import fqn @@ -168,7 +169,7 @@ class TableDiffParamsSetter(RuntimeParameterSetter): table_fqn ) # path needs to include the database AND schema in some of the connectors - if kwargs["scheme"] in ["mssql"]: + if kwargs["scheme"] in {Dialects.MSSQL, Dialects.Snowflake}: kwargs["path"] = f"/{database}/{schema}" return url._replace(**kwargs).geturl() diff --git a/ingestion/tests/cli_e2e/base/config_builders/builders.py b/ingestion/tests/cli_e2e/base/config_builders/builders.py index 1da3ce0fefc..024f287214c 100644 --- a/ingestion/tests/cli_e2e/base/config_builders/builders.py +++ b/ingestion/tests/cli_e2e/base/config_builders/builders.py @@ -16,6 +16,10 @@ Config builder classes from copy import deepcopy +from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( + TestSuiteConfigType, +) + from ..e2e_types import E2EType @@ -70,6 +74,34 @@ class ProfilerConfigBuilder(BaseBuilder): return self.config +class DataQualityConfigBuilder(BaseBuilder): + """Builder class for the data quality config""" + + # pylint: disable=invalid-name + def __init__(self, config: dict, config_args: dict) -> None: + super().__init__(config, config_args) + self.test_case_defintions = self.config_args.get("test_case_definitions", []) + self.entity_fqn = self.config_args.get("entity_fqn", []) + + # pylint: enable=invalid-name + + def build(self) -> dict: + """build profiler config""" + del self.config["source"]["sourceConfig"]["config"] + self.config["source"]["sourceConfig"] = { + "config": { + "type": TestSuiteConfigType.TestSuite.value, + "entityFullyQualifiedName": self.entity_fqn, + }, + } + + self.config["processor"] = { + "type": "orm-test-runner", + "config": {"testCases": self.test_case_defintions}, + } + return self.config + + class SchemaConfigBuilder(BaseBuilder): """Builder for schema filter config""" @@ -147,6 +179,7 @@ def builder_factory(builder, config: dict, config_args: dict): """Factory method to return the builder class""" builder_classes = { E2EType.PROFILER.value: ProfilerConfigBuilder, + E2EType.DATA_QUALITY.value: DataQualityConfigBuilder, E2EType.INGEST_DB_FILTER_SCHEMA.value: SchemaConfigBuilder, E2EType.INGEST_DB_FILTER_TABLE.value: TableConfigBuilder, E2EType.INGEST_DB_FILTER_MIX.value: MixConfigBuilder, diff --git a/ingestion/tests/cli_e2e/base/e2e_types.py b/ingestion/tests/cli_e2e/base/e2e_types.py index 81b7eb14890..442c5c27b88 100644 --- a/ingestion/tests/cli_e2e/base/e2e_types.py +++ b/ingestion/tests/cli_e2e/base/e2e_types.py @@ -24,6 +24,7 @@ class E2EType(Enum): INGEST = "ingest" PROFILER = "profiler" PROFILER_PROCESSOR = "profiler-processor" + DATA_QUALITY = "test" INGEST_DB_FILTER_SCHEMA = "ingest-db-filter-schema" INGEST_DB_FILTER_TABLE = "ingest-db-filter-table" INGEST_DB_FILTER_MIX = "ingest-db-filter-mix" diff --git a/ingestion/tests/cli_e2e/base/test_cli_db.py b/ingestion/tests/cli_e2e/base/test_cli_db.py index 02ebfa6d40e..0bc5eb23f0f 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_db.py +++ b/ingestion/tests/cli_e2e/base/test_cli_db.py @@ -17,8 +17,13 @@ from typing import List, Optional from unittest import TestCase import pytest +from pydantic import TypeAdapter +from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects +from metadata.data_quality.api.models import TestCaseDefinition from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.tests.basic import TestCaseResult +from metadata.generated.schema.tests.testCase import TestCase as OMTestCase from metadata.ingestion.api.status import Status from .e2e_types import E2EType @@ -208,6 +213,50 @@ class CliDBBase(TestCase): sink_status, ) + @pytest.mark.order(12) + def test_data_quality(self) -> None: + """12. Test data quality for the connector""" + if self.get_data_quality_table() is None: + return + self.delete_table_and_view() + self.create_table_and_view() + table: Table = self.openmetadata.get_by_name( + Table, self.get_data_quality_table(), nullable=False + ) + self.build_config_file() + self.run_command() + test_case_definitions = self.get_test_case_definitions() + self.build_config_file( + E2EType.DATA_QUALITY, + { + "entity_fqn": table.fullyQualifiedName.root, + "test_case_definitions": TypeAdapter( + List[TestCaseDefinition] + ).dump_python(test_case_definitions), + }, + ) + result = self.run_command("test") + sink_status, source_status = self.retrieve_statuses(result) + self.assert_status_for_data_quality(source_status, sink_status) + test_case_entities = [ + self.openmetadata.get_by_name( + OMTestCase, + ".".join([table.fullyQualifiedName.root, tcd.name]), + fields=["*"], + nullable=False, + ) + for tcd in test_case_definitions + ] + expected = self.get_expected_test_case_results() + try: + for test_case, expected in zip(test_case_entities, expected): + assert_equal_pydantic_objects(expected, test_case.testCaseResult) + finally: + for tc in test_case_entities: + self.openmetadata.delete( + OMTestCase, tc.id, recursive=True, hard_delete=True + ) + def retrieve_table(self, table_name_fqn: str) -> Table: return self.openmetadata.get_by_name(entity=Table, fqn=table_name_fqn) @@ -346,3 +395,15 @@ class CliDBBase(TestCase): "config": {"tableConfig": [config]}, } } + + def get_data_quality_table(self): + return None + + def get_test_case_definitions(self) -> List[TestCaseDefinition]: + pass + + def get_expected_test_case_results(self) -> List[TestCaseResult]: + pass + + def assert_status_for_data_quality(self, source_status, sink_status): + pass diff --git a/ingestion/tests/cli_e2e/test_cli_snowflake.py b/ingestion/tests/cli_e2e/test_cli_snowflake.py index cceb19d2674..551e1711930 100644 --- a/ingestion/tests/cli_e2e/test_cli_snowflake.py +++ b/ingestion/tests/cli_e2e/test_cli_snowflake.py @@ -16,8 +16,14 @@ from typing import List import pytest +from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects +from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile +from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus +from metadata.generated.schema.tests.testCase import TestCaseParameterValue +from metadata.generated.schema.type.basic import Timestamp from metadata.ingestion.api.status import Status +from ...src.metadata.data_quality.api.models import TestCaseDefinition from .base.e2e_types import E2EType from .common.test_cli_db import CliCommonDB from .common_e2e_sqa_mixins import SQACommonMethods @@ -40,6 +46,9 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods): "CREATE OR REPLACE TABLE e2e_test.test_departments(e2e_testdepartment_id INT PRIMARY KEY,e2e_testdepartment_name VARCHAR (30) NOT NULL,e2e_testlocation_id INT);", "CREATE OR REPLACE TABLE e2e_test.test_employees(e2e_testemployee_id INT PRIMARY KEY,e2e_testfirst_name VARCHAR (20),e2e_testlast_name VARCHAR (25) NOT NULL,e2e_testemail VARCHAR (100) NOT NULL,e2e_testphone_number VARCHAR (20),e2e_testhire_date DATE NOT NULL,e2e_testjob_id INT NOT NULL,e2e_testsalary DECIMAL (8, 2) NOT NULL,e2e_testmanager_id INT,e2e_testdepartment_id INT);", "CREATE OR REPLACE TABLE e2e_test.test_dependents(e2e_testdependent_id INT PRIMARY KEY,e2e_testfirst_name VARCHAR (50) NOT NULL,e2e_testlast_name VARCHAR (50) NOT NULL,e2e_testrelationship VARCHAR (25) NOT NULL,e2e_testemployee_id INT NOT NULL);", + "CREATE OR REPLACE TABLE e2e_test.e2e_table(varchar_column VARCHAR(255),int_column INT);", + "CREATE OR REPLACE TABLE public.public_table(varchar_column VARCHAR(255),int_column INT);", + "CREATE OR REPLACE TABLE public.e2e_table(varchar_column VARCHAR(255),int_column INT);", ] create_table_query: str = """ @@ -57,7 +66,14 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods): insert_data_queries: List[str] = [ "INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (1,'Peter Parker');", - "INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (1, 'Clark Kent');", + "INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (2, 'Clark Kent');", + "INSERT INTO e2e_test.e2e_table (varchar_column, int_column) VALUES ('e2e_test.e2e_table', 1);", + "INSERT INTO public.e2e_table (varchar_column, int_column) VALUES ('public.e2e_table', 1);", + "INSERT INTO e2e_table (varchar_column, int_column) VALUES ('e2e_table', 1);", + "INSERT INTO public.public_table (varchar_column, int_column) VALUES ('public.public_table', 1);", + "INSERT INTO public_table (varchar_column, int_column) VALUES ('public_table', 1);", + "MERGE INTO public_table USING (SELECT 'public_table' as varchar_column, 2 as int_column) as source ON public_table.varchar_column = source.varchar_column WHEN MATCHED THEN UPDATE SET public_table.int_column = source.int_column WHEN NOT MATCHED THEN INSERT (varchar_column, int_column) VALUES (source.varchar_column, source.int_column);", + "DELETE FROM public_table WHERE varchar_column = 'public.public_table';", ] drop_table_query: str = """ @@ -68,6 +84,19 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods): DROP VIEW IF EXISTS E2E_DB.e2e_test.view_persons; """ + teardown_sql_statements: List[str] = [ + "DROP TABLE IF EXISTS E2E_DB.e2e_test.e2e_table;", + "DROP TABLE IF EXISTS E2E_DB.public.e2e_table;", + "DROP TABLE IF EXISTS E2E_DB.public.public_table;", + ] + + @classmethod + def tearDownClass(cls): + super().tearDownClass() + with cls.engine.connect() as connection: + for stmt in cls.teardown_sql_statements: + connection.execute(stmt) + def setUp(self) -> None: with self.engine.connect() as connection: for sql_statements in self.prepare_snowflake_e2e: @@ -83,15 +112,15 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods): self.assertTrue(len(source_status.failures) == 0) self.assertTrue(len(source_status.warnings) == 0) self.assertTrue(len(source_status.filtered) == 1) - self.assertTrue( - (len(source_status.records) + len(source_status.updated_records)) - >= self.expected_tables() + self.assertGreaterEqual( + (len(source_status.records) + len(source_status.updated_records)), + self.expected_tables(), ) self.assertTrue(len(sink_status.failures) == 0) self.assertTrue(len(sink_status.warnings) == 0) - self.assertTrue( - (len(sink_status.records) + len(sink_status.updated_records)) - > self.expected_tables() + self.assertGreater( + (len(sink_status.records) + len(sink_status.updated_records)), + self.expected_tables(), ) def create_table_and_view(self) -> None: @@ -130,17 +159,22 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods): # Otherwise the sampling here does not pick up rows extra_args={"profileSample": 100}, ) + # wait for query log to be updated + self.wait_for_query_log() # run profiler with new tables result = self.run_command("profile") sink_status, source_status = self.retrieve_statuses(result) self.assert_for_table_with_profiler(source_status, sink_status) + self.custom_profiler_assertions() @staticmethod def expected_tables() -> int: return 7 def inserted_rows_count(self) -> int: - return len(self.insert_data_queries) + return len( + [q for q in self.insert_data_queries if "E2E_DB.e2e_test.persons" in q] + ) def view_column_lineage_count(self) -> int: return 2 @@ -171,7 +205,7 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods): @staticmethod def expected_filtered_table_includes() -> int: - return 5 + return 8 @staticmethod def expected_filtered_table_excludes() -> int: @@ -179,7 +213,7 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods): @staticmethod def expected_filtered_mix() -> int: - return 6 + return 7 @staticmethod def delete_queries() -> List[str]: @@ -196,3 +230,115 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods): UPDATE E2E_DB.E2E_TEST.PERSONS SET full_name = 'Bruce Wayne' WHERE full_name = 'Clark Kent' """, ] + + def custom_profiler_assertions(self): + cases = [ + ( + "e2e_snowflake.E2E_DB.E2E_TEST.E2E_TABLE", + [ + SystemProfile( + timestamp=Timestamp(root=0), + operation=DmlOperationType.INSERT, + rowsAffected=1, + ), + SystemProfile( + timestamp=Timestamp(root=0), + operation=DmlOperationType.INSERT, + rowsAffected=1, + ), + ], + ), + ( + "e2e_snowflake.E2E_DB.PUBLIC.E2E_TABLE", + [ + SystemProfile( + timestamp=Timestamp(root=0), + operation=DmlOperationType.INSERT, + rowsAffected=1, + ) + ], + ), + ( + "e2e_snowflake.E2E_DB.PUBLIC.PUBLIC_TABLE", + [ + SystemProfile( + timestamp=Timestamp(root=0), + operation=DmlOperationType.INSERT, + rowsAffected=1, + ), + SystemProfile( + timestamp=Timestamp(root=0), + operation=DmlOperationType.INSERT, + rowsAffected=1, + ), + SystemProfile( + timestamp=Timestamp(root=0), + operation=DmlOperationType.UPDATE, + rowsAffected=1, + ), + SystemProfile( + timestamp=Timestamp(root=0), + operation=DmlOperationType.DELETE, + rowsAffected=1, + ), + ], + ), + ] + for table_fqn, expected_profile in cases: + actual_profiles = self.openmetadata.get_profile_data( + table_fqn, + start_ts=int((datetime.now().timestamp() - 600) * 1000), + end_ts=int(datetime.now().timestamp() * 1000), + profile_type=SystemProfile, + ).entities + actual_profiles = sorted(actual_profiles, key=lambda x: x.timestamp.root) + actual_profiles = actual_profiles[-len(expected_profile) :] + actual_profiles = [ + p.copy(update={"timestamp": Timestamp(root=0)}) for p in actual_profiles + ] + try: + assert_equal_pydantic_objects(expected_profile, actual_profiles) + except AssertionError as e: + raise AssertionError(f"Table: {table_fqn}\n{e}") + + @classmethod + def wait_for_query_log(cls, timeout=600): + start = datetime.now().timestamp() + cls.engine.execute("SELECT 'e2e_query_log_wait'") + latest = 0 + while latest < start: + sleep(5) + latest = ( + cls.engine.execute( + 'SELECT max(start_time) FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY"' + ) + .scalar() + .timestamp() + ) + if (datetime.now().timestamp() - start) > timeout: + raise TimeoutError(f"Query log not updated for {timeout} seconds") + + def get_data_quality_table(self): + return "e2e_snowflake.E2E_DB.E2E_TEST.PERSONS" + + def get_test_case_definitions(self) -> List[TestCaseDefinition]: + return [ + TestCaseDefinition( + name="snowflake_data_diff", + testDefinitionName="tableDiff", + computePassedFailedRowCount=True, + parameterValues=[ + TestCaseParameterValue( + name="table2", + value=self.get_data_quality_table(), + ), + TestCaseParameterValue( + name="keyColumns", + value='["PERSON_ID"]', + ), + ], + ) + ] + + def get_expected_test_case_results(self): + return [TestCaseResult(testCaseStatus=TestCaseStatus.Success)]