mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-19 14:24:01 +00:00
175 lines
6.2 KiB
Python
175 lines
6.2 KiB
Python
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from datahub.ingestion.api.common import PipelineContext
|
|
from datahub.ingestion.source.sql.postgres import (
|
|
PostgresAuthMode,
|
|
PostgresConfig,
|
|
PostgresSource,
|
|
)
|
|
|
|
|
|
class TestPostgresRDSIAMConfig:
|
|
def test_config_without_rds_iam(self):
|
|
config_dict = {
|
|
"host_port": "localhost:5432",
|
|
"username": "testuser",
|
|
"password": "testpass",
|
|
"database": "testdb",
|
|
}
|
|
config = PostgresConfig.parse_obj(config_dict)
|
|
|
|
assert config.auth_mode == PostgresAuthMode.PASSWORD
|
|
assert config.aws_config is not None # aws_config always has default value
|
|
assert config.aws_config.aws_region is None # but region is not set
|
|
|
|
def test_config_with_rds_iam_valid(self):
|
|
config_dict = {
|
|
"host_port": "test.rds.amazonaws.com:5432",
|
|
"username": "testuser",
|
|
"database": "testdb",
|
|
"auth_mode": "AWS_IAM",
|
|
"aws_config": {"aws_region": "us-west-2"},
|
|
}
|
|
config = PostgresConfig.parse_obj(config_dict)
|
|
|
|
assert config.auth_mode == PostgresAuthMode.AWS_IAM
|
|
assert config.aws_config is not None
|
|
assert config.aws_config.aws_region == "us-west-2"
|
|
|
|
def test_config_with_rds_iam_without_explicit_aws_config(self):
|
|
"""Test that RDS IAM works with default aws_config (boto3 will infer region from env)."""
|
|
config_dict = {
|
|
"host_port": "test.rds.amazonaws.com:5432",
|
|
"username": "testuser",
|
|
"database": "testdb",
|
|
"auth_mode": "AWS_IAM",
|
|
}
|
|
config = PostgresConfig.parse_obj(config_dict)
|
|
|
|
assert config.auth_mode == PostgresAuthMode.AWS_IAM
|
|
assert config.aws_config is not None
|
|
assert config.aws_config.aws_region is None # Will be inferred by boto3
|
|
|
|
|
|
class TestPostgresSourceRDSIAM:
|
|
@patch("datahub.ingestion.source.sql.postgres.RDSIAMTokenManager")
|
|
def test_init_without_rds_iam(self, mock_token_manager):
|
|
config_dict = {
|
|
"host_port": "localhost:5432",
|
|
"username": "testuser",
|
|
"password": "testpass",
|
|
"database": "testdb",
|
|
}
|
|
config = PostgresConfig.parse_obj(config_dict)
|
|
ctx = PipelineContext(run_id="test-run")
|
|
|
|
source = PostgresSource(config, ctx)
|
|
|
|
assert source._rds_iam_token_manager is None
|
|
mock_token_manager.assert_not_called()
|
|
|
|
@patch("datahub.ingestion.source.sql.postgres.RDSIAMTokenManager")
|
|
def test_init_with_rds_iam(self, mock_token_manager):
|
|
config_dict = {
|
|
"host_port": "test.rds.amazonaws.com:5432",
|
|
"username": "testuser",
|
|
"database": "testdb",
|
|
"auth_mode": "AWS_IAM",
|
|
"aws_config": {"aws_region": "us-west-2"},
|
|
}
|
|
config = PostgresConfig.parse_obj(config_dict)
|
|
ctx = PipelineContext(run_id="test-run")
|
|
|
|
source = PostgresSource(config, ctx)
|
|
|
|
assert source._rds_iam_token_manager is not None
|
|
mock_token_manager.assert_called_once_with(
|
|
endpoint="test.rds.amazonaws.com",
|
|
username="testuser",
|
|
port=5432,
|
|
aws_config=config.aws_config,
|
|
)
|
|
|
|
@patch("datahub.ingestion.source.sql.postgres.RDSIAMTokenManager")
|
|
def test_init_with_rds_iam_custom_port(self, mock_token_manager):
|
|
config_dict = {
|
|
"host_port": "test.rds.amazonaws.com:5433",
|
|
"username": "testuser",
|
|
"database": "testdb",
|
|
"auth_mode": "AWS_IAM",
|
|
"aws_config": {"aws_region": "us-west-2"},
|
|
}
|
|
config = PostgresConfig.parse_obj(config_dict)
|
|
ctx = PipelineContext(run_id="test-run")
|
|
|
|
PostgresSource(config, ctx)
|
|
|
|
mock_token_manager.assert_called_once_with(
|
|
endpoint="test.rds.amazonaws.com",
|
|
username="testuser",
|
|
port=5433,
|
|
aws_config=config.aws_config,
|
|
)
|
|
|
|
@patch("datahub.ingestion.source.sql.postgres.RDSIAMTokenManager")
|
|
def test_init_with_rds_iam_no_username(self, mock_token_manager):
|
|
config_dict = {
|
|
"host_port": "test.rds.amazonaws.com:5432",
|
|
"database": "testdb",
|
|
"auth_mode": "AWS_IAM",
|
|
"aws_config": {"aws_region": "us-west-2"},
|
|
}
|
|
config = PostgresConfig.parse_obj(config_dict)
|
|
ctx = PipelineContext(run_id="test-run")
|
|
|
|
with pytest.raises(ValueError, match="username is required"):
|
|
PostgresSource(config, ctx)
|
|
|
|
@patch("datahub.ingestion.source.sql.postgres.RDSIAMTokenManager")
|
|
def test_init_with_rds_iam_invalid_port(self, mock_token_manager):
|
|
config_dict = {
|
|
"host_port": "test.rds.amazonaws.com:invalid",
|
|
"username": "testuser",
|
|
"database": "testdb",
|
|
"auth_mode": "AWS_IAM",
|
|
"aws_config": {"aws_region": "us-west-2"},
|
|
}
|
|
config = PostgresConfig.parse_obj(config_dict)
|
|
ctx = PipelineContext(run_id="test-run")
|
|
|
|
PostgresSource(config, ctx)
|
|
|
|
# Should use default port 5432 (silently, like make_sqlalchemy_uri)
|
|
mock_token_manager.assert_called_once_with(
|
|
endpoint="test.rds.amazonaws.com",
|
|
username="testuser",
|
|
port=5432,
|
|
aws_config=config.aws_config,
|
|
)
|
|
|
|
@patch("datahub.ingestion.source.sql.postgres.RDSIAMTokenManager")
|
|
def test_init_with_rds_iam_stores_hostname_and_port(self, mock_token_manager):
|
|
"""Test that hostname and port are passed to token manager."""
|
|
config_dict = {
|
|
"host_port": "test.rds.amazonaws.com:5433",
|
|
"username": "testuser",
|
|
"database": "testdb",
|
|
"auth_mode": "AWS_IAM",
|
|
"aws_config": {"aws_region": "us-west-2"},
|
|
}
|
|
config = PostgresConfig.parse_obj(config_dict)
|
|
ctx = PipelineContext(run_id="test-run")
|
|
|
|
source = PostgresSource(config, ctx)
|
|
|
|
# Verify token manager was created with correct parameters
|
|
assert source._rds_iam_token_manager is not None
|
|
mock_token_manager.assert_called_once_with(
|
|
endpoint="test.rds.amazonaws.com",
|
|
username="testuser",
|
|
port=5433,
|
|
aws_config=config.aws_config,
|
|
)
|