mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-03 23:07:53 +00:00
288 lines
10 KiB
Python
288 lines
10 KiB
Python
from unittest.mock import ANY, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from datahub.ingestion.source.sql.mssql.source import SQLServerConfig, SQLServerSource
|
|
|
|
|
|
@pytest.fixture
|
|
def mssql_source():
|
|
config = SQLServerConfig(
|
|
host_port="localhost:1433",
|
|
username="test",
|
|
password="test",
|
|
database="test_db",
|
|
temporary_tables_pattern=["^temp_"],
|
|
include_descriptions=False, # Disable description loading to avoid DB connections
|
|
)
|
|
|
|
# Mock the parent class's __init__ to avoid DB connections
|
|
with patch("datahub.ingestion.source.sql.sql_common.SQLAlchemySource.__init__"):
|
|
source = SQLServerSource(config, MagicMock())
|
|
source.discovered_datasets = {"test_db.dbo.regular_table"}
|
|
source.report = MagicMock()
|
|
return source
|
|
|
|
|
|
def test_is_temp_table(mssql_source):
|
|
# Test tables matching temporary table patterns
|
|
assert mssql_source.is_temp_table("test_db.dbo.temp_table") is True
|
|
|
|
# Test tables starting with # (handled by startswith check in is_temp_table)
|
|
assert mssql_source.is_temp_table("test_db.dbo.#some_table") is True
|
|
|
|
# Test tables that are not in discovered_datasets
|
|
assert mssql_source.is_temp_table("test_db.dbo.unknown_table") is True
|
|
|
|
# Test regular tables that should return False
|
|
assert mssql_source.is_temp_table("test_db.dbo.regular_table") is False
|
|
|
|
# Test invalid table name format
|
|
assert mssql_source.is_temp_table("invalid_table_name") is False
|
|
|
|
|
|
def test_detect_rds_environment_on_premises(mssql_source):
|
|
"""Test environment detection for on-premises SQL Server"""
|
|
mock_conn = MagicMock()
|
|
# Mock successful query execution (on-premises)
|
|
mock_conn.execute.return_value = True
|
|
|
|
result = mssql_source._detect_rds_environment(mock_conn)
|
|
|
|
assert result is False
|
|
mock_conn.execute.assert_called_once_with("SELECT TOP 1 * FROM msdb.dbo.sysjobs")
|
|
|
|
|
|
def test_detect_rds_environment_rds(mssql_source):
|
|
"""Test environment detection for RDS/managed SQL Server"""
|
|
mock_conn = MagicMock()
|
|
# Mock failed query execution (RDS)
|
|
mock_conn.execute.side_effect = Exception("Access denied")
|
|
|
|
result = mssql_source._detect_rds_environment(mock_conn)
|
|
|
|
assert result is True
|
|
mock_conn.execute.assert_called_once_with("SELECT TOP 1 * FROM msdb.dbo.sysjobs")
|
|
|
|
|
|
@patch("datahub.ingestion.source.sql.mssql.source.logger")
|
|
def test_get_jobs_managed_environment_success(mock_logger, mssql_source):
|
|
"""Test successful job retrieval in managed environment using stored procedures"""
|
|
mock_conn = MagicMock()
|
|
mock_jobs = {"TestJob": {1: {"job_name": "TestJob", "step_name": "Step1"}}}
|
|
|
|
# Mock managed environment detection
|
|
with patch.object(
|
|
mssql_source, "_detect_rds_environment", return_value=True
|
|
), patch.object(
|
|
mssql_source, "_get_jobs_via_stored_procedures", return_value=mock_jobs
|
|
):
|
|
result = mssql_source._get_jobs(mock_conn, "test_db")
|
|
|
|
assert result == mock_jobs
|
|
mock_logger.info.assert_called_with(
|
|
"Successfully retrieved jobs using stored procedures (managed environment)"
|
|
)
|
|
|
|
|
|
@patch("datahub.ingestion.source.sql.mssql.source.logger")
|
|
def test_get_jobs_on_premises_success(mock_logger, mssql_source):
|
|
"""Test successful job retrieval in on-premises environment using direct query"""
|
|
mock_conn = MagicMock()
|
|
mock_jobs = {"TestJob": {1: {"job_name": "TestJob", "step_name": "Step1"}}}
|
|
|
|
# Mock on-premises environment detection
|
|
with patch.object(
|
|
mssql_source, "_detect_rds_environment", return_value=False
|
|
), patch.object(mssql_source, "_get_jobs_via_direct_query", return_value=mock_jobs):
|
|
result = mssql_source._get_jobs(mock_conn, "test_db")
|
|
|
|
assert result == mock_jobs
|
|
mock_logger.info.assert_called_with(
|
|
"Successfully retrieved jobs using direct query (on-premises environment)"
|
|
)
|
|
|
|
|
|
@patch("datahub.ingestion.source.sql.mssql.source.logger")
|
|
def test_get_jobs_managed_fallback_success(mock_logger, mssql_source):
|
|
"""Test managed environment with stored procedure failure but direct query success"""
|
|
mock_conn = MagicMock()
|
|
mock_jobs = {"TestJob": {1: {"job_name": "TestJob", "step_name": "Step1"}}}
|
|
|
|
# Mock managed environment detection
|
|
with patch.object(
|
|
mssql_source, "_detect_rds_environment", return_value=True
|
|
), patch.object(
|
|
mssql_source,
|
|
"_get_jobs_via_stored_procedures",
|
|
side_effect=Exception("SP failed"),
|
|
), patch.object(mssql_source, "_get_jobs_via_direct_query", return_value=mock_jobs):
|
|
result = mssql_source._get_jobs(mock_conn, "test_db")
|
|
|
|
assert result == mock_jobs
|
|
mock_logger.warning.assert_called_with(
|
|
"Failed to retrieve jobs via stored procedures in managed environment: SP failed"
|
|
)
|
|
mock_logger.info.assert_called_with(
|
|
"Successfully retrieved jobs using direct query fallback in managed environment"
|
|
)
|
|
|
|
|
|
@patch("datahub.ingestion.source.sql.mssql.source.logger")
|
|
def test_get_jobs_on_premises_fallback_success(mock_logger, mssql_source):
|
|
"""Test on-premises environment with direct query failure but stored procedure success"""
|
|
mock_conn = MagicMock()
|
|
mock_jobs = {"TestJob": {1: {"job_name": "TestJob", "step_name": "Step1"}}}
|
|
|
|
# Mock on-premises environment detection
|
|
with patch.object(
|
|
mssql_source, "_detect_rds_environment", return_value=False
|
|
), patch.object(
|
|
mssql_source,
|
|
"_get_jobs_via_direct_query",
|
|
side_effect=Exception("Direct query failed"),
|
|
), patch.object(
|
|
mssql_source, "_get_jobs_via_stored_procedures", return_value=mock_jobs
|
|
):
|
|
result = mssql_source._get_jobs(mock_conn, "test_db")
|
|
|
|
assert result == mock_jobs
|
|
mock_logger.warning.assert_called_with(
|
|
"Failed to retrieve jobs via direct query in on-premises environment: Direct query failed"
|
|
)
|
|
mock_logger.info.assert_called_with(
|
|
"Successfully retrieved jobs using stored procedures fallback in on-premises environment"
|
|
)
|
|
|
|
|
|
@patch("datahub.ingestion.source.sql.mssql.source.logger")
|
|
def test_get_jobs_managed_both_methods_fail(mock_logger, mssql_source):
|
|
"""Test managed environment where both methods fail"""
|
|
mock_conn = MagicMock()
|
|
|
|
# Mock managed environment detection
|
|
with patch.object(
|
|
mssql_source, "_detect_rds_environment", return_value=True
|
|
), patch.object(
|
|
mssql_source,
|
|
"_get_jobs_via_stored_procedures",
|
|
side_effect=Exception("SP failed"),
|
|
), patch.object(
|
|
mssql_source,
|
|
"_get_jobs_via_direct_query",
|
|
side_effect=Exception("Direct failed"),
|
|
):
|
|
result = mssql_source._get_jobs(mock_conn, "test_db")
|
|
|
|
assert result == {}
|
|
mssql_source.report.failure.assert_called_once_with(
|
|
message="Failed to retrieve jobs in managed environment",
|
|
title="SQL Server Jobs Extraction",
|
|
context="Both stored procedures and direct query methods failed",
|
|
exc=ANY,
|
|
)
|
|
|
|
|
|
@patch("datahub.ingestion.source.sql.mssql.source.logger")
|
|
def test_get_jobs_on_premises_both_methods_fail(mock_logger, mssql_source):
|
|
"""Test on-premises environment where both methods fail"""
|
|
mock_conn = MagicMock()
|
|
|
|
# Mock on-premises environment detection
|
|
with patch.object(
|
|
mssql_source, "_detect_rds_environment", return_value=False
|
|
), patch.object(
|
|
mssql_source,
|
|
"_get_jobs_via_direct_query",
|
|
side_effect=Exception("Direct failed"),
|
|
), patch.object(
|
|
mssql_source,
|
|
"_get_jobs_via_stored_procedures",
|
|
side_effect=Exception("SP failed"),
|
|
):
|
|
result = mssql_source._get_jobs(mock_conn, "test_db")
|
|
|
|
assert result == {}
|
|
mssql_source.report.failure.assert_called_once_with(
|
|
message="Failed to retrieve jobs in on-premises environment",
|
|
title="SQL Server Jobs Extraction",
|
|
context="Both direct query and stored procedures methods failed",
|
|
exc=ANY,
|
|
)
|
|
|
|
|
|
def test_stored_procedure_vs_direct_query_compatibility(mssql_source):
|
|
"""Test that both methods return compatible data structures"""
|
|
mock_conn = MagicMock()
|
|
|
|
# Mock data that both methods should be able to process
|
|
mock_job_data = {
|
|
"job_id": "12345678-1234-1234-1234-123456789012",
|
|
"name": "TestJob",
|
|
"description": "Test job description",
|
|
"date_created": "2023-01-01 10:00:00",
|
|
"date_modified": "2023-01-02 11:00:00",
|
|
"enabled": 1,
|
|
"step_id": 1,
|
|
"step_name": "Test Step",
|
|
"subsystem": "TSQL",
|
|
"command": "SELECT 1",
|
|
"database_name": "test_db",
|
|
}
|
|
|
|
# Test stored procedure method
|
|
with patch.object(mock_conn, "execute") as mock_execute:
|
|
# Mock sp_help_job response
|
|
mock_job_result = MagicMock()
|
|
mock_job_result.__iter__.return_value = [mock_job_data]
|
|
|
|
# Mock sp_help_jobstep response
|
|
mock_step_result = MagicMock()
|
|
mock_step_result.__iter__.return_value = [mock_job_data]
|
|
|
|
mock_execute.side_effect = [mock_job_result, mock_step_result]
|
|
|
|
sp_result = mssql_source._get_jobs_via_stored_procedures(mock_conn, "test_db")
|
|
|
|
# Test direct query method
|
|
with patch.object(mock_conn, "execute") as mock_execute:
|
|
mock_query_result = MagicMock()
|
|
mock_query_result.__iter__.return_value = [mock_job_data]
|
|
mock_execute.return_value = mock_query_result
|
|
|
|
direct_result = mssql_source._get_jobs_via_direct_query(mock_conn, "test_db")
|
|
|
|
# Verify both methods return data with same structure
|
|
assert len(sp_result) > 0, "Stored procedure method should return data"
|
|
assert len(direct_result) > 0, "Direct query method should return data"
|
|
|
|
# Get first job from each method
|
|
sp_job = list(sp_result.values())[0]
|
|
direct_job = list(direct_result.values())[0]
|
|
|
|
# Get first step from each method
|
|
sp_step = list(sp_job.values())[0]
|
|
direct_step = list(direct_job.values())[0]
|
|
|
|
# Verify both contain critical fields
|
|
required_fields = [
|
|
"job_id",
|
|
"job_name",
|
|
"description",
|
|
"date_created",
|
|
"date_modified",
|
|
"step_id",
|
|
"step_name",
|
|
"subsystem",
|
|
"command",
|
|
"database_name",
|
|
]
|
|
|
|
for field in required_fields:
|
|
assert field in sp_step, f"Stored procedure result missing field: {field}"
|
|
assert field in direct_step, f"Direct query result missing field: {field}"
|
|
|
|
# Verify database_name is properly set
|
|
assert sp_step["database_name"] == "test_db"
|
|
assert direct_step["database_name"] == "test_db"
|