mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 07:34:44 +00:00
115 lines
3.2 KiB
Python
115 lines
3.2 KiB
Python
import subprocess
|
|
|
|
import pytest
|
|
from freezegun import freeze_time
|
|
|
|
from datahub.ingestion.source.sql.mysql import MySQLSource
|
|
from datahub.testing import mce_helpers
|
|
from tests.test_helpers import test_connection_helpers
|
|
from tests.test_helpers.click_helpers import run_datahub_cmd
|
|
from tests.test_helpers.docker_helpers import wait_for_port
|
|
|
|
FROZEN_TIME = "2020-04-14 07:00:00"
|
|
MYSQL_PORT = 3306
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def test_resources_dir(pytestconfig):
|
|
return pytestconfig.rootpath / "tests/integration/mysql"
|
|
|
|
|
|
def is_mysql_up(container_name: str, port: int) -> bool:
|
|
"""A cheap way to figure out if mysql is responsive on a container"""
|
|
|
|
cmd = f"docker logs {container_name} 2>&1 | grep '/usr/sbin/mysqld: ready for connections.' | grep {port}"
|
|
ret = subprocess.run(
|
|
cmd,
|
|
shell=True,
|
|
)
|
|
return ret.returncode == 0
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def mysql_runner(docker_compose_runner, pytestconfig, test_resources_dir):
|
|
with docker_compose_runner(
|
|
test_resources_dir / "docker-compose.yml", "mysql"
|
|
) as docker_services:
|
|
wait_for_port(
|
|
docker_services,
|
|
"testmysql",
|
|
MYSQL_PORT,
|
|
timeout=120,
|
|
checker=lambda: is_mysql_up("testmysql", MYSQL_PORT),
|
|
)
|
|
yield docker_services
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"config_file,golden_file",
|
|
[
|
|
("mysql_to_file_with_db.yml", "mysql_mces_with_db_golden.json"),
|
|
("mysql_to_file_no_db.yml", "mysql_mces_no_db_golden.json"),
|
|
("mysql_profile_table_level_only.yml", "mysql_table_level_only.json"),
|
|
(
|
|
"mysql_profile_table_row_count_estimate_only.yml",
|
|
"mysql_table_row_count_estimate_only.json",
|
|
),
|
|
],
|
|
)
|
|
@freeze_time(FROZEN_TIME)
|
|
@pytest.mark.integration
|
|
def test_mysql_ingest_no_db(
|
|
mysql_runner,
|
|
pytestconfig,
|
|
test_resources_dir,
|
|
tmp_path,
|
|
mock_time,
|
|
config_file,
|
|
golden_file,
|
|
):
|
|
# Run the metadata ingestion pipeline.
|
|
config_file = (test_resources_dir / config_file).resolve()
|
|
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)
|
|
|
|
# Verify the output.
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / "mysql_mces.json",
|
|
golden_path=test_resources_dir / golden_file,
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"config_dict, is_success",
|
|
[
|
|
(
|
|
{
|
|
"host_port": "localhost:53307",
|
|
"database": "northwind",
|
|
"username": "root",
|
|
"password": "example",
|
|
},
|
|
True,
|
|
),
|
|
(
|
|
{
|
|
"host_port": "localhost:5330",
|
|
"database": "wrong_db",
|
|
"username": "wrong_user",
|
|
"password": "wrong_pass",
|
|
},
|
|
False,
|
|
),
|
|
],
|
|
)
|
|
@freeze_time(FROZEN_TIME)
|
|
@pytest.mark.integration
|
|
def test_mysql_test_connection(mysql_runner, config_dict, is_success):
|
|
report = test_connection_helpers.run_test_connection(MySQLSource, config_dict)
|
|
if is_success:
|
|
test_connection_helpers.assert_basic_connectivity_success(report)
|
|
else:
|
|
test_connection_helpers.assert_basic_connectivity_failure(
|
|
report, "Connection refused"
|
|
)
|