MINOR: add data quality tests (#18193)

* test: add dq tests

* wip

* fixed test_all_definition_exists
This commit is contained in:
Imri Paran 2024-10-11 08:37:58 +02:00 committed by GitHub
parent 7d736f6195
commit d0ca05efbf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -1,3 +1,6 @@
import glob
import json
import os.path
import sys
from dataclasses import dataclass
from typing import List
@ -6,6 +9,7 @@ import pytest
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.data_quality.api.models import TestCaseDefinition
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuiteConfigType,
@ -19,12 +23,17 @@ from metadata.generated.schema.metadataIngestion.workflow import (
SourceConfig,
WorkflowConfig,
)
from metadata.generated.schema.tests.basic import TestCaseStatus
from metadata.generated.schema.tests.basic import (
TestCaseResult,
TestCaseStatus,
TestResultValue,
)
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type.basic import ComponentConfig
from metadata.ingestion.api.status import TruncatedStackTraceError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import entity_link
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.metadata import MetadataWorkflow
@ -32,7 +41,7 @@ if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
@pytest.fixture()
@pytest.fixture(scope="module")
def run_data_quality_workflow(
run_workflow,
ingestion_config,
@ -66,6 +75,7 @@ def run_data_quality_workflow(
"parameterValues": [
{"name": "allowedValues", "value": "['Tom', 'Jerry']"}
],
"computePassedFailedRowCount": True,
},
{
"name": "first_name_includes_tom_and_jerry",
@ -91,6 +101,104 @@ def run_data_quality_workflow(
"columnName": "customer_id",
"parameterValues": [],
},
{
"name": "column_values_not_match_regex",
"testDefinitionName": "columnValuesToNotMatchRegex",
"columnName": "email",
"parameterValues": [
{"name": "forbiddenRegex", "value": ".*@example\\.com$"}
],
},
{
"name": "table_column_count_between",
"testDefinitionName": "tableColumnCountToBeBetween",
"parameterValues": [
{"name": "minColValue", "value": "8"},
{"name": "maxColValue", "value": "12"},
],
},
{
"name": "table_column_count_equal",
"testDefinitionName": "tableColumnCountToEqual",
"parameterValues": [{"name": "columnCount", "value": "11"}],
},
{
"name": "table_column_name_exists",
"testDefinitionName": "tableColumnNameToExist",
"parameterValues": [
{"name": "columnName", "value": "customer_id"}
],
},
{
"name": "table_column_names_match_set",
"testDefinitionName": "tableColumnToMatchSet",
"parameterValues": [
{
"name": "columnNames",
"value": "customer_id, store_id, first_name, last_name, email, address_id, activebool, create_date, last_update, active, json_field",
},
{"name": "ordered", "value": "false"},
],
},
{
"name": "custom_sql_query_count",
"testDefinitionName": "tableCustomSQLQuery",
"parameterValues": [
{
"name": "sqlExpression",
"value": "SELECT CASE WHEN COUNT(*) > 0 THEN 0 ELSE 1 END FROM customer WHERE active = 1",
},
{"name": "strategy", "value": "COUNT"},
{"name": "threshold", "value": "0"},
],
},
{
"name": "custom_sql_query_rows",
"testDefinitionName": "tableCustomSQLQuery",
"parameterValues": [
{
"name": "sqlExpression",
"value": "SELECT * FROM customer WHERE active = 1",
},
{"name": "strategy", "value": "ROWS"},
{"name": "threshold", "value": "10"},
],
},
{
"name": "table_row_count_between",
"testDefinitionName": "tableRowCountToBeBetween",
"parameterValues": [
{"name": "minValue", "value": "100"},
{"name": "maxValue", "value": "1000"},
],
},
{
"name": "table_row_count_equal",
"testDefinitionName": "tableRowCountToEqual",
"parameterValues": [{"name": "value", "value": "599"}],
},
{
"name": "table_row_inserted_count_between_fail",
"testDefinitionName": "tableRowInsertedCountToBeBetween",
"parameterValues": [
{"name": "min", "value": "10"},
{"name": "max", "value": "50"},
{"name": "columnName", "value": "create_date"},
{"name": "rangeType", "value": "DAY"},
{"name": "rangeInterval", "value": "1"},
],
},
{
"name": "table_row_inserted_count_between_success",
"testDefinitionName": "tableRowInsertedCountToBeBetween",
"parameterValues": [
{"name": "min", "value": "590"},
{"name": "max", "value": "600"},
{"name": "columnName", "value": "last_update"},
{"name": "rangeType", "value": "YEAR"},
{"name": "rangeInterval", "value": "12"},
],
},
],
}
),
@ -109,14 +217,150 @@ def run_data_quality_workflow(
metadata.delete(TestSuite, test_suite.id, recursive=True, hard_delete=True)
def test_all_definition_exists(metadata, run_data_quality_workflow, db_service):
test_difinitions_glob = (
os.path.dirname(__file__)
+ "/../../../.."
+ "/openmetadata-service/src/main/resources/json/data/tests/**.json"
)
test_definitions: List[str] = []
for test_definition_file in glob.glob(test_difinitions_glob, recursive=False):
test_definitions.append(json.load(open(test_definition_file))["name"])
assert len(test_definitions) > 0
table: Table = metadata.get_by_name(
Table,
f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer",
nullable=False,
)
tcs: List[TestCase] = metadata.list_entities(
TestCase,
fields=["*"],
params={
"entityLink": entity_link.get_entity_link(
Table, table.fullyQualifiedName.root
)
},
).entities
tcs_dict = {tc.testDefinition.fullyQualifiedName: tc for tc in tcs}
excluded = {
# TODO implement these too
"columnValueLengthsToBeBetween",
"columnValueMaxToBeBetween",
"columnValueMinToBeBetween",
"columnValuesToBeUnique",
"tableDataToBeFresh",
"columnValuesToMatchRegex",
"columnValuesToNotMatchRegex",
"columnValueStdDevToBeBetween",
"columnValuesToBeNotNull",
"columnValueMedianToBeBetween",
"columnValuesSumToBeBetween",
"columnValuesToBeInSet",
"columnValuesMissingCount",
"columnValuesToBeNotInSet",
"columnValueMeanToBeBetween",
"columnValuesToBeBetween",
"tableDiff",
}
missing = set()
for test_definition in test_definitions:
if test_definition in tcs_dict:
assert (
test_definition not in excluded
), f"Remove test from excluded list: {test_definition}"
else:
if test_definition in excluded:
continue
missing.add(test_definition.fullyQualifiedName.root)
assert not missing, f"Missing test cases: {missing}"
@pytest.mark.parametrize(
"test_case_name,expected_status",
[
("first_name_includes_tom_and_jerry_wo_enum", TestCaseStatus.Success),
("first_name_includes_tom_and_jerry", TestCaseStatus.Success),
("first_name_is_tom_or_jerry", TestCaseStatus.Failed),
("id_no_bounds", TestCaseStatus.Success),
(
"first_name_includes_tom_and_jerry_wo_enum",
TestCaseResult(
timestamp=0,
testCaseStatus=TestCaseStatus.Success,
passedRows=2,
failedRows=597,
),
),
(
"first_name_includes_tom_and_jerry",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"first_name_is_tom_or_jerry",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Failed),
),
(
"id_no_bounds",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"column_values_not_match_regex",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"table_column_count_between",
TestCaseResult(
timestamp=0,
testCaseStatus=TestCaseStatus.Success,
testResultValue=[TestResultValue(name="columnCount", value="11")],
),
),
(
"table_column_count_equal",
TestCaseResult(
timestamp=0,
testCaseStatus=TestCaseStatus.Success,
testResultValue=[TestResultValue(name="columnCount", value="11")],
),
),
(
"table_column_name_exists",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"table_column_names_match_set",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"custom_sql_query_count",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"custom_sql_query_rows",
TestCaseResult(
timestamp=0,
testCaseStatus=TestCaseStatus.Failed,
testResultValues=[{"name": "resultRowCount", "value": "599"}],
),
),
(
"table_row_count_between",
TestCaseResult(
timestamp=0,
testCaseStatus=TestCaseStatus.Success,
testResultValue=[TestResultValue(name="rowCount", value="599")],
),
),
(
"table_row_count_equal",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"table_row_inserted_count_between_fail",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Failed),
),
(
"table_row_inserted_count_between_success",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
],
ids=lambda *x: x[0],
)
def test_data_quality(
run_data_quality_workflow, metadata: OpenMetadata, test_case_name, expected_status
@ -128,7 +372,12 @@ def test_data_quality(
(t for t in test_cases if t.name.root == test_case_name), None
)
assert test_case is not None
assert test_case.testCaseResult.testCaseStatus == expected_status
assert_equal_pydantic_objects(
expected_status.model_copy(
update={"timestamp": test_case.testCaseResult.timestamp}
),
test_case.testCaseResult,
)
@pytest.fixture()