Pere Miquel Brull 3186937cc2
MINOR - Update Auto Classification defaults for sample data & classif… (#20587)
* MINOR - Update Auto Classification defaults for sample data & classification

* fix tests
2025-04-07 15:56:57 +02:00

328 lines
12 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Datalake ingestion integration tests"""
import os
from copy import deepcopy
import pytest
from metadata.generated.schema.entity.data.table import (
PartitionIntervalTypes,
ProfileSampleType,
TableProfilerConfig,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.sampler.models import PartitionProfilerConfig
from metadata.workflow.classification import AutoClassificationWorkflow
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from ..containers import MinioContainerConfigs, get_minio_container
BUCKET_NAME = "my-bucket"
INGESTION_CONFIG = {
"source": {
"type": "datalake",
"serviceName": "datalake_for_integration_tests",
"serviceConnection": {
"config": {
"type": "Datalake",
"configSource": {
"securityConfig": {
"awsAccessKeyId": "fake_access_key",
"awsSecretAccessKey": "fake_secret_key",
"awsRegion": "us-weat-1",
}
},
"bucketName": f"{BUCKET_NAME}",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
DATA_QUALITY_CONFIG = {
"source": {
"type": "testsuite",
"serviceName": "datalake_for_integration_tests",
"serviceConnection": {
"config": {
"type": "Datalake",
"configSource": {
"securityConfig": {
"awsAccessKeyId": "fake_access_key",
"awsSecretAccessKey": "fake_secret_key",
"awsRegion": "us-weat-1",
}
},
"bucketName": f"{BUCKET_NAME}",
}
},
"sourceConfig": {
"config": {
"type": "TestSuite",
"entityFullyQualifiedName": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv"',
}
},
},
"processor": {
"type": "orm-test-runner",
"config": {
"testCases": [
{
"name": "first_name_includes_john",
"testDefinitionName": "columnValuesToBeInSet",
"columnName": "first_name",
"parameterValues": [
{
"name": "allowedValues",
"value": "['John']",
}
],
},
{
"name": "first_name_is_john",
"testDefinitionName": "columnValuesToBeInSet",
"columnName": "first_name",
"computePassedFailedRowCount": True,
"parameterValues": [
{
"name": "allowedValues",
"value": "['John']",
},
{
"name": "matchEnum",
"value": "True",
},
],
},
# Helps us ensure that the passedRows and failedRows are proper ints, even when coming from Pandas
{
"name": "first_name_is_unique",
"testDefinitionName": "columnValuesToBeUnique",
"columnName": "first_name",
"computePassedFailedRowCount": True,
},
]
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
},
},
# Helps us validate we are properly encoding the names of Ingestion Pipelines when sending status updates
"ingestionPipelineFQN": f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv".testSuite.uuid',
}
@pytest.fixture(scope="session")
def ingestion_fqn():
return f'datalake_for_integration_tests.default.{BUCKET_NAME}."users/users.csv".testSuite.uuid'
@pytest.fixture(scope="session")
def minio_container():
with get_minio_container(MinioContainerConfigs()) as container:
yield container
@pytest.fixture(scope="class", autouse=True)
def setup_s3(minio_container) -> None:
# Mock our S3 bucket and ingest a file
client = minio_container.get_client()
if client.bucket_exists(BUCKET_NAME):
return
client.make_bucket(BUCKET_NAME)
current_dir = os.path.dirname(__file__)
resources_dir = os.path.join(current_dir, "resources")
resources_paths = [
os.path.join(path, filename)
for path, _, files in os.walk(resources_dir)
for filename in files
]
for path in resources_paths:
key = os.path.relpath(path, resources_dir)
client.fput_object(BUCKET_NAME, key, path)
return
@pytest.fixture(scope="class")
def ingestion_config(minio_container):
ingestion_config = deepcopy(INGESTION_CONFIG)
ingestion_config["source"]["serviceConnection"]["config"]["configSource"].update(
{
"securityConfig": {
"awsAccessKeyId": minio_container.access_key,
"awsSecretAccessKey": minio_container.secret_key,
"awsRegion": "us-west-1",
"endPointURL": f"http://localhost:{minio_container.get_exposed_port(minio_container.port)}",
}
}
)
return ingestion_config
@pytest.fixture(scope="class")
def run_ingestion(metadata, ingestion_config):
ingestion_workflow = MetadataWorkflow.create(ingestion_config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.stop()
yield
db_service = metadata.get_by_name(
entity=DatabaseService, fqn="datalake_for_integration_tests"
)
metadata.delete(DatabaseService, db_service.id, recursive=True, hard_delete=True)
@pytest.fixture(scope="class")
def run_test_suite_workflow(run_ingestion, ingestion_config):
workflow_config = deepcopy(DATA_QUALITY_CONFIG)
workflow_config["source"]["sourceConfig"]["config"]["serviceConnections"] = [
{
"serviceName": ingestion_config["source"]["serviceName"],
"serviceConnection": ingestion_config["source"]["serviceConnection"],
}
]
ingestion_workflow = TestSuiteWorkflow.create(workflow_config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.stop()
@pytest.fixture(scope="class")
def run_sampled_test_suite_workflow(metadata, run_ingestion, ingestion_config):
metadata.create_or_update_table_profiler_config(
fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"',
table_profiler_config=TableProfilerConfig(
profileSampleType=ProfileSampleType.PERCENTAGE,
profileSample=50.0,
partitioning=None,
),
)
workflow_config = deepcopy(DATA_QUALITY_CONFIG)
workflow_config["source"]["sourceConfig"]["config"]["serviceConnections"] = [
{
"serviceName": ingestion_config["source"]["serviceName"],
"serviceConnection": ingestion_config["source"]["serviceConnection"],
}
]
ingestion_workflow = TestSuiteWorkflow.create(workflow_config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.stop()
metadata.create_or_update_table_profiler_config(
fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"',
table_profiler_config=TableProfilerConfig(
profileSampleType=ProfileSampleType.PERCENTAGE,
profileSample=100.0,
),
)
@pytest.fixture(scope="class")
def run_partitioned_test_suite_workflow(metadata, run_ingestion, ingestion_config):
metadata.create_or_update_table_profiler_config(
fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"',
table_profiler_config=TableProfilerConfig(
partitioning=PartitionProfilerConfig(
enablePartitioning=True,
partitionIntervalType=PartitionIntervalTypes.COLUMN_VALUE,
partitionValues=["Los Angeles"],
partitionColumnName="city",
)
),
)
workflow_config = deepcopy(DATA_QUALITY_CONFIG)
workflow_config["source"]["sourceConfig"]["config"]["serviceConnections"] = [
{
"serviceName": ingestion_config["source"]["serviceName"],
"serviceConnection": ingestion_config["source"]["serviceConnection"],
}
]
ingestion_workflow = TestSuiteWorkflow.create(workflow_config)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
ingestion_workflow.stop()
metadata.create_or_update_table_profiler_config(
fqn='datalake_for_integration_tests.default.my-bucket."users/users.csv"',
table_profiler_config=TableProfilerConfig(partitioning=None),
)
@pytest.fixture(scope="class")
def profiler_workflow_config(ingestion_config, workflow_config):
ingestion_config["source"]["sourceConfig"]["config"].update(
{
"type": "Profiler",
}
)
ingestion_config["processor"] = {
"type": "orm-profiler",
"config": {},
}
ingestion_config["workflowConfig"] = workflow_config
return ingestion_config
@pytest.fixture(scope="class")
def auto_classification_workflow_config(ingestion_config, workflow_config):
ingestion_config["source"]["sourceConfig"]["config"].update(
{
"type": "AutoClassification",
"storeSampleData": True,
"enableAutoClassification": False,
}
)
ingestion_config["processor"] = {
"type": "orm-profiler",
"config": {},
}
ingestion_config["workflowConfig"] = workflow_config
return ingestion_config
@pytest.fixture()
def run_profiler(run_ingestion, run_workflow, profiler_workflow_config):
"""Test profiler ingestion"""
run_workflow(ProfilerWorkflow, profiler_workflow_config)
@pytest.fixture()
def run_auto_classification(
run_ingestion, run_workflow, auto_classification_workflow_config
):
"""Test profiler ingestion"""
run_workflow(AutoClassificationWorkflow, auto_classification_workflow_config)