From d0ca05efbf28f4d4148f626a7187e7c24821947f Mon Sep 17 00:00:00 2001 From: Imri Paran Date: Fri, 11 Oct 2024 08:37:58 +0200 Subject: [PATCH] MINOR: add data quality tests (#18193) * test: add dq tests * wip * fixed test_all_definition_exists --- .../integration/postgres/test_data_quality.py | 263 +++++++++++++++++- 1 file changed, 256 insertions(+), 7 deletions(-) diff --git a/ingestion/tests/integration/postgres/test_data_quality.py b/ingestion/tests/integration/postgres/test_data_quality.py index 0e27e7d03cf..b8b0b2ff033 100644 --- a/ingestion/tests/integration/postgres/test_data_quality.py +++ b/ingestion/tests/integration/postgres/test_data_quality.py @@ -1,3 +1,6 @@ +import glob +import json +import os.path import sys from dataclasses import dataclass from typing import List @@ -6,6 +9,7 @@ import pytest from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects from metadata.data_quality.api.models import TestCaseDefinition +from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( TestSuiteConfigType, @@ -19,12 +23,17 @@ from metadata.generated.schema.metadataIngestion.workflow import ( SourceConfig, WorkflowConfig, ) -from metadata.generated.schema.tests.basic import TestCaseStatus +from metadata.generated.schema.tests.basic import ( + TestCaseResult, + TestCaseStatus, + TestResultValue, +) from metadata.generated.schema.tests.testCase import TestCase from metadata.generated.schema.tests.testSuite import TestSuite from metadata.generated.schema.type.basic import ComponentConfig from metadata.ingestion.api.status import TruncatedStackTraceError from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils import entity_link from metadata.workflow.data_quality import TestSuiteWorkflow from metadata.workflow.metadata import MetadataWorkflow @@ -32,7 +41,7 @@ if not sys.version_info >= (3, 9): pytest.skip("requires python 3.9+", allow_module_level=True) -@pytest.fixture() +@pytest.fixture(scope="module") def run_data_quality_workflow( run_workflow, ingestion_config, @@ -66,6 +75,7 @@ def run_data_quality_workflow( "parameterValues": [ {"name": "allowedValues", "value": "['Tom', 'Jerry']"} ], + "computePassedFailedRowCount": True, }, { "name": "first_name_includes_tom_and_jerry", @@ -91,6 +101,104 @@ def run_data_quality_workflow( "columnName": "customer_id", "parameterValues": [], }, + { + "name": "column_values_not_match_regex", + "testDefinitionName": "columnValuesToNotMatchRegex", + "columnName": "email", + "parameterValues": [ + {"name": "forbiddenRegex", "value": ".*@example\\.com$"} + ], + }, + { + "name": "table_column_count_between", + "testDefinitionName": "tableColumnCountToBeBetween", + "parameterValues": [ + {"name": "minColValue", "value": "8"}, + {"name": "maxColValue", "value": "12"}, + ], + }, + { + "name": "table_column_count_equal", + "testDefinitionName": "tableColumnCountToEqual", + "parameterValues": [{"name": "columnCount", "value": "11"}], + }, + { + "name": "table_column_name_exists", + "testDefinitionName": "tableColumnNameToExist", + "parameterValues": [ + {"name": "columnName", "value": "customer_id"} + ], + }, + { + "name": "table_column_names_match_set", + "testDefinitionName": "tableColumnToMatchSet", + "parameterValues": [ + { + "name": "columnNames", + "value": "customer_id, store_id, first_name, last_name, email, address_id, activebool, create_date, last_update, active, json_field", + }, + {"name": "ordered", "value": "false"}, + ], + }, + { + "name": "custom_sql_query_count", + "testDefinitionName": "tableCustomSQLQuery", + "parameterValues": [ + { + "name": "sqlExpression", + "value": "SELECT CASE WHEN COUNT(*) > 0 THEN 0 ELSE 1 END FROM customer WHERE active = 1", + }, + {"name": "strategy", "value": "COUNT"}, + {"name": "threshold", "value": "0"}, + ], + }, + { + "name": "custom_sql_query_rows", + "testDefinitionName": "tableCustomSQLQuery", + "parameterValues": [ + { + "name": "sqlExpression", + "value": "SELECT * FROM customer WHERE active = 1", + }, + {"name": "strategy", "value": "ROWS"}, + {"name": "threshold", "value": "10"}, + ], + }, + { + "name": "table_row_count_between", + "testDefinitionName": "tableRowCountToBeBetween", + "parameterValues": [ + {"name": "minValue", "value": "100"}, + {"name": "maxValue", "value": "1000"}, + ], + }, + { + "name": "table_row_count_equal", + "testDefinitionName": "tableRowCountToEqual", + "parameterValues": [{"name": "value", "value": "599"}], + }, + { + "name": "table_row_inserted_count_between_fail", + "testDefinitionName": "tableRowInsertedCountToBeBetween", + "parameterValues": [ + {"name": "min", "value": "10"}, + {"name": "max", "value": "50"}, + {"name": "columnName", "value": "create_date"}, + {"name": "rangeType", "value": "DAY"}, + {"name": "rangeInterval", "value": "1"}, + ], + }, + { + "name": "table_row_inserted_count_between_success", + "testDefinitionName": "tableRowInsertedCountToBeBetween", + "parameterValues": [ + {"name": "min", "value": "590"}, + {"name": "max", "value": "600"}, + {"name": "columnName", "value": "last_update"}, + {"name": "rangeType", "value": "YEAR"}, + {"name": "rangeInterval", "value": "12"}, + ], + }, ], } ), @@ -109,14 +217,150 @@ def run_data_quality_workflow( metadata.delete(TestSuite, test_suite.id, recursive=True, hard_delete=True) +def test_all_definition_exists(metadata, run_data_quality_workflow, db_service): + test_difinitions_glob = ( + os.path.dirname(__file__) + + "/../../../.." + + "/openmetadata-service/src/main/resources/json/data/tests/**.json" + ) + test_definitions: List[str] = [] + for test_definition_file in glob.glob(test_difinitions_glob, recursive=False): + test_definitions.append(json.load(open(test_definition_file))["name"]) + assert len(test_definitions) > 0 + table: Table = metadata.get_by_name( + Table, + f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer", + nullable=False, + ) + tcs: List[TestCase] = metadata.list_entities( + TestCase, + fields=["*"], + params={ + "entityLink": entity_link.get_entity_link( + Table, table.fullyQualifiedName.root + ) + }, + ).entities + tcs_dict = {tc.testDefinition.fullyQualifiedName: tc for tc in tcs} + excluded = { + # TODO implement these too + "columnValueLengthsToBeBetween", + "columnValueMaxToBeBetween", + "columnValueMinToBeBetween", + "columnValuesToBeUnique", + "tableDataToBeFresh", + "columnValuesToMatchRegex", + "columnValuesToNotMatchRegex", + "columnValueStdDevToBeBetween", + "columnValuesToBeNotNull", + "columnValueMedianToBeBetween", + "columnValuesSumToBeBetween", + "columnValuesToBeInSet", + "columnValuesMissingCount", + "columnValuesToBeNotInSet", + "columnValueMeanToBeBetween", + "columnValuesToBeBetween", + "tableDiff", + } + missing = set() + for test_definition in test_definitions: + if test_definition in tcs_dict: + assert ( + test_definition not in excluded + ), f"Remove test from excluded list: {test_definition}" + else: + if test_definition in excluded: + continue + missing.add(test_definition.fullyQualifiedName.root) + assert not missing, f"Missing test cases: {missing}" + + @pytest.mark.parametrize( "test_case_name,expected_status", [ - ("first_name_includes_tom_and_jerry_wo_enum", TestCaseStatus.Success), - ("first_name_includes_tom_and_jerry", TestCaseStatus.Success), - ("first_name_is_tom_or_jerry", TestCaseStatus.Failed), - ("id_no_bounds", TestCaseStatus.Success), + ( + "first_name_includes_tom_and_jerry_wo_enum", + TestCaseResult( + timestamp=0, + testCaseStatus=TestCaseStatus.Success, + passedRows=2, + failedRows=597, + ), + ), + ( + "first_name_includes_tom_and_jerry", + TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success), + ), + ( + "first_name_is_tom_or_jerry", + TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Failed), + ), + ( + "id_no_bounds", + TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success), + ), + ( + "column_values_not_match_regex", + TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success), + ), + ( + "table_column_count_between", + TestCaseResult( + timestamp=0, + testCaseStatus=TestCaseStatus.Success, + testResultValue=[TestResultValue(name="columnCount", value="11")], + ), + ), + ( + "table_column_count_equal", + TestCaseResult( + timestamp=0, + testCaseStatus=TestCaseStatus.Success, + testResultValue=[TestResultValue(name="columnCount", value="11")], + ), + ), + ( + "table_column_name_exists", + TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success), + ), + ( + "table_column_names_match_set", + TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success), + ), + ( + "custom_sql_query_count", + TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success), + ), + ( + "custom_sql_query_rows", + TestCaseResult( + timestamp=0, + testCaseStatus=TestCaseStatus.Failed, + testResultValues=[{"name": "resultRowCount", "value": "599"}], + ), + ), + ( + "table_row_count_between", + TestCaseResult( + timestamp=0, + testCaseStatus=TestCaseStatus.Success, + testResultValue=[TestResultValue(name="rowCount", value="599")], + ), + ), + ( + "table_row_count_equal", + TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success), + ), + ( + "table_row_inserted_count_between_fail", + TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Failed), + ), + ( + "table_row_inserted_count_between_success", + TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success), + ), ], + ids=lambda *x: x[0], ) def test_data_quality( run_data_quality_workflow, metadata: OpenMetadata, test_case_name, expected_status @@ -128,7 +372,12 @@ def test_data_quality( (t for t in test_cases if t.name.root == test_case_name), None ) assert test_case is not None - assert test_case.testCaseResult.testCaseStatus == expected_status + assert_equal_pydantic_objects( + expected_status.model_copy( + update={"timestamp": test_case.testCaseResult.timestamp} + ), + test_case.testCaseResult, + ) @pytest.fixture()