mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-06 16:49:03 +00:00
331 lines
9.7 KiB
Python
331 lines
9.7 KiB
Python
![]() |
import logging
|
||
|
import os
|
||
|
import tempfile
|
||
|
from contextlib import contextmanager
|
||
|
from typing import Generator
|
||
|
from unittest.mock import Mock
|
||
|
|
||
|
import pytest
|
||
|
from click.testing import CliRunner
|
||
|
|
||
|
from datahub_actions.cli.actions import actions, pipeline_config_to_pipeline
|
||
|
from datahub_actions.pipeline.pipeline import Pipeline
|
||
|
from datahub_actions.pipeline.pipeline_manager import PipelineManager
|
||
|
|
||
|
|
||
|
@contextmanager
|
||
|
def local_monkeypatch(monkeypatch, target, replacement):
|
||
|
"""Apply monkeypatch temporarily within a context."""
|
||
|
monkeypatch.setattr(target, replacement)
|
||
|
try:
|
||
|
yield
|
||
|
finally:
|
||
|
monkeypatch.undo()
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def capture_logger(
|
||
|
caplog: pytest.LogCaptureFixture,
|
||
|
) -> Generator[pytest.LogCaptureFixture, None, None]:
|
||
|
"""Fixture to capture logger output with the proper level."""
|
||
|
caplog.set_level(logging.INFO)
|
||
|
yield caplog
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def mock_pipeline() -> Mock:
|
||
|
"""Create a mock pipeline object."""
|
||
|
mock = Mock(spec=Pipeline)
|
||
|
mock.name = "test_pipeline"
|
||
|
return mock
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def mock_pipeline_manager() -> Mock:
|
||
|
"""Create a mock pipeline manager."""
|
||
|
mock = Mock(spec=PipelineManager)
|
||
|
mock.start_pipeline = Mock(return_value=None)
|
||
|
mock.stop_all = Mock(return_value=None)
|
||
|
return mock
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def temp_config_file() -> Generator[str, None, None]:
|
||
|
"""Creates a temporary YAML config file for testing."""
|
||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
|
||
|
f.write(
|
||
|
"""
|
||
|
name: "test_pipeline"
|
||
|
datahub:
|
||
|
server: "https://test.datahub.io"
|
||
|
token: "test-token"
|
||
|
source:
|
||
|
type: "datahub-cloud"
|
||
|
action:
|
||
|
type: "hello_world"
|
||
|
"""
|
||
|
)
|
||
|
config_path = f.name
|
||
|
|
||
|
yield config_path
|
||
|
# Cleanup
|
||
|
os.unlink(config_path)
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def disabled_config_file() -> Generator[str, None, None]:
|
||
|
"""Creates a temporary YAML config file with disabled pipeline."""
|
||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
|
||
|
f.write(
|
||
|
"""
|
||
|
name: "disabled_pipeline"
|
||
|
enabled: false
|
||
|
datahub:
|
||
|
server: "https://test.datahub.io"
|
||
|
token: "test-token"
|
||
|
source:
|
||
|
type: "datahub-cloud"
|
||
|
action:
|
||
|
type: "hello_world"
|
||
|
"""
|
||
|
)
|
||
|
config_path = f.name
|
||
|
|
||
|
yield config_path
|
||
|
# Cleanup
|
||
|
os.unlink(config_path)
|
||
|
|
||
|
|
||
|
@pytest.fixture
|
||
|
def invalid_config_file() -> Generator[str, None, None]:
|
||
|
"""Creates a temporary YAML config file with unbound variables."""
|
||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f:
|
||
|
f.write(
|
||
|
"""
|
||
|
name: "invalid_pipeline"
|
||
|
datahub:
|
||
|
server: "https://test.datahub.io"
|
||
|
token: ${UNDEFINED_VAR}
|
||
|
source:
|
||
|
type: "datahub-cloud"
|
||
|
action:
|
||
|
type: "hello_world"
|
||
|
"""
|
||
|
)
|
||
|
config_path = f.name
|
||
|
|
||
|
yield config_path
|
||
|
# Cleanup
|
||
|
os.unlink(config_path)
|
||
|
|
||
|
|
||
|
def test_version_command() -> None:
|
||
|
"""Test the version command outputs version information."""
|
||
|
runner = CliRunner()
|
||
|
result = runner.invoke(actions, ["version"])
|
||
|
assert result.exit_code == 0
|
||
|
assert "DataHub Actions version:" in result.output
|
||
|
assert "Python version:" in result.output
|
||
|
|
||
|
|
||
|
def test_disabled_pipeline_exits(
|
||
|
disabled_config_file: str, capture_logger: pytest.LogCaptureFixture
|
||
|
) -> None:
|
||
|
"""Test that disabled pipelines cause program exit."""
|
||
|
runner = CliRunner()
|
||
|
result = runner.invoke(actions, ["run", "-c", disabled_config_file])
|
||
|
|
||
|
assert result.exit_code == 1
|
||
|
assert any(
|
||
|
"Skipping pipeline disabled_pipeline as it is not enabled" in record.message
|
||
|
for record in capture_logger.records
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_invalid_config_single(invalid_config_file: str) -> None:
|
||
|
"""Test handling of single config with unbound variables."""
|
||
|
runner = CliRunner()
|
||
|
result = runner.invoke(actions, ["run", "-c", invalid_config_file])
|
||
|
assert result.exit_code != 0
|
||
|
assert "Unbound variable(s) provided in config YAML" in str(result.exception)
|
||
|
|
||
|
|
||
|
def test_all_configs_invalid_or_disabled(
|
||
|
invalid_config_file: str,
|
||
|
disabled_config_file: str,
|
||
|
) -> None:
|
||
|
"""Test that program exits when all configs are invalid or disabled."""
|
||
|
runner = CliRunner()
|
||
|
result = runner.invoke(
|
||
|
actions, ["run", "-c", invalid_config_file, "-c", disabled_config_file]
|
||
|
)
|
||
|
assert result.exit_code == 1
|
||
|
assert (
|
||
|
"Failed to load action configuration. Unbound variable(s) provided in config YAML."
|
||
|
in str(result.exception)
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_all_configs_multiple_disabled(
|
||
|
disabled_config_file: str,
|
||
|
capture_logger: pytest.LogCaptureFixture,
|
||
|
) -> None:
|
||
|
"""Test that program exits when all configs are invalid or disabled."""
|
||
|
runner = CliRunner()
|
||
|
result = runner.invoke(
|
||
|
actions, ["run", "-c", disabled_config_file, "-c", disabled_config_file]
|
||
|
)
|
||
|
assert result.exit_code == 1
|
||
|
assert any(
|
||
|
"No valid pipelines were started from 2 config(s). Check that at least one pipeline is enabled and all required environment variables are set."
|
||
|
in record.message
|
||
|
for record in capture_logger.records
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_mixed_valid_and_invalid_configs(
|
||
|
temp_config_file: str,
|
||
|
disabled_config_file: str,
|
||
|
capture_logger: pytest.LogCaptureFixture,
|
||
|
monkeypatch: pytest.MonkeyPatch,
|
||
|
mock_pipeline: Mock,
|
||
|
mock_pipeline_manager: Mock,
|
||
|
) -> None:
|
||
|
"""Test handling mix of valid and invalid configs."""
|
||
|
|
||
|
def mock_create_pipeline(config: dict) -> Pipeline:
|
||
|
"""Mock implementation of pipeline creation."""
|
||
|
return mock_pipeline
|
||
|
|
||
|
sleep_count = 0
|
||
|
|
||
|
def mock_sleep(seconds: int) -> None:
|
||
|
"""Mock sleep that raises KeyboardInterrupt after first call."""
|
||
|
nonlocal sleep_count
|
||
|
sleep_count += 1
|
||
|
if sleep_count > 1: # Allow one sleep to ensure logs are captured
|
||
|
raise KeyboardInterrupt()
|
||
|
|
||
|
runner = CliRunner()
|
||
|
|
||
|
# Use local_monkeypatch for tighter control
|
||
|
|
||
|
with local_monkeypatch(
|
||
|
monkeypatch,
|
||
|
"datahub_actions.pipeline.pipeline.Pipeline.create",
|
||
|
mock_create_pipeline,
|
||
|
), local_monkeypatch(monkeypatch, "time.sleep", mock_sleep), local_monkeypatch(
|
||
|
monkeypatch,
|
||
|
"datahub_actions.cli.actions.pipeline_manager",
|
||
|
mock_pipeline_manager,
|
||
|
):
|
||
|
result = runner.invoke(
|
||
|
actions, ["run", "-c", temp_config_file, "-c", disabled_config_file]
|
||
|
)
|
||
|
|
||
|
assert result.exit_code == 1
|
||
|
assert any(
|
||
|
"Skipping pipeline disabled_pipeline as it is not enabled" in record.message
|
||
|
for record in capture_logger.records
|
||
|
)
|
||
|
assert any(
|
||
|
"Action Pipeline with name 'test_pipeline' is now running." in record.message
|
||
|
for record in capture_logger.records
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_debug_mode_with_valid_config(
|
||
|
temp_config_file: str,
|
||
|
capture_logger: pytest.LogCaptureFixture,
|
||
|
monkeypatch: pytest.MonkeyPatch,
|
||
|
mock_pipeline: Mock,
|
||
|
mock_pipeline_manager: Mock,
|
||
|
) -> None:
|
||
|
"""Test debug mode with valid pipeline config."""
|
||
|
|
||
|
def mock_create_pipeline(config: dict) -> Pipeline:
|
||
|
"""Mock implementation of pipeline creation."""
|
||
|
return mock_pipeline
|
||
|
|
||
|
sleep_count = 0
|
||
|
|
||
|
def mock_sleep(seconds: int) -> None:
|
||
|
"""Mock sleep that raises KeyboardInterrupt after first call."""
|
||
|
nonlocal sleep_count
|
||
|
sleep_count += 1
|
||
|
if sleep_count > 1: # Allow one sleep to ensure logs are captured
|
||
|
raise KeyboardInterrupt()
|
||
|
|
||
|
runner = CliRunner()
|
||
|
|
||
|
# Use local_monkeypatch for tighter control
|
||
|
with local_monkeypatch(
|
||
|
monkeypatch,
|
||
|
"datahub_actions.pipeline.pipeline.Pipeline.create",
|
||
|
mock_create_pipeline,
|
||
|
), local_monkeypatch(monkeypatch, "time.sleep", mock_sleep), local_monkeypatch(
|
||
|
monkeypatch,
|
||
|
"datahub_actions.cli.actions.pipeline_manager",
|
||
|
mock_pipeline_manager,
|
||
|
):
|
||
|
result = runner.invoke(
|
||
|
actions,
|
||
|
["run", "-c", temp_config_file, "--debug"],
|
||
|
)
|
||
|
|
||
|
assert result.exit_code == 1
|
||
|
assert any(
|
||
|
"Action Pipeline with name 'test_pipeline' is now running." in record.message
|
||
|
for record in capture_logger.records
|
||
|
)
|
||
|
|
||
|
|
||
|
def test_pipeline_config_to_pipeline_error() -> None:
|
||
|
"""Test error handling in pipeline creation."""
|
||
|
invalid_config = {
|
||
|
"name": "invalid_pipeline",
|
||
|
# Missing required fields
|
||
|
}
|
||
|
|
||
|
with pytest.raises(Exception) as exc_info:
|
||
|
pipeline_config_to_pipeline(invalid_config)
|
||
|
assert "Failed to instantiate Actions Pipeline" in str(exc_info.value)
|
||
|
|
||
|
|
||
|
def test_multiple_disabled_configs(
|
||
|
disabled_config_file: str,
|
||
|
capture_logger: pytest.LogCaptureFixture,
|
||
|
) -> None:
|
||
|
"""Test handling of multiple disabled configs."""
|
||
|
runner = CliRunner()
|
||
|
result = runner.invoke(
|
||
|
actions, ["run", "-c", disabled_config_file, "-c", disabled_config_file]
|
||
|
)
|
||
|
assert result.exit_code == 1
|
||
|
assert any(
|
||
|
"No valid pipelines were started from 2 config(s). Check that at least one pipeline is enabled and all required environment variables are set."
|
||
|
in record.message
|
||
|
for record in capture_logger.records
|
||
|
)
|
||
|
assert any(
|
||
|
"Skipping pipeline disabled_pipeline as it is not enabled" in record.message
|
||
|
for record in capture_logger.records
|
||
|
)
|
||
|
|
||
|
|
||
|
# Type checking with mypy annotations
|
||
|
def test_type_annotations() -> None:
|
||
|
"""Verify type hints are correct (this is a compile-time check)."""
|
||
|
# These assignments will be checked by mypy
|
||
|
runner: CliRunner = CliRunner()
|
||
|
config_dict: dict = {
|
||
|
"name": "test_pipeline",
|
||
|
"datahub": {"server": "https://test.datahub.io", "token": "test-token"},
|
||
|
"source": {"type": "datahub-cloud"},
|
||
|
"action": {"type": "hello_world"},
|
||
|
}
|
||
|
|
||
|
# These assertions serve as runtime checks
|
||
|
assert isinstance(runner, CliRunner)
|
||
|
assert isinstance(config_dict, dict)
|