2025-07-24 17:05:09 +01:00

247 lines
8.9 KiB
Python

import logging
import time
from base64 import b64encode
import pytest
import pytest_docker.plugin
import requests
from freezegun import freeze_time
from datahub.ingestion.run.pipeline import Pipeline
from datahub.testing import mce_helpers
from tests.test_helpers import fs_helpers
from tests.test_helpers.docker_helpers import cleanup_image
pytestmark = pytest.mark.integration_batch_2
FROZEN_TIME = "2024-07-12 12:00:00"
logger = logging.getLogger(__name__)
class GrafanaClient:
def __init__(self, url, admin_user, admin_password):
self.url = url
self.auth = (admin_user, admin_password)
self.headers = {
"Authorization": f"Basic {b64encode(f'{admin_user}:{admin_password}'.encode()).decode()}",
"Content-Type": "application/json",
}
def create_service_account(self, name, role):
service_account_payload = {"name": name, "role": role, "isDisabled": False}
try:
response = requests.post(
f"{self.url}/api/serviceaccounts",
headers=self.headers,
json=service_account_payload,
)
response.raise_for_status()
service_account = response.json()
return service_account
except requests.exceptions.RequestException as e:
logging.error(f"Error creating service account: {e}")
return None
def create_api_key(self, service_account_id, key_name, role):
api_key_payload = {"name": key_name, "role": role}
try:
response = requests.post(
f"{self.url}/api/serviceaccounts/{service_account_id}/tokens",
headers=self.headers,
json=api_key_payload,
)
response.raise_for_status()
api_key = response.json()
return api_key["key"]
except requests.exceptions.RequestException as e:
logging.error(f"Error creating API key: {e}")
return None
@pytest.fixture(scope="module")
def test_resources_dir(pytestconfig):
return pytestconfig.rootpath / "tests/integration/grafana"
@pytest.fixture(scope="module")
def test_api_key(loaded_grafana):
# Get the actual mapped port from Docker services
grafana_port = loaded_grafana.port_for("grafana", 3000)
url = f"http://localhost:{grafana_port}"
admin_user = "admin"
admin_password = "admin"
grafana_client = GrafanaClient(url, admin_user, admin_password)
service_account = grafana_client.create_service_account(
name="example-service-account", role="Admin"
)
if service_account:
api_key = grafana_client.create_api_key(
service_account_id=service_account["id"],
key_name="example-api-key",
role="Admin",
)
if api_key:
return api_key
else:
pytest.fail("Failed to create API key for the service account")
else:
pytest.fail("Failed to create service account")
@pytest.fixture(scope="module")
def loaded_grafana(docker_compose_runner, test_resources_dir):
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "grafana"
) as docker_services:
# Docker Compose now waits for health check to pass before considering service ready
# Verify we can access the API endpoints as an additional safety check
verify_grafana_api_ready(docker_services)
yield docker_services
cleanup_image("grafana/grafana")
def verify_grafana_api_ready(docker_services: pytest_docker.plugin.Services) -> None:
"""Robust verification that Grafana API is fully accessible after health check passes"""
import requests
grafana_port = docker_services.port_for("grafana", 3000)
base_url = f"http://localhost:{grafana_port}"
# Wait for API endpoints to be fully ready (health check might pass but API still initializing)
max_attempts = 30
for attempt in range(max_attempts):
try:
# Test both basic API access and service account creation capability
api_url = f"{base_url}/api/search"
resp = requests.get(api_url, auth=("admin", "admin"), timeout=10)
if resp.status_code == 200:
# Also verify service account API is ready (needed for test_api_key fixture)
# Service accounts might not be available in all Grafana versions
sa_url = f"{base_url}/api/serviceaccounts"
sa_resp = requests.get(sa_url, auth=("admin", "admin"), timeout=10)
if sa_resp.status_code == 200:
logging.info(
f"Grafana API endpoints fully ready with service accounts (attempt {attempt + 1})"
)
return
elif sa_resp.status_code == 404:
# Service accounts API not available - this is okay for older Grafana versions
logging.info(
f"Grafana API ready, service accounts not available (attempt {attempt + 1})"
)
return
else:
logging.debug(
f"Service account API not ready yet: {sa_resp.status_code}"
)
else:
logging.debug(f"Basic API not ready yet: {resp.status_code}")
except Exception as e:
logging.debug(f"API readiness check failed (attempt {attempt + 1}): {e}")
if attempt < max_attempts - 1:
time.sleep(2)
logging.warning(f"Grafana API may not be fully ready after {max_attempts} attempts")
# Don't fail here - let the test proceed and provide better error info if needed
@freeze_time(FROZEN_TIME)
def test_grafana_basic_ingest(
loaded_grafana, pytestconfig, tmp_path, test_resources_dir, test_api_key
):
"""Test ingestion with lineage enabled"""
with fs_helpers.isolated_filesystem(tmp_path):
pipeline = Pipeline.create(
{
"run_id": "grafana-test",
"source": {
"type": "grafana",
"config": {
"url": "http://localhost:3000",
"service_account_token": test_api_key,
"ingest_tags": False,
"ingest_owners": False,
},
},
"sink": {
"type": "file",
"config": {"filename": "./grafana_basic_mcps.json"},
},
}
)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path="grafana_basic_mcps.json",
golden_path=test_resources_dir / "grafana_basic_mcps_golden.json",
ignore_paths=[
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['lastModified'\]",
],
)
@freeze_time(FROZEN_TIME)
def test_grafana_ingest(
loaded_grafana, pytestconfig, tmp_path, test_resources_dir, test_api_key
):
"""Test ingestion with lineage enabled"""
with fs_helpers.isolated_filesystem(tmp_path):
pipeline = Pipeline.create(
{
"run_id": "grafana-test",
"source": {
"type": "grafana",
"config": {
"url": "http://localhost:3000",
"service_account_token": test_api_key,
"ingest_tags": True,
"ingest_owners": True,
"connection_to_platform_map": {
"test-postgres": {
"platform": "postgres",
"database": "grafana",
"platform_instance": "local",
"env": "PROD",
},
"test-prometheus": {
"platform": "prometheus",
"platform_instance": "local",
"env": "PROD",
},
},
"platform_instance": "local-grafana",
"env": "PROD",
},
},
"sink": {
"type": "file",
"config": {"filename": "./grafana_mcps.json"},
},
}
)
pipeline.run()
pipeline.raise_from_status()
mce_helpers.check_golden_file(
pytestconfig,
output_path="grafana_mcps.json",
golden_path=test_resources_dir / "grafana_mcps_golden.json",
ignore_paths=[
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]",
r"root\[\d+\]\['aspect'\]\['json'\]\['lastModified'\]",
],
)