mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-04 15:45:42 +00:00

* Make pytest to user code from src rather than from install package * Fix test_amundsen: missing None * Update pytest configuration to use importlib mode * Fix custom_basemodel_validation to check model_fields on type(values) to prevent noisy warnings * Refactor referencedByQueries validation to use field_validator as per deprecation warning * Update ColumnJson to use model_rebuild rather as replacement for forward reference updates as per deprecation warning * Move superset test to integration test as they are using testcontainers * Update coverage source path * Fix wrong import. * Add install_dev_env target to Makefile for development dependencies * Add test-unit as extra in setup.py * Modify dependencies in dev environment. * Ignore all airflow tests * Remove coverage in unit_ingestion_dev_env. Revert coverage source to prevent broken CI. * Add nox for running unit test * FIx PowerBI integration test to use pathlib for resource paths and not os.getcwd to prevent failures when not executed from the right path * Move test_helpers.py to unit test, as it is not an integration test. * Remove utils empty folder in integration tests * Refactor testcontainers configuration to avoid pitfalls with max_tries setting * Add nox unit testing basic setup * Add format check session * Refactor nox-unit and add plugins tests * Add GHA for py-nox-ci * Add comment to GHA * Restore conftest.py file * Clarify comment * Simplify function * Fix matrix startegy and nox mismatch * Improve python version strategy with nox and GHA --------- Co-authored-by: Pere Menal <pere.menal@getcollate.io>
272 lines
8.4 KiB
Python
272 lines
8.4 KiB
Python
import logging
|
|
import sys
|
|
from typing import List, Tuple, Type
|
|
|
|
import pytest
|
|
|
|
from _openmetadata_testutils.ometa import int_admin_ometa
|
|
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
|
from metadata.generated.schema.metadataIngestion.databaseServiceAutoClassificationPipeline import (
|
|
AutoClassificationConfigType,
|
|
)
|
|
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
|
|
DatabaseMetadataConfigType,
|
|
)
|
|
from metadata.generated.schema.metadataIngestion.workflow import LogLevels
|
|
from metadata.ingestion.api.common import Entity
|
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|
from metadata.workflow.ingestion import IngestionWorkflow
|
|
|
|
if not sys.version_info >= (3, 9):
|
|
# these tests use test-containers which are not supported in python 3.8
|
|
collect_ignore = ["trino", "kafka", "datalake"]
|
|
|
|
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
def configure_logging():
|
|
logging.getLogger("sqlfluff").setLevel(logging.CRITICAL)
|
|
logging.getLogger("pytds").setLevel(logging.CRITICAL)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def metadata():
|
|
return int_admin_ometa()
|
|
|
|
|
|
def pytest_pycollect_makeitem(collector, name, obj):
|
|
try:
|
|
bases = [base.__name__ for base in obj.mro()]
|
|
for cls in ("BaseModel", "Enum"):
|
|
if cls in bases:
|
|
return []
|
|
except (AttributeError, TypeError):
|
|
pass
|
|
|
|
|
|
# TODO: Will be addressed when cleaning up integration tests.
|
|
# Setting the max tries for testcontainers here has pitfalls,
|
|
# the main one being that it cannot be changed through the recommended
|
|
# way of using environment variables. The main problem is that
|
|
# waiting_utils.py uses testcontainers_config.timeout as a default
|
|
# value for the timeout. Therefore, if we want to effectively change
|
|
# this value, we must do so before the module is imported,
|
|
# which is a potential source of issues.
|
|
|
|
|
|
@pytest.fixture(scope="session", autouse=sys.version_info >= (3, 9))
|
|
def config_testcontatiners():
|
|
from testcontainers.core.config import testcontainers_config
|
|
|
|
testcontainers_config.max_tries = 10
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def sink_config(metadata):
|
|
return {
|
|
"type": "metadata-rest",
|
|
"config": {},
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
def workflow_config(metadata):
|
|
return {
|
|
"loggerLevel": LogLevels.DEBUG.value,
|
|
"openMetadataServerConfig": metadata.config.model_dump(),
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def profiler_config(db_service, workflow_config, sink_config):
|
|
return {
|
|
"source": {
|
|
"type": db_service.connection.config.type.value.lower(),
|
|
"serviceName": db_service.fullyQualifiedName.root,
|
|
"sourceConfig": {
|
|
"config": {
|
|
"type": "Profiler",
|
|
"timeoutSeconds": 600,
|
|
"threadCount": 1, # easier for debugging
|
|
}
|
|
},
|
|
},
|
|
"processor": {
|
|
"type": "orm-profiler",
|
|
"config": {},
|
|
},
|
|
"sink": sink_config,
|
|
"workflowConfig": workflow_config,
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def classifier_config(db_service, workflow_config, sink_config):
|
|
return {
|
|
"source": {
|
|
"type": db_service.connection.config.type.value.lower(),
|
|
"serviceName": db_service.fullyQualifiedName.root,
|
|
"sourceConfig": {
|
|
"config": {
|
|
"type": AutoClassificationConfigType.AutoClassification.value,
|
|
"storeSampleData": True,
|
|
"enableAutoClassification": True,
|
|
}
|
|
},
|
|
},
|
|
"processor": {
|
|
"type": "orm-profiler",
|
|
"config": {},
|
|
},
|
|
"sink": sink_config,
|
|
"workflowConfig": workflow_config,
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def run_workflow():
|
|
def _run(workflow_type: Type[IngestionWorkflow], config, raise_from_status=True):
|
|
workflow: IngestionWorkflow = workflow_type.create(config)
|
|
workflow.execute()
|
|
if raise_from_status:
|
|
workflow.print_status()
|
|
workflow.raise_from_status()
|
|
return workflow
|
|
|
|
return _run
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def db_service(metadata, create_service_request, unmask_password):
|
|
service_entity = metadata.create_or_update(data=create_service_request)
|
|
fqn = service_entity.fullyQualifiedName.root
|
|
yield unmask_password(service_entity)
|
|
service_entity = metadata.get_by_name(DatabaseService, fqn)
|
|
if service_entity:
|
|
metadata.delete(
|
|
DatabaseService, service_entity.id, recursive=True, hard_delete=True
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def unmask_password(create_service_request):
|
|
"""Unmask the db passwrod returned by the metadata service.
|
|
You can override this at the test_module level to implement custom password handling.
|
|
|
|
Example:
|
|
@pytest.fixture(scope="module")
|
|
def unmask_password(my_container1, my_container2):
|
|
def patch_password(service: DatabaseService):
|
|
if service.connection.config.authType.password == "my_password":
|
|
... # do something else
|
|
return service
|
|
return patch_password
|
|
"""
|
|
|
|
def patch_password(service: DatabaseService):
|
|
if hasattr(service.connection.config, "authType"):
|
|
service.connection.config.authType.password = (
|
|
create_service_request.connection.config.authType.password
|
|
)
|
|
return service
|
|
service.connection.config.password = (
|
|
create_service_request.connection.config.password
|
|
)
|
|
return service
|
|
|
|
return patch_password
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def create_service_request():
|
|
"""
|
|
Implement in the test module to create a service request
|
|
Example:
|
|
def create_service_request(scope="module"):
|
|
return CreateDatabaseServiceRequest(
|
|
name="my_service",
|
|
serviceType=DatabaseServiceType.MyService,
|
|
connection=DatabaseConnection(
|
|
config=MyServiceConnection(
|
|
username="my_user",
|
|
password="my_password",
|
|
host="localhost",
|
|
port="5432",
|
|
)
|
|
),
|
|
)
|
|
"""
|
|
raise NotImplementedError("Implement in the test module")
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def monkeymodule():
|
|
with pytest.MonkeyPatch.context() as mp:
|
|
yield mp
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def patch_passwords_for_db_services(db_service, unmask_password, monkeymodule):
|
|
"""Patch the password for all db services returned by the metadata service.
|
|
|
|
Usage:
|
|
|
|
def test_my_test(db_service, patch_passwords_for_db_services):
|
|
...
|
|
|
|
OR
|
|
|
|
@pytest.usefixtures("patch_passwords_for_db_services")
|
|
def test_my_test(db_service):
|
|
...
|
|
"""
|
|
|
|
def override_password(getter):
|
|
def inner(*args, **kwargs):
|
|
result = getter(*args, **kwargs)
|
|
if isinstance(result, DatabaseService):
|
|
if result.fullyQualifiedName.root == db_service.fullyQualifiedName.root:
|
|
return unmask_password(result)
|
|
return result
|
|
|
|
return inner
|
|
|
|
monkeymodule.setattr(
|
|
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name",
|
|
override_password(OpenMetadata.get_by_name),
|
|
)
|
|
|
|
monkeymodule.setattr(
|
|
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id",
|
|
override_password(OpenMetadata.get_by_id),
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def cleanup_fqns(metadata):
|
|
fqns: List[Tuple[Type[Entity], str]] = []
|
|
|
|
def inner(entity_type: Type[Entity], fqn: str):
|
|
fqns.append((entity_type, fqn))
|
|
|
|
yield inner
|
|
for etype, fqn in fqns:
|
|
entity = metadata.get_by_name(etype, fqn, fields=["*"])
|
|
if entity:
|
|
metadata.delete(etype, entity.id, recursive=True, hard_delete=True)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def ingestion_config(db_service, metadata, workflow_config, sink_config):
|
|
return {
|
|
"source": {
|
|
"type": db_service.connection.config.type.value.lower(),
|
|
"serviceName": db_service.fullyQualifiedName.root,
|
|
"sourceConfig": {
|
|
"config": {"type": DatabaseMetadataConfigType.DatabaseMetadata.value}
|
|
},
|
|
"serviceConnection": db_service.connection.model_dump(),
|
|
},
|
|
"sink": sink_config,
|
|
"workflowConfig": workflow_config,
|
|
}
|