test: assert dangling db connections (#20458)

added dangling connection assertions for mysql integration test
This commit is contained in:
Imri Paran 2025-04-02 08:38:17 +02:00 committed by GitHub
parent f7d9720486
commit 663839bd85
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 69 additions and 1 deletions

View File

@ -6,6 +6,9 @@ import pytest
from _openmetadata_testutils.ometa import int_admin_ometa
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.databaseServiceAutoClassificationPipeline import (
AutoClassificationConfigType,
)
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseMetadataConfigType,
)
@ -86,6 +89,29 @@ def profiler_config(db_service, workflow_config, sink_config):
}
@pytest.fixture(scope="module")
def classifier_config(db_service, workflow_config, sink_config):
return {
"source": {
"type": db_service.connection.config.type.value.lower(),
"serviceName": db_service.fullyQualifiedName.root,
"sourceConfig": {
"config": {
"type": AutoClassificationConfigType.AutoClassification.value,
"storeSampleData": True,
"enableAutoClassification": True,
}
},
},
"processor": {
"type": "orm-profiler",
"config": {},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
@pytest.fixture(scope="module")
def run_workflow():
def _run(workflow_type: Type[IngestionWorkflow], config, raise_from_status=True):

View File

@ -51,7 +51,27 @@ def mysql_container(tmp_path_factory):
engine.execute(
"UPDATE employees SET last_update = hire_date + INTERVAL FLOOR(1 + RAND() * 500000) SECOND"
)
engine.dispose()
assert_dangling_connections(container, 1)
yield container
# TODO: We are still leaving some connections open. Should be fixed in the future.
assert_dangling_connections(container, 9)
def assert_dangling_connections(container, max_connections):
engine = create_engine(container.get_connection_url())
with engine.connect() as conn:
result = conn.execute("SHOW PROCESSLIST")
processes = result.fetchall()
# Count all connections except system processes (Daemon, Binlog Dump)
# Note: We include Sleep connections as they are still open connections
active_connections = len(
[p for p in processes if p[1] not in ["Daemon", "Binlog Dump"]]
)
assert (
active_connections <= max_connections
), f"Found {active_connections} open connections to MySQL"
@pytest.fixture(scope="module")

View File

@ -0,0 +1,18 @@
import sys
import pytest
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.workflow.classification import AutoClassificationWorkflow
from metadata.workflow.metadata import MetadataWorkflow
if not sys.version_info >= (3, 9):
pytest.skip("requires python 3.9+", allow_module_level=True)
def test_classifier(
patch_passwords_for_db_services, run_workflow, ingestion_config, classifier_config
):
search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config)
run_workflow(AutoClassificationWorkflow, classifier_config)

View File

@ -11,7 +11,11 @@ if not sys.version_info >= (3, 9):
def test_profiler(
patch_passwords_for_db_services, run_workflow, ingestion_config, profiler_config
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
profiler_config,
mysql_container,
):
search_cache.clear()
run_workflow(MetadataWorkflow, ingestion_config)