mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-10 18:49:00 +00:00
177 lines
6.1 KiB
Python
177 lines
6.1 KiB
Python
import pytest
|
|
from dirty_equals import IsPositiveInt
|
|
from sqlalchemy import create_engine
|
|
|
|
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
|
|
from metadata.data_quality.api.models import TestCaseDefinition
|
|
from metadata.generated.schema.entity.data.table import Table
|
|
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
|
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
|
|
TestSuiteConfigType,
|
|
)
|
|
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
|
|
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
|
|
from metadata.generated.schema.type.basic import Timestamp
|
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|
from metadata.workflow.data_quality import TestSuiteWorkflow
|
|
from metadata.workflow.metadata import MetadataWorkflow
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def prepare_data(create_test_data, trino_container):
|
|
"""Prepare test data by creating tables to compare"""
|
|
# Create test tables in Trino
|
|
engine = create_engine(trino_container.get_connection_url())
|
|
with engine.connect() as conn:
|
|
# Create a copy of the userdata table with modified data
|
|
conn.execute(
|
|
"""
|
|
CREATE TABLE minio.my_schema.changed_userdata AS
|
|
SELECT
|
|
registration_dttm,
|
|
id,
|
|
CASE WHEN MOD(id, 2) = 0 THEN 'John' ELSE first_name END as first_name,
|
|
last_name,
|
|
email,
|
|
gender,
|
|
ip_address,
|
|
cc,
|
|
country,
|
|
birthdate,
|
|
salary,
|
|
title,
|
|
comments
|
|
FROM minio.my_schema.userdata
|
|
WHERE MOD(id, 13) != 0
|
|
"""
|
|
).fetchall()
|
|
|
|
|
|
@pytest.mark.skip("Skipping while investigating CI failure. Test passes locally.")
|
|
@pytest.mark.parametrize(
|
|
"test_case_definition,expected_result",
|
|
[
|
|
(
|
|
TestCaseDefinition(
|
|
name="compare_same_tables",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[
|
|
TestCaseParameterValue(name="keyColumns", value="['id']"),
|
|
TestCaseParameterValue(
|
|
name="table2",
|
|
value="{db_service}.minio.my_schema.userdata",
|
|
),
|
|
TestCaseParameterValue(
|
|
name="useColumns",
|
|
value="['id', 'first_name', 'last_name', 'email', 'country', 'salary', 'title']",
|
|
),
|
|
],
|
|
),
|
|
TestCaseResult.model_construct(
|
|
timestamp=Timestamp.model_construct(root=IsPositiveInt),
|
|
testCaseStatus=TestCaseStatus.Success,
|
|
failedRows=0,
|
|
passedRows=1000,
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="compare_modified_tables",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[
|
|
TestCaseParameterValue(name="keyColumns", value="['id']"),
|
|
TestCaseParameterValue(
|
|
name="table2",
|
|
value="{db_service}.minio.my_schema.changed_userdata",
|
|
),
|
|
TestCaseParameterValue(
|
|
name="useColumns",
|
|
value="['id', 'first_name', 'last_name', 'email', 'country', 'salary', 'title']",
|
|
),
|
|
],
|
|
),
|
|
TestCaseResult.model_construct(
|
|
timestamp=Timestamp.model_construct(root=IsPositiveInt),
|
|
testCaseStatus=TestCaseStatus.Failed,
|
|
failedRows=535,
|
|
passedRows=465,
|
|
),
|
|
),
|
|
],
|
|
ids=lambda x: getattr(x, "name", None),
|
|
)
|
|
def test_table_diff(
|
|
test_case_definition,
|
|
expected_result,
|
|
prepare_data,
|
|
db_service: DatabaseService,
|
|
ingestion_config,
|
|
metadata: OpenMetadata,
|
|
sink_config,
|
|
workflow_config,
|
|
run_workflow,
|
|
cleanup_fqns,
|
|
):
|
|
"""Test table diff functionality with Trino tables"""
|
|
|
|
# Run metadata ingestion first
|
|
run_workflow(MetadataWorkflow, ingestion_config)
|
|
|
|
# Get the source table
|
|
table1: Table = metadata.get_by_name(
|
|
Table,
|
|
f"{db_service.fullyQualifiedName.root}.minio.my_schema.userdata",
|
|
nullable=False,
|
|
)
|
|
|
|
# Update table2 FQN with actual service name
|
|
for param in test_case_definition.parameterValues:
|
|
if param.name == "table2":
|
|
param.value = param.value.format(
|
|
db_service=db_service.fullyQualifiedName.root
|
|
)
|
|
|
|
# Configure test suite workflow
|
|
test_suite_config = {
|
|
"source": {
|
|
"type": "trino",
|
|
"serviceName": "MyTestSuite",
|
|
"sourceConfig": {
|
|
"config": {
|
|
"type": TestSuiteConfigType.TestSuite,
|
|
"entityFullyQualifiedName": table1.fullyQualifiedName.root,
|
|
"serviceConnections": [
|
|
{
|
|
"serviceName": db_service.name.root,
|
|
"serviceConnection": db_service.connection.model_dump(),
|
|
}
|
|
],
|
|
},
|
|
},
|
|
},
|
|
"processor": {
|
|
"type": "orm-test-runner",
|
|
"config": {"testCases": [test_case_definition.model_dump()]},
|
|
},
|
|
"sink": sink_config,
|
|
"workflowConfig": workflow_config,
|
|
}
|
|
|
|
# Run test suite workflow
|
|
run_workflow(TestSuiteWorkflow, test_suite_config)
|
|
|
|
# Verify results
|
|
test_case_entity = metadata.get_by_name(
|
|
TestCase,
|
|
f"{table1.fullyQualifiedName.root}.{test_case_definition.name}",
|
|
fields=["*"],
|
|
nullable=False,
|
|
)
|
|
cleanup_fqns(
|
|
TestCase, f"{table1.fullyQualifiedName.root}.{test_case_definition.name}"
|
|
)
|
|
|
|
assert_equal_pydantic_objects(expected_result, test_case_entity.testCaseResult)
|