diff --git a/ingestion/tests/integration/conftest.py b/ingestion/tests/integration/conftest.py index a838715059a..4cfbd55c1a2 100644 --- a/ingestion/tests/integration/conftest.py +++ b/ingestion/tests/integration/conftest.py @@ -6,6 +6,9 @@ import pytest from _openmetadata_testutils.ometa import int_admin_ometa from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.metadataIngestion.databaseServiceAutoClassificationPipeline import ( + AutoClassificationConfigType, +) from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import ( DatabaseMetadataConfigType, ) @@ -86,6 +89,29 @@ def profiler_config(db_service, workflow_config, sink_config): } +@pytest.fixture(scope="module") +def classifier_config(db_service, workflow_config, sink_config): + return { + "source": { + "type": db_service.connection.config.type.value.lower(), + "serviceName": db_service.fullyQualifiedName.root, + "sourceConfig": { + "config": { + "type": AutoClassificationConfigType.AutoClassification.value, + "storeSampleData": True, + "enableAutoClassification": True, + } + }, + }, + "processor": { + "type": "orm-profiler", + "config": {}, + }, + "sink": sink_config, + "workflowConfig": workflow_config, + } + + @pytest.fixture(scope="module") def run_workflow(): def _run(workflow_type: Type[IngestionWorkflow], config, raise_from_status=True): diff --git a/ingestion/tests/integration/mysql/conftest.py b/ingestion/tests/integration/mysql/conftest.py index 1147984f0b2..5c1b7d2bf97 100644 --- a/ingestion/tests/integration/mysql/conftest.py +++ b/ingestion/tests/integration/mysql/conftest.py @@ -51,7 +51,27 @@ def mysql_container(tmp_path_factory): engine.execute( "UPDATE employees SET last_update = hire_date + INTERVAL FLOOR(1 + RAND() * 500000) SECOND" ) + engine.dispose() + assert_dangling_connections(container, 1) yield container + # TODO: We are still leaving some connections open. Should be fixed in the future. + assert_dangling_connections(container, 9) + + +def assert_dangling_connections(container, max_connections): + engine = create_engine(container.get_connection_url()) + with engine.connect() as conn: + result = conn.execute("SHOW PROCESSLIST") + processes = result.fetchall() + # Count all connections except system processes (Daemon, Binlog Dump) + # Note: We include Sleep connections as they are still open connections + active_connections = len( + [p for p in processes if p[1] not in ["Daemon", "Binlog Dump"]] + ) + + assert ( + active_connections <= max_connections + ), f"Found {active_connections} open connections to MySQL" @pytest.fixture(scope="module") diff --git a/ingestion/tests/integration/mysql/test_classifier.py b/ingestion/tests/integration/mysql/test_classifier.py new file mode 100644 index 00000000000..324b447a6e8 --- /dev/null +++ b/ingestion/tests/integration/mysql/test_classifier.py @@ -0,0 +1,18 @@ +import sys + +import pytest + +from metadata.ingestion.lineage.sql_lineage import search_cache +from metadata.workflow.classification import AutoClassificationWorkflow +from metadata.workflow.metadata import MetadataWorkflow + +if not sys.version_info >= (3, 9): + pytest.skip("requires python 3.9+", allow_module_level=True) + + +def test_classifier( + patch_passwords_for_db_services, run_workflow, ingestion_config, classifier_config +): + search_cache.clear() + run_workflow(MetadataWorkflow, ingestion_config) + run_workflow(AutoClassificationWorkflow, classifier_config) diff --git a/ingestion/tests/integration/mysql/test_profiler.py b/ingestion/tests/integration/mysql/test_profiler.py index 5aa854b0601..4f2aa44f6d3 100644 --- a/ingestion/tests/integration/mysql/test_profiler.py +++ b/ingestion/tests/integration/mysql/test_profiler.py @@ -11,7 +11,11 @@ if not sys.version_info >= (3, 9): def test_profiler( - patch_passwords_for_db_services, run_workflow, ingestion_config, profiler_config + patch_passwords_for_db_services, + run_workflow, + ingestion_config, + profiler_config, + mysql_container, ): search_cache.clear() run_workflow(MetadataWorkflow, ingestion_config)