2023-07-17 12:50:11 +05:30
|
|
|
import time
|
2022-07-18 14:44:50 +02:00
|
|
|
from pprint import pprint
|
2023-07-17 12:50:11 +05:30
|
|
|
from typing import Tuple
|
|
|
|
|
2022-07-18 14:44:50 +02:00
|
|
|
import requests
|
2023-07-17 12:50:11 +05:30
|
|
|
from metadata.utils.logger import log_ansi_encoded_string
|
|
|
|
from metadata.cli.docker import REQUESTS_TIMEOUT
|
2022-07-28 14:46:25 +02:00
|
|
|
from requests.auth import HTTPBasicAuth
|
2023-07-17 12:50:11 +05:30
|
|
|
|
2022-12-10 21:46:28 +05:30
|
|
|
|
2022-07-18 14:44:50 +02:00
|
|
|
HEADER_JSON = {"Content-Type": "application/json"}
|
2022-07-28 14:46:25 +02:00
|
|
|
BASIC_AUTH = HTTPBasicAuth("admin", "admin")
|
2022-07-18 14:44:50 +02:00
|
|
|
|
|
|
|
|
|
|
|
def get_last_run_info() -> Tuple[str, str]:
|
|
|
|
"""
|
|
|
|
Make sure we can pick up the latest run info
|
|
|
|
"""
|
2024-01-11 14:23:33 +05:30
|
|
|
max_retries = 15
|
|
|
|
retries = 0
|
|
|
|
|
|
|
|
while retries < max_retries:
|
2023-07-17 12:50:11 +05:30
|
|
|
log_ansi_encoded_string(message="Waiting for DAG Run data...")
|
2022-07-18 14:44:50 +02:00
|
|
|
time.sleep(5)
|
2024-02-22 11:46:19 +01:00
|
|
|
res = requests.get(
|
2023-07-17 12:50:11 +05:30
|
|
|
"http://localhost:8080/api/v1/dags/sample_data/dagRuns", auth=BASIC_AUTH, timeout=REQUESTS_TIMEOUT
|
2024-02-22 11:46:19 +01:00
|
|
|
)
|
|
|
|
res.raise_for_status()
|
|
|
|
runs = res.json()
|
2022-07-18 14:44:50 +02:00
|
|
|
dag_runs = runs.get("dag_runs")
|
2024-01-11 14:23:33 +05:30
|
|
|
if dag_runs[0].get("dag_run_id"):
|
|
|
|
return dag_runs[0].get("dag_run_id"), "success"
|
|
|
|
retries += 1
|
|
|
|
return None, None
|
2022-07-18 14:44:50 +02:00
|
|
|
|
|
|
|
|
|
|
|
def print_last_run_logs() -> None:
|
|
|
|
"""
|
|
|
|
Show the logs
|
|
|
|
"""
|
|
|
|
logs = requests.get(
|
2024-04-11 14:30:40 +02:00
|
|
|
"http://localhost:8080/api/v1/openmetadata/last_dag_logs?dag_id=sample_data&task_id=ingest_using_recipe",
|
2022-12-10 21:46:28 +05:30
|
|
|
auth=BASIC_AUTH,
|
2023-07-17 12:50:11 +05:30
|
|
|
timeout=REQUESTS_TIMEOUT
|
2022-07-18 14:44:50 +02:00
|
|
|
).text
|
|
|
|
pprint(logs)
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
2024-01-11 14:23:33 +05:30
|
|
|
max_retries = 15
|
|
|
|
retries = 0
|
2022-07-18 14:44:50 +02:00
|
|
|
|
2024-01-11 14:23:33 +05:30
|
|
|
while retries < max_retries:
|
2022-07-18 14:44:50 +02:00
|
|
|
dag_run_id, state = get_last_run_info()
|
2024-01-11 14:23:33 +05:30
|
|
|
if state == "success":
|
2024-04-11 14:30:40 +02:00
|
|
|
print(f"DAG run: [{dag_run_id}, {state}]")
|
2024-01-11 14:23:33 +05:30
|
|
|
print_last_run_logs()
|
|
|
|
break
|
|
|
|
else:
|
2024-04-11 14:30:40 +02:00
|
|
|
print(
|
|
|
|
"Waiting for sample data ingestion to be a success. We'll show some logs along the way.",
|
2024-01-11 14:23:33 +05:30
|
|
|
)
|
2024-04-11 14:30:40 +02:00
|
|
|
print(f"DAG run: [{dag_run_id}, {state}]")
|
2024-01-11 14:23:33 +05:30
|
|
|
print_last_run_logs()
|
|
|
|
time.sleep(10)
|
|
|
|
retries += 1
|
|
|
|
|
|
|
|
if retries == max_retries:
|
|
|
|
raise Exception("Max retries exceeded. Sample data ingestion was not successful.")
|
2022-07-18 14:44:50 +02:00
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
main()
|