Imri Paran be82086e25
MINOR: add column case sensitivity parameter (#18115)
* fix(data-quality): table diff

- added handling for case-insensitive columns
- added handling for different numeric types (int/float/Decimal)
- added handling of boolean test case parameters

* add migrations for table diff

* add migrations for table diff

* removed cross type diff for now. it appears to be flaky

* fixed migrations

* use casefold() instead of lower()

* - implemented utils.get_test_case_param_value
- fixed params for case sensitive column

* handle bool test case parameters

* format

* testing

* format

* list -> List

* list -> List

* - change caseSensitiveColumns default to fase
- added migration to stay backward compatible

* - removed migration files
- updated logging message for table diff migration

* changed bool test case parameters default to always be false

* format

* docs: data diff

- added the caseSensitiveColumns parameter

requires: https://github.com/open-metadata/OpenMetadata/pull/18115

* fixed test_get_bool_test_case_param
2024-10-15 16:29:43 +02:00

144 lines
4.8 KiB
Python

from typing import cast
import pytest
from testcontainers.mysql import MySqlContainer
from _openmetadata_testutils.postgres.conftest import postgres_container, try_bind
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
BasicAuth,
)
from metadata.generated.schema.entity.services.connections.database.postgresConnection import (
PostgresConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
from metadata.ingestion.models.custom_pydantic import CustomSecretStr
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow
__all__ = [
"postgres_container",
]
@pytest.fixture(scope="module")
def mysql_container():
with try_bind(MySqlContainer("mysql:8"), 3306, 3307) as container:
yield container
@pytest.fixture(scope="module")
def ingest_mysql_service(
mysql_container: MySqlContainer, metadata: OpenMetadata, tmp_path_factory
):
workflow_config = {
"source": {
"type": "mysql",
"serviceName": "integration_test_mysql_"
+ tmp_path_factory.mktemp("mysql").name.split("/")[-1],
"serviceConnection": {
"config": {
"type": "Mysql",
"username": mysql_container.username,
"authType": {
"password": mysql_container.password,
},
"hostPort": "localhost:" + mysql_container.get_exposed_port(3306),
"databaseSchema": mysql_container.dbname,
}
},
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
},
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": LogLevels.DEBUG.value,
"openMetadataServerConfig": metadata.config.model_dump(),
},
}
metadata_ingestion = MetadataWorkflow.create(workflow_config)
metadata_ingestion.execute()
metadata_ingestion.raise_from_status()
metadata_ingestion.stop()
db_service: DatabaseService = metadata.get_by_name(
DatabaseService, workflow_config["source"]["serviceName"]
)
db_service.connection.config.authType.password = CustomSecretStr(
mysql_container.password
)
yield db_service
metadata.delete(DatabaseService, db_service.id, recursive=True, hard_delete=True)
@pytest.fixture(scope="module")
def create_service_request(tmp_path_factory, postgres_container):
return CreateDatabaseServiceRequest(
name="docker_test_" + tmp_path_factory.mktemp("postgres").name,
serviceType=DatabaseServiceType.Postgres,
connection=DatabaseConnection(
config=PostgresConnection(
username=postgres_container.username,
authType=BasicAuth(password=postgres_container.password),
hostPort="localhost:"
+ postgres_container.get_exposed_port(postgres_container.port),
database="dvdrental",
)
),
)
@pytest.fixture(scope="module")
def postgres_service(db_service):
return db_service
@pytest.fixture()
def ingest_postgres_metadata(
postgres_service, metadata: OpenMetadata, sink_config, workflow_config, run_workflow
):
workflow_config = {
"source": {
"type": postgres_service.connection.config.type.value.lower(),
"serviceName": postgres_service.fullyQualifiedName.root,
"serviceConnection": postgres_service.connection.model_copy(
update={
"config": postgres_service.connection.config.model_copy(
update={
"ingestAllDatabases": True,
}
)
}
),
"sourceConfig": {
"config": {
"schemaFilterPattern": {"excludes": ["information_schema"]},
}
},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
run_workflow(MetadataWorkflow, workflow_config)
@pytest.fixture(scope="module")
def patch_password(postgres_container):
def inner(service: DatabaseService):
service.connection.config = cast(PostgresConnection, service.connection.config)
service.connection.config.authType.password = type(
service.connection.config.authType.password
)(postgres_container.password)
return service
return inner