mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-04 04:29:13 +00:00 
			
		
		
		
	* fix: properly close connection on sampler ingestion * fix: dangling connection test * style: ran python linting * fix: revert to 9
		
			
				
	
	
		
			94 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			94 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import os
 | 
						|
from subprocess import CalledProcessError
 | 
						|
 | 
						|
import pytest
 | 
						|
from sqlalchemy import create_engine
 | 
						|
from testcontainers.mysql import MySqlContainer
 | 
						|
 | 
						|
from _openmetadata_testutils.helpers.docker import try_bind
 | 
						|
from metadata.generated.schema.api.services.createDatabaseService import (
 | 
						|
    CreateDatabaseServiceRequest,
 | 
						|
)
 | 
						|
from metadata.generated.schema.entity.services.databaseService import (
 | 
						|
    DatabaseServiceType,
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="module")
 | 
						|
def mysql_container(tmp_path_factory):
 | 
						|
    """Start a PostgreSQL container with the dvdrental database."""
 | 
						|
    test_db_tar_path = os.path.join(
 | 
						|
        os.path.dirname(__file__), "data", "mysql", "test_db-1.0.7.tar.gz"
 | 
						|
    )
 | 
						|
    container = MySqlContainer(dbname="employees")
 | 
						|
    with (
 | 
						|
        try_bind(container, 3306, 3307) if not os.getenv("CI") else container
 | 
						|
    ) as container:
 | 
						|
        docker_container = container.get_wrapped_container()
 | 
						|
        docker_container.exec_run(["mkdir", "-p", "/data"])
 | 
						|
        docker_container.put_archive("/data", open(test_db_tar_path, "rb"))
 | 
						|
        for command in (
 | 
						|
            [
 | 
						|
                "sh",
 | 
						|
                "-c",
 | 
						|
                f"cd /data/test_db && mysql -uroot -p{container.password}  < employees.sql",
 | 
						|
            ],
 | 
						|
            [
 | 
						|
                "sh",
 | 
						|
                "-c",
 | 
						|
                f'mysql -uroot -p{container.password} -e \'GRANT SELECT ON employees.* TO "test"@"%";\'',
 | 
						|
            ],
 | 
						|
        ):
 | 
						|
            res = docker_container.exec_run(command)
 | 
						|
            if res[0] != 0:
 | 
						|
                raise CalledProcessError(
 | 
						|
                    returncode=res[0], cmd=res, output=res[1].decode("utf-8")
 | 
						|
                )
 | 
						|
        engine = create_engine(container.get_connection_url())
 | 
						|
        engine.execute(
 | 
						|
            "ALTER TABLE employees ADD COLUMN last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
 | 
						|
        )
 | 
						|
        engine.execute(
 | 
						|
            "UPDATE employees SET last_update = hire_date + INTERVAL FLOOR(1 + RAND() * 500000) SECOND"
 | 
						|
        )
 | 
						|
        engine.dispose()
 | 
						|
        assert_dangling_connections(container, 1)
 | 
						|
        yield container
 | 
						|
        # Needs to be handled for Test Cases https://github.com/open-metadata/OpenMetadata/issues/21187
 | 
						|
        assert_dangling_connections(container, 9)
 | 
						|
 | 
						|
 | 
						|
def assert_dangling_connections(container, max_connections):
 | 
						|
    engine = create_engine(container.get_connection_url())
 | 
						|
    with engine.connect() as conn:
 | 
						|
        result = conn.execute("SHOW PROCESSLIST")
 | 
						|
    processes = result.fetchall()
 | 
						|
    # Count all connections except system processes (Daemon, Binlog Dump)
 | 
						|
    # Note: We include Sleep connections as they are still open connections
 | 
						|
    active_connections = len(
 | 
						|
        [p for p in processes if p[1] not in ["Daemon", "Binlog Dump"]]
 | 
						|
    )
 | 
						|
 | 
						|
    assert (
 | 
						|
        active_connections <= max_connections
 | 
						|
    ), f"Found {active_connections} open connections to MySQL"
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture(scope="module")
 | 
						|
def create_service_request(mysql_container, tmp_path_factory):
 | 
						|
    return CreateDatabaseServiceRequest.model_validate(
 | 
						|
        {
 | 
						|
            "name": "docker_test_" + tmp_path_factory.mktemp("mysql").name,
 | 
						|
            "serviceType": DatabaseServiceType.Mysql.value,
 | 
						|
            "connection": {
 | 
						|
                "config": {
 | 
						|
                    "username": mysql_container.username,
 | 
						|
                    "authType": {"password": mysql_container.password},
 | 
						|
                    "hostPort": "localhost:"
 | 
						|
                    + mysql_container.get_exposed_port(mysql_container.port),
 | 
						|
                    "databaseSchema": mysql_container.dbname,
 | 
						|
                }
 | 
						|
            },
 | 
						|
        }
 | 
						|
    )
 |