datahub/metadata-ingestion/examples/airflow/lineage_backend_demo.py
Gabe Lyons 851e00ba9f
feat(lineage): implement support for datasets, charts and dashboards downstream lineage fetching in a generic way (#2397)
Co-authored-by: Dexter Lee <dexter@acryl.io>
Co-authored-by: Brian <brianwebtek@gmail.com>
Co-authored-by: John Joyce <john@acryl.io>
Co-authored-by: Harshal Sheth <hsheth2@gmail.com>
2021-04-23 00:18:39 -07:00

48 lines
1.3 KiB
Python

"""Lineage Backend
An example DAG demonstrating the usage of DataHub's Airflow lineage backend.
"""
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
try:
from airflow.operators.bash import BashOperator
except ImportError:
from airflow.operators.bash_operator import BashOperator
from datahub.integrations.airflow.entities import Dataset
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["jdoe@example.com"],
"email_on_failure": False,
"execution_timeout": timedelta(minutes=5),
}
with DAG(
"datahub_lineage_backend_demo",
default_args=default_args,
description="An example DAG demonstrating the usage of DataHub's Airflow lineage backend.",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
tags=["example_tag"],
catchup=False,
) as dag:
task1 = BashOperator(
task_id="run_data_task",
dag=dag,
bash_command="echo 'This is where you might run your data tooling.'",
inlets={
"datasets": [
Dataset("snowflake", "mydb.schema.tableA"),
Dataset("snowflake", "mydb.schema.tableB"),
],
},
outlets={"datasets": [Dataset("snowflake", "mydb.schema.tableC")]},
)