2021-02-17 18:01:22 -08:00
|
|
|
"""Generic DataHub Ingest via Recipe
|
|
|
|
|
|
|
|
This example demonstrates how to load any configuration file and run a
|
|
|
|
DataHub ingestion pipeline within an Airflow DAG.
|
|
|
|
"""
|
|
|
|
|
|
|
|
from datetime import timedelta
|
|
|
|
|
|
|
|
import yaml
|
|
|
|
from airflow import DAG
|
2021-04-12 17:40:15 -07:00
|
|
|
|
|
|
|
try:
|
|
|
|
from airflow.operators.python import PythonOperator
|
|
|
|
except ImportError:
|
|
|
|
from airflow.operators.python_operator import PythonOperator
|
2021-04-05 19:11:28 -07:00
|
|
|
from airflow.utils.dates import days_ago
|
2021-02-17 18:01:22 -08:00
|
|
|
|
|
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
|
|
|
|
|
|
|
default_args = {
|
|
|
|
"owner": "airflow",
|
|
|
|
"depends_on_past": False,
|
|
|
|
"email": ["jdoe@example.com"],
|
|
|
|
"email_on_failure": False,
|
|
|
|
"email_on_retry": False,
|
|
|
|
"retries": 1,
|
|
|
|
"retry_delay": timedelta(minutes=5),
|
|
|
|
"execution_timeout": timedelta(minutes=120),
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def datahub_recipe():
|
|
|
|
with open("path/to/recipe.yml") as config_file:
|
|
|
|
config = yaml.safe_load(config_file)
|
|
|
|
|
|
|
|
pipeline = Pipeline.create(config)
|
|
|
|
pipeline.run()
|
2021-02-18 11:15:13 -08:00
|
|
|
pipeline.raise_from_status()
|
2021-02-17 18:01:22 -08:00
|
|
|
|
|
|
|
|
|
|
|
with DAG(
|
|
|
|
"datahub_ingest_using_recipe",
|
|
|
|
default_args=default_args,
|
|
|
|
description="An example DAG which runs a DataHub ingestion recipe",
|
|
|
|
schedule_interval=timedelta(days=1),
|
|
|
|
start_date=days_ago(2),
|
|
|
|
catchup=False,
|
|
|
|
) as dag:
|
|
|
|
ingest_task = PythonOperator(
|
|
|
|
task_id="ingest_using_recipe",
|
|
|
|
python_callable=datahub_recipe,
|
|
|
|
)
|