mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-14 10:19:51 +00:00
147 lines
5.2 KiB
Python
147 lines
5.2 KiB
Python
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from datahub.ingestion.api.common import PipelineContext
|
|
from datahub.ingestion.source.sql.mysql import MySQLAuthMode, MySQLConfig, MySQLSource
|
|
|
|
|
|
class TestMySQLRDSIAMConfig:
|
|
def test_config_without_rds_iam(self):
|
|
config_dict = {
|
|
"host_port": "localhost:3306",
|
|
"username": "testuser",
|
|
"password": "testpass",
|
|
"database": "testdb",
|
|
}
|
|
config = MySQLConfig.parse_obj(config_dict)
|
|
|
|
assert config.auth_mode == MySQLAuthMode.PASSWORD
|
|
assert config.aws_config is not None # aws_config always has default value
|
|
assert config.aws_config.aws_region is None # but region is not set
|
|
|
|
def test_config_with_rds_iam_valid(self):
|
|
config_dict = {
|
|
"host_port": "test.rds.amazonaws.com:3306",
|
|
"username": "testuser",
|
|
"database": "testdb",
|
|
"auth_mode": "AWS_IAM",
|
|
"aws_config": {"aws_region": "us-west-2"},
|
|
}
|
|
config = MySQLConfig.parse_obj(config_dict)
|
|
|
|
assert config.auth_mode == MySQLAuthMode.AWS_IAM
|
|
assert config.aws_config is not None
|
|
assert config.aws_config.aws_region == "us-west-2"
|
|
|
|
def test_config_with_rds_iam_without_explicit_aws_config(self):
|
|
"""Test that RDS IAM works with default aws_config (boto3 will infer region from env)."""
|
|
config_dict = {
|
|
"host_port": "test.rds.amazonaws.com:3306",
|
|
"username": "testuser",
|
|
"database": "testdb",
|
|
"auth_mode": "AWS_IAM",
|
|
}
|
|
config = MySQLConfig.parse_obj(config_dict)
|
|
|
|
assert config.auth_mode == MySQLAuthMode.AWS_IAM
|
|
assert config.aws_config is not None
|
|
assert config.aws_config.aws_region is None # Will be inferred by boto3
|
|
|
|
|
|
class TestMySQLSourceRDSIAM:
|
|
@patch("datahub.ingestion.source.sql.mysql.RDSIAMTokenManager")
|
|
def test_init_without_rds_iam(self, mock_token_manager):
|
|
config_dict = {
|
|
"host_port": "localhost:3306",
|
|
"username": "testuser",
|
|
"password": "testpass",
|
|
"database": "testdb",
|
|
}
|
|
config = MySQLConfig.parse_obj(config_dict)
|
|
ctx = PipelineContext(run_id="test-run")
|
|
|
|
source = MySQLSource(config, ctx)
|
|
|
|
assert source._rds_iam_token_manager is None
|
|
mock_token_manager.assert_not_called()
|
|
|
|
@patch("datahub.ingestion.source.sql.mysql.RDSIAMTokenManager")
|
|
def test_init_with_rds_iam(self, mock_token_manager):
|
|
config_dict = {
|
|
"host_port": "test.rds.amazonaws.com:3306",
|
|
"username": "testuser",
|
|
"database": "testdb",
|
|
"auth_mode": "AWS_IAM",
|
|
"aws_config": {"aws_region": "us-west-2"},
|
|
}
|
|
config = MySQLConfig.parse_obj(config_dict)
|
|
ctx = PipelineContext(run_id="test-run")
|
|
|
|
source = MySQLSource(config, ctx)
|
|
|
|
assert source._rds_iam_token_manager is not None
|
|
mock_token_manager.assert_called_once_with(
|
|
endpoint="test.rds.amazonaws.com",
|
|
username="testuser",
|
|
port=3306,
|
|
aws_config=config.aws_config,
|
|
)
|
|
|
|
@patch("datahub.ingestion.source.sql.mysql.RDSIAMTokenManager")
|
|
def test_init_with_rds_iam_custom_port(self, mock_token_manager):
|
|
config_dict = {
|
|
"host_port": "test.rds.amazonaws.com:3307",
|
|
"username": "testuser",
|
|
"database": "testdb",
|
|
"auth_mode": "AWS_IAM",
|
|
"aws_config": {"aws_region": "us-west-2"},
|
|
}
|
|
config = MySQLConfig.parse_obj(config_dict)
|
|
ctx = PipelineContext(run_id="test-run")
|
|
|
|
MySQLSource(config, ctx)
|
|
|
|
mock_token_manager.assert_called_once_with(
|
|
endpoint="test.rds.amazonaws.com",
|
|
username="testuser",
|
|
port=3307,
|
|
aws_config=config.aws_config,
|
|
)
|
|
|
|
@patch("datahub.ingestion.source.sql.mysql.RDSIAMTokenManager")
|
|
def test_init_with_rds_iam_no_username(self, mock_token_manager):
|
|
config_dict = {
|
|
"host_port": "test.rds.amazonaws.com:3306",
|
|
"database": "testdb",
|
|
"auth_mode": "AWS_IAM",
|
|
"aws_config": {"aws_region": "us-west-2"},
|
|
}
|
|
config = MySQLConfig.parse_obj(config_dict)
|
|
ctx = PipelineContext(run_id="test-run")
|
|
|
|
with pytest.raises(ValueError, match="username is required"):
|
|
MySQLSource(config, ctx)
|
|
|
|
@patch("datahub.ingestion.source.sql.mysql.RDSIAMTokenManager")
|
|
def test_init_with_rds_iam_invalid_port(self, mock_token_manager):
|
|
config_dict = {
|
|
"host_port": "test.rds.amazonaws.com:invalid",
|
|
"username": "testuser",
|
|
"database": "testdb",
|
|
"auth_mode": "AWS_IAM",
|
|
"aws_config": {"aws_region": "us-west-2"},
|
|
}
|
|
config = MySQLConfig.parse_obj(config_dict)
|
|
ctx = PipelineContext(run_id="test-run")
|
|
|
|
MySQLSource(config, ctx)
|
|
|
|
# Should use default port 3306 (silently, like make_sqlalchemy_uri)
|
|
mock_token_manager.assert_called_once_with(
|
|
endpoint="test.rds.amazonaws.com",
|
|
username="testuser",
|
|
port=3306,
|
|
aws_config=config.aws_config,
|
|
)
|