mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-04 12:36:23 +00:00 
			
		
		
		
	* Add Test Connection to Profiler * remove Code Lint * Fix AttributeError * Fix Pytest * Fix Bigquery Partition tests * Fix Lint
		
			
				
	
	
		
			58 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			58 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from typing import Tuple
 | 
						|
 | 
						|
from pprint import pprint
 | 
						|
import requests
 | 
						|
from requests.auth import HTTPBasicAuth
 | 
						|
import time
 | 
						|
from metadata.utils.ansi import print_ansi_encoded_string
 | 
						|
 | 
						|
HEADER_JSON = {"Content-Type": "application/json"}
 | 
						|
BASIC_AUTH = HTTPBasicAuth("admin", "admin")
 | 
						|
 | 
						|
 | 
						|
def get_last_run_info() -> Tuple[str, str]:
 | 
						|
    """
 | 
						|
    Make sure we can pick up the latest run info
 | 
						|
    """
 | 
						|
    dag_runs = None
 | 
						|
    while not dag_runs:
 | 
						|
        print_ansi_encoded_string(message="Waiting for DAG Run data...")
 | 
						|
        time.sleep(5)
 | 
						|
        runs = requests.get(
 | 
						|
            "http://localhost:8080/api/v1/dags/sample_data/dagRuns", auth=BASIC_AUTH
 | 
						|
        ).json()
 | 
						|
        dag_runs = runs.get("dag_runs")
 | 
						|
 | 
						|
    return dag_runs[0].get("dag_run_id"), dag_runs[0].get("state")
 | 
						|
 | 
						|
 | 
						|
def print_last_run_logs() -> None:
 | 
						|
    """
 | 
						|
    Show the logs
 | 
						|
    """
 | 
						|
    logs = requests.get(
 | 
						|
        "http://localhost:8080/api/v1/openmetadata/last_dag_logs?dag_id=sample_data",
 | 
						|
        auth=BASIC_AUTH,
 | 
						|
    ).text
 | 
						|
    pprint(logs)
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
 | 
						|
    state = None
 | 
						|
    while state != "success":
 | 
						|
 | 
						|
        print_ansi_encoded_string(
 | 
						|
            message="Waiting for sample data ingestion to be a success. We'll show some logs along the way."
 | 
						|
        )
 | 
						|
 | 
						|
        dag_run_id, state = get_last_run_info()
 | 
						|
        print_ansi_encoded_string(message=f"DAG run: [{dag_run_id}, {state}]")
 | 
						|
 | 
						|
        print_last_run_logs()
 | 
						|
        time.sleep(10)
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    main()
 |