mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-24 09:50:01 +00:00

* fix: Allow non numeric numbers to be sent via Json, Replace NaN values with None in SQAProfilerInterface Replace NaN values with None in the SQAProfilerInterface class to maintain database parity. NaN values will be cast to null in OpenMetadata. This change ensures that data handling processes account for this conversion. * fix: histogram overflow error * test: Add Unit Test for Null and Null Ratio Metric * chore: Address comments * chore: Address comments * fix: checkstyle and message * fix: failing tests as null count works as expected
224 lines
6.5 KiB
Python
224 lines
6.5 KiB
Python
import logging
|
|
import sys
|
|
from typing import List, Tuple, Type
|
|
|
|
import pytest
|
|
|
|
from _openmetadata_testutils.ometa import int_admin_ometa
|
|
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
|
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
|
|
from metadata.ingestion.api.common import Entity
|
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|
from metadata.workflow.ingestion import IngestionWorkflow
|
|
|
|
if not sys.version_info >= (3, 9):
|
|
collect_ignore = ["trino"]
|
|
|
|
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
def configure_logging():
|
|
logging.getLogger("sqlfluff").setLevel(logging.CRITICAL)
|
|
logging.getLogger("pytds").setLevel(logging.CRITICAL)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def metadata():
|
|
return int_admin_ometa()
|
|
|
|
|
|
def pytest_pycollect_makeitem(collector, name, obj):
|
|
try:
|
|
bases = [base.__name__ for base in obj.mro()]
|
|
for cls in ("BaseModel", "Enum"):
|
|
if cls in bases:
|
|
return []
|
|
except (AttributeError, TypeError):
|
|
pass
|
|
|
|
|
|
@pytest.fixture(scope="session", autouse=sys.version_info >= (3, 9))
|
|
def config_testcontatiners():
|
|
from testcontainers.core.config import testcontainers_config
|
|
|
|
testcontainers_config.max_tries = 10
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def sink_config(metadata):
|
|
return {
|
|
"type": "metadata-rest",
|
|
"config": {},
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def workflow_config(metadata):
|
|
return {
|
|
"loggerLevel": LogLevels.DEBUG.value,
|
|
"openMetadataServerConfig": metadata.config.model_dump(),
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def profiler_config(db_service, workflow_config, sink_config):
|
|
return {
|
|
"source": {
|
|
"type": db_service.connection.config.type.value.lower(),
|
|
"serviceName": db_service.fullyQualifiedName.root,
|
|
"sourceConfig": {
|
|
"config": {
|
|
"type": "Profiler",
|
|
"generateSampleData": True,
|
|
"timeoutSeconds": 30,
|
|
}
|
|
},
|
|
},
|
|
"processor": {
|
|
"type": "orm-profiler",
|
|
"config": {},
|
|
},
|
|
"sink": sink_config,
|
|
"workflowConfig": workflow_config,
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def run_workflow():
|
|
def _run(workflow_type: Type[IngestionWorkflow], config, raise_from_status=True):
|
|
workflow: IngestionWorkflow = workflow_type.create(config)
|
|
workflow.execute()
|
|
if raise_from_status:
|
|
workflow.raise_from_status()
|
|
return workflow
|
|
|
|
return _run
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def db_service(metadata, create_service_request, unmask_password):
|
|
service_entity = metadata.create_or_update(data=create_service_request)
|
|
fqn = service_entity.fullyQualifiedName.root
|
|
yield unmask_password(service_entity)
|
|
service_entity = metadata.get_by_name(DatabaseService, fqn)
|
|
if service_entity:
|
|
metadata.delete(
|
|
DatabaseService, service_entity.id, recursive=True, hard_delete=True
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def unmask_password(create_service_request):
|
|
"""Unmask the db passwrod returned by the metadata service.
|
|
You can override this at the test_module level to implement custom password handling.
|
|
|
|
Example:
|
|
@pytest.fixture(scope="module")
|
|
def unmask_password(my_container1, my_container2):
|
|
def patch_password(service: DatabaseService):
|
|
if service.connection.config.authType.password == "my_password":
|
|
... # do something else
|
|
return service
|
|
return patch_password
|
|
"""
|
|
|
|
def patch_password(service: DatabaseService):
|
|
service.connection.config.authType.password = (
|
|
create_service_request.connection.config.authType.password
|
|
)
|
|
return service
|
|
|
|
return patch_password
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def create_service_request():
|
|
"""
|
|
Implement in the test module to create a service request
|
|
Example:
|
|
def create_service_request(scope="module"):
|
|
return CreateDatabaseServiceRequest(
|
|
name="my_service",
|
|
serviceType=DatabaseServiceType.MyService,
|
|
connection=DatabaseConnection(
|
|
config=MyServiceConnection(
|
|
username="my_user",
|
|
password="my_password",
|
|
host="localhost",
|
|
port="5432",
|
|
)
|
|
),
|
|
)
|
|
"""
|
|
raise NotImplementedError("Implement in the test module")
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def monkeymodule():
|
|
with pytest.MonkeyPatch.context() as mp:
|
|
yield mp
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def patch_passwords_for_db_services(db_service, unmask_password, monkeymodule):
|
|
"""Patch the password for all db services returned by the metadata service.
|
|
|
|
Usage:
|
|
|
|
def test_my_test(db_service, patch_passwords_for_db_services):
|
|
...
|
|
|
|
OR
|
|
|
|
@pytest.usefixtures("patch_passwords_for_db_services")
|
|
def test_my_test(db_service):
|
|
...
|
|
"""
|
|
|
|
def override_password(getter):
|
|
def inner(*args, **kwargs):
|
|
result = getter(*args, **kwargs)
|
|
if isinstance(result, DatabaseService):
|
|
if result.fullyQualifiedName.root == db_service.fullyQualifiedName.root:
|
|
return unmask_password(result)
|
|
return result
|
|
|
|
return inner
|
|
|
|
monkeymodule.setattr(
|
|
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name",
|
|
override_password(OpenMetadata.get_by_name),
|
|
)
|
|
|
|
monkeymodule.setattr(
|
|
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id",
|
|
override_password(OpenMetadata.get_by_id),
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def cleanup_fqns(metadata):
|
|
fqns: List[Tuple[Type[Entity], str]] = []
|
|
|
|
def inner(entity_type: Type[Entity], fqn: str):
|
|
fqns.append((entity_type, fqn))
|
|
|
|
yield inner
|
|
for etype, fqn in fqns:
|
|
entity = metadata.get_by_name(etype, fqn, fields=["*"])
|
|
if entity:
|
|
metadata.delete(etype, entity.id, recursive=True, hard_delete=True)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def ingestion_config(db_service, metadata, workflow_config, sink_config):
|
|
return {
|
|
"source": {
|
|
"type": db_service.connection.config.type.value.lower(),
|
|
"serviceName": db_service.fullyQualifiedName.root,
|
|
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
|
|
"serviceConnection": db_service.connection.model_dump(),
|
|
},
|
|
"sink": sink_config,
|
|
"workflowConfig": workflow_config,
|
|
}
|