| 
									
										
										
										
											2023-07-17 12:50:11 +05:30
										 |  |  | import time | 
					
						
							| 
									
										
										
										
											2022-07-18 14:44:50 +02:00
										 |  |  | from pprint import pprint | 
					
						
							| 
									
										
										
										
											2023-07-17 12:50:11 +05:30
										 |  |  | from typing import Tuple | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-07-18 14:44:50 +02:00
										 |  |  | import requests | 
					
						
							| 
									
										
										
										
											2023-07-17 12:50:11 +05:30
										 |  |  | from metadata.utils.logger import log_ansi_encoded_string | 
					
						
							|  |  |  | from metadata.cli.docker import REQUESTS_TIMEOUT | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  | from requests.auth import HTTPBasicAuth | 
					
						
							| 
									
										
										
										
											2023-07-17 12:50:11 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-12-10 21:46:28 +05:30
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2022-07-18 14:44:50 +02:00
										 |  |  | HEADER_JSON = {"Content-Type": "application/json"} | 
					
						
							| 
									
										
										
										
											2022-07-28 14:46:25 +02:00
										 |  |  | BASIC_AUTH = HTTPBasicAuth("admin", "admin") | 
					
						
							| 
									
										
										
										
											2022-07-18 14:44:50 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def get_last_run_info() -> Tuple[str, str]: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Make sure we can pick up the latest run info | 
					
						
							|  |  |  |     """
 | 
					
						
							| 
									
										
										
										
											2024-01-11 14:23:33 +05:30
										 |  |  |     max_retries = 15 | 
					
						
							|  |  |  |     retries = 0 | 
					
						
							|  |  |  |      | 
					
						
							|  |  |  |     while retries < max_retries: | 
					
						
							| 
									
										
										
										
											2023-07-17 12:50:11 +05:30
										 |  |  |         log_ansi_encoded_string(message="Waiting for DAG Run data...") | 
					
						
							| 
									
										
										
										
											2022-07-18 14:44:50 +02:00
										 |  |  |         time.sleep(5) | 
					
						
							| 
									
										
										
										
											2024-02-22 11:46:19 +01:00
										 |  |  |         res = requests.get( | 
					
						
							| 
									
										
										
										
											2023-07-17 12:50:11 +05:30
										 |  |  |             "http://localhost:8080/api/v1/dags/sample_data/dagRuns", auth=BASIC_AUTH, timeout=REQUESTS_TIMEOUT | 
					
						
							| 
									
										
										
										
											2024-02-22 11:46:19 +01:00
										 |  |  |         ) | 
					
						
							|  |  |  |         res.raise_for_status() | 
					
						
							|  |  |  |         runs = res.json() | 
					
						
							| 
									
										
										
										
											2022-07-18 14:44:50 +02:00
										 |  |  |         dag_runs = runs.get("dag_runs") | 
					
						
							| 
									
										
										
										
											2024-01-11 14:23:33 +05:30
										 |  |  |         if dag_runs[0].get("dag_run_id"): | 
					
						
							|  |  |  |             return dag_runs[0].get("dag_run_id"), "success" | 
					
						
							|  |  |  |         retries += 1 | 
					
						
							|  |  |  |     return None, None | 
					
						
							| 
									
										
										
										
											2022-07-18 14:44:50 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def print_last_run_logs() -> None: | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     Show the logs | 
					
						
							|  |  |  |     """
 | 
					
						
							|  |  |  |     logs = requests.get( | 
					
						
							| 
									
										
										
										
											2024-04-11 14:30:40 +02:00
										 |  |  |         "http://localhost:8080/api/v1/openmetadata/last_dag_logs?dag_id=sample_data&task_id=ingest_using_recipe", | 
					
						
							| 
									
										
										
										
											2022-12-10 21:46:28 +05:30
										 |  |  |         auth=BASIC_AUTH, | 
					
						
							| 
									
										
										
										
											2023-07-17 12:50:11 +05:30
										 |  |  |         timeout=REQUESTS_TIMEOUT | 
					
						
							| 
									
										
										
										
											2022-07-18 14:44:50 +02:00
										 |  |  |     ).text | 
					
						
							|  |  |  |     pprint(logs) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def main(): | 
					
						
							| 
									
										
										
										
											2024-01-11 14:23:33 +05:30
										 |  |  |     max_retries = 15 | 
					
						
							|  |  |  |     retries = 0 | 
					
						
							| 
									
										
										
										
											2022-07-18 14:44:50 +02:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-11 14:23:33 +05:30
										 |  |  |     while retries < max_retries: | 
					
						
							| 
									
										
										
										
											2022-07-18 14:44:50 +02:00
										 |  |  |         dag_run_id, state = get_last_run_info() | 
					
						
							| 
									
										
										
										
											2024-01-11 14:23:33 +05:30
										 |  |  |         if state == "success": | 
					
						
							| 
									
										
										
										
											2024-04-11 14:30:40 +02:00
										 |  |  |             print(f"DAG run: [{dag_run_id}, {state}]") | 
					
						
							| 
									
										
										
										
											2024-01-11 14:23:33 +05:30
										 |  |  |             print_last_run_logs() | 
					
						
							|  |  |  |             break | 
					
						
							|  |  |  |         else: | 
					
						
							| 
									
										
										
										
											2024-04-11 14:30:40 +02:00
										 |  |  |             print( | 
					
						
							|  |  |  |                 "Waiting for sample data ingestion to be a success. We'll show some logs along the way.", | 
					
						
							| 
									
										
										
										
											2024-01-11 14:23:33 +05:30
										 |  |  |             ) | 
					
						
							| 
									
										
										
										
											2024-04-11 14:30:40 +02:00
										 |  |  |             print(f"DAG run: [{dag_run_id}, {state}]") | 
					
						
							| 
									
										
										
										
											2024-01-11 14:23:33 +05:30
										 |  |  |             print_last_run_logs() | 
					
						
							|  |  |  |             time.sleep(10) | 
					
						
							|  |  |  |             retries += 1 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     if retries == max_retries: | 
					
						
							|  |  |  |         raise Exception("Max retries exceeded. Sample data ingestion was not successful.") | 
					
						
							| 
									
										
										
										
											2022-07-18 14:44:50 +02:00
										 |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     main() |