Lineage Backend example and fix (#9095)

This commit is contained in:
Pere Miquel Brull 2022-12-01 13:09:36 +01:00 committed by GitHub
parent 2f9f169de9
commit 7d2eab332d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 126 additions and 4 deletions

View File

@ -271,17 +271,17 @@ def add_status(
# Check if we already have a pipelineStatus for
# our execution_date that we should update
pipeline_status: List[PipelineStatus] = metadata.get_by_id(
pipeline_status: PipelineStatus = metadata.get_by_id(
entity=Pipeline, entity_id=pipeline.id, fields=["pipelineStatus"]
).pipelineStatus
task_status = []
# We will append based on the current registered status
if pipeline_status and pipeline_status[0].executionDate.__root__ == execution_date:
if pipeline_status and pipeline_status.timestamp.__root__ == execution_date:
# If we are clearing a task, use the status of the new execution
task_status = [
task
for task in pipeline_status[0].taskStatus
for task in pipeline_status.taskStatus
if task.name != task_instance.task_id
]

View File

@ -29,9 +29,12 @@ The Lineage Backend can be directly installed to the Airflow instances as part o
distribution:
```commandline
pip3 install "openmetadata-ingestion[airflow-container]"
pip3 install "openmetadata-ingestion==x.y.z"
```
Where `x.y.z` is the version of your OpenMetadata server, e.g., 0.13.0. It is important that server and client
versions match.
### Adding Lineage Config
After the installation, we need to update the Airflow configuration. This can be done following this example on
@ -200,3 +203,122 @@ is supported as well through external metadata ingestion from Airflow, be it via
an extraction DAG from Airflow itself.
</Note>
## Example
This is a full example of a working DAG. Note how we are passing the inlets and outlets for the `fullyQualifiedName`s
- `mysql.default.openmetadata_db.bot_entity`
- `snow.TEST.PUBLIC.COUNTRIES`
We are pointing at already ingested assets, so there is no limitation of them being part of the same service. For
this example to work on your end, update the FQNs to tables you already have in OpenMetadata.
```python
from datetime import datetime, timedelta
from textwrap import dedent
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
from airflow_provider_openmetadata.lineage.callback import success_callback, failure_callback
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': failure_callback,
'on_success_callback': success_callback,
}
def explode():
raise Exception("I am an angry exception!")
with DAG(
'lineage_tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = BashOperator(
task_id='print_date',
bash_command='date',
outlets={
"tables": ["mysql.default.openmetadata_db.bot_entity"]
}
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
inlets={
"tables": ["snow.TEST.PUBLIC.COUNTRIES"]
}
)
risen = PythonOperator(
task_id='explode',
provide_context=True,
python_callable=explode,
retries=0,
)
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG
dag.doc_md = """
This is a documentation placed anywhere
""" # otherwise, type it like this
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
params={'my_param': 'Parameter I passed in'},
)
t1 >> [t2, t3]
```
Running the lineage backend will not only ingest the lineage data, but will also send the DAG as a pipeline with its tasks
and status to OpenMetadata.
If you are running this example using the quickstart deployment of OpenMetadata, then your `airflow.cfg` could look like
this:
```
backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend
airflow_service_name = local_airflow
openmetadata_api_endpoint = http://localhost:8585/api
auth_provider_type = openmetadata
jwt_token = eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg
```
After running the DAG, you should be able to see the following information in the ingested Pipeline:
<Image src="/images/openmetadata/connectors/airflow/lineage-backend-dag.png" alt="DAG" caption="DAG ingested as a Pipeline with the Task view."/>
<Image src="/images/openmetadata/connectors/airflow/lineage-backend-lineage.png" alt="Lineage" caption="Pipeline Lineage."/>
A fast way to try and play with Airflow locally is to install `apache-airflow` in a virtual environment and, when using
versions greater than 2.2.x, using `airflow standalone`.

Binary file not shown.

After

Width:  |  Height:  |  Size: 19 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 55 KiB