| 
									
										
										
										
											2025-04-03 10:39:47 +05:30
										 |  |  | #  Copyright 2025 Collate | 
					
						
							|  |  |  | #  Licensed under the Collate Community License, Version 1.0 (the "License"); | 
					
						
							| 
									
										
										
										
											2024-11-27 08:50:54 +01:00
										 |  |  | #  you may not use this file except in compliance with the License. | 
					
						
							|  |  |  | #  You may obtain a copy of the License at | 
					
						
							| 
									
										
										
										
											2025-04-03 10:39:47 +05:30
										 |  |  | #  https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE | 
					
						
							| 
									
										
										
										
											2024-11-27 08:50:54 +01:00
										 |  |  | #  Unless required by applicable law or agreed to in writing, software | 
					
						
							|  |  |  | #  distributed under the License is distributed on an "AS IS" BASIS, | 
					
						
							|  |  |  | #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
					
						
							|  |  |  | #  See the License for the specific language governing permissions and | 
					
						
							|  |  |  | #  limitations under the License. | 
					
						
							|  |  |  | """
 | 
					
						
							|  |  |  | test data quality | 
					
						
							|  |  |  | """
 | 
					
						
							| 
									
										
										
										
											2024-05-28 09:30:30 +02:00
										 |  |  | from typing import List | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import pytest | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-02 17:17:21 +01:00
										 |  |  | from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import ( | 
					
						
							|  |  |  |     IngestionPipeline, | 
					
						
							|  |  |  |     PipelineState, | 
					
						
							|  |  |  | ) | 
					
						
							| 
									
										
										
										
											2024-05-28 09:30:30 +02:00
										 |  |  | from metadata.generated.schema.tests.basic import TestCaseStatus | 
					
						
							|  |  |  | from metadata.generated.schema.tests.testCase import TestCase | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class TestDataQuality: | 
					
						
							|  |  |  |     @pytest.mark.parametrize( | 
					
						
							|  |  |  |         "test_case_name,expected_status", | 
					
						
							|  |  |  |         [ | 
					
						
							|  |  |  |             ("first_name_includes_john", TestCaseStatus.Success), | 
					
						
							|  |  |  |             ("first_name_is_john", TestCaseStatus.Failed), | 
					
						
							|  |  |  |         ], | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     def test_data_quality( | 
					
						
							| 
									
										
										
										
											2024-12-02 17:17:21 +01:00
										 |  |  |         self, | 
					
						
							|  |  |  |         run_test_suite_workflow, | 
					
						
							|  |  |  |         metadata, | 
					
						
							|  |  |  |         test_case_name, | 
					
						
							|  |  |  |         expected_status, | 
					
						
							|  |  |  |         ingestion_fqn, | 
					
						
							| 
									
										
										
										
											2024-05-28 09:30:30 +02:00
										 |  |  |     ): | 
					
						
							|  |  |  |         test_cases: List[TestCase] = metadata.list_entities( | 
					
						
							|  |  |  |             TestCase, fields=["*"], skip_on_failure=True | 
					
						
							|  |  |  |         ).entities | 
					
						
							|  |  |  |         test_case: TestCase = next( | 
					
						
							| 
									
										
										
										
											2024-06-05 21:18:37 +02:00
										 |  |  |             (t for t in test_cases if t.name.root == test_case_name), None | 
					
						
							| 
									
										
										
										
											2024-05-28 09:30:30 +02:00
										 |  |  |         ) | 
					
						
							|  |  |  |         assert test_case is not None | 
					
						
							|  |  |  |         assert test_case.testCaseResult.testCaseStatus == expected_status | 
					
						
							| 
									
										
										
										
											2024-11-27 08:50:54 +01:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-12-02 17:17:21 +01:00
										 |  |  |         # Check the ingestion pipeline is properly created | 
					
						
							|  |  |  |         ingestion_pipeline: IngestionPipeline = metadata.get_by_name( | 
					
						
							|  |  |  |             entity=IngestionPipeline, fqn=ingestion_fqn, fields=["pipelineStatuses"] | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert ingestion_pipeline | 
					
						
							|  |  |  |         assert ingestion_pipeline.pipelineStatuses | 
					
						
							|  |  |  |         assert ( | 
					
						
							|  |  |  |             ingestion_pipeline.pipelineStatuses.pipelineState == PipelineState.success | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-27 08:50:54 +01:00
										 |  |  |     @pytest.mark.parametrize( | 
					
						
							|  |  |  |         "test_case_name,failed_rows", | 
					
						
							|  |  |  |         [ | 
					
						
							|  |  |  |             ("first_name_includes_john", None), | 
					
						
							|  |  |  |             ("first_name_is_john", 1), | 
					
						
							|  |  |  |         ], | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     def test_data_quality_with_sample( | 
					
						
							|  |  |  |         self, run_sampled_test_suite_workflow, metadata, test_case_name, failed_rows | 
					
						
							|  |  |  |     ): | 
					
						
							|  |  |  |         test_cases: List[TestCase] = metadata.list_entities( | 
					
						
							|  |  |  |             TestCase, fields=["*"], skip_on_failure=True | 
					
						
							|  |  |  |         ).entities | 
					
						
							|  |  |  |         test_case: TestCase = next( | 
					
						
							|  |  |  |             (t for t in test_cases if t.name.root == test_case_name), None | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert test_case is not None | 
					
						
							|  |  |  |         if failed_rows: | 
					
						
							|  |  |  |             assert test_case.testCaseResult.failedRows == pytest.approx( | 
					
						
							|  |  |  |                 failed_rows, abs=1 | 
					
						
							|  |  |  |             ) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @pytest.mark.parametrize( | 
					
						
							|  |  |  |         "test_case_name,expected_status,failed_rows", | 
					
						
							|  |  |  |         [ | 
					
						
							|  |  |  |             ("first_name_includes_john", TestCaseStatus.Success, None), | 
					
						
							|  |  |  |             ("first_name_is_john", TestCaseStatus.Failed, 1), | 
					
						
							|  |  |  |         ], | 
					
						
							|  |  |  |     ) | 
					
						
							|  |  |  |     def test_data_quality_with_partition( | 
					
						
							|  |  |  |         self, | 
					
						
							|  |  |  |         run_partitioned_test_suite_workflow, | 
					
						
							|  |  |  |         metadata, | 
					
						
							|  |  |  |         test_case_name, | 
					
						
							|  |  |  |         expected_status, | 
					
						
							|  |  |  |         failed_rows, | 
					
						
							|  |  |  |     ): | 
					
						
							|  |  |  |         test_cases: List[TestCase] = metadata.list_entities( | 
					
						
							|  |  |  |             TestCase, fields=["*"], skip_on_failure=True | 
					
						
							|  |  |  |         ).entities | 
					
						
							|  |  |  |         test_case: TestCase = next( | 
					
						
							|  |  |  |             (t for t in test_cases if t.name.root == test_case_name), None | 
					
						
							|  |  |  |         ) | 
					
						
							|  |  |  |         assert test_case is not None | 
					
						
							|  |  |  |         assert test_case.testCaseResult.testCaseStatus == expected_status | 
					
						
							|  |  |  |         if failed_rows: | 
					
						
							|  |  |  |             assert test_case.testCaseResult.failedRows == failed_rows |