mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-30 18:17:53 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			74 lines
		
	
	
		
			1.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			74 lines
		
	
	
		
			1.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import time
 | |
| from pprint import pprint
 | |
| from typing import Tuple
 | |
| 
 | |
| import requests
 | |
| from metadata.utils.logger import log_ansi_encoded_string
 | |
| from requests.auth import HTTPBasicAuth
 | |
| 
 | |
| 
 | |
| HEADER_JSON = {"Content-Type": "application/json"}
 | |
| BASIC_AUTH = HTTPBasicAuth("admin", "admin")
 | |
| REQUESTS_TIMEOUT = 60 * 5
 | |
| 
 | |
| 
 | |
| def get_last_run_info() -> Tuple[str, str]:
 | |
|     """
 | |
|     Make sure we can pick up the latest run info
 | |
|     """
 | |
|     max_retries = 15
 | |
|     retries = 0
 | |
|     
 | |
|     while retries < max_retries:
 | |
|         log_ansi_encoded_string(message="Waiting for DAG Run data...")
 | |
|         time.sleep(5)
 | |
|         res = requests.get(
 | |
|             "http://localhost:8080/api/v1/dags/sample_data/dagRuns", auth=BASIC_AUTH, timeout=REQUESTS_TIMEOUT
 | |
|         )
 | |
|         res.raise_for_status()
 | |
|         runs = res.json()
 | |
|         dag_runs = runs.get("dag_runs")
 | |
|         if dag_runs[0].get("dag_run_id"):
 | |
|             return dag_runs[0].get("dag_run_id"), "success"
 | |
|         retries += 1
 | |
|     return None, None
 | |
| 
 | |
| 
 | |
| def print_last_run_logs() -> None:
 | |
|     """
 | |
|     Show the logs
 | |
|     """
 | |
|     logs = requests.get(
 | |
|         "http://localhost:8080/api/v1/openmetadata/last_dag_logs?dag_id=sample_data&task_id=ingest_using_recipe",
 | |
|         auth=BASIC_AUTH,
 | |
|         timeout=REQUESTS_TIMEOUT
 | |
|     ).text
 | |
|     pprint(logs)
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     max_retries = 15
 | |
|     retries = 0
 | |
| 
 | |
|     while retries < max_retries:
 | |
|         dag_run_id, state = get_last_run_info()
 | |
|         if state == "success":
 | |
|             print(f"DAG run: [{dag_run_id}, {state}]")
 | |
|             print_last_run_logs()
 | |
|             break
 | |
|         else:
 | |
|             print(
 | |
|                 "Waiting for sample data ingestion to be a success. We'll show some logs along the way.",
 | |
|             )
 | |
|             print(f"DAG run: [{dag_run_id}, {state}]")
 | |
|             print_last_run_logs()
 | |
|             time.sleep(10)
 | |
|             retries += 1
 | |
| 
 | |
|     if retries == max_retries:
 | |
|         raise Exception("Max retries exceeded. Sample data ingestion was not successful.")
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 | 
