mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-31 02:29:03 +00:00 
			
		
		
		
	 231b28fc87
			
		
	
	
		231b28fc87
		
			
		
	
	
	
	
		
			
			* Add Test Connection to Profiler * remove Code Lint * Fix AttributeError * Fix Pytest * Fix Bigquery Partition tests * Fix Lint
		
			
				
	
	
		
			58 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			58 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from typing import Tuple
 | |
| 
 | |
| from pprint import pprint
 | |
| import requests
 | |
| from requests.auth import HTTPBasicAuth
 | |
| import time
 | |
| from metadata.utils.ansi import print_ansi_encoded_string
 | |
| 
 | |
| HEADER_JSON = {"Content-Type": "application/json"}
 | |
| BASIC_AUTH = HTTPBasicAuth("admin", "admin")
 | |
| 
 | |
| 
 | |
| def get_last_run_info() -> Tuple[str, str]:
 | |
|     """
 | |
|     Make sure we can pick up the latest run info
 | |
|     """
 | |
|     dag_runs = None
 | |
|     while not dag_runs:
 | |
|         print_ansi_encoded_string(message="Waiting for DAG Run data...")
 | |
|         time.sleep(5)
 | |
|         runs = requests.get(
 | |
|             "http://localhost:8080/api/v1/dags/sample_data/dagRuns", auth=BASIC_AUTH
 | |
|         ).json()
 | |
|         dag_runs = runs.get("dag_runs")
 | |
| 
 | |
|     return dag_runs[0].get("dag_run_id"), dag_runs[0].get("state")
 | |
| 
 | |
| 
 | |
| def print_last_run_logs() -> None:
 | |
|     """
 | |
|     Show the logs
 | |
|     """
 | |
|     logs = requests.get(
 | |
|         "http://localhost:8080/api/v1/openmetadata/last_dag_logs?dag_id=sample_data",
 | |
|         auth=BASIC_AUTH,
 | |
|     ).text
 | |
|     pprint(logs)
 | |
| 
 | |
| 
 | |
| def main():
 | |
| 
 | |
|     state = None
 | |
|     while state != "success":
 | |
| 
 | |
|         print_ansi_encoded_string(
 | |
|             message="Waiting for sample data ingestion to be a success. We'll show some logs along the way."
 | |
|         )
 | |
| 
 | |
|         dag_run_id, state = get_last_run_info()
 | |
|         print_ansi_encoded_string(message=f"DAG run: [{dag_run_id}, {state}]")
 | |
| 
 | |
|         print_last_run_logs()
 | |
|         time.sleep(10)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 |