mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-04 04:29:13 +00:00 
			
		
		
		
	* MINOR - Update Auto Classification defaults for sample data & classification * fix tests
		
			
				
	
	
		
			328 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			328 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#  Copyright 2025 Collate
 | 
						|
#  Licensed under the Collate Community License, Version 1.0 (the "License");
 | 
						|
#  you may not use this file except in compliance with the License.
 | 
						|
#  You may obtain a copy of the License at
 | 
						|
#  https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
 | 
						|
#  Unless required by applicable law or agreed to in writing, software
 | 
						|
#  distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
#  See the License for the specific language governing permissions and
 | 
						|
#  limitations under the License.
 | 
						|
 | 
						|
"""Datalake ingestion integration tests"""
 | 
						|
 | 
						|
import os
 | 
						|
from copy import deepcopy
 | 
						|
 | 
						|
import pytest
 | 
						|
 | 
						|
from metadata.generated.schema.entity.data.table import (
 | 
						|
    PartitionIntervalTypes,
 | 
						|
    ProfileSampleType,
 | 
						|
    TableProfilerConfig,
 | 
						|
)
 | 
						|
from metadata.generated.schema.entity.services.databaseService import DatabaseService
 | 
						|
from metadata.sampler.models import PartitionProfilerConfig
 | 
						|
from metadata.workflow.classification import AutoClassificationWorkflow
 | 
						|
from metadata.workflow.data_quality import TestSuiteWorkflow
 | 
						|
from metadata.workflow.metadata import MetadataWorkflow
 | 
						|
from metadata.workflow.profiler import ProfilerWorkflow
 | 
						|
 | 
						|
from ..containers import MinioContainerConfigs, get_minio_container
 | 
						|
 | 
						|
BUCKET_NAME = "my-bucket"
 | 
						|
 | 
						|
INGESTION_CONFIG = {
 | 
						|
    "source": {
 | 
						|
        "type": "datalake",
 | 
						|
        "serviceName": "datalake_for_integration_tests",
 | 
						|
        "serviceConnection": {
 | 
						|
            "config": {
 | 
						|
                "type": "Datalake",
 | 
						|
                "configSource": {
 | 
						|
                    "securityConfig": {
 | 
						|
                        "awsAccessKeyId": "fake_access_key",
 | 
						|
                        "awsSecretAccessKey": "fake_secret_key",
 | 
						|
                        "awsRegion": "us-weat-1",
 | 
						|
                    }
 | 
						|
                },
 | 
						|
                "bucketName": f"{BUCKET_NAME}",
 | 
						|
            }
 | 
						|
        },
 | 
						|
        "sourceConfig": {"config": {"type": "DatabaseMetadata"}},
 | 
						|
    },
 | 
						|
    "sink": {"type": "metadata-rest", "config": {}},
 | 
						|
    "workflowConfig": {
 | 
						|
        "openMetadataServerConfig": {
 | 
						|
            "hostPort": "http://localhost:8585/api",
 | 
						|
            "authProvider": "openmetadata",
 | 
						|
            "securityConfig": {
 | 
						|
                "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
 | 
						|
            },
 | 
						|
        }
 | 
						|
    },
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
DATA_QUALITY_CONFIG = {
 | 
						|
    "source": {
 | 
						|
        "type": "testsuite",
 | 
						|
        "serviceName": "datalake_for_integration_tests",
 | 
						|
        "serviceConnection": {
 | 
						|
            "config": {
 | 
						|
                "type": "Datalake",
 | 
						|
                "configSource": {
 | 
						|
                    "securityConfig": {
 | 
						|
                        "awsAccessKeyId": "fake_access_key",
 | 
						|
                        "awsSecretAccessKey": "fake_secret_key",
 | 
						|
                        "awsRegion": "us-weat-1",
 | 
						|
                    }
 | 
						|
                },
 | 
						|
                "bucketName": f"{BUCKET_NAME}",
 | 
						|
            }
 | 
						|
        },
 | 
						|
        "sourceConfig": {
 | 
						|
            "config": {
 | 
						|
                "type": "TestSuite",
 | 
						|
                "entityFullyQualifiedName": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv"',
 | 
						|
            }
 | 
						|
        },
 | 
						|
    },
 | 
						|
    "processor": {
 | 
						|
        "type": "orm-test-runner",
 | 
						|
        "config": {
 | 
						|
            "testCases": [
 | 
						|
                {
 | 
						|
                    "name": "first_name_includes_john",
 | 
						|
                    "testDefinitionName": "columnValuesToBeInSet",
 | 
						|
                    "columnName": "first_name",
 | 
						|
                    "parameterValues": [
 | 
						|
                        {
 | 
						|
                            "name": "allowedValues",
 | 
						|
                            "value": "['John']",
 | 
						|
                        }
 | 
						|
                    ],
 | 
						|
                },
 | 
						|
                {
 | 
						|
                    "name": "first_name_is_john",
 | 
						|
                    "testDefinitionName": "columnValuesToBeInSet",
 | 
						|
                    "columnName": "first_name",
 | 
						|
                    "computePassedFailedRowCount": True,
 | 
						|
                    "parameterValues": [
 | 
						|
                        {
 | 
						|
                            "name": "allowedValues",
 | 
						|
                            "value": "['John']",
 | 
						|
                        },
 | 
						|
                        {
 | 
						|
                            "name": "matchEnum",
 | 
						|
                            "value": "True",
 | 
						|
                        },
 | 
						|
                    ],
 | 
						|
                },
 | 
						|
                # Helps us ensure that the passedRows and failedRows are proper ints, even when coming from Pandas
 | 
						|
                {
 | 
						|
                    "name": "first_name_is_unique",
 | 
						|
                    "testDefinitionName": "columnValuesToBeUnique",
 | 
						|
                    "columnName": "first_name",
 | 
						|
                    "computePassedFailedRowCount": True,
 | 
						|
                },
 | 
						|
            ]
 | 
						|
        },
 | 
						|
    },
 | 
						|
    "sink": {"type": "metadata-rest", "config": {}},
 | 
						|
    "workflowConfig": {
 | 
						|
        "loggerLevel": "DEBUG",
 | 
						|
        "openMetadataServerConfig": {
 | 
						|
            "hostPort": "http://localhost:8585/api",
 | 
						|
            "authProvider": "openmetadata",
 | 
						|
            "securityConfig": {
 | 
						|
                "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
 | 
						|
            },
 | 
						|
        },
 | 
						|
    },
 | 
						|
    # Helps us validate we are properly encoding the names of Ingestion Pipelines when sending status updates
 | 
						|
    "ingestionPipelineFQN": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv".testSuite.uuid',
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="session")
 | 
						|
def ingestion_fqn():
 | 
						|
    return f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv".testSuite.uuid'
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="session")
 | 
						|
def minio_container():
 | 
						|
    with get_minio_container(MinioContainerConfigs()) as container:
 | 
						|
        yield container
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="class", autouse=True)
 | 
						|
def setup_s3(minio_container) -> None:
 | 
						|
    # Mock our S3 bucket and ingest a file
 | 
						|
    client = minio_container.get_client()
 | 
						|
    if client.bucket_exists(BUCKET_NAME):
 | 
						|
        return
 | 
						|
    client.make_bucket(BUCKET_NAME)
 | 
						|
    current_dir = os.path.dirname(__file__)
 | 
						|
    resources_dir = os.path.join(current_dir, "resources")
 | 
						|
 | 
						|
    resources_paths = [
 | 
						|
        os.path.join(path, filename)
 | 
						|
        for path, _, files in os.walk(resources_dir)
 | 
						|
        for filename in files
 | 
						|
    ]
 | 
						|
    for path in resources_paths:
 | 
						|
        key = os.path.relpath(path, resources_dir)
 | 
						|
        client.fput_object(BUCKET_NAME, key, path)
 | 
						|
    return
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="class")
 | 
						|
def ingestion_config(minio_container):
 | 
						|
    ingestion_config = deepcopy(INGESTION_CONFIG)
 | 
						|
    ingestion_config["source"]["serviceConnection"]["config"]["configSource"].update(
 | 
						|
        {
 | 
						|
            "securityConfig": {
 | 
						|
                "awsAccessKeyId": minio_container.access_key,
 | 
						|
                "awsSecretAccessKey": minio_container.secret_key,
 | 
						|
                "awsRegion": "us-west-1",
 | 
						|
                "endPointURL": f"http://localhost:{minio_container.get_exposed_port(minio_container.port)}",
 | 
						|
            }
 | 
						|
        }
 | 
						|
    )
 | 
						|
    return ingestion_config
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="class")
 | 
						|
def run_ingestion(metadata, ingestion_config):
 | 
						|
    ingestion_workflow = MetadataWorkflow.create(ingestion_config)
 | 
						|
    ingestion_workflow.execute()
 | 
						|
    ingestion_workflow.raise_from_status()
 | 
						|
    ingestion_workflow.stop()
 | 
						|
    yield
 | 
						|
    db_service = metadata.get_by_name(
 | 
						|
        entity=DatabaseService, fqn="datalake_for_integration_tests"
 | 
						|
    )
 | 
						|
    metadata.delete(DatabaseService, db_service.id, recursive=True, hard_delete=True)
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="class")
 | 
						|
def run_test_suite_workflow(run_ingestion, ingestion_config):
 | 
						|
    workflow_config = deepcopy(DATA_QUALITY_CONFIG)
 | 
						|
    workflow_config["source"]["sourceConfig"]["config"]["serviceConnections"] = [
 | 
						|
        {
 | 
						|
            "serviceName": ingestion_config["source"]["serviceName"],
 | 
						|
            "serviceConnection": ingestion_config["source"]["serviceConnection"],
 | 
						|
        }
 | 
						|
    ]
 | 
						|
    ingestion_workflow = TestSuiteWorkflow.create(workflow_config)
 | 
						|
    ingestion_workflow.execute()
 | 
						|
    ingestion_workflow.raise_from_status()
 | 
						|
    ingestion_workflow.stop()
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="class")
 | 
						|
def run_sampled_test_suite_workflow(metadata, run_ingestion, ingestion_config):
 | 
						|
    metadata.create_or_update_table_profiler_config(
 | 
						|
        fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"',
 | 
						|
        table_profiler_config=TableProfilerConfig(
 | 
						|
            profileSampleType=ProfileSampleType.PERCENTAGE,
 | 
						|
            profileSample=50.0,
 | 
						|
            partitioning=None,
 | 
						|
        ),
 | 
						|
    )
 | 
						|
    workflow_config = deepcopy(DATA_QUALITY_CONFIG)
 | 
						|
    workflow_config["source"]["sourceConfig"]["config"]["serviceConnections"] = [
 | 
						|
        {
 | 
						|
            "serviceName": ingestion_config["source"]["serviceName"],
 | 
						|
            "serviceConnection": ingestion_config["source"]["serviceConnection"],
 | 
						|
        }
 | 
						|
    ]
 | 
						|
    ingestion_workflow = TestSuiteWorkflow.create(workflow_config)
 | 
						|
    ingestion_workflow.execute()
 | 
						|
    ingestion_workflow.raise_from_status()
 | 
						|
    ingestion_workflow.stop()
 | 
						|
    metadata.create_or_update_table_profiler_config(
 | 
						|
        fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"',
 | 
						|
        table_profiler_config=TableProfilerConfig(
 | 
						|
            profileSampleType=ProfileSampleType.PERCENTAGE,
 | 
						|
            profileSample=100.0,
 | 
						|
        ),
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="class")
 | 
						|
def run_partitioned_test_suite_workflow(metadata, run_ingestion, ingestion_config):
 | 
						|
    metadata.create_or_update_table_profiler_config(
 | 
						|
        fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"',
 | 
						|
        table_profiler_config=TableProfilerConfig(
 | 
						|
            partitioning=PartitionProfilerConfig(
 | 
						|
                enablePartitioning=True,
 | 
						|
                partitionIntervalType=PartitionIntervalTypes.COLUMN_VALUE,
 | 
						|
                partitionValues=["Los Angeles"],
 | 
						|
                partitionColumnName="city",
 | 
						|
            )
 | 
						|
        ),
 | 
						|
    )
 | 
						|
    workflow_config = deepcopy(DATA_QUALITY_CONFIG)
 | 
						|
    workflow_config["source"]["sourceConfig"]["config"]["serviceConnections"] = [
 | 
						|
        {
 | 
						|
            "serviceName": ingestion_config["source"]["serviceName"],
 | 
						|
            "serviceConnection": ingestion_config["source"]["serviceConnection"],
 | 
						|
        }
 | 
						|
    ]
 | 
						|
    ingestion_workflow = TestSuiteWorkflow.create(workflow_config)
 | 
						|
    ingestion_workflow.execute()
 | 
						|
    ingestion_workflow.raise_from_status()
 | 
						|
    ingestion_workflow.stop()
 | 
						|
    metadata.create_or_update_table_profiler_config(
 | 
						|
        fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"',
 | 
						|
        table_profiler_config=TableProfilerConfig(partitioning=None),
 | 
						|
    )
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="class")
 | 
						|
def profiler_workflow_config(ingestion_config, workflow_config):
 | 
						|
    ingestion_config["source"]["sourceConfig"]["config"].update(
 | 
						|
        {
 | 
						|
            "type": "Profiler",
 | 
						|
        }
 | 
						|
    )
 | 
						|
    ingestion_config["processor"] = {
 | 
						|
        "type": "orm-profiler",
 | 
						|
        "config": {},
 | 
						|
    }
 | 
						|
    ingestion_config["workflowConfig"] = workflow_config
 | 
						|
    return ingestion_config
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="class")
 | 
						|
def auto_classification_workflow_config(ingestion_config, workflow_config):
 | 
						|
    ingestion_config["source"]["sourceConfig"]["config"].update(
 | 
						|
        {
 | 
						|
            "type": "AutoClassification",
 | 
						|
            "storeSampleData": True,
 | 
						|
            "enableAutoClassification": False,
 | 
						|
        }
 | 
						|
    )
 | 
						|
    ingestion_config["processor"] = {
 | 
						|
        "type": "orm-profiler",
 | 
						|
        "config": {},
 | 
						|
    }
 | 
						|
    ingestion_config["workflowConfig"] = workflow_config
 | 
						|
    return ingestion_config
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture()
 | 
						|
def run_profiler(run_ingestion, run_workflow, profiler_workflow_config):
 | 
						|
    """Test profiler ingestion"""
 | 
						|
    run_workflow(ProfilerWorkflow, profiler_workflow_config)
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture()
 | 
						|
def run_auto_classification(
 | 
						|
    run_ingestion, run_workflow, auto_classification_workflow_config
 | 
						|
):
 | 
						|
    """Test profiler ingestion"""
 | 
						|
    run_workflow(AutoClassificationWorkflow, auto_classification_workflow_config)
 |