Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

94 lines
3.5 KiB
Python
Raw Normal View History

import os
from subprocess import CalledProcessError
import pytest
from sqlalchemy import create_engine
from testcontainers.mysql import MySqlContainer
from _openmetadata_testutils.helpers.docker import try_bind
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
@pytest.fixture(scope="module")
def mysql_container(tmp_path_factory):
"""Start a PostgreSQL container with the dvdrental database."""
test_db_tar_path = os.path.join(
os.path.dirname(__file__), "data", "mysql", "test_db-1.0.7.tar.gz"
)
container = MySqlContainer(dbname="employees")
with (
try_bind(container, 3306, 3307) if not os.getenv("CI") else container
) as container:
docker_container = container.get_wrapped_container()
docker_container.exec_run(["mkdir", "-p", "/data"])
docker_container.put_archive("/data", open(test_db_tar_path, "rb"))
for command in (
[
"sh",
"-c",
f"cd /data/test_db && mysql -uroot -p{container.password} < employees.sql",
],
[
"sh",
"-c",
f'mysql -uroot -p{container.password} -e \'GRANT SELECT ON employees.* TO "test"@"%";\'',
],
):
res = docker_container.exec_run(command)
if res[0] != 0:
raise CalledProcessError(
returncode=res[0], cmd=res, output=res[1].decode("utf-8")
)
engine = create_engine(container.get_connection_url())
engine.execute(
"ALTER TABLE employees ADD COLUMN last_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP"
)
engine.execute(
"UPDATE employees SET last_update = hire_date + INTERVAL FLOOR(1 + RAND() * 500000) SECOND"
)
engine.dispose()
assert_dangling_connections(container, 1)
yield container
# TODO: We are still leaving some connections open. Should be fixed in the future.
assert_dangling_connections(container, 9)
def assert_dangling_connections(container, max_connections):
engine = create_engine(container.get_connection_url())
with engine.connect() as conn:
result = conn.execute("SHOW PROCESSLIST")
processes = result.fetchall()
# Count all connections except system processes (Daemon, Binlog Dump)
# Note: We include Sleep connections as they are still open connections
active_connections = len(
[p for p in processes if p[1] not in ["Daemon", "Binlog Dump"]]
)
assert (
active_connections <= max_connections
), f"Found {active_connections} open connections to MySQL"
@pytest.fixture(scope="module")
def create_service_request(mysql_container, tmp_path_factory):
return CreateDatabaseServiceRequest.model_validate(
{
"name": "docker_test_" + tmp_path_factory.mktemp("mysql").name,
"serviceType": DatabaseServiceType.Mysql.value,
"connection": {
"config": {
"username": mysql_container.username,
"authType": {"password": mysql_container.password},
"hostPort": "localhost:"
+ mysql_container.get_exposed_port(mysql_container.port),
"databaseSchema": mysql_container.dbname,
}
},
}
)