mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-04 12:36:23 +00:00 
			
		
		
		
	
		
			
	
	
		
			91 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
		
		
			
		
	
	
			91 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
| 
								 | 
							
								---
							 | 
						||
| 
								 | 
							
								title: Run Elasticsearch Reindex using Airflow SDK
							 | 
						||
| 
								 | 
							
								slug: /how-to-guides/data-insights/elasticsearch-reindex
							 | 
						||
| 
								 | 
							
								---
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# Run Elasticsearch Reindex using Airflow SDK
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								## 1. Define the YAML Config
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								This is a sample config for Elasticsearch Reindex:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```yaml
							 | 
						||
| 
								 | 
							
								source:
							 | 
						||
| 
								 | 
							
								source:
							 | 
						||
| 
								 | 
							
								  type: metadata_elasticsearch
							 | 
						||
| 
								 | 
							
								  serviceName: openMetadata
							 | 
						||
| 
								 | 
							
								  serviceConnection:
							 | 
						||
| 
								 | 
							
								    config:
							 | 
						||
| 
								 | 
							
								      type: MetadataES
							 | 
						||
| 
								 | 
							
								  sourceConfig:
							 | 
						||
| 
								 | 
							
								    config: {}
							 | 
						||
| 
								 | 
							
								sink:
							 | 
						||
| 
								 | 
							
								  type: elasticsearch
							 | 
						||
| 
								 | 
							
								  config:
							 | 
						||
| 
								 | 
							
								    es_host: localhost
							 | 
						||
| 
								 | 
							
								    es_port: 9200
							 | 
						||
| 
								 | 
							
								    recreate_indexes: true
							 | 
						||
| 
								 | 
							
								workflowConfig:
							 | 
						||
| 
								 | 
							
								  openMetadataServerConfig:
							 | 
						||
| 
								 | 
							
								    hostPort: http://localhost:8585/api
							 | 
						||
| 
								 | 
							
								    authProvider: openmetadata
							 | 
						||
| 
								 | 
							
								    securityConfig:
							 | 
						||
| 
								 | 
							
								      jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
							 | 
						||
| 
								 | 
							
								```
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								### 2. Prepare the Ingestion DAG
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								Create a Python file in your Airflow DAGs directory with the following contents:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								```python
							 | 
						||
| 
								 | 
							
								import pathlib
							 | 
						||
| 
								 | 
							
								import yaml
							 | 
						||
| 
								 | 
							
								from datetime import timedelta
							 | 
						||
| 
								 | 
							
								from airflow import DAG
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								try:
							 | 
						||
| 
								 | 
							
								    from airflow.operators.python import PythonOperator
							 | 
						||
| 
								 | 
							
								except ModuleNotFoundError:
							 | 
						||
| 
								 | 
							
								    from airflow.operators.python_operator import PythonOperator
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from metadata.config.common import load_config_file
							 | 
						||
| 
								 | 
							
								from metadata.workflow.metadata import MetadataWorkflow
							 | 
						||
| 
								 | 
							
								from metadata.workflow.workflow_output_handler import print_status
							 | 
						||
| 
								 | 
							
								from airflow.utils.dates import days_ago
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								default_args = {
							 | 
						||
| 
								 | 
							
								    "owner": "user_name",
							 | 
						||
| 
								 | 
							
								    "email": ["username@org.com"],
							 | 
						||
| 
								 | 
							
								    "email_on_failure": False,
							 | 
						||
| 
								 | 
							
								    "retries": 3,
							 | 
						||
| 
								 | 
							
								    "retry_delay": timedelta(minutes=5),
							 | 
						||
| 
								 | 
							
								    "execution_timeout": timedelta(minutes=60)
							 | 
						||
| 
								 | 
							
								}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								config = """
							 | 
						||
| 
								 | 
							
								<your YAML configuration>
							 | 
						||
| 
								 | 
							
								"""
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def metadata_ingestion_workflow():
							 | 
						||
| 
								 | 
							
								    workflow_config = yaml.safe_load(config)
							 | 
						||
| 
								 | 
							
								    workflow = MetadataWorkflow.create(workflow_config)
							 | 
						||
| 
								 | 
							
								    workflow.execute()
							 | 
						||
| 
								 | 
							
								    workflow.raise_from_status()
							 | 
						||
| 
								 | 
							
								    print_status(workflow)
							 | 
						||
| 
								 | 
							
								    workflow.stop()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								with DAG(
							 | 
						||
| 
								 | 
							
								    "sample_data",
							 | 
						||
| 
								 | 
							
								    default_args=default_args,
							 | 
						||
| 
								 | 
							
								    description="An example DAG which runs a OpenMetadata ingestion workflow",
							 | 
						||
| 
								 | 
							
								    start_date=days_ago(1),
							 | 
						||
| 
								 | 
							
								    is_paused_upon_creation=False,
							 | 
						||
| 
								 | 
							
								    schedule_interval='*/5 * * * *',
							 | 
						||
| 
								 | 
							
								    catchup=False,
							 | 
						||
| 
								 | 
							
								) as dag:
							 | 
						||
| 
								 | 
							
								    ingest_task = PythonOperator(
							 | 
						||
| 
								 | 
							
								        task_id="ingest_using_recipe",
							 | 
						||
| 
								 | 
							
								        python_callable=metadata_ingestion_workflow,
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								```
							 |