2024-07-16 02:42:18 +05:30
|
|
|
import logging
|
|
|
|
import time
|
|
|
|
from base64 import b64encode
|
|
|
|
|
|
|
|
import pytest
|
|
|
|
import requests
|
|
|
|
from freezegun import freeze_time
|
|
|
|
|
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
2025-05-19 08:39:53 +02:00
|
|
|
from datahub.testing import mce_helpers
|
|
|
|
from tests.test_helpers import fs_helpers
|
2024-07-16 02:42:18 +05:30
|
|
|
from tests.test_helpers.docker_helpers import cleanup_image, wait_for_port
|
|
|
|
|
|
|
|
pytestmark = pytest.mark.integration_batch_2
|
|
|
|
|
|
|
|
FROZEN_TIME = "2024-07-12 12:00:00"
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class GrafanaClient:
|
|
|
|
def __init__(self, url, admin_user, admin_password):
|
|
|
|
self.url = url
|
|
|
|
self.auth = (admin_user, admin_password)
|
|
|
|
self.headers = {
|
|
|
|
"Authorization": f"Basic {b64encode(f'{admin_user}:{admin_password}'.encode()).decode()}",
|
|
|
|
"Content-Type": "application/json",
|
|
|
|
}
|
|
|
|
|
|
|
|
def create_service_account(self, name, role):
|
|
|
|
service_account_payload = {"name": name, "role": role, "isDisabled": False}
|
|
|
|
try:
|
|
|
|
response = requests.post(
|
|
|
|
f"{self.url}/api/serviceaccounts",
|
|
|
|
headers=self.headers,
|
|
|
|
json=service_account_payload,
|
|
|
|
)
|
|
|
|
response.raise_for_status()
|
|
|
|
service_account = response.json()
|
|
|
|
return service_account
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
logging.error(f"Error creating service account: {e}")
|
|
|
|
return None
|
|
|
|
|
|
|
|
def create_api_key(self, service_account_id, key_name, role):
|
|
|
|
api_key_payload = {"name": key_name, "role": role}
|
|
|
|
try:
|
|
|
|
response = requests.post(
|
|
|
|
f"{self.url}/api/serviceaccounts/{service_account_id}/tokens",
|
|
|
|
headers=self.headers,
|
|
|
|
json=api_key_payload,
|
|
|
|
)
|
|
|
|
response.raise_for_status()
|
|
|
|
api_key = response.json()
|
|
|
|
return api_key["key"]
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
|
logging.error(f"Error creating API key: {e}")
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def test_resources_dir(pytestconfig):
|
|
|
|
return pytestconfig.rootpath / "tests/integration/grafana"
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def test_api_key():
|
|
|
|
# Example usage:
|
|
|
|
url = "http://localhost:3000"
|
|
|
|
admin_user = "admin"
|
|
|
|
admin_password = "admin"
|
|
|
|
|
|
|
|
grafana_client = GrafanaClient(url, admin_user, admin_password)
|
|
|
|
|
|
|
|
# Step 1: Create the service account
|
|
|
|
service_account = grafana_client.create_service_account(
|
|
|
|
name="example-service-account", role="Viewer"
|
|
|
|
)
|
|
|
|
if service_account:
|
|
|
|
print(f"Service Account Created: {service_account}")
|
|
|
|
|
|
|
|
# Step 2: Create the API key for the service account
|
|
|
|
api_key = grafana_client.create_api_key(
|
|
|
|
service_account_id=service_account["id"],
|
|
|
|
key_name="example-api-key",
|
|
|
|
role="Admin",
|
|
|
|
)
|
|
|
|
if api_key:
|
|
|
|
print("Service Account API Key:", api_key)
|
|
|
|
return api_key
|
|
|
|
else:
|
|
|
|
print("Failed to create API key for the service account")
|
|
|
|
else:
|
|
|
|
print("Failed to create service account")
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def loaded_grafana(docker_compose_runner, test_resources_dir):
|
|
|
|
with docker_compose_runner(
|
|
|
|
test_resources_dir / "docker-compose.yml", "grafana"
|
|
|
|
) as docker_services:
|
|
|
|
wait_for_port(
|
|
|
|
docker_services,
|
|
|
|
container_name="grafana",
|
|
|
|
container_port=3000,
|
|
|
|
timeout=300,
|
|
|
|
)
|
|
|
|
yield docker_services
|
|
|
|
|
|
|
|
# The Grafana image can be large, so we remove it after the test.
|
|
|
|
cleanup_image("grafana/grafana")
|
|
|
|
|
|
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
|
|
def test_grafana_dashboard(loaded_grafana, pytestconfig, tmp_path, test_resources_dir):
|
|
|
|
# Wait for Grafana to be up and running
|
|
|
|
url = "http://localhost:3000/api/health"
|
|
|
|
for i in range(30):
|
|
|
|
logging.info("waiting for Grafana to start...")
|
|
|
|
time.sleep(5)
|
|
|
|
resp = requests.get(url)
|
|
|
|
if resp.status_code == 200:
|
2025-01-18 15:06:20 +05:30
|
|
|
logging.info(f"Grafana started after waiting {i * 5} seconds")
|
2024-07-16 02:42:18 +05:30
|
|
|
break
|
|
|
|
else:
|
|
|
|
pytest.fail("Grafana did not start in time")
|
|
|
|
|
|
|
|
# Check if the default dashboard is loaded
|
|
|
|
dashboard_url = "http://localhost:3000/api/dashboards/uid/default"
|
|
|
|
resp = requests.get(dashboard_url, auth=("admin", "admin"))
|
|
|
|
assert resp.status_code == 200, "Failed to load default dashboard"
|
|
|
|
dashboard = resp.json()
|
|
|
|
|
2025-01-18 15:06:20 +05:30
|
|
|
assert dashboard["dashboard"]["title"] == "Default Dashboard", (
|
|
|
|
"Default dashboard title mismatch"
|
|
|
|
)
|
|
|
|
assert any(panel["type"] == "text" for panel in dashboard["dashboard"]["panels"]), (
|
|
|
|
"Default dashboard missing text panel"
|
|
|
|
)
|
2024-07-16 02:42:18 +05:30
|
|
|
|
|
|
|
# Verify the output. (You can add further checks here if needed)
|
|
|
|
logging.info("Default dashboard verified successfully")
|
|
|
|
|
|
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
|
|
def test_grafana_ingest(
|
|
|
|
loaded_grafana, pytestconfig, tmp_path, test_resources_dir, test_api_key
|
|
|
|
):
|
|
|
|
# Wait for Grafana to be up and running
|
|
|
|
url = "http://localhost:3000/api/health"
|
|
|
|
for i in range(30):
|
|
|
|
logging.info("waiting for Grafana to start...")
|
|
|
|
time.sleep(5)
|
|
|
|
resp = requests.get(url)
|
|
|
|
if resp.status_code == 200:
|
2025-01-18 15:06:20 +05:30
|
|
|
logging.info(f"Grafana started after waiting {i * 5} seconds")
|
2024-07-16 02:42:18 +05:30
|
|
|
break
|
|
|
|
else:
|
|
|
|
pytest.fail("Grafana did not start in time")
|
|
|
|
|
|
|
|
# Run the metadata ingestion pipeline.
|
|
|
|
with fs_helpers.isolated_filesystem(tmp_path):
|
|
|
|
# Run grafana ingestion run.
|
|
|
|
pipeline = Pipeline.create(
|
|
|
|
{
|
|
|
|
"run_id": "grafana-test-simple",
|
|
|
|
"source": {
|
|
|
|
"type": "grafana",
|
|
|
|
"config": {
|
|
|
|
"url": "http://localhost:3000",
|
|
|
|
"service_account_token": test_api_key,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
"sink": {
|
|
|
|
"type": "file",
|
|
|
|
"config": {"filename": "./grafana_mcps.json"},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
)
|
|
|
|
pipeline.run()
|
|
|
|
pipeline.raise_from_status()
|
|
|
|
|
|
|
|
# Verify the output.
|
|
|
|
mce_helpers.check_golden_file(
|
|
|
|
pytestconfig,
|
|
|
|
output_path="grafana_mcps.json",
|
|
|
|
golden_path=test_resources_dir / "grafana_mcps_golden.json",
|
|
|
|
ignore_paths=[
|
|
|
|
r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['last_event_time'\]",
|
|
|
|
],
|
|
|
|
)
|