mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-04 04:29:13 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			108 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			108 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#  Copyright 2025 Collate
 | 
						|
#  Licensed under the Collate Community License, Version 1.0 (the "License");
 | 
						|
#  you may not use this file except in compliance with the License.
 | 
						|
#  You may obtain a copy of the License at
 | 
						|
#  https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
 | 
						|
#  Unless required by applicable law or agreed to in writing, software
 | 
						|
#  distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
#  See the License for the specific language governing permissions and
 | 
						|
#  limitations under the License.
 | 
						|
"""
 | 
						|
test data quality
 | 
						|
"""
 | 
						|
from typing import List
 | 
						|
 | 
						|
import pytest
 | 
						|
 | 
						|
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
 | 
						|
    IngestionPipeline,
 | 
						|
    PipelineState,
 | 
						|
)
 | 
						|
from metadata.generated.schema.tests.basic import TestCaseStatus
 | 
						|
from metadata.generated.schema.tests.testCase import TestCase
 | 
						|
 | 
						|
 | 
						|
class TestDataQuality:
 | 
						|
    @pytest.mark.parametrize(
 | 
						|
        "test_case_name,expected_status",
 | 
						|
        [
 | 
						|
            ("first_name_includes_john", TestCaseStatus.Success),
 | 
						|
            ("first_name_is_john", TestCaseStatus.Failed),
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    def test_data_quality(
 | 
						|
        self,
 | 
						|
        run_test_suite_workflow,
 | 
						|
        metadata,
 | 
						|
        test_case_name,
 | 
						|
        expected_status,
 | 
						|
        ingestion_fqn,
 | 
						|
    ):
 | 
						|
        test_cases: List[TestCase] = metadata.list_entities(
 | 
						|
            TestCase, fields=["*"], skip_on_failure=True
 | 
						|
        ).entities
 | 
						|
        test_case: TestCase = next(
 | 
						|
            (t for t in test_cases if t.name.root == test_case_name), None
 | 
						|
        )
 | 
						|
        assert test_case is not None
 | 
						|
        assert test_case.testCaseResult.testCaseStatus == expected_status
 | 
						|
 | 
						|
        # Check the ingestion pipeline is properly created
 | 
						|
        ingestion_pipeline: IngestionPipeline = metadata.get_by_name(
 | 
						|
            entity=IngestionPipeline, fqn=ingestion_fqn, fields=["pipelineStatuses"]
 | 
						|
        )
 | 
						|
        assert ingestion_pipeline
 | 
						|
        assert ingestion_pipeline.pipelineStatuses
 | 
						|
        assert (
 | 
						|
            ingestion_pipeline.pipelineStatuses.pipelineState == PipelineState.success
 | 
						|
        )
 | 
						|
 | 
						|
    @pytest.mark.parametrize(
 | 
						|
        "test_case_name,failed_rows",
 | 
						|
        [
 | 
						|
            ("first_name_includes_john", None),
 | 
						|
            ("first_name_is_john", 1),
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    def test_data_quality_with_sample(
 | 
						|
        self, run_sampled_test_suite_workflow, metadata, test_case_name, failed_rows
 | 
						|
    ):
 | 
						|
        test_cases: List[TestCase] = metadata.list_entities(
 | 
						|
            TestCase, fields=["*"], skip_on_failure=True
 | 
						|
        ).entities
 | 
						|
        test_case: TestCase = next(
 | 
						|
            (t for t in test_cases if t.name.root == test_case_name), None
 | 
						|
        )
 | 
						|
        assert test_case is not None
 | 
						|
        if failed_rows:
 | 
						|
            assert test_case.testCaseResult.failedRows == pytest.approx(
 | 
						|
                failed_rows, abs=1
 | 
						|
            )
 | 
						|
 | 
						|
    @pytest.mark.parametrize(
 | 
						|
        "test_case_name,expected_status,failed_rows",
 | 
						|
        [
 | 
						|
            ("first_name_includes_john", TestCaseStatus.Success, None),
 | 
						|
            ("first_name_is_john", TestCaseStatus.Failed, 1),
 | 
						|
        ],
 | 
						|
    )
 | 
						|
    def test_data_quality_with_partition(
 | 
						|
        self,
 | 
						|
        run_partitioned_test_suite_workflow,
 | 
						|
        metadata,
 | 
						|
        test_case_name,
 | 
						|
        expected_status,
 | 
						|
        failed_rows,
 | 
						|
    ):
 | 
						|
        test_cases: List[TestCase] = metadata.list_entities(
 | 
						|
            TestCase, fields=["*"], skip_on_failure=True
 | 
						|
        ).entities
 | 
						|
        test_case: TestCase = next(
 | 
						|
            (t for t in test_cases if t.name.root == test_case_name), None
 | 
						|
        )
 | 
						|
        assert test_case is not None
 | 
						|
        assert test_case.testCaseResult.testCaseStatus == expected_status
 | 
						|
        if failed_rows:
 | 
						|
            assert test_case.testCaseResult.failedRows == failed_rows
 |