OpenMetadata/docker/validate_compose.py
2024-05-10 06:44:27 +02:00

74 lines
1.9 KiB
Python

import time
from pprint import pprint
from typing import Tuple
import requests
from metadata.utils.logger import log_ansi_encoded_string
from requests.auth import HTTPBasicAuth
HEADER_JSON = {"Content-Type": "application/json"}
BASIC_AUTH = HTTPBasicAuth("admin", "admin")
REQUESTS_TIMEOUT = 60 * 5
def get_last_run_info() -> Tuple[str, str]:
"""
Make sure we can pick up the latest run info
"""
max_retries = 15
retries = 0
while retries < max_retries:
log_ansi_encoded_string(message="Waiting for DAG Run data...")
time.sleep(5)
res = requests.get(
"http://localhost:8080/api/v1/dags/sample_data/dagRuns", auth=BASIC_AUTH, timeout=REQUESTS_TIMEOUT
)
res.raise_for_status()
runs = res.json()
dag_runs = runs.get("dag_runs")
if dag_runs[0].get("dag_run_id"):
return dag_runs[0].get("dag_run_id"), "success"
retries += 1
return None, None
def print_last_run_logs() -> None:
"""
Show the logs
"""
logs = requests.get(
"http://localhost:8080/api/v1/openmetadata/last_dag_logs?dag_id=sample_data&task_id=ingest_using_recipe",
auth=BASIC_AUTH,
timeout=REQUESTS_TIMEOUT
).text
pprint(logs)
def main():
max_retries = 15
retries = 0
while retries < max_retries:
dag_run_id, state = get_last_run_info()
if state == "success":
print(f"DAG run: [{dag_run_id}, {state}]")
print_last_run_logs()
break
else:
print(
"Waiting for sample data ingestion to be a success. We'll show some logs along the way.",
)
print(f"DAG run: [{dag_run_id}, {state}]")
print_last_run_logs()
time.sleep(10)
retries += 1
if retries == max_retries:
raise Exception("Max retries exceeded. Sample data ingestion was not successful.")
if __name__ == "__main__":
main()