| 
									
										
										
										
											2024-05-28 09:30:30 +02:00
										 |  |  | import sys | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  | from typing import List | 
					
						
							| 
									
										
										
										
											2024-05-28 09:30:30 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | import pytest | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from metadata.generated.schema.entity.services.databaseService import DatabaseService | 
					
						
							|  |  |  | from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( | 
					
						
							|  |  |  |     TestSuiteConfigType, | 
					
						
							|  |  |  |     TestSuitePipeline, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.metadataIngestion.workflow import ( | 
					
						
							|  |  |  |     LogLevels, | 
					
						
							|  |  |  |     OpenMetadataWorkflowConfig, | 
					
						
							|  |  |  |     Processor, | 
					
						
							|  |  |  |     Sink, | 
					
						
							|  |  |  |     Source, | 
					
						
							|  |  |  |     SourceConfig, | 
					
						
							|  |  |  |     WorkflowConfig, | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | from metadata.generated.schema.tests.basic import TestCaseStatus | 
					
						
							|  |  |  | from metadata.generated.schema.tests.testCase import TestCase | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  | from metadata.generated.schema.type.basic import ComponentConfig | 
					
						
							| 
									
										
										
										
											2024-05-28 09:30:30 +02:00
										 |  |  | from metadata.ingestion.ometa.ometa_api import OpenMetadata | 
					
						
							|  |  |  | from metadata.workflow.data_quality import TestSuiteWorkflow | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if not sys.version_info >= (3, 9): | 
					
						
							|  |  |  |     pytest.skip("requires python 3.9+", allow_module_level=True) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @pytest.fixture(scope="module") | 
					
						
							|  |  |  | def run_data_quality_workflow( | 
					
						
							|  |  |  |     ingest_metadata, db_service: DatabaseService, metadata: OpenMetadata | 
					
						
							|  |  |  | ): | 
					
						
							|  |  |  |     workflow_config = OpenMetadataWorkflowConfig( | 
					
						
							|  |  |  |         source=Source( | 
					
						
							|  |  |  |             type=TestSuiteConfigType.TestSuite.value, | 
					
						
							|  |  |  |             serviceName="MyTestSuite", | 
					
						
							|  |  |  |             sourceConfig=SourceConfig( | 
					
						
							|  |  |  |                 config=TestSuitePipeline( | 
					
						
							|  |  |  |                     type=TestSuiteConfigType.TestSuite, | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |                     entityFullyQualifiedName=f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer", | 
					
						
							| 
									
										
										
										
											2024-05-28 09:30:30 +02:00
										 |  |  |                 ) | 
					
						
							|  |  |  |             ), | 
					
						
							|  |  |  |             serviceConnection=db_service.connection, | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |         processor=Processor( | 
					
						
							|  |  |  |             type="orm-test-runner", | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |             config=ComponentConfig( | 
					
						
							|  |  |  |                 { | 
					
						
							|  |  |  |                     "testCases": [ | 
					
						
							|  |  |  |                         { | 
					
						
							|  |  |  |                             "name": "first_name_includes_tom_and_jerry_wo_enum", | 
					
						
							|  |  |  |                             "testDefinitionName": "columnValuesToBeInSet", | 
					
						
							|  |  |  |                             "columnName": "first_name", | 
					
						
							|  |  |  |                             "parameterValues": [ | 
					
						
							|  |  |  |                                 {"name": "allowedValues", "value": "['Tom', 'Jerry']"} | 
					
						
							|  |  |  |                             ], | 
					
						
							|  |  |  |                         }, | 
					
						
							|  |  |  |                         { | 
					
						
							|  |  |  |                             "name": "first_name_includes_tom_and_jerry", | 
					
						
							|  |  |  |                             "testDefinitionName": "columnValuesToBeInSet", | 
					
						
							|  |  |  |                             "columnName": "first_name", | 
					
						
							|  |  |  |                             "parameterValues": [ | 
					
						
							|  |  |  |                                 {"name": "allowedValues", "value": "['Tom', 'Jerry']"}, | 
					
						
							|  |  |  |                                 {"name": "matchEnum", "value": ""}, | 
					
						
							|  |  |  |                             ], | 
					
						
							|  |  |  |                         }, | 
					
						
							|  |  |  |                         { | 
					
						
							|  |  |  |                             "name": "first_name_is_tom_or_jerry", | 
					
						
							|  |  |  |                             "testDefinitionName": "columnValuesToBeInSet", | 
					
						
							|  |  |  |                             "columnName": "first_name", | 
					
						
							|  |  |  |                             "parameterValues": [ | 
					
						
							|  |  |  |                                 {"name": "allowedValues", "value": "['Tom', 'Jerry']"}, | 
					
						
							|  |  |  |                                 {"name": "matchEnum", "value": "True"}, | 
					
						
							|  |  |  |                             ], | 
					
						
							|  |  |  |                         }, | 
					
						
							|  |  |  |                     ], | 
					
						
							|  |  |  |                 } | 
					
						
							| 
									
										
										
										
											2024-05-28 09:30:30 +02:00
										 |  |  |             ), | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |         sink=Sink( | 
					
						
							|  |  |  |             type="metadata-rest", | 
					
						
							|  |  |  |             config={}, | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |         workflowConfig=WorkflowConfig( | 
					
						
							|  |  |  |             loggerLevel=LogLevels.DEBUG, openMetadataServerConfig=metadata.config | 
					
						
							|  |  |  |         ), | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     test_suite_procesor = TestSuiteWorkflow.create(workflow_config) | 
					
						
							|  |  |  |     test_suite_procesor.execute() | 
					
						
							|  |  |  |     test_suite_procesor.raise_from_status() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | @pytest.mark.parametrize( | 
					
						
							|  |  |  |     "test_case_name,expected_status", | 
					
						
							|  |  |  |     [ | 
					
						
							|  |  |  |         ("first_name_includes_tom_and_jerry_wo_enum", TestCaseStatus.Success), | 
					
						
							|  |  |  |         ("first_name_includes_tom_and_jerry", TestCaseStatus.Success), | 
					
						
							|  |  |  |         ("first_name_is_tom_or_jerry", TestCaseStatus.Failed), | 
					
						
							|  |  |  |     ], | 
					
						
							|  |  |  | ) | 
					
						
							|  |  |  | def test_data_quality( | 
					
						
							|  |  |  |     run_data_quality_workflow, metadata: OpenMetadata, test_case_name, expected_status | 
					
						
							|  |  |  | ): | 
					
						
							|  |  |  |     test_cases: List[TestCase] = metadata.list_entities( | 
					
						
							|  |  |  |         TestCase, fields=["*"], skip_on_failure=True | 
					
						
							|  |  |  |     ).entities | 
					
						
							|  |  |  |     test_case: TestCase = next( | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |         (t for t in test_cases if t.name.root == test_case_name), None | 
					
						
							| 
									
										
										
										
											2024-05-28 09:30:30 +02:00
										 |  |  |     ) | 
					
						
							|  |  |  |     assert test_case is not None | 
					
						
							|  |  |  |     assert test_case.testCaseResult.testCaseStatus == expected_status |