from unittest.mock import ANY, MagicMock, patch import pytest from datahub.ingestion.source.sql.mssql.source import SQLServerConfig, SQLServerSource @pytest.fixture def mssql_source(): config = SQLServerConfig( host_port="localhost:1433", username="test", password="test", database="test_db", temporary_tables_pattern=["^temp_"], include_descriptions=False, # Disable description loading to avoid DB connections ) # Mock the parent class's __init__ to avoid DB connections with patch("datahub.ingestion.source.sql.sql_common.SQLAlchemySource.__init__"): source = SQLServerSource(config, MagicMock()) source.discovered_datasets = {"test_db.dbo.regular_table"} source.report = MagicMock() return source def test_is_temp_table(mssql_source): # Test tables matching temporary table patterns assert mssql_source.is_temp_table("test_db.dbo.temp_table") is True # Test tables starting with # (handled by startswith check in is_temp_table) assert mssql_source.is_temp_table("test_db.dbo.#some_table") is True # Test tables that are not in discovered_datasets assert mssql_source.is_temp_table("test_db.dbo.unknown_table") is True # Test regular tables that should return False assert mssql_source.is_temp_table("test_db.dbo.regular_table") is False # Test invalid table name format assert mssql_source.is_temp_table("invalid_table_name") is False def test_detect_rds_environment_on_premises(mssql_source): """Test environment detection for on-premises SQL Server""" mock_conn = MagicMock() # Mock successful query execution (on-premises) mock_conn.execute.return_value = True result = mssql_source._detect_rds_environment(mock_conn) assert result is False mock_conn.execute.assert_called_once_with("SELECT TOP 1 * FROM msdb.dbo.sysjobs") def test_detect_rds_environment_rds(mssql_source): """Test environment detection for RDS/managed SQL Server""" mock_conn = MagicMock() # Mock failed query execution (RDS) mock_conn.execute.side_effect = Exception("Access denied") result = mssql_source._detect_rds_environment(mock_conn) assert result is True mock_conn.execute.assert_called_once_with("SELECT TOP 1 * FROM msdb.dbo.sysjobs") @patch("datahub.ingestion.source.sql.mssql.source.logger") def test_get_jobs_managed_environment_success(mock_logger, mssql_source): """Test successful job retrieval in managed environment using stored procedures""" mock_conn = MagicMock() mock_jobs = {"TestJob": {1: {"job_name": "TestJob", "step_name": "Step1"}}} # Mock managed environment detection with patch.object( mssql_source, "_detect_rds_environment", return_value=True ), patch.object( mssql_source, "_get_jobs_via_stored_procedures", return_value=mock_jobs ): result = mssql_source._get_jobs(mock_conn, "test_db") assert result == mock_jobs mock_logger.info.assert_called_with( "Successfully retrieved jobs using stored procedures (managed environment)" ) @patch("datahub.ingestion.source.sql.mssql.source.logger") def test_get_jobs_on_premises_success(mock_logger, mssql_source): """Test successful job retrieval in on-premises environment using direct query""" mock_conn = MagicMock() mock_jobs = {"TestJob": {1: {"job_name": "TestJob", "step_name": "Step1"}}} # Mock on-premises environment detection with patch.object( mssql_source, "_detect_rds_environment", return_value=False ), patch.object(mssql_source, "_get_jobs_via_direct_query", return_value=mock_jobs): result = mssql_source._get_jobs(mock_conn, "test_db") assert result == mock_jobs mock_logger.info.assert_called_with( "Successfully retrieved jobs using direct query (on-premises environment)" ) @patch("datahub.ingestion.source.sql.mssql.source.logger") def test_get_jobs_managed_fallback_success(mock_logger, mssql_source): """Test managed environment with stored procedure failure but direct query success""" mock_conn = MagicMock() mock_jobs = {"TestJob": {1: {"job_name": "TestJob", "step_name": "Step1"}}} # Mock managed environment detection with patch.object( mssql_source, "_detect_rds_environment", return_value=True ), patch.object( mssql_source, "_get_jobs_via_stored_procedures", side_effect=Exception("SP failed"), ), patch.object(mssql_source, "_get_jobs_via_direct_query", return_value=mock_jobs): result = mssql_source._get_jobs(mock_conn, "test_db") assert result == mock_jobs mock_logger.warning.assert_called_with( "Failed to retrieve jobs via stored procedures in managed environment: SP failed" ) mock_logger.info.assert_called_with( "Successfully retrieved jobs using direct query fallback in managed environment" ) @patch("datahub.ingestion.source.sql.mssql.source.logger") def test_get_jobs_on_premises_fallback_success(mock_logger, mssql_source): """Test on-premises environment with direct query failure but stored procedure success""" mock_conn = MagicMock() mock_jobs = {"TestJob": {1: {"job_name": "TestJob", "step_name": "Step1"}}} # Mock on-premises environment detection with patch.object( mssql_source, "_detect_rds_environment", return_value=False ), patch.object( mssql_source, "_get_jobs_via_direct_query", side_effect=Exception("Direct query failed"), ), patch.object( mssql_source, "_get_jobs_via_stored_procedures", return_value=mock_jobs ): result = mssql_source._get_jobs(mock_conn, "test_db") assert result == mock_jobs mock_logger.warning.assert_called_with( "Failed to retrieve jobs via direct query in on-premises environment: Direct query failed" ) mock_logger.info.assert_called_with( "Successfully retrieved jobs using stored procedures fallback in on-premises environment" ) @patch("datahub.ingestion.source.sql.mssql.source.logger") def test_get_jobs_managed_both_methods_fail(mock_logger, mssql_source): """Test managed environment where both methods fail""" mock_conn = MagicMock() # Mock managed environment detection with patch.object( mssql_source, "_detect_rds_environment", return_value=True ), patch.object( mssql_source, "_get_jobs_via_stored_procedures", side_effect=Exception("SP failed"), ), patch.object( mssql_source, "_get_jobs_via_direct_query", side_effect=Exception("Direct failed"), ): result = mssql_source._get_jobs(mock_conn, "test_db") assert result == {} mssql_source.report.failure.assert_called_once_with( message="Failed to retrieve jobs in managed environment", title="SQL Server Jobs Extraction", context="Both stored procedures and direct query methods failed", exc=ANY, ) @patch("datahub.ingestion.source.sql.mssql.source.logger") def test_get_jobs_on_premises_both_methods_fail(mock_logger, mssql_source): """Test on-premises environment where both methods fail""" mock_conn = MagicMock() # Mock on-premises environment detection with patch.object( mssql_source, "_detect_rds_environment", return_value=False ), patch.object( mssql_source, "_get_jobs_via_direct_query", side_effect=Exception("Direct failed"), ), patch.object( mssql_source, "_get_jobs_via_stored_procedures", side_effect=Exception("SP failed"), ): result = mssql_source._get_jobs(mock_conn, "test_db") assert result == {} mssql_source.report.failure.assert_called_once_with( message="Failed to retrieve jobs in on-premises environment", title="SQL Server Jobs Extraction", context="Both direct query and stored procedures methods failed", exc=ANY, ) def test_stored_procedure_vs_direct_query_compatibility(mssql_source): """Test that both methods return compatible data structures""" mock_conn = MagicMock() # Mock data that both methods should be able to process mock_job_data = { "job_id": "12345678-1234-1234-1234-123456789012", "name": "TestJob", "description": "Test job description", "date_created": "2023-01-01 10:00:00", "date_modified": "2023-01-02 11:00:00", "enabled": 1, "step_id": 1, "step_name": "Test Step", "subsystem": "TSQL", "command": "SELECT 1", "database_name": "test_db", } # Test stored procedure method with patch.object(mock_conn, "execute") as mock_execute: # Mock sp_help_job response mock_job_result = MagicMock() mock_job_result.__iter__.return_value = [mock_job_data] # Mock sp_help_jobstep response mock_step_result = MagicMock() mock_step_result.__iter__.return_value = [mock_job_data] mock_execute.side_effect = [mock_job_result, mock_step_result] sp_result = mssql_source._get_jobs_via_stored_procedures(mock_conn, "test_db") # Test direct query method with patch.object(mock_conn, "execute") as mock_execute: mock_query_result = MagicMock() mock_query_result.__iter__.return_value = [mock_job_data] mock_execute.return_value = mock_query_result direct_result = mssql_source._get_jobs_via_direct_query(mock_conn, "test_db") # Verify both methods return data with same structure assert len(sp_result) > 0, "Stored procedure method should return data" assert len(direct_result) > 0, "Direct query method should return data" # Get first job from each method sp_job = list(sp_result.values())[0] direct_job = list(direct_result.values())[0] # Get first step from each method sp_step = list(sp_job.values())[0] direct_step = list(direct_job.values())[0] # Verify both contain critical fields required_fields = [ "job_id", "job_name", "description", "date_created", "date_modified", "step_id", "step_name", "subsystem", "command", "database_name", ] for field in required_fields: assert field in sp_step, f"Stored procedure result missing field: {field}" assert field in direct_step, f"Direct query result missing field: {field}" # Verify database_name is properly set assert sp_step["database_name"] == "test_db" assert direct_step["database_name"] == "test_db"