Fix 17903: fix(data-quality): snowflake data diff (#17907)

* fix(data-quality): snowflake data diff

- fixed schema in snowflake URL for data diff
- added e2e for snowflake data quality

* reverted unintended change

(cherry picked from commit 21af02d8d5d65ce73645b5396b3cd2eaee1fac19)
This commit is contained in:
Imri Paran 2024-09-19 17:55:48 +02:00 committed by sushi30
parent 3702750040
commit e22a66fe45
5 changed files with 253 additions and 11 deletions

View File

@ -27,6 +27,7 @@ from metadata.generated.schema.entity.data.table import Constraint, Table
from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.tests.testCase import TestCase from metadata.generated.schema.tests.testCase import TestCase
from metadata.ingestion.source.connections import get_connection from metadata.ingestion.source.connections import get_connection
from metadata.profiler.orm.registry import Dialects
from metadata.utils import fqn from metadata.utils import fqn
@ -168,7 +169,7 @@ class TableDiffParamsSetter(RuntimeParameterSetter):
table_fqn table_fqn
) )
# path needs to include the database AND schema in some of the connectors # path needs to include the database AND schema in some of the connectors
if kwargs["scheme"] in ["mssql"]: if kwargs["scheme"] in {Dialects.MSSQL, Dialects.Snowflake}:
kwargs["path"] = f"/{database}/{schema}" kwargs["path"] = f"/{database}/{schema}"
return url._replace(**kwargs).geturl() return url._replace(**kwargs).geturl()

View File

@ -16,6 +16,10 @@ Config builder classes
from copy import deepcopy from copy import deepcopy
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuiteConfigType,
)
from ..e2e_types import E2EType from ..e2e_types import E2EType
@ -70,6 +74,34 @@ class ProfilerConfigBuilder(BaseBuilder):
return self.config return self.config
class DataQualityConfigBuilder(BaseBuilder):
"""Builder class for the data quality config"""
# pylint: disable=invalid-name
def __init__(self, config: dict, config_args: dict) -> None:
super().__init__(config, config_args)
self.test_case_defintions = self.config_args.get("test_case_definitions", [])
self.entity_fqn = self.config_args.get("entity_fqn", [])
# pylint: enable=invalid-name
def build(self) -> dict:
"""build profiler config"""
del self.config["source"]["sourceConfig"]["config"]
self.config["source"]["sourceConfig"] = {
"config": {
"type": TestSuiteConfigType.TestSuite.value,
"entityFullyQualifiedName": self.entity_fqn,
},
}
self.config["processor"] = {
"type": "orm-test-runner",
"config": {"testCases": self.test_case_defintions},
}
return self.config
class SchemaConfigBuilder(BaseBuilder): class SchemaConfigBuilder(BaseBuilder):
"""Builder for schema filter config""" """Builder for schema filter config"""
@ -147,6 +179,7 @@ def builder_factory(builder, config: dict, config_args: dict):
"""Factory method to return the builder class""" """Factory method to return the builder class"""
builder_classes = { builder_classes = {
E2EType.PROFILER.value: ProfilerConfigBuilder, E2EType.PROFILER.value: ProfilerConfigBuilder,
E2EType.DATA_QUALITY.value: DataQualityConfigBuilder,
E2EType.INGEST_DB_FILTER_SCHEMA.value: SchemaConfigBuilder, E2EType.INGEST_DB_FILTER_SCHEMA.value: SchemaConfigBuilder,
E2EType.INGEST_DB_FILTER_TABLE.value: TableConfigBuilder, E2EType.INGEST_DB_FILTER_TABLE.value: TableConfigBuilder,
E2EType.INGEST_DB_FILTER_MIX.value: MixConfigBuilder, E2EType.INGEST_DB_FILTER_MIX.value: MixConfigBuilder,

View File

@ -24,6 +24,7 @@ class E2EType(Enum):
INGEST = "ingest" INGEST = "ingest"
PROFILER = "profiler" PROFILER = "profiler"
PROFILER_PROCESSOR = "profiler-processor" PROFILER_PROCESSOR = "profiler-processor"
DATA_QUALITY = "test"
INGEST_DB_FILTER_SCHEMA = "ingest-db-filter-schema" INGEST_DB_FILTER_SCHEMA = "ingest-db-filter-schema"
INGEST_DB_FILTER_TABLE = "ingest-db-filter-table" INGEST_DB_FILTER_TABLE = "ingest-db-filter-table"
INGEST_DB_FILTER_MIX = "ingest-db-filter-mix" INGEST_DB_FILTER_MIX = "ingest-db-filter-mix"

View File

@ -17,8 +17,13 @@ from typing import List, Optional
from unittest import TestCase from unittest import TestCase
import pytest import pytest
from pydantic import TypeAdapter
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.data_quality.api.models import TestCaseDefinition
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.tests.basic import TestCaseResult
from metadata.generated.schema.tests.testCase import TestCase as OMTestCase
from metadata.ingestion.api.status import Status from metadata.ingestion.api.status import Status
from .e2e_types import E2EType from .e2e_types import E2EType
@ -208,6 +213,50 @@ class CliDBBase(TestCase):
sink_status, sink_status,
) )
@pytest.mark.order(12)
def test_data_quality(self) -> None:
"""12. Test data quality for the connector"""
if self.get_data_quality_table() is None:
return
self.delete_table_and_view()
self.create_table_and_view()
table: Table = self.openmetadata.get_by_name(
Table, self.get_data_quality_table(), nullable=False
)
self.build_config_file()
self.run_command()
test_case_definitions = self.get_test_case_definitions()
self.build_config_file(
E2EType.DATA_QUALITY,
{
"entity_fqn": table.fullyQualifiedName.root,
"test_case_definitions": TypeAdapter(
List[TestCaseDefinition]
).dump_python(test_case_definitions),
},
)
result = self.run_command("test")
sink_status, source_status = self.retrieve_statuses(result)
self.assert_status_for_data_quality(source_status, sink_status)
test_case_entities = [
self.openmetadata.get_by_name(
OMTestCase,
".".join([table.fullyQualifiedName.root, tcd.name]),
fields=["*"],
nullable=False,
)
for tcd in test_case_definitions
]
expected = self.get_expected_test_case_results()
try:
for test_case, expected in zip(test_case_entities, expected):
assert_equal_pydantic_objects(expected, test_case.testCaseResult)
finally:
for tc in test_case_entities:
self.openmetadata.delete(
OMTestCase, tc.id, recursive=True, hard_delete=True
)
def retrieve_table(self, table_name_fqn: str) -> Table: def retrieve_table(self, table_name_fqn: str) -> Table:
return self.openmetadata.get_by_name(entity=Table, fqn=table_name_fqn) return self.openmetadata.get_by_name(entity=Table, fqn=table_name_fqn)
@ -346,3 +395,15 @@ class CliDBBase(TestCase):
"config": {"tableConfig": [config]}, "config": {"tableConfig": [config]},
} }
} }
def get_data_quality_table(self):
return None
def get_test_case_definitions(self) -> List[TestCaseDefinition]:
pass
def get_expected_test_case_results(self) -> List[TestCaseResult]:
pass
def assert_status_for_data_quality(self, source_status, sink_status):
pass

View File

@ -16,8 +16,14 @@ from typing import List
import pytest import pytest
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.generated.schema.entity.data.table import DmlOperationType, SystemProfile
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.testCase import TestCaseParameterValue
from metadata.generated.schema.type.basic import Timestamp
from metadata.ingestion.api.status import Status from metadata.ingestion.api.status import Status
from ...src.metadata.data_quality.api.models import TestCaseDefinition
from .base.e2e_types import E2EType from .base.e2e_types import E2EType
from .common.test_cli_db import CliCommonDB from .common.test_cli_db import CliCommonDB
from .common_e2e_sqa_mixins import SQACommonMethods from .common_e2e_sqa_mixins import SQACommonMethods
@ -40,6 +46,9 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods):
"CREATE OR REPLACE TABLE e2e_test.test_departments(e2e_testdepartment_id INT PRIMARY KEY,e2e_testdepartment_name VARCHAR (30) NOT NULL,e2e_testlocation_id INT);", "CREATE OR REPLACE TABLE e2e_test.test_departments(e2e_testdepartment_id INT PRIMARY KEY,e2e_testdepartment_name VARCHAR (30) NOT NULL,e2e_testlocation_id INT);",
"CREATE OR REPLACE TABLE e2e_test.test_employees(e2e_testemployee_id INT PRIMARY KEY,e2e_testfirst_name VARCHAR (20),e2e_testlast_name VARCHAR (25) NOT NULL,e2e_testemail VARCHAR (100) NOT NULL,e2e_testphone_number VARCHAR (20),e2e_testhire_date DATE NOT NULL,e2e_testjob_id INT NOT NULL,e2e_testsalary DECIMAL (8, 2) NOT NULL,e2e_testmanager_id INT,e2e_testdepartment_id INT);", "CREATE OR REPLACE TABLE e2e_test.test_employees(e2e_testemployee_id INT PRIMARY KEY,e2e_testfirst_name VARCHAR (20),e2e_testlast_name VARCHAR (25) NOT NULL,e2e_testemail VARCHAR (100) NOT NULL,e2e_testphone_number VARCHAR (20),e2e_testhire_date DATE NOT NULL,e2e_testjob_id INT NOT NULL,e2e_testsalary DECIMAL (8, 2) NOT NULL,e2e_testmanager_id INT,e2e_testdepartment_id INT);",
"CREATE OR REPLACE TABLE e2e_test.test_dependents(e2e_testdependent_id INT PRIMARY KEY,e2e_testfirst_name VARCHAR (50) NOT NULL,e2e_testlast_name VARCHAR (50) NOT NULL,e2e_testrelationship VARCHAR (25) NOT NULL,e2e_testemployee_id INT NOT NULL);", "CREATE OR REPLACE TABLE e2e_test.test_dependents(e2e_testdependent_id INT PRIMARY KEY,e2e_testfirst_name VARCHAR (50) NOT NULL,e2e_testlast_name VARCHAR (50) NOT NULL,e2e_testrelationship VARCHAR (25) NOT NULL,e2e_testemployee_id INT NOT NULL);",
"CREATE OR REPLACE TABLE e2e_test.e2e_table(varchar_column VARCHAR(255),int_column INT);",
"CREATE OR REPLACE TABLE public.public_table(varchar_column VARCHAR(255),int_column INT);",
"CREATE OR REPLACE TABLE public.e2e_table(varchar_column VARCHAR(255),int_column INT);",
] ]
create_table_query: str = """ create_table_query: str = """
@ -57,7 +66,14 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods):
insert_data_queries: List[str] = [ insert_data_queries: List[str] = [
"INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (1,'Peter Parker');", "INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (1,'Peter Parker');",
"INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (1, 'Clark Kent');", "INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (2, 'Clark Kent');",
"INSERT INTO e2e_test.e2e_table (varchar_column, int_column) VALUES ('e2e_test.e2e_table', 1);",
"INSERT INTO public.e2e_table (varchar_column, int_column) VALUES ('public.e2e_table', 1);",
"INSERT INTO e2e_table (varchar_column, int_column) VALUES ('e2e_table', 1);",
"INSERT INTO public.public_table (varchar_column, int_column) VALUES ('public.public_table', 1);",
"INSERT INTO public_table (varchar_column, int_column) VALUES ('public_table', 1);",
"MERGE INTO public_table USING (SELECT 'public_table' as varchar_column, 2 as int_column) as source ON public_table.varchar_column = source.varchar_column WHEN MATCHED THEN UPDATE SET public_table.int_column = source.int_column WHEN NOT MATCHED THEN INSERT (varchar_column, int_column) VALUES (source.varchar_column, source.int_column);",
"DELETE FROM public_table WHERE varchar_column = 'public.public_table';",
] ]
drop_table_query: str = """ drop_table_query: str = """
@ -68,6 +84,19 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods):
DROP VIEW IF EXISTS E2E_DB.e2e_test.view_persons; DROP VIEW IF EXISTS E2E_DB.e2e_test.view_persons;
""" """
teardown_sql_statements: List[str] = [
"DROP TABLE IF EXISTS E2E_DB.e2e_test.e2e_table;",
"DROP TABLE IF EXISTS E2E_DB.public.e2e_table;",
"DROP TABLE IF EXISTS E2E_DB.public.public_table;",
]
@classmethod
def tearDownClass(cls):
super().tearDownClass()
with cls.engine.connect() as connection:
for stmt in cls.teardown_sql_statements:
connection.execute(stmt)
def setUp(self) -> None: def setUp(self) -> None:
with self.engine.connect() as connection: with self.engine.connect() as connection:
for sql_statements in self.prepare_snowflake_e2e: for sql_statements in self.prepare_snowflake_e2e:
@ -83,15 +112,15 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods):
self.assertTrue(len(source_status.failures) == 0) self.assertTrue(len(source_status.failures) == 0)
self.assertTrue(len(source_status.warnings) == 0) self.assertTrue(len(source_status.warnings) == 0)
self.assertTrue(len(source_status.filtered) == 1) self.assertTrue(len(source_status.filtered) == 1)
self.assertTrue( self.assertGreaterEqual(
(len(source_status.records) + len(source_status.updated_records)) (len(source_status.records) + len(source_status.updated_records)),
>= self.expected_tables() self.expected_tables(),
) )
self.assertTrue(len(sink_status.failures) == 0) self.assertTrue(len(sink_status.failures) == 0)
self.assertTrue(len(sink_status.warnings) == 0) self.assertTrue(len(sink_status.warnings) == 0)
self.assertTrue( self.assertGreater(
(len(sink_status.records) + len(sink_status.updated_records)) (len(sink_status.records) + len(sink_status.updated_records)),
> self.expected_tables() self.expected_tables(),
) )
def create_table_and_view(self) -> None: def create_table_and_view(self) -> None:
@ -130,17 +159,22 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods):
# Otherwise the sampling here does not pick up rows # Otherwise the sampling here does not pick up rows
extra_args={"profileSample": 100}, extra_args={"profileSample": 100},
) )
# wait for query log to be updated
self.wait_for_query_log()
# run profiler with new tables # run profiler with new tables
result = self.run_command("profile") result = self.run_command("profile")
sink_status, source_status = self.retrieve_statuses(result) sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_table_with_profiler(source_status, sink_status) self.assert_for_table_with_profiler(source_status, sink_status)
self.custom_profiler_assertions()
@staticmethod @staticmethod
def expected_tables() -> int: def expected_tables() -> int:
return 7 return 7
def inserted_rows_count(self) -> int: def inserted_rows_count(self) -> int:
return len(self.insert_data_queries) return len(
[q for q in self.insert_data_queries if "E2E_DB.e2e_test.persons" in q]
)
def view_column_lineage_count(self) -> int: def view_column_lineage_count(self) -> int:
return 2 return 2
@ -171,7 +205,7 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods):
@staticmethod @staticmethod
def expected_filtered_table_includes() -> int: def expected_filtered_table_includes() -> int:
return 5 return 8
@staticmethod @staticmethod
def expected_filtered_table_excludes() -> int: def expected_filtered_table_excludes() -> int:
@ -179,7 +213,7 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods):
@staticmethod @staticmethod
def expected_filtered_mix() -> int: def expected_filtered_mix() -> int:
return 6 return 7
@staticmethod @staticmethod
def delete_queries() -> List[str]: def delete_queries() -> List[str]:
@ -196,3 +230,115 @@ class SnowflakeCliTest(CliCommonDB.TestSuite, SQACommonMethods):
UPDATE E2E_DB.E2E_TEST.PERSONS SET full_name = 'Bruce Wayne' WHERE full_name = 'Clark Kent' UPDATE E2E_DB.E2E_TEST.PERSONS SET full_name = 'Bruce Wayne' WHERE full_name = 'Clark Kent'
""", """,
] ]
def custom_profiler_assertions(self):
cases = [
(
"e2e_snowflake.E2E_DB.E2E_TEST.E2E_TABLE",
[
SystemProfile(
timestamp=Timestamp(root=0),
operation=DmlOperationType.INSERT,
rowsAffected=1,
),
SystemProfile(
timestamp=Timestamp(root=0),
operation=DmlOperationType.INSERT,
rowsAffected=1,
),
],
),
(
"e2e_snowflake.E2E_DB.PUBLIC.E2E_TABLE",
[
SystemProfile(
timestamp=Timestamp(root=0),
operation=DmlOperationType.INSERT,
rowsAffected=1,
)
],
),
(
"e2e_snowflake.E2E_DB.PUBLIC.PUBLIC_TABLE",
[
SystemProfile(
timestamp=Timestamp(root=0),
operation=DmlOperationType.INSERT,
rowsAffected=1,
),
SystemProfile(
timestamp=Timestamp(root=0),
operation=DmlOperationType.INSERT,
rowsAffected=1,
),
SystemProfile(
timestamp=Timestamp(root=0),
operation=DmlOperationType.UPDATE,
rowsAffected=1,
),
SystemProfile(
timestamp=Timestamp(root=0),
operation=DmlOperationType.DELETE,
rowsAffected=1,
),
],
),
]
for table_fqn, expected_profile in cases:
actual_profiles = self.openmetadata.get_profile_data(
table_fqn,
start_ts=int((datetime.now().timestamp() - 600) * 1000),
end_ts=int(datetime.now().timestamp() * 1000),
profile_type=SystemProfile,
).entities
actual_profiles = sorted(actual_profiles, key=lambda x: x.timestamp.root)
actual_profiles = actual_profiles[-len(expected_profile) :]
actual_profiles = [
p.copy(update={"timestamp": Timestamp(root=0)}) for p in actual_profiles
]
try:
assert_equal_pydantic_objects(expected_profile, actual_profiles)
except AssertionError as e:
raise AssertionError(f"Table: {table_fqn}\n{e}")
@classmethod
def wait_for_query_log(cls, timeout=600):
start = datetime.now().timestamp()
cls.engine.execute("SELECT 'e2e_query_log_wait'")
latest = 0
while latest < start:
sleep(5)
latest = (
cls.engine.execute(
'SELECT max(start_time) FROM "SNOWFLAKE"."ACCOUNT_USAGE"."QUERY_HISTORY"'
)
.scalar()
.timestamp()
)
if (datetime.now().timestamp() - start) > timeout:
raise TimeoutError(f"Query log not updated for {timeout} seconds")
def get_data_quality_table(self):
return "e2e_snowflake.E2E_DB.E2E_TEST.PERSONS"
def get_test_case_definitions(self) -> List[TestCaseDefinition]:
return [
TestCaseDefinition(
name="snowflake_data_diff",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[
TestCaseParameterValue(
name="table2",
value=self.get_data_quality_table(),
),
TestCaseParameterValue(
name="keyColumns",
value='["PERSON_ID"]',
),
],
)
]
def get_expected_test_case_results(self):
return [TestCaseResult(testCaseStatus=TestCaseStatus.Success)]