mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-30 18:17:53 +00:00 
			
		
		
		
	
		
			
	
	
		
			80 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
		
		
			
		
	
	
			80 lines
		
	
	
		
			2.2 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
|   | # Ingest Metadata in Production
 | ||
|  | 
 | ||
|  | Use this procedure, if you already have a production Airflow instance on which you would like to schedule OpenMetadata ingestion workflows. | ||
|  | 
 | ||
|  | ### 1. Create a configuration file for your connector
 | ||
|  | 
 | ||
|  | See the [connector documentation](connectors/) for instructions on how to create a configuration file for the service you would like to integrate with OpenMetadata. | ||
|  | 
 | ||
|  | ### 2. Edit a Python script to define your ingestion DAG
 | ||
|  | 
 | ||
|  | Copy and paste the code below into a file called `openmetadata-airflow.py`.  | ||
|  | 
 | ||
|  | ```python | ||
|  | import json | ||
|  | from datetime import timedelta | ||
|  | 
 | ||
|  | from airflow import DAG | ||
|  | 
 | ||
|  | try: | ||
|  |     from airflow.operators.python import PythonOperator | ||
|  | except ModuleNotFoundError: | ||
|  |     from airflow.operators.python_operator import PythonOperator | ||
|  | 
 | ||
|  | from airflow.utils.dates import days_ago | ||
|  | 
 | ||
|  | from metadata.ingestion.api.workflow import Workflow | ||
|  | 
 | ||
|  | default_args = { | ||
|  |     "owner": "user_name", | ||
|  |     "email": ["username@org.com"], | ||
|  |     "email_on_failure": False, | ||
|  |     "retries": 3, | ||
|  |     "retry_delay": timedelta(seconds=10), | ||
|  |     "execution_timeout": timedelta(minutes=60), | ||
|  | } | ||
|  | 
 | ||
|  | config = """ | ||
|  |   ## REPLACE THIS LINE WITH YOUR CONFIGURATION JSON | ||
|  | """ | ||
|  | 
 | ||
|  | def metadata_ingestion_workflow(): | ||
|  |     workflow_config = json.loads(config) | ||
|  |     workflow = Workflow.create(workflow_config) | ||
|  |     workflow.execute() | ||
|  |     workflow.raise_from_status() | ||
|  |     workflow.print_status() | ||
|  |     workflow.stop() | ||
|  | 
 | ||
|  | with DAG( | ||
|  |     "sample_data", | ||
|  |     default_args=default_args, | ||
|  |     description="An example DAG which runs a OpenMetadata ingestion workflow", | ||
|  |     start_date=days_ago(1), | ||
|  |     is_paused_upon_creation=False, | ||
|  |     catchup=False, | ||
|  | ) as dag: | ||
|  |     ingest_task = PythonOperator( | ||
|  |         task_id="ingest_using_recipe", | ||
|  |         python_callable=metadata_ingestion_workflow, | ||
|  |     ) | ||
|  | ``` | ||
|  | 
 | ||
|  | ### 3. Copy your configuration JSON into the ingestion script
 | ||
|  | 
 | ||
|  | In step 1 above you created a JSON file with the configuration for your ingestion connector. Copy that JSON into the `openmetadata-airflow.py` file that you created in step 2 as directed by the comment below. | ||
|  | 
 | ||
|  | ``` | ||
|  | config = """ | ||
|  |   ## REPLACE THIS LINE WITH YOUR CONFIGURATION JSON | ||
|  | """ | ||
|  | ``` | ||
|  | 
 | ||
|  | ### 14. Run the script to create your ingestion DAG
 | ||
|  | 
 | ||
|  | Run the following command to create your ingestion DAG in Airflow. | ||
|  | 
 | ||
|  | ``` | ||
|  | python openmetadata-airflow.py | ||
|  | ``` |