diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py index 464e61487c7..8ee5127c988 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py @@ -271,17 +271,17 @@ def add_status( # Check if we already have a pipelineStatus for # our execution_date that we should update - pipeline_status: List[PipelineStatus] = metadata.get_by_id( + pipeline_status: PipelineStatus = metadata.get_by_id( entity=Pipeline, entity_id=pipeline.id, fields=["pipelineStatus"] ).pipelineStatus task_status = [] # We will append based on the current registered status - if pipeline_status and pipeline_status[0].executionDate.__root__ == execution_date: + if pipeline_status and pipeline_status.timestamp.__root__ == execution_date: # If we are clearing a task, use the status of the new execution task_status = [ task - for task in pipeline_status[0].taskStatus + for task in pipeline_status.taskStatus if task.name != task_instance.task_id ] diff --git a/openmetadata-docs/content/connectors/pipeline/airflow/lineage-backend.md b/openmetadata-docs/content/connectors/pipeline/airflow/lineage-backend.md index e5ac6c9de56..6cba8b3c6aa 100644 --- a/openmetadata-docs/content/connectors/pipeline/airflow/lineage-backend.md +++ b/openmetadata-docs/content/connectors/pipeline/airflow/lineage-backend.md @@ -29,9 +29,12 @@ The Lineage Backend can be directly installed to the Airflow instances as part o distribution: ```commandline -pip3 install "openmetadata-ingestion[airflow-container]" +pip3 install "openmetadata-ingestion==x.y.z" ``` +Where `x.y.z` is the version of your OpenMetadata server, e.g., 0.13.0. It is important that server and client +versions match. + ### Adding Lineage Config After the installation, we need to update the Airflow configuration. This can be done following this example on @@ -200,3 +203,122 @@ is supported as well through external metadata ingestion from Airflow, be it via an extraction DAG from Airflow itself. + +## Example + +This is a full example of a working DAG. Note how we are passing the inlets and outlets for the `fullyQualifiedName`s +- `mysql.default.openmetadata_db.bot_entity` +- `snow.TEST.PUBLIC.COUNTRIES` + +We are pointing at already ingested assets, so there is no limitation of them being part of the same service. For +this example to work on your end, update the FQNs to tables you already have in OpenMetadata. + +```python +from datetime import datetime, timedelta +from textwrap import dedent + +# The DAG object; we'll need this to instantiate a DAG +from airflow import DAG + +# Operators; we need this to operate! +from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator + +# These args will get passed on to each operator +# You can override them on a per-task basis during operator initialization +from airflow_provider_openmetadata.lineage.callback import success_callback, failure_callback + + +default_args = { + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + 'on_failure_callback': failure_callback, + 'on_success_callback': success_callback, +} + + +def explode(): + raise Exception("I am an angry exception!") + +with DAG( + 'lineage_tutorial', + default_args=default_args, + description='A simple tutorial DAG', + schedule_interval=timedelta(days=1), + start_date=datetime(2021, 1, 1), + catchup=False, + tags=['example'], +) as dag: + + # t1, t2 and t3 are examples of tasks created by instantiating operators + t1 = BashOperator( + task_id='print_date', + bash_command='date', + outlets={ + "tables": ["mysql.default.openmetadata_db.bot_entity"] + } + ) + + t2 = BashOperator( + task_id='sleep', + depends_on_past=False, + bash_command='sleep 5', + retries=3, + inlets={ + "tables": ["snow.TEST.PUBLIC.COUNTRIES"] + } + ) + + risen = PythonOperator( + task_id='explode', + provide_context=True, + python_callable=explode, + retries=0, + ) + + dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG + dag.doc_md = """ + This is a documentation placed anywhere + """ # otherwise, type it like this + templated_command = dedent( + """ + {% for i in range(5) %} + echo "{{ ds }}" + echo "{{ macros.ds_add(ds, 7)}}" + echo "{{ params.my_param }}" + {% endfor %} + """ + ) + + t3 = BashOperator( + task_id='templated', + depends_on_past=False, + bash_command=templated_command, + params={'my_param': 'Parameter I passed in'}, + ) + + t1 >> [t2, t3] +``` + +Running the lineage backend will not only ingest the lineage data, but will also send the DAG as a pipeline with its tasks +and status to OpenMetadata. + +If you are running this example using the quickstart deployment of OpenMetadata, then your `airflow.cfg` could look like +this: + +``` +backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend +airflow_service_name = local_airflow +openmetadata_api_endpoint = http://localhost:8585/api +auth_provider_type = openmetadata +jwt_token = eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg +``` + +After running the DAG, you should be able to see the following information in the ingested Pipeline: + +DAG + +Lineage + +A fast way to try and play with Airflow locally is to install `apache-airflow` in a virtual environment and, when using +versions greater than 2.2.x, using `airflow standalone`. diff --git a/openmetadata-docs/images/openmetadata/connectors/airflow/lineage-backend-dag.png b/openmetadata-docs/images/openmetadata/connectors/airflow/lineage-backend-dag.png new file mode 100644 index 00000000000..ebaaf7fea3e Binary files /dev/null and b/openmetadata-docs/images/openmetadata/connectors/airflow/lineage-backend-dag.png differ diff --git a/openmetadata-docs/images/openmetadata/connectors/airflow/lineage-backend-lineage.png b/openmetadata-docs/images/openmetadata/connectors/airflow/lineage-backend-lineage.png new file mode 100644 index 00000000000..875c51a708e Binary files /dev/null and b/openmetadata-docs/images/openmetadata/connectors/airflow/lineage-backend-lineage.png differ