diff --git a/metadata-ingestion/README.md b/metadata-ingestion/README.md index 802a34a28b..bc49793780 100644 --- a/metadata-ingestion/README.md +++ b/metadata-ingestion/README.md @@ -2,6 +2,7 @@ This module hosts an extensible Python-based metadata ingestion system for DataHub. This supports sending data to DataHub using Kafka or through the REST api. +It can be used through our CLI tool or as a library e.g. with an orchestrator like Airflow. ## Architecture @@ -62,6 +63,10 @@ source docker/docker_run.sh examples/recipes/file_to_file.yml ``` --> +We have also included a couple [sample DAGs](./examples/airflow) that can be used with [Airflow](https://airflow.apache.org/). +- `generic_recipe_sample_dag.py` - a simple Airflow DAG that picks up a DataHub ingestion recipe configuration and runs it. +- `mysql_sample_dag.py` - an Airflow DAG that runs a MySQL metadata ingestion pipeline using an inlined configuration. + # Recipes A recipe is a configuration file that tells our ingestion scripts where to pull data from (source) and where to put it (sink). diff --git a/metadata-ingestion/examples/airflow/generic_recipe_sample_dag.py b/metadata-ingestion/examples/airflow/generic_recipe_sample_dag.py new file mode 100644 index 0000000000..7bb37d15ae --- /dev/null +++ b/metadata-ingestion/examples/airflow/generic_recipe_sample_dag.py @@ -0,0 +1,50 @@ +"""Generic DataHub Ingest via Recipe + +This example demonstrates how to load any configuration file and run a +DataHub ingestion pipeline within an Airflow DAG. +""" + +from datetime import timedelta + +import yaml + +from airflow import DAG +from airflow.utils.dates import days_ago +from airflow.operators.python import PythonOperator + +from datahub.ingestion.run.pipeline import Pipeline + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "email": ["jdoe@example.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), + "execution_timeout": timedelta(minutes=120), +} + + +def datahub_recipe(): + with open("path/to/recipe.yml") as config_file: + config = yaml.safe_load(config_file) + + pipeline = Pipeline.create(config) + pipeline.run() + + +with DAG( + "datahub_ingest_using_recipe", + default_args=default_args, + description="An example DAG which runs a DataHub ingestion recipe", + schedule_interval=timedelta(days=1), + start_date=days_ago(2), + tags=["datahub-ingest"], + catchup=False, +) as dag: + ingest_task = PythonOperator( + task_id="ingest_using_recipe", + python_callable=datahub_recipe, + ) diff --git a/metadata-ingestion/examples/airflow/mysql_sample_dag.py b/metadata-ingestion/examples/airflow/mysql_sample_dag.py new file mode 100644 index 0000000000..98f7eeca34 --- /dev/null +++ b/metadata-ingestion/examples/airflow/mysql_sample_dag.py @@ -0,0 +1,62 @@ +"""MySQL DataHub Ingest DAG + +This example demonstrates how to ingest metadata from MySQL into DataHub +from within an Airflow DAG. Note that the DB connection configuration is +embedded within the code. +""" + +from datetime import timedelta + +from airflow import DAG +from airflow.utils.dates import days_ago +from airflow.operators.python import PythonOperator + +from datahub.ingestion.run.pipeline import Pipeline + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "email": ["jdoe@example.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), + "execution_timeout": timedelta(minutes=120), +} + + +def ingest_from_mysql(): + pipeline = Pipeline.create( + { + "source": { + "type": "mysql", + "config": { + "username": "user", + "password": "pass", + "database": "db_name", + "host_port": "localhost:3306", + }, + }, + "sink": { + "type": "datahub-rest", + "config": {"server": "http://localhost:8080"}, + }, + } + ) + pipeline.run() + + +with DAG( + "datahub_mysql_ingest", + default_args=default_args, + description="An example DAG which ingests metadata from MySQL to DataHub", + schedule_interval=timedelta(days=1), + start_date=days_ago(2), + tags=["datahub-ingest"], + catchup=False, +) as dag: + ingest_task = PythonOperator( + task_id="ingest_from_mysql", + python_callable=ingest_from_mysql, + )