docs(ingest): add example DAGs for Airflow (#2116)

This commit is contained in:
Harshal Sheth 2021-02-17 18:01:22 -08:00 committed by GitHub
parent ea86ade29b
commit 02ffa6fd54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 117 additions and 0 deletions

View File

@ -2,6 +2,7 @@
This module hosts an extensible Python-based metadata ingestion system for DataHub.
This supports sending data to DataHub using Kafka or through the REST api.
It can be used through our CLI tool or as a library e.g. with an orchestrator like Airflow.
## Architecture
@ -62,6 +63,10 @@ source docker/docker_run.sh examples/recipes/file_to_file.yml
```
-->
We have also included a couple [sample DAGs](./examples/airflow) that can be used with [Airflow](https://airflow.apache.org/).
- `generic_recipe_sample_dag.py` - a simple Airflow DAG that picks up a DataHub ingestion recipe configuration and runs it.
- `mysql_sample_dag.py` - an Airflow DAG that runs a MySQL metadata ingestion pipeline using an inlined configuration.
# Recipes
A recipe is a configuration file that tells our ingestion scripts where to pull data from (source) and where to put it (sink).

View File

@ -0,0 +1,50 @@
"""Generic DataHub Ingest via Recipe
This example demonstrates how to load any configuration file and run a
DataHub ingestion pipeline within an Airflow DAG.
"""
from datetime import timedelta
import yaml
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from datahub.ingestion.run.pipeline import Pipeline
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["jdoe@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=120),
}
def datahub_recipe():
with open("path/to/recipe.yml") as config_file:
config = yaml.safe_load(config_file)
pipeline = Pipeline.create(config)
pipeline.run()
with DAG(
"datahub_ingest_using_recipe",
default_args=default_args,
description="An example DAG which runs a DataHub ingestion recipe",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["datahub-ingest"],
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_using_recipe",
python_callable=datahub_recipe,
)

View File

@ -0,0 +1,62 @@
"""MySQL DataHub Ingest DAG
This example demonstrates how to ingest metadata from MySQL into DataHub
from within an Airflow DAG. Note that the DB connection configuration is
embedded within the code.
"""
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from datahub.ingestion.run.pipeline import Pipeline
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["jdoe@example.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=120),
}
def ingest_from_mysql():
pipeline = Pipeline.create(
{
"source": {
"type": "mysql",
"config": {
"username": "user",
"password": "pass",
"database": "db_name",
"host_port": "localhost:3306",
},
},
"sink": {
"type": "datahub-rest",
"config": {"server": "http://localhost:8080"},
},
}
)
pipeline.run()
with DAG(
"datahub_mysql_ingest",
default_args=default_args,
description="An example DAG which ingests metadata from MySQL to DataHub",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["datahub-ingest"],
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_from_mysql",
python_callable=ingest_from_mysql,
)