datahub/metadata-ingestion/tests/unit/test_mysql_rds_iam.py

147 lines
5.2 KiB
Python

from unittest.mock import patch
import pytest
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.sql.mysql import MySQLAuthMode, MySQLConfig, MySQLSource
class TestMySQLRDSIAMConfig:
def test_config_without_rds_iam(self):
config_dict = {
"host_port": "localhost:3306",
"username": "testuser",
"password": "testpass",
"database": "testdb",
}
config = MySQLConfig.parse_obj(config_dict)
assert config.auth_mode == MySQLAuthMode.PASSWORD
assert config.aws_config is not None # aws_config always has default value
assert config.aws_config.aws_region is None # but region is not set
def test_config_with_rds_iam_valid(self):
config_dict = {
"host_port": "test.rds.amazonaws.com:3306",
"username": "testuser",
"database": "testdb",
"auth_mode": "AWS_IAM",
"aws_config": {"aws_region": "us-west-2"},
}
config = MySQLConfig.parse_obj(config_dict)
assert config.auth_mode == MySQLAuthMode.AWS_IAM
assert config.aws_config is not None
assert config.aws_config.aws_region == "us-west-2"
def test_config_with_rds_iam_without_explicit_aws_config(self):
"""Test that RDS IAM works with default aws_config (boto3 will infer region from env)."""
config_dict = {
"host_port": "test.rds.amazonaws.com:3306",
"username": "testuser",
"database": "testdb",
"auth_mode": "AWS_IAM",
}
config = MySQLConfig.parse_obj(config_dict)
assert config.auth_mode == MySQLAuthMode.AWS_IAM
assert config.aws_config is not None
assert config.aws_config.aws_region is None # Will be inferred by boto3
class TestMySQLSourceRDSIAM:
@patch("datahub.ingestion.source.sql.mysql.RDSIAMTokenManager")
def test_init_without_rds_iam(self, mock_token_manager):
config_dict = {
"host_port": "localhost:3306",
"username": "testuser",
"password": "testpass",
"database": "testdb",
}
config = MySQLConfig.parse_obj(config_dict)
ctx = PipelineContext(run_id="test-run")
source = MySQLSource(config, ctx)
assert source._rds_iam_token_manager is None
mock_token_manager.assert_not_called()
@patch("datahub.ingestion.source.sql.mysql.RDSIAMTokenManager")
def test_init_with_rds_iam(self, mock_token_manager):
config_dict = {
"host_port": "test.rds.amazonaws.com:3306",
"username": "testuser",
"database": "testdb",
"auth_mode": "AWS_IAM",
"aws_config": {"aws_region": "us-west-2"},
}
config = MySQLConfig.parse_obj(config_dict)
ctx = PipelineContext(run_id="test-run")
source = MySQLSource(config, ctx)
assert source._rds_iam_token_manager is not None
mock_token_manager.assert_called_once_with(
endpoint="test.rds.amazonaws.com",
username="testuser",
port=3306,
aws_config=config.aws_config,
)
@patch("datahub.ingestion.source.sql.mysql.RDSIAMTokenManager")
def test_init_with_rds_iam_custom_port(self, mock_token_manager):
config_dict = {
"host_port": "test.rds.amazonaws.com:3307",
"username": "testuser",
"database": "testdb",
"auth_mode": "AWS_IAM",
"aws_config": {"aws_region": "us-west-2"},
}
config = MySQLConfig.parse_obj(config_dict)
ctx = PipelineContext(run_id="test-run")
MySQLSource(config, ctx)
mock_token_manager.assert_called_once_with(
endpoint="test.rds.amazonaws.com",
username="testuser",
port=3307,
aws_config=config.aws_config,
)
@patch("datahub.ingestion.source.sql.mysql.RDSIAMTokenManager")
def test_init_with_rds_iam_no_username(self, mock_token_manager):
config_dict = {
"host_port": "test.rds.amazonaws.com:3306",
"database": "testdb",
"auth_mode": "AWS_IAM",
"aws_config": {"aws_region": "us-west-2"},
}
config = MySQLConfig.parse_obj(config_dict)
ctx = PipelineContext(run_id="test-run")
with pytest.raises(ValueError, match="username is required"):
MySQLSource(config, ctx)
@patch("datahub.ingestion.source.sql.mysql.RDSIAMTokenManager")
def test_init_with_rds_iam_invalid_port(self, mock_token_manager):
config_dict = {
"host_port": "test.rds.amazonaws.com:invalid",
"username": "testuser",
"database": "testdb",
"auth_mode": "AWS_IAM",
"aws_config": {"aws_region": "us-west-2"},
}
config = MySQLConfig.parse_obj(config_dict)
ctx = PipelineContext(run_id="test-run")
MySQLSource(config, ctx)
# Should use default port 3306 (silently, like make_sqlalchemy_uri)
mock_token_manager.assert_called_once_with(
endpoint="test.rds.amazonaws.com",
username="testuser",
port=3306,
aws_config=config.aws_config,
)