mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-24 23:34:51 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			454 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			454 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Test cases for data quality workflow
 | |
| import sys
 | |
| from dataclasses import dataclass
 | |
| from typing import List
 | |
| 
 | |
| import pytest
 | |
| 
 | |
| from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
 | |
| from metadata.data_quality.api.models import TestCaseDefinition
 | |
| from metadata.generated.schema.entity.services.databaseService import DatabaseService
 | |
| from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
 | |
|     ServiceConnections,
 | |
|     TestSuiteConfigType,
 | |
|     TestSuitePipeline,
 | |
| )
 | |
| from metadata.generated.schema.metadataIngestion.workflow import (
 | |
|     OpenMetadataWorkflowConfig,
 | |
|     Processor,
 | |
|     Sink,
 | |
|     Source,
 | |
|     SourceConfig,
 | |
|     WorkflowConfig,
 | |
| )
 | |
| from metadata.generated.schema.tests.basic import (
 | |
|     TestCaseResult,
 | |
|     TestCaseStatus,
 | |
|     TestResultValue,
 | |
| )
 | |
| from metadata.generated.schema.tests.testCase import TestCase
 | |
| from metadata.generated.schema.tests.testSuite import TestSuite
 | |
| from metadata.generated.schema.type.basic import ComponentConfig
 | |
| from metadata.ingestion.api.status import TruncatedStackTraceError
 | |
| from metadata.ingestion.ometa.ometa_api import OpenMetadata
 | |
| from metadata.workflow.data_quality import TestSuiteWorkflow
 | |
| from metadata.workflow.metadata import MetadataWorkflow
 | |
| 
 | |
| if not sys.version_info >= (3, 9):
 | |
|     pytest.skip("requires python 3.9+", allow_module_level=True)
 | |
| 
 | |
| 
 | |
| @pytest.fixture(scope="module")
 | |
| def run_data_quality_workflow(
 | |
|     run_workflow,
 | |
|     ingestion_config,
 | |
|     db_service: DatabaseService,
 | |
|     metadata: OpenMetadata,
 | |
|     sink_config,
 | |
|     workflow_config,
 | |
| ):
 | |
|     run_workflow(MetadataWorkflow, ingestion_config)
 | |
|     test_suite_config = OpenMetadataWorkflowConfig(
 | |
|         source=Source(
 | |
|             type="postgres",
 | |
|             serviceName="MyTestSuite",
 | |
|             sourceConfig=SourceConfig(
 | |
|                 config=TestSuitePipeline(
 | |
|                     type=TestSuiteConfigType.TestSuite,
 | |
|                     entityFullyQualifiedName=f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer",
 | |
|                     serviceConnections=[
 | |
|                         ServiceConnections(
 | |
|                             serviceName=db_service.name.root,
 | |
|                             serviceConnection=db_service.connection,
 | |
|                         )
 | |
|                     ],
 | |
|                 )
 | |
|             ),
 | |
|         ),
 | |
|         processor=Processor(
 | |
|             type="orm-test-runner",
 | |
|             config=ComponentConfig(
 | |
|                 {
 | |
|                     "testCases": [
 | |
|                         {
 | |
|                             "name": "first_name_includes_tom_and_jerry_wo_enum",
 | |
|                             "testDefinitionName": "columnValuesToBeInSet",
 | |
|                             "columnName": "first_name",
 | |
|                             "parameterValues": [
 | |
|                                 {"name": "allowedValues", "value": "['Tom', 'Jerry']"}
 | |
|                             ],
 | |
|                             "computePassedFailedRowCount": True,
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "first_name_includes_tom_and_jerry",
 | |
|                             "testDefinitionName": "columnValuesToBeInSet",
 | |
|                             "columnName": "first_name",
 | |
|                             "parameterValues": [
 | |
|                                 {"name": "allowedValues", "value": "['Tom', 'Jerry']"},
 | |
|                                 {"name": "matchEnum", "value": "false"},
 | |
|                             ],
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "first_name_is_tom_or_jerry",
 | |
|                             "testDefinitionName": "columnValuesToBeInSet",
 | |
|                             "columnName": "first_name",
 | |
|                             "parameterValues": [
 | |
|                                 {"name": "allowedValues", "value": "['Tom', 'Jerry']"},
 | |
|                                 {"name": "matchEnum", "value": "true"},
 | |
|                             ],
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "id_no_bounds",
 | |
|                             "testDefinitionName": "columnValuesToBeBetween",
 | |
|                             "columnName": "customer_id",
 | |
|                             "parameterValues": [],
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "column_values_not_match_regex",
 | |
|                             "testDefinitionName": "columnValuesToNotMatchRegex",
 | |
|                             "columnName": "email",
 | |
|                             "parameterValues": [
 | |
|                                 {"name": "forbiddenRegex", "value": ".*@example\\.com$"}
 | |
|                             ],
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "table_column_count_between",
 | |
|                             "testDefinitionName": "tableColumnCountToBeBetween",
 | |
|                             "parameterValues": [
 | |
|                                 {"name": "minColValue", "value": "8"},
 | |
|                                 {"name": "maxColValue", "value": "12"},
 | |
|                             ],
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "table_column_count_equal",
 | |
|                             "testDefinitionName": "tableColumnCountToEqual",
 | |
|                             "parameterValues": [{"name": "columnCount", "value": "11"}],
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "table_column_name_exists",
 | |
|                             "testDefinitionName": "tableColumnNameToExist",
 | |
|                             "parameterValues": [
 | |
|                                 {"name": "columnName", "value": "customer_id"}
 | |
|                             ],
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "table_column_names_match_set",
 | |
|                             "testDefinitionName": "tableColumnToMatchSet",
 | |
|                             "parameterValues": [
 | |
|                                 {
 | |
|                                     "name": "columnNames",
 | |
|                                     "value": "customer_id, store_id, first_name, last_name, email, address_id, activebool, create_date, last_update, active, json_field",
 | |
|                                 },
 | |
|                                 {"name": "ordered", "value": "false"},
 | |
|                             ],
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "custom_sql_query_count",
 | |
|                             "testDefinitionName": "tableCustomSQLQuery",
 | |
|                             "parameterValues": [
 | |
|                                 {
 | |
|                                     "name": "sqlExpression",
 | |
|                                     "value": "SELECT CASE WHEN COUNT(*) > 0 THEN 0 ELSE 1 END FROM customer WHERE active = 1",
 | |
|                                 },
 | |
|                                 {"name": "strategy", "value": "COUNT"},
 | |
|                                 {"name": "threshold", "value": "0"},
 | |
|                             ],
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "custom_sql_query_rows",
 | |
|                             "testDefinitionName": "tableCustomSQLQuery",
 | |
|                             "parameterValues": [
 | |
|                                 {
 | |
|                                     "name": "sqlExpression",
 | |
|                                     "value": "SELECT * FROM customer WHERE active = 1",
 | |
|                                 },
 | |
|                                 {"name": "strategy", "value": "ROWS"},
 | |
|                                 {"name": "threshold", "value": "10"},
 | |
|                             ],
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "table_row_count_between",
 | |
|                             "testDefinitionName": "tableRowCountToBeBetween",
 | |
|                             "parameterValues": [
 | |
|                                 {"name": "minValue", "value": "100"},
 | |
|                                 {"name": "maxValue", "value": "1000"},
 | |
|                             ],
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "table_row_count_equal",
 | |
|                             "testDefinitionName": "tableRowCountToEqual",
 | |
|                             "parameterValues": [{"name": "value", "value": "599"}],
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "table_row_inserted_count_between_fail",
 | |
|                             "testDefinitionName": "tableRowInsertedCountToBeBetween",
 | |
|                             "parameterValues": [
 | |
|                                 {"name": "min", "value": "10"},
 | |
|                                 {"name": "max", "value": "50"},
 | |
|                                 {"name": "columnName", "value": "create_date"},
 | |
|                                 {"name": "rangeType", "value": "DAY"},
 | |
|                                 {"name": "rangeInterval", "value": "1"},
 | |
|                             ],
 | |
|                         },
 | |
|                         {
 | |
|                             "name": "table_row_inserted_count_between_success",
 | |
|                             "testDefinitionName": "tableRowInsertedCountToBeBetween",
 | |
|                             "parameterValues": [
 | |
|                                 {"name": "min", "value": "590"},
 | |
|                                 {"name": "max", "value": "600"},
 | |
|                                 {"name": "columnName", "value": "last_update"},
 | |
|                                 {"name": "rangeType", "value": "YEAR"},
 | |
|                                 {"name": "rangeInterval", "value": "50"},
 | |
|                             ],
 | |
|                         },
 | |
|                     ],
 | |
|                 }
 | |
|             ),
 | |
|         ),
 | |
|         sink=Sink.model_validate(sink_config),
 | |
|         workflowConfig=WorkflowConfig.model_validate(workflow_config),
 | |
|     )
 | |
|     test_suite_processor = TestSuiteWorkflow.create(test_suite_config)
 | |
|     test_suite_processor.execute()
 | |
|     test_suite_processor.raise_from_status()
 | |
|     yield
 | |
|     test_suite: TestSuite = metadata.get_by_name(
 | |
|         TestSuite, "MyTestSuite", nullable=True
 | |
|     )
 | |
|     if test_suite:
 | |
|         metadata.delete(TestSuite, test_suite.id, recursive=True, hard_delete=True)
 | |
| 
 | |
| 
 | |
| @pytest.mark.parametrize(
 | |
|     "test_case_name,expected_status",
 | |
|     [
 | |
|         (
 | |
|             "first_name_includes_tom_and_jerry_wo_enum",
 | |
|             TestCaseResult(
 | |
|                 timestamp=0,
 | |
|                 testCaseStatus=TestCaseStatus.Success,
 | |
|                 passedRows=2,
 | |
|                 failedRows=597,
 | |
|             ),
 | |
|         ),
 | |
|         (
 | |
|             "first_name_includes_tom_and_jerry",
 | |
|             TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
 | |
|         ),
 | |
|         (
 | |
|             "first_name_is_tom_or_jerry",
 | |
|             TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Failed),
 | |
|         ),
 | |
|         (
 | |
|             "id_no_bounds",
 | |
|             TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
 | |
|         ),
 | |
|         (
 | |
|             "column_values_not_match_regex",
 | |
|             TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
 | |
|         ),
 | |
|         (
 | |
|             "table_column_count_between",
 | |
|             TestCaseResult(
 | |
|                 timestamp=0,
 | |
|                 testCaseStatus=TestCaseStatus.Success,
 | |
|                 testResultValue=[TestResultValue(name="columnCount", value="11")],
 | |
|             ),
 | |
|         ),
 | |
|         (
 | |
|             "table_column_count_equal",
 | |
|             TestCaseResult(
 | |
|                 timestamp=0,
 | |
|                 testCaseStatus=TestCaseStatus.Success,
 | |
|                 testResultValue=[TestResultValue(name="columnCount", value="11")],
 | |
|             ),
 | |
|         ),
 | |
|         (
 | |
|             "table_column_name_exists",
 | |
|             TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
 | |
|         ),
 | |
|         (
 | |
|             "table_column_names_match_set",
 | |
|             TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
 | |
|         ),
 | |
|         (
 | |
|             "custom_sql_query_count",
 | |
|             TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
 | |
|         ),
 | |
|         (
 | |
|             "custom_sql_query_rows",
 | |
|             TestCaseResult(
 | |
|                 timestamp=0,
 | |
|                 testCaseStatus=TestCaseStatus.Failed,
 | |
|                 testResultValues=[{"name": "resultRowCount", "value": "599"}],
 | |
|             ),
 | |
|         ),
 | |
|         (
 | |
|             "table_row_count_between",
 | |
|             TestCaseResult(
 | |
|                 timestamp=0,
 | |
|                 testCaseStatus=TestCaseStatus.Success,
 | |
|                 testResultValue=[TestResultValue(name="rowCount", value="599")],
 | |
|             ),
 | |
|         ),
 | |
|         (
 | |
|             "table_row_count_equal",
 | |
|             TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
 | |
|         ),
 | |
|         (
 | |
|             "table_row_inserted_count_between_fail",
 | |
|             TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Failed),
 | |
|         ),
 | |
|         (
 | |
|             "table_row_inserted_count_between_success",
 | |
|             TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
 | |
|         ),
 | |
|     ],
 | |
|     ids=lambda *x: x[0],
 | |
| )
 | |
| def test_data_quality(
 | |
|     run_data_quality_workflow,
 | |
|     metadata: OpenMetadata,
 | |
|     test_case_name,
 | |
|     expected_status,
 | |
|     db_service,
 | |
| ):
 | |
|     test_cases: List[TestCase] = metadata.list_entities(
 | |
|         TestCase, fields=["*"], skip_on_failure=True
 | |
|     ).entities
 | |
|     test_case: TestCase = next(
 | |
|         (
 | |
|             t
 | |
|             for t in test_cases
 | |
|             if t.name.root == test_case_name
 | |
|             and "dvdrental.public.customer" in t.entityFQN
 | |
|         ),
 | |
|         None,
 | |
|     )
 | |
|     assert test_case is not None
 | |
|     assert_equal_pydantic_objects(
 | |
|         expected_status.model_copy(
 | |
|             update={"timestamp": test_case.testCaseResult.timestamp}
 | |
|         ),
 | |
|         test_case.testCaseResult,
 | |
|     )
 | |
| 
 | |
| 
 | |
| @pytest.fixture()
 | |
| def get_incompatible_column_type_config(workflow_config, sink_config):
 | |
|     def inner(entity_fqn: str, incompatible_test_case: TestCaseDefinition):
 | |
|         return {
 | |
|             "source": {
 | |
|                 "type": "postgres",
 | |
|                 "serviceName": "MyTestSuite",
 | |
|                 "sourceConfig": {
 | |
|                     "config": {
 | |
|                         "type": "TestSuite",
 | |
|                         "entityFullyQualifiedName": entity_fqn,
 | |
|                     }
 | |
|                 },
 | |
|             },
 | |
|             "processor": {
 | |
|                 "type": "orm-test-runner",
 | |
|                 "config": {
 | |
|                     "testCases": [
 | |
|                         incompatible_test_case.model_dump(),
 | |
|                         {
 | |
|                             "name": "compatible_test",
 | |
|                             "testDefinitionName": "columnValueMaxToBeBetween",
 | |
|                             "columnName": "customer_id",
 | |
|                             "parameterValues": [
 | |
|                                 {"name": "minValueForMaxInCol", "value": "0"},
 | |
|                                 {"name": "maxValueForMaxInCol", "value": "10"},
 | |
|                             ],
 | |
|                         },
 | |
|                     ]
 | |
|                 },
 | |
|             },
 | |
|             "sink": sink_config,
 | |
|             "workflowConfig": workflow_config,
 | |
|         }
 | |
| 
 | |
|     return inner
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class IncompatibleTypeParameter:
 | |
|     entity_fqn: str
 | |
|     test_case: TestCaseDefinition
 | |
|     expected_failure: TruncatedStackTraceError
 | |
| 
 | |
| 
 | |
| @pytest.fixture(
 | |
|     params=[
 | |
|         IncompatibleTypeParameter(
 | |
|             entity_fqn="{database_service}.dvdrental.public.customer",
 | |
|             test_case=TestCaseDefinition(
 | |
|                 name="string_max_between",
 | |
|                 testDefinitionName="columnValueMaxToBeBetween",
 | |
|                 columnName="first_name",
 | |
|                 parameterValues=[
 | |
|                     {"name": "minValueForMaxInCol", "value": "0"},
 | |
|                     {"name": "maxValueForMaxInCol", "value": "10"},
 | |
|                 ],
 | |
|             ),
 | |
|             expected_failure=TruncatedStackTraceError(
 | |
|                 name="Incompatible Column for Test Case",
 | |
|                 error="Test case string_max_between of type columnValueMaxToBeBetween "
 | |
|                 "is not compatible with column first_name of type VARCHAR",
 | |
|             ),
 | |
|         ),
 | |
|         IncompatibleTypeParameter(
 | |
|             entity_fqn="{database_service}.dvdrental.public.customer",
 | |
|             test_case=TestCaseDefinition(
 | |
|                 name="unique_json_column",
 | |
|                 testDefinitionName="columnValuesToBeUnique",
 | |
|                 columnName="json_field",
 | |
|             ),
 | |
|             expected_failure=TruncatedStackTraceError(
 | |
|                 name="Incompatible Column for Test Case",
 | |
|                 error="Test case unique_json_column of type columnValuesToBeUnique "
 | |
|                 "is not compatible with column json_field of type JSON",
 | |
|             ),
 | |
|         ),
 | |
|     ],
 | |
|     ids=lambda x: x.test_case.name,
 | |
| )
 | |
| def parameters(request, db_service):
 | |
|     request.param.entity_fqn = request.param.entity_fqn.format(
 | |
|         database_service=db_service.fullyQualifiedName.root
 | |
|     )
 | |
|     return request.param
 | |
| 
 | |
| 
 | |
| def test_incompatible_column_type(
 | |
|     parameters: IncompatibleTypeParameter,
 | |
|     patch_passwords_for_db_services,
 | |
|     run_workflow,
 | |
|     ingestion_config,
 | |
|     get_incompatible_column_type_config,
 | |
|     metadata: OpenMetadata,
 | |
|     db_service,
 | |
|     cleanup_fqns,
 | |
| ):
 | |
|     run_workflow(MetadataWorkflow, ingestion_config)
 | |
|     test_suite_processor = run_workflow(
 | |
|         TestSuiteWorkflow,
 | |
|         get_incompatible_column_type_config(
 | |
|             parameters.entity_fqn, parameters.test_case
 | |
|         ),
 | |
|         raise_from_status=False,
 | |
|     )
 | |
|     cleanup_fqns(
 | |
|         TestCase,
 | |
|         f"{parameters.entity_fqn}.{parameters.test_case.columnName}.{parameters.test_case.name}",
 | |
|     )
 | |
|     assert_equal_pydantic_objects(
 | |
|         parameters.expected_failure,
 | |
|         test_suite_processor.steps[0].get_status().failures[0],
 | |
|     )
 | |
|     assert (
 | |
|         f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer.customer_id.compatible_test"
 | |
|         in test_suite_processor.steps[1].get_status().records
 | |
|     ), "Test case compatible_test should pass"
 | 
