mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-31 10:39:30 +00:00 
			
		
		
		
	
		
			
				
	
	
	
		
			3.1 KiB
		
	
	
	
	
	
	
	
			
		
		
	
	
			3.1 KiB
		
	
	
	
	
	
	
	
| title | slug | 
|---|---|
| Run Data Insights using Airflow SDK | /how-to-guides/data-insights/airflow-sdk | 
Run Data Insights using Airflow SDK
1. Define the YAML Config
This is a sample config for Data Insights:
source:
  type: dataInsight
  serviceName: OpenMetadata
  sourceConfig:
    config:
      type: MetadataToElasticSearch
processor:
  type: data-insight-processor
  config: {}
sink:
  type: elasticsearch
  config:
    es_host: localhost
    es_port: 9200
    recreate_indexes: false
workflowConfig:
  loggerLevel: DEBUG
  openMetadataServerConfig:
    hostPort: '<OpenMetadata host and port>'
    authProvider: openmetadata
    securityConfig:
      jwtToken: '{bot_jwt_token}'
Source Configuration - Source Config
- To send the metadata to OpenMetadata, it needs to be specified as type: MetadataToElasticSearch.
Processor Configuration
- To send the metadata to OpenMetadata, it needs to be specified as type: data-insight-processor.
Workflow Configuration
The main property here is the openMetadataServerConfig, where you can define the host and security provider of your OpenMetadata installation.
For a simple, local installation using our docker containers, this looks like:
workflowConfig:
  openMetadataServerConfig:
    hostPort: 'http://localhost:8585/api'
    authProvider: openmetadata
    securityConfig:
      jwtToken: '{bot_jwt_token}'
We support different security providers. You can find their definitions here. You can find the different implementation of the ingestion below.
2. Prepare the Data Insights DAG
Create a Python file in your Airflow DAGs directory with the following contents:
import pathlib
import yaml
from datetime import timedelta
from airflow import DAG
from metadata.workflow.data_insight import DataInsightWorkflow
from metadata.workflow.workflow_output_handler import print_status
try:
    from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
    from airflow.operators.python_operator import PythonOperator
from metadata.config.common import load_config_file
from airflow.utils.dates import days_ago
default_args = {
    "owner": "user_name",
    "email": ["username@org.com"],
    "email_on_failure": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "execution_timeout": timedelta(minutes=60)
}
config = """
<your YAML configuration>
"""
def metadata_ingestion_workflow():
    workflow_config = yaml.safe_load(config)
    workflow = DataInsightWorkflow.create(workflow_config)
    workflow.execute()
    workflow.raise_from_status()
    print_status(workflow)
    workflow.stop()
with DAG(
    "sample_data",
    default_args=default_args,
    description="An example DAG which runs a OpenMetadata ingestion workflow",
    start_date=days_ago(1),
    is_paused_upon_creation=False,
    schedule_interval='*/5 * * * *',
    catchup=False,
) as dag:
    ingest_task = PythonOperator(
        task_id="ingest_using_recipe",
        python_callable=metadata_ingestion_workflow,
    )
