mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-27 08:44:49 +00:00
parent
231b28fc87
commit
0963eac48e
118
ingestion/examples/airflow/dags/airflow_lineage_operator.py
Normal file
118
ingestion/examples/airflow/dags/airflow_lineage_operator.py
Normal file
@ -0,0 +1,118 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
You can run this DAG from the default OM installation
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from textwrap import dedent
|
||||
|
||||
# The DAG object; we'll need this to instantiate a DAG
|
||||
from airflow import DAG
|
||||
|
||||
# Operators; we need this to operate!
|
||||
from airflow.operators.bash import BashOperator
|
||||
from airflow.operators.python import PythonOperator
|
||||
|
||||
# These args will get passed on to each operator
|
||||
# You can override them on a per-task basis during operator initialization
|
||||
from airflow_provider_openmetadata.lineage.operator import OpenMetadataLineageOperator
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
|
||||
OpenMetadataJWTClientConfig,
|
||||
)
|
||||
|
||||
default_args = {
|
||||
"retries": 1,
|
||||
"retry_delay": timedelta(minutes=5),
|
||||
}
|
||||
|
||||
|
||||
def explode():
|
||||
raise Exception("Oh no!")
|
||||
|
||||
|
||||
with DAG(
|
||||
"lineage_tutorial_operator",
|
||||
default_args=default_args,
|
||||
description="A simple tutorial DAG",
|
||||
schedule_interval=timedelta(days=1),
|
||||
start_date=datetime(2021, 1, 1),
|
||||
catchup=False,
|
||||
tags=["example"],
|
||||
) as dag:
|
||||
|
||||
# t1, t2 and t3 are examples of tasks created by instantiating operators
|
||||
t1 = BashOperator(
|
||||
task_id="print_date",
|
||||
bash_command="date",
|
||||
outlets={"tables": ["sample_data.ecommerce_db.shopify.dim_address"]},
|
||||
)
|
||||
|
||||
t2 = BashOperator(
|
||||
task_id="sleep",
|
||||
depends_on_past=False,
|
||||
bash_command="sleep 1",
|
||||
retries=3,
|
||||
inlets={"tables": ["sample_data.ecommerce_db.shopify.dim_customer"]},
|
||||
)
|
||||
|
||||
risen = PythonOperator(
|
||||
task_id="explode",
|
||||
provide_context=True,
|
||||
python_callable=explode,
|
||||
retries=0,
|
||||
)
|
||||
|
||||
dag.doc_md = (
|
||||
__doc__ # providing that you have a docstring at the beginning of the DAG
|
||||
)
|
||||
dag.doc_md = """
|
||||
This is a documentation placed anywhere
|
||||
""" # otherwise, type it like this
|
||||
templated_command = dedent(
|
||||
"""
|
||||
{% for i in range(5) %}
|
||||
echo "{{ ds }}"
|
||||
echo "{{ macros.ds_add(ds, 7)}}"
|
||||
echo "{{ params.my_param }}"
|
||||
{% endfor %}
|
||||
"""
|
||||
)
|
||||
|
||||
t3 = BashOperator(
|
||||
task_id="templated",
|
||||
depends_on_past=False,
|
||||
bash_command=templated_command,
|
||||
params={"my_param": "Parameter I passed in"},
|
||||
)
|
||||
|
||||
t1 >> [t2, t3]
|
||||
|
||||
server_config = OpenMetadataConnection(
|
||||
hostPort="http://localhost:8585/api",
|
||||
authProvider="openmetadata",
|
||||
securityConfig=OpenMetadataJWTClientConfig(
|
||||
jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
||||
),
|
||||
)
|
||||
|
||||
t4 = OpenMetadataLineageOperator(
|
||||
task_id="lineage_op",
|
||||
depends_on_past=False,
|
||||
server_config=server_config,
|
||||
service_name="airflow_lineage_op_service",
|
||||
only_keep_dag_lineage=True,
|
||||
)
|
||||
|
||||
t1 >> t4
|
||||
67
ingestion/src/airflow_provider_openmetadata/README.md
Normal file
67
ingestion/src/airflow_provider_openmetadata/README.md
Normal file
@ -0,0 +1,67 @@
|
||||
# OpenMetadata Airflow Provider
|
||||
|
||||
This package brings:
|
||||
- Lineage Backend
|
||||
- Lineage Operator
|
||||
- OpenMetadata Hook
|
||||
|
||||
Note that this is configured as an entrypoint in the `setup.py`:
|
||||
|
||||
```python
|
||||
entry_points={
|
||||
"apache_airflow_provider": [
|
||||
"provider_info = airflow_provider_openmetadata:get_provider_config"
|
||||
],
|
||||
},
|
||||
```
|
||||
|
||||
Therefore, any metadata changes that should be discoverable by Airflow need to be passed in `get_provider_config`.
|
||||
|
||||
More information about that on Airflow's [docs](https://airflow.apache.org/docs/apache-airflow-providers/index.html?utm_cta=website-events-featured-summit#creating-your-own-providers).
|
||||
|
||||
## How to use the OpenMetadataHook
|
||||
|
||||
In the Airflow UI you can create a new OpenMetadata connection.
|
||||
|
||||
Then, load it as follows:
|
||||
|
||||
```python
|
||||
from airflow_provider_openmetadata.hooks.openmetadata import OpenMetadataHook
|
||||
|
||||
openmetadata_hook = OpenMetadataHook(openmetadata_conn_id="om_id") # The ID you provided
|
||||
server_config = openmetadata_hook.get_conn()
|
||||
```
|
||||
|
||||
## How to use the OpenMetadataLineageOperator
|
||||
|
||||
```python
|
||||
from airflow_provider_openmetadata.lineage.operator import OpenMetadataLineageOperator
|
||||
|
||||
OpenMetadataLineageOperator(
|
||||
task_id='lineage_op',
|
||||
depends_on_past=False,
|
||||
server_config=server_config,
|
||||
service_name="your-airflow-service",
|
||||
only_keep_dag_lineage=True,
|
||||
)
|
||||
```
|
||||
|
||||
You can get the `server_config` variable using the `OpenMetadataHook` as shown above, or create it
|
||||
directly:
|
||||
|
||||
```python
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
|
||||
OpenMetadataJWTClientConfig,
|
||||
)
|
||||
|
||||
server_config = OpenMetadataConnection(
|
||||
hostPort="http://localhost:8585/api",
|
||||
authProvider="openmetadata",
|
||||
securityConfig=OpenMetadataJWTClientConfig(
|
||||
jwtToken="<token>"
|
||||
),
|
||||
)
|
||||
```
|
||||
@ -22,7 +22,16 @@ def get_provider_config():
|
||||
"""
|
||||
return {
|
||||
"name": "OpenMetadata",
|
||||
"description": "OpenMetadata <https://open-metadata.org/>",
|
||||
"description": "`OpenMetadata <https://open-metadata.org/>`__",
|
||||
"package-name": "openmetadata-ingestion",
|
||||
"version": "0.4.1",
|
||||
"connection-types": [
|
||||
{
|
||||
"connection-type": "openmetadata",
|
||||
"hook-class-name": "airflow_provider_openmetadata.hooks.openmetadata.OpenMetadataHook",
|
||||
}
|
||||
],
|
||||
"hook-class-names": [
|
||||
"airflow_provider_openmetadata.hooks.openmetadata.OpenMetadataHook",
|
||||
],
|
||||
}
|
||||
|
||||
@ -0,0 +1,99 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
This hook allows storing the connection to
|
||||
an OpenMetadata server and use it for your
|
||||
operators.
|
||||
"""
|
||||
from typing import Any
|
||||
|
||||
from airflow.hooks.base import BaseHook
|
||||
from airflow.models import Connection
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
AuthProvider,
|
||||
OpenMetadataConnection,
|
||||
VerifySSL,
|
||||
)
|
||||
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
|
||||
OpenMetadataJWTClientConfig,
|
||||
)
|
||||
from metadata.generated.schema.security.ssl.validateSSLClientConfig import (
|
||||
ValidateSSLClientConfig,
|
||||
)
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
|
||||
|
||||
class OpenMetadataHook(BaseHook):
|
||||
"""
|
||||
Airflow hook to store and use an `OpenMetadataConnection`
|
||||
"""
|
||||
|
||||
conn_name_attr: str = "openmetadata_conn_id"
|
||||
default_conn_name = "openmetadata_default"
|
||||
conn_type = "openmetadata"
|
||||
hook_name = "OpenMetadata"
|
||||
|
||||
def __init__(self, openmetadata_conn_id: str = default_conn_name) -> None:
|
||||
super().__init__()
|
||||
self.openmetadata_conn_id = openmetadata_conn_id
|
||||
# Add defaults
|
||||
self.default_schema = "http"
|
||||
self.default_port = 8585
|
||||
self.default_verify_ssl = VerifySSL.no_ssl
|
||||
self.default_ssl_config = None
|
||||
|
||||
def get_conn(self) -> OpenMetadataConnection:
|
||||
|
||||
conn: Connection = self.get_connection(self.openmetadata_conn_id)
|
||||
jwt_token = conn.get_password()
|
||||
if not jwt_token:
|
||||
raise ValueError("JWT Token should be informed.")
|
||||
|
||||
if not conn.host:
|
||||
raise ValueError("Host should be informed.")
|
||||
|
||||
port = conn.port if conn.port else self.default_port
|
||||
schema = conn.schema if conn.schema else self.default_schema
|
||||
|
||||
extra = conn.extra_dejson if conn.get_extra() else {}
|
||||
verify_ssl = extra.get("verifySSL") or self.default_verify_ssl
|
||||
ssl_config = (
|
||||
ValidateSSLClientConfig(certificatePath=extra["sslConfig"])
|
||||
if extra.get("sslConfig")
|
||||
else self.default_ssl_config
|
||||
)
|
||||
|
||||
om_conn = OpenMetadataConnection(
|
||||
hostPort=f"{schema}://{conn.host}:{port}/api",
|
||||
authProvider=AuthProvider.openmetadata,
|
||||
securityConfig=OpenMetadataJWTClientConfig(jwtToken=jwt_token),
|
||||
verifySSL=verify_ssl,
|
||||
sslConfig=ssl_config,
|
||||
)
|
||||
|
||||
return om_conn
|
||||
|
||||
def test_connection(self):
|
||||
"""Test that we can instantiate the ometa client with the given connection"""
|
||||
try:
|
||||
OpenMetadata(self.get_conn())
|
||||
return True, "Connection successful"
|
||||
except Exception as err:
|
||||
return False, str(err)
|
||||
|
||||
@staticmethod
|
||||
def get_ui_field_behaviour() -> dict[str, Any]:
|
||||
"""Returns custom field behaviour"""
|
||||
return {
|
||||
"hidden_fields": ["login"],
|
||||
"relabeling": {"password": "JWT Token"},
|
||||
}
|
||||
@ -14,7 +14,7 @@ OpenMetadata Airflow Lineage Backend
|
||||
"""
|
||||
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from airflow.lineage.backend import LineageBackend
|
||||
|
||||
@ -22,12 +22,9 @@ from airflow_provider_openmetadata.lineage.config.loader import (
|
||||
AirflowLineageConfig,
|
||||
get_lineage_config,
|
||||
)
|
||||
from airflow_provider_openmetadata.lineage.utils import get_xlets, parse_lineage
|
||||
from airflow_provider_openmetadata.lineage.runner import AirflowLineageRunner
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from airflow.models.baseoperator import BaseOperator
|
||||
|
||||
|
||||
# pylint: disable=too-few-public-methods
|
||||
class OpenMetadataLineageBackend(LineageBackend):
|
||||
@ -50,16 +47,9 @@ class OpenMetadataLineageBackend(LineageBackend):
|
||||
only if you are using google as SSO
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
"""
|
||||
Instantiate a superclass object and run lineage config function
|
||||
"""
|
||||
super().__init__()
|
||||
_ = get_lineage_config()
|
||||
|
||||
# pylint: disable=protected-access
|
||||
@staticmethod # needed for Airflow 1.10.x
|
||||
def send_lineage(
|
||||
self,
|
||||
operator: "BaseOperator",
|
||||
inlets: Optional[List] = None,
|
||||
outlets: Optional[List] = None,
|
||||
@ -81,10 +71,16 @@ class OpenMetadataLineageBackend(LineageBackend):
|
||||
config: AirflowLineageConfig = get_lineage_config()
|
||||
metadata = OpenMetadata(config.metadata_config)
|
||||
|
||||
op_inlets = get_xlets(operator, "_inlets")
|
||||
op_outlets = get_xlets(operator, "_outlets")
|
||||
runner = AirflowLineageRunner(
|
||||
metadata=metadata,
|
||||
service_name=config.airflow_service_name,
|
||||
dag=context["dag"],
|
||||
context=context,
|
||||
only_keep_dag_lineage=config.only_keep_dag_lineage,
|
||||
max_status=config.max_status,
|
||||
)
|
||||
runner.execute()
|
||||
|
||||
parse_lineage(config, context, operator, op_inlets, op_outlets, metadata)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
operator.log.error(traceback.format_exc())
|
||||
operator.log.error(exc)
|
||||
@ -43,23 +43,29 @@ def failure_callback(context: Dict[str, str]) -> None:
|
||||
metadata = OpenMetadata(config.metadata_config)
|
||||
|
||||
operator: "BaseOperator" = context["task"]
|
||||
dag: "DAG" = context["dag"]
|
||||
|
||||
operator.log.info("Parsing lineage & pipeline status on failure...")
|
||||
operator.log.info("Updating pipeline status on error...")
|
||||
|
||||
op_inlets = get_xlets(operator, "_inlets")
|
||||
op_outlets = get_xlets(operator, "_outlets")
|
||||
|
||||
# Get the pipeline created or updated during the lineage
|
||||
pipeline = parse_lineage(
|
||||
config, context, operator, op_inlets, op_outlets, metadata
|
||||
airflow_service_entity: PipelineService = metadata.get_by_name(
|
||||
entity=PipelineService, fqn=config.airflow_service_name
|
||||
)
|
||||
pipeline: Pipeline = metadata.get_by_name(
|
||||
entity=Pipeline,
|
||||
fqn=f"{airflow_service_entity.name.__root__}.{dag.dag_id}",
|
||||
)
|
||||
|
||||
add_status(
|
||||
operator=operator,
|
||||
pipeline=pipeline,
|
||||
metadata=metadata,
|
||||
context=context,
|
||||
)
|
||||
if pipeline:
|
||||
add_status(
|
||||
operator=operator,
|
||||
pipeline=pipeline,
|
||||
metadata=metadata,
|
||||
context=context,
|
||||
)
|
||||
else:
|
||||
logging.warning(
|
||||
f"Pipeline {airflow_service_entity.name.__root__}.{dag.dag_id} not found. Skipping status update."
|
||||
)
|
||||
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
logging.error(traceback.format_exc())
|
||||
|
||||
@ -32,6 +32,8 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
|
||||
class AirflowLineageConfig(BaseModel):
|
||||
airflow_service_name: str
|
||||
metadata_config: OpenMetadataConnection
|
||||
only_keep_dag_lineage: bool = False
|
||||
max_status: int = 10
|
||||
|
||||
|
||||
def parse_airflow_config(
|
||||
@ -59,6 +61,12 @@ def parse_airflow_config(
|
||||
|
||||
return AirflowLineageConfig(
|
||||
airflow_service_name=airflow_service_name,
|
||||
# Check if value is a literal string `true`
|
||||
only_keep_dag_lineage=conf.get(
|
||||
LINEAGE, "only_keep_dag_lineage", fallback="false"
|
||||
)
|
||||
== "true",
|
||||
max_status=int(conf.get(LINEAGE, "max_status", fallback=10)),
|
||||
metadata_config=OpenMetadataConnection(
|
||||
hostPort=conf.get(
|
||||
LINEAGE,
|
||||
|
||||
@ -0,0 +1,73 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
OpenMetadata Airflow Lineage Operator
|
||||
"""
|
||||
import traceback
|
||||
|
||||
from airflow.models.baseoperator import BaseOperator
|
||||
from airflow.utils.context import Context
|
||||
|
||||
from airflow_provider_openmetadata.lineage.runner import AirflowLineageRunner
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
|
||||
|
||||
class OpenMetadataLineageOperator(BaseOperator):
|
||||
"""
|
||||
This Operator will check the inlets and outlets of its DAG
|
||||
and push the lineage to OpenMetadata.
|
||||
|
||||
It requires the OpenMetadataConnection to be passed as
|
||||
an argument to instantiate the ometa client.
|
||||
|
||||
if `only_keep_dag` is True, we will remove any lineage related
|
||||
to the DAG that is not part of the inlets/outlets of its tasks.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
server_config: OpenMetadataConnection,
|
||||
service_name: str,
|
||||
only_keep_dag_lineage: bool = False,
|
||||
max_status: int = 10,
|
||||
**kwargs,
|
||||
) -> None:
|
||||
super().__init__(**kwargs)
|
||||
self.server_config = server_config
|
||||
self.service_name = service_name
|
||||
self.only_keep_dag_lineage = only_keep_dag_lineage
|
||||
self.max_status = max_status
|
||||
|
||||
def execute(self, context: Context) -> None:
|
||||
"""
|
||||
Main logic to check the context for lineage
|
||||
and push it to OpenMetadata using the Python Client.
|
||||
"""
|
||||
try:
|
||||
metadata = OpenMetadata(self.server_config)
|
||||
runner = AirflowLineageRunner(
|
||||
metadata=metadata,
|
||||
service_name=self.service_name,
|
||||
dag=self.dag,
|
||||
context=context,
|
||||
only_keep_dag_lineage=self.only_keep_dag_lineage,
|
||||
max_status=self.max_status,
|
||||
)
|
||||
|
||||
runner.execute()
|
||||
except Exception as err:
|
||||
self.dag.log.info(traceback.format_exc())
|
||||
self.dag.log.error(f"Error executing the lineage runner - {err}")
|
||||
raise err
|
||||
376
ingestion/src/airflow_provider_openmetadata/lineage/runner.py
Normal file
376
ingestion/src/airflow_provider_openmetadata/lineage/runner.py
Normal file
@ -0,0 +1,376 @@
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
OpenMetadata Airflow Provider Lineage Runner
|
||||
"""
|
||||
from itertools import groupby
|
||||
from typing import List, Set
|
||||
|
||||
from airflow.configuration import conf
|
||||
from pydantic import BaseModel
|
||||
|
||||
from airflow_provider_openmetadata.lineage.utils import STATUS_MAP, get_xlets
|
||||
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.api.services.createPipelineService import (
|
||||
CreatePipelineServiceRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.pipeline import (
|
||||
Pipeline,
|
||||
PipelineStatus,
|
||||
StatusType,
|
||||
Task,
|
||||
TaskStatus,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.services.connections.pipeline.airflowConnection import (
|
||||
AirflowConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import (
|
||||
BackendConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.pipelineService import (
|
||||
PipelineConnection,
|
||||
PipelineService,
|
||||
PipelineServiceType,
|
||||
)
|
||||
from metadata.generated.schema.type.basic import Uuid
|
||||
from metadata.generated.schema.type.entityLineage import EntitiesEdge
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.utils.helpers import datetime_to_ts
|
||||
|
||||
|
||||
class XLets(BaseModel):
|
||||
"""
|
||||
Group inlets and outlets from all tasks in a DAG
|
||||
"""
|
||||
|
||||
inlets: Set[str]
|
||||
outlets: Set[str]
|
||||
|
||||
|
||||
class SimpleEdge(BaseModel):
|
||||
"""
|
||||
Simple Edge representation with FQN and id
|
||||
"""
|
||||
|
||||
fqn: str
|
||||
id: str
|
||||
|
||||
|
||||
class AirflowLineageRunner:
|
||||
"""
|
||||
Given the OpenMetadata connection, a service name and a DAG:
|
||||
|
||||
1. Create the Pipeline Service (if not exists)
|
||||
2. Create or update the Pipeline (DAG + tasks)
|
||||
3. Add the task status (Task Instances). We'll pick this up from the available information.
|
||||
This operator should run the last to have the complete view.
|
||||
4. Add Pipeline Lineage from xlets
|
||||
|
||||
This Runner will be called either from:
|
||||
1. Lineage Backend
|
||||
2. Lineage Operator
|
||||
In both cases, this will run directly on an Airflow instance. Therefore,
|
||||
we'll use the airflow config data to populate entities' details.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
metadata: OpenMetadata,
|
||||
service_name: str,
|
||||
dag: "DAG",
|
||||
context: "Context",
|
||||
only_keep_dag_lineage: bool = False,
|
||||
max_status: int = 10,
|
||||
):
|
||||
self.metadata = metadata
|
||||
self.service_name = service_name
|
||||
self.only_keep_dag_lineage = only_keep_dag_lineage
|
||||
self.max_status = max_status
|
||||
|
||||
self.dag = dag
|
||||
self.context = context
|
||||
|
||||
def get_or_create_pipeline_service(self) -> PipelineService:
|
||||
"""
|
||||
Fetch the Pipeline Service from OM. If it does not exist,
|
||||
create it.
|
||||
"""
|
||||
service_entity: PipelineService = self.metadata.get_by_name(
|
||||
entity=PipelineService, fqn=self.service_name
|
||||
)
|
||||
|
||||
if service_entity:
|
||||
return service_entity
|
||||
|
||||
else:
|
||||
pipeline_service: PipelineService = self.metadata.create_or_update(
|
||||
CreatePipelineServiceRequest(
|
||||
name=self.service_name,
|
||||
serviceType=PipelineServiceType.Airflow,
|
||||
connection=PipelineConnection(
|
||||
config=AirflowConnection(
|
||||
hostPort=conf.get("webserver", "base_url"),
|
||||
connection=BackendConnection(),
|
||||
),
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
if pipeline_service is None:
|
||||
raise RuntimeError("Failed to create Airflow service.")
|
||||
|
||||
return pipeline_service
|
||||
|
||||
def get_task_url(self, task: "Operator"):
|
||||
return f"/taskinstance/list/?flt1_dag_id_equals={self.dag.dag_id}&_flt_3_task_id={task.task_id}"
|
||||
|
||||
def get_om_tasks(self) -> List[Task]:
|
||||
"""
|
||||
Get all tasks from the DAG and map them to
|
||||
OpenMetadata Task Entities
|
||||
"""
|
||||
return [
|
||||
Task(
|
||||
name=task.task_id,
|
||||
taskUrl=self.get_task_url(task),
|
||||
taskType=task.task_type,
|
||||
startDate=task.start_date.isoformat() if task.start_date else None,
|
||||
endDate=task.end_date.isoformat() if task.end_date else None,
|
||||
downstreamTasks=list(task.downstream_task_ids)
|
||||
if task.downstream_task_ids
|
||||
else None,
|
||||
)
|
||||
for task in self.dag.tasks or []
|
||||
]
|
||||
|
||||
def create_pipeline_entity(self, pipeline_service: PipelineService) -> Pipeline:
|
||||
"""
|
||||
Create the Pipeline Entity
|
||||
"""
|
||||
self.dag.log.info("Creating or updating Pipeline Entity from DAG...")
|
||||
pipeline_request = CreatePipelineRequest(
|
||||
name=self.dag.dag_id,
|
||||
description=self.dag.description,
|
||||
pipelineUrl=f"/tree?dag_id={self.dag.dag_id}",
|
||||
concurrency=self.dag.max_active_tasks,
|
||||
pipelineLocation=self.dag.fileloc,
|
||||
startDate=self.dag.start_date.isoformat() if self.dag.start_date else None,
|
||||
tasks=self.get_om_tasks(),
|
||||
service=EntityReference(
|
||||
id=pipeline_service.id,
|
||||
type="pipelineService",
|
||||
),
|
||||
)
|
||||
|
||||
return self.metadata.create_or_update(pipeline_request)
|
||||
|
||||
def get_all_pipeline_status(self) -> List[PipelineStatus]:
|
||||
"""
|
||||
Iterate over the DAG's task instances and map
|
||||
them to PipelineStatus
|
||||
"""
|
||||
|
||||
# This list is already ordered by Execution Date
|
||||
grouped_ti: List[List["TaskInstance"]] = [
|
||||
list(value)
|
||||
for _, value in groupby(
|
||||
self.dag.get_task_instances(), key=lambda ti: ti.run_id
|
||||
)
|
||||
]
|
||||
# Order descending by execution date
|
||||
grouped_ti.reverse()
|
||||
|
||||
return [
|
||||
self.get_pipeline_status(task_instances)
|
||||
for task_instances in grouped_ti[: self.max_status]
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def get_dag_status_from_task_instances(task_instances: List["TaskInstance"]) -> str:
|
||||
"""
|
||||
If any task is in pending state, then return pending.
|
||||
If any task is in failed state, return failed.
|
||||
Otherwise, return Success.
|
||||
"""
|
||||
task_statuses = [
|
||||
STATUS_MAP.get(task_instance.state, StatusType.Pending.value)
|
||||
for task_instance in task_instances
|
||||
]
|
||||
if any(status == StatusType.Pending.value for status in task_statuses):
|
||||
return StatusType.Pending.value
|
||||
if any(status == StatusType.Failed.value for status in task_statuses):
|
||||
return StatusType.Failed.value
|
||||
|
||||
return StatusType.Successful.value
|
||||
|
||||
def get_pipeline_status(
|
||||
self, task_instances: List["TaskInstance"]
|
||||
) -> PipelineStatus:
|
||||
"""
|
||||
Given the task instances for a run, prep the PipelineStatus
|
||||
"""
|
||||
|
||||
task_status = [
|
||||
TaskStatus(
|
||||
name=task_instance.task_id,
|
||||
executionStatus=STATUS_MAP.get(
|
||||
task_instance.state, StatusType.Pending.value
|
||||
),
|
||||
startTime=datetime_to_ts(task_instance.start_date),
|
||||
endTime=datetime_to_ts(task_instance.end_date),
|
||||
logLink=task_instance.log_url,
|
||||
)
|
||||
for task_instance in task_instances
|
||||
]
|
||||
return PipelineStatus(
|
||||
# Use any of the task execution dates for the status execution date
|
||||
timestamp=datetime_to_ts(task_instances[0].execution_date),
|
||||
executionStatus=self.get_dag_status_from_task_instances(task_instances),
|
||||
taskStatus=task_status,
|
||||
)
|
||||
|
||||
def add_all_pipeline_status(self, pipeline: Pipeline) -> None:
|
||||
"""
|
||||
Get the latest Pipeline Status from the DAG and send
|
||||
it to OM
|
||||
"""
|
||||
pipeline_status_list = self.get_all_pipeline_status()
|
||||
|
||||
for status in pipeline_status_list:
|
||||
self.metadata.add_pipeline_status(
|
||||
fqn=pipeline.fullyQualifiedName.__root__, status=status
|
||||
)
|
||||
|
||||
def get_xlets(self) -> XLets:
|
||||
"""
|
||||
Fill the inlets and outlets of the Pipeline by iterating
|
||||
over all its tasks
|
||||
"""
|
||||
_inlets = set()
|
||||
_outlets = set()
|
||||
|
||||
for task in self.dag.tasks:
|
||||
_inlets.update(get_xlets(operator=task, xlet_mode="_inlets") or [])
|
||||
_outlets.update(get_xlets(operator=task, xlet_mode="_outlets") or [])
|
||||
|
||||
return XLets(inlets=_inlets, outlets=_outlets)
|
||||
|
||||
def add_lineage(self, pipeline: Pipeline, xlets: XLets) -> None:
|
||||
"""
|
||||
Add the lineage from inlets and outlets
|
||||
"""
|
||||
|
||||
for table_fqn in xlets.inlets or []:
|
||||
table_entity = self.metadata.get_by_name(entity=Table, fqn=table_fqn)
|
||||
try:
|
||||
lineage = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(id=table_entity.id, type="table"),
|
||||
toEntity=EntityReference(id=pipeline.id, type="pipeline"),
|
||||
)
|
||||
)
|
||||
self.metadata.add_lineage(lineage)
|
||||
except AttributeError as err:
|
||||
self.dag.log.error(
|
||||
f"Error trying to access Entity data due to: {err}."
|
||||
f" Is the table [{table_fqn}] present in OpenMetadata?"
|
||||
)
|
||||
|
||||
for table_fqn in xlets.outlets or []:
|
||||
table_entity = self.metadata.get_by_name(entity=Table, fqn=table_fqn)
|
||||
try:
|
||||
lineage = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(id=pipeline.id, type="pipeline"),
|
||||
toEntity=EntityReference(id=table_entity.id, type="table"),
|
||||
)
|
||||
)
|
||||
self.metadata.add_lineage(lineage)
|
||||
except AttributeError as err:
|
||||
self.dag.log.error(
|
||||
f"Error trying to access Entity data due to: {err}."
|
||||
f" Is the table [{table_fqn}] present in OpenMetadata?"
|
||||
)
|
||||
|
||||
def clean_lineage(self, pipeline: Pipeline, xlets: XLets):
|
||||
"""
|
||||
Clean the lineage nodes that are not part of xlets.
|
||||
|
||||
We'll only clean up table nodes
|
||||
"""
|
||||
lineage_data = self.metadata.get_lineage_by_name(
|
||||
entity=Pipeline,
|
||||
fqn=pipeline.fullyQualifiedName.__root__,
|
||||
up_depth=1,
|
||||
down_depth=1,
|
||||
)
|
||||
|
||||
upstream_edges = [
|
||||
next(
|
||||
(
|
||||
SimpleEdge(fqn=node["fullyQualifiedName"], id=node["id"])
|
||||
for node in lineage_data.get("nodes") or []
|
||||
if node["id"] == upstream_edge["fromEntity"]
|
||||
and node["type"] == "table"
|
||||
)
|
||||
)
|
||||
for upstream_edge in lineage_data.get("upstreamEdges") or []
|
||||
]
|
||||
downstream_edges = [
|
||||
next(
|
||||
(
|
||||
SimpleEdge(fqn=node["fullyQualifiedName"], id=node["id"])
|
||||
for node in lineage_data.get("nodes") or []
|
||||
if node["id"] == downstream_edge["toEntity"]
|
||||
and node["type"] == "table"
|
||||
)
|
||||
)
|
||||
for downstream_edge in lineage_data.get("downstreamEdges") or []
|
||||
]
|
||||
|
||||
for edge in upstream_edges:
|
||||
if edge.fqn not in xlets.inlets:
|
||||
self.dag.log.info(f"Removing upstream edge with {edge.fqn}")
|
||||
edge_to_remove = EntitiesEdge(
|
||||
fromEntity=EntityReference(id=edge.id, type="table"),
|
||||
toEntity=EntityReference(id=pipeline.id, type="pipeline"),
|
||||
)
|
||||
self.metadata.delete_lineage_edge(edge=edge_to_remove)
|
||||
|
||||
for edge in downstream_edges:
|
||||
if edge.fqn not in xlets.outlets:
|
||||
self.dag.log.info(f"Removing downstream edge with {edge.fqn}")
|
||||
edge_to_remove = EntitiesEdge(
|
||||
fromEntity=EntityReference(id=pipeline.id, type="pipeline"),
|
||||
toEntity=EntityReference(id=edge.id, type="table"),
|
||||
)
|
||||
self.metadata.delete_lineage_edge(edge=edge_to_remove)
|
||||
|
||||
def execute(self):
|
||||
"""
|
||||
Run the whole ingestion logic
|
||||
"""
|
||||
self.dag.log.info("Executing Airflow Lineage Runner...")
|
||||
pipeline_service = self.get_or_create_pipeline_service()
|
||||
pipeline = self.create_pipeline_entity(pipeline_service)
|
||||
self.add_all_pipeline_status(pipeline)
|
||||
|
||||
xlets = self.get_xlets()
|
||||
self.add_lineage(pipeline, xlets)
|
||||
if self.only_keep_dag_lineage:
|
||||
self.dag.log.info(
|
||||
"`only_keep_dag_lineage` is set to True. Cleaning lineage not in inlets or outlets..."
|
||||
)
|
||||
self.clean_lineage(pipeline, xlets)
|
||||
@ -10,41 +10,17 @@
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
OpenMetadata Airflow Lineage Backend
|
||||
OpenMetadata Airflow Provider utilities
|
||||
"""
|
||||
|
||||
import traceback
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional
|
||||
|
||||
from airflow.configuration import conf
|
||||
|
||||
from airflow_provider_openmetadata.lineage.config.loader import AirflowLineageConfig
|
||||
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.api.services.createPipelineService import (
|
||||
CreatePipelineServiceRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.pipeline import (
|
||||
Pipeline,
|
||||
PipelineStatus,
|
||||
StatusType,
|
||||
Task,
|
||||
TaskStatus,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.services.connections.pipeline.airflowConnection import (
|
||||
AirflowConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import (
|
||||
BackendConnection,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.pipelineService import (
|
||||
PipelineConnection,
|
||||
PipelineService,
|
||||
PipelineServiceType,
|
||||
)
|
||||
from metadata.generated.schema.type.entityLineage import EntitiesEdge
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.utils.helpers import datetime_to_ts
|
||||
|
||||
@ -55,40 +31,13 @@ if TYPE_CHECKING:
|
||||
from airflow.models.taskinstance import TaskInstance
|
||||
|
||||
|
||||
_STATUS_MAP = {
|
||||
"running": StatusType.Pending,
|
||||
"success": StatusType.Successful,
|
||||
"failed": StatusType.Failed,
|
||||
STATUS_MAP = {
|
||||
"success": StatusType.Successful.value,
|
||||
"failed": StatusType.Failed.value,
|
||||
"queued": StatusType.Pending.value,
|
||||
}
|
||||
|
||||
|
||||
def is_airflow_version_1() -> bool:
|
||||
"""
|
||||
Check varying imports between Airflow v1 & v2
|
||||
"""
|
||||
# pylint: disable=unused-import,import-outside-toplevel
|
||||
try:
|
||||
|
||||
return False
|
||||
except ModuleNotFoundError:
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def parse_v1_xlets(xlet: dict) -> Optional[List[str]]:
|
||||
"""
|
||||
Parse airflow xlets for V1
|
||||
:param xlet: airflow v1 xlet dict
|
||||
:return: table list or None
|
||||
"""
|
||||
if isinstance(xlet, dict):
|
||||
tables = xlet.get("tables")
|
||||
if tables and isinstance(tables, list):
|
||||
return tables
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def parse_xlets(xlet: List[dict]) -> Optional[List[str]]:
|
||||
"""
|
||||
Parse airflow xlets for V1
|
||||
@ -118,112 +67,17 @@ def get_xlets(
|
||||
:return: list of tables FQN
|
||||
"""
|
||||
xlet = getattr(operator, xlet_mode)
|
||||
if is_airflow_version_1():
|
||||
tables = parse_v1_xlets(xlet)
|
||||
|
||||
else:
|
||||
tables = parse_xlets(xlet)
|
||||
tables = parse_xlets(xlet)
|
||||
|
||||
if not tables:
|
||||
operator.log.info(f"Not finding proper {xlet_mode} in task {operator.task_id}")
|
||||
operator.log.debug(f"Not finding proper {xlet_mode} in task {operator.task_id}")
|
||||
|
||||
else:
|
||||
operator.log.info(f"Found {xlet_mode} {tables} in task {operator.task_id}")
|
||||
|
||||
return tables
|
||||
|
||||
|
||||
def create_or_update_pipeline( # pylint: disable=too-many-locals
|
||||
task_instance: "TaskInstance",
|
||||
operator: "BaseOperator",
|
||||
dag: "DAG",
|
||||
airflow_service_entity: PipelineService,
|
||||
metadata: OpenMetadata,
|
||||
) -> Pipeline:
|
||||
"""
|
||||
Prepare the upsert of pipeline entity with the given task
|
||||
|
||||
We will:
|
||||
- Create the pipeline Entity
|
||||
- Append the task being processed
|
||||
- Clean deleted tasks based on the DAG children information
|
||||
|
||||
:param task_instance: task run being processed
|
||||
:param dag_run: DAG run being processed
|
||||
:param operator: task being examined by lineage
|
||||
:param dag: airflow dag
|
||||
:param airflow_service_entity: PipelineService
|
||||
:param metadata: OpenMetadata API client
|
||||
:return: PipelineEntity
|
||||
"""
|
||||
dag_url = f"/tree?dag_id={dag.dag_id}"
|
||||
task_url = f"/taskinstance/list/?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={operator.task_id}"
|
||||
|
||||
dag_start_date = dag.start_date.isoformat() if dag.start_date else None
|
||||
task_start_date = (
|
||||
task_instance.start_date.isoformat() if task_instance.start_date else None
|
||||
)
|
||||
task_end_date = (
|
||||
task_instance.end_date.isoformat() if task_instance.end_date else None
|
||||
)
|
||||
|
||||
downstream_tasks = []
|
||||
if getattr(operator, "downstream_task_ids", None):
|
||||
downstream_tasks = operator.downstream_task_ids
|
||||
|
||||
operator.log.info(f"downstream tasks {downstream_tasks}")
|
||||
|
||||
task = Task(
|
||||
name=operator.task_id,
|
||||
displayName=operator.task_id,
|
||||
taskUrl=task_url,
|
||||
taskType=operator.task_type,
|
||||
startDate=task_start_date,
|
||||
endDate=task_end_date,
|
||||
downstreamTasks=downstream_tasks,
|
||||
)
|
||||
|
||||
# Check if the pipeline already exists
|
||||
operator.log.info(
|
||||
f"Checking if the pipeline {airflow_service_entity.name.__root__}.{dag.dag_id} exists. If not, we will create it."
|
||||
)
|
||||
current_pipeline: Pipeline = metadata.get_by_name(
|
||||
entity=Pipeline,
|
||||
fqn=f"{airflow_service_entity.name.__root__}.{dag.dag_id}",
|
||||
fields=["tasks"],
|
||||
)
|
||||
|
||||
# Create pipeline if not exists or update its properties
|
||||
pipeline_request = CreatePipelineRequest(
|
||||
name=dag.dag_id,
|
||||
displayName=dag.dag_id,
|
||||
description=dag.description,
|
||||
pipelineUrl=dag_url,
|
||||
concurrency=current_pipeline.concurrency if current_pipeline else None,
|
||||
pipelineLocation=current_pipeline.pipelineLocation
|
||||
if current_pipeline
|
||||
else None,
|
||||
startDate=dag_start_date,
|
||||
tasks=current_pipeline.tasks
|
||||
if current_pipeline
|
||||
else None, # use the current tasks, if any
|
||||
service=EntityReference(id=airflow_service_entity.id, type="pipelineService"),
|
||||
owner=current_pipeline.owner if current_pipeline else None,
|
||||
tags=current_pipeline.tags if current_pipeline else None,
|
||||
)
|
||||
pipeline: Pipeline = metadata.create_or_update(pipeline_request)
|
||||
|
||||
# Add the task we are processing in the lineage backend
|
||||
operator.log.info("Adding tasks to pipeline...")
|
||||
updated_pipeline = metadata.add_task_to_pipeline(pipeline, task)
|
||||
|
||||
# Clean pipeline
|
||||
try:
|
||||
operator.log.info("Cleaning pipeline tasks...")
|
||||
updated_pipeline = metadata.clean_pipeline_tasks(updated_pipeline, dag.task_ids)
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
operator.log.warning(f"Error cleaning pipeline tasks {exc}")
|
||||
|
||||
return updated_pipeline
|
||||
|
||||
|
||||
def get_dag_status(all_tasks: List[str], task_status: List[TaskStatus]):
|
||||
"""
|
||||
Based on the task information and the total DAG tasks, cook the
|
||||
@ -290,7 +144,7 @@ def add_status(
|
||||
updated_task_status = [
|
||||
TaskStatus(
|
||||
name=task_instance.task_id,
|
||||
executionStatus=_STATUS_MAP.get(task_instance.state),
|
||||
executionStatus=STATUS_MAP.get(task_instance.state),
|
||||
startTime=datetime_to_ts(task_instance.start_date),
|
||||
endTime=datetime_to_ts(task_instance.end_date),
|
||||
logLink=task_instance.log_url,
|
||||
@ -311,117 +165,3 @@ def add_status(
|
||||
metadata.add_pipeline_status(
|
||||
fqn=pipeline.fullyQualifiedName.__root__, status=updated_status
|
||||
)
|
||||
|
||||
|
||||
# pylint: disable=too-many-arguments,too-many-locals
|
||||
def parse_lineage(
|
||||
config: AirflowLineageConfig,
|
||||
context: Dict,
|
||||
operator: "BaseOperator",
|
||||
inlets: List,
|
||||
outlets: List,
|
||||
metadata: OpenMetadata,
|
||||
) -> Optional[Pipeline]:
|
||||
"""
|
||||
Main logic to extract properties from DAG and the
|
||||
triggered operator to ingest lineage data into
|
||||
OpenMetadata
|
||||
|
||||
:param config: lineage configuration
|
||||
:param context: airflow runtime context
|
||||
:param operator: task being executed
|
||||
:param inlets: list of upstream tables
|
||||
:param outlets: list of downstream tables
|
||||
:param metadata: OpenMetadata client
|
||||
"""
|
||||
operator.log.info("Parsing Lineage for OpenMetadata")
|
||||
|
||||
dag: "DAG" = context["dag"]
|
||||
task_instance: "TaskInstance" = context["task_instance"]
|
||||
|
||||
try:
|
||||
|
||||
airflow_service_entity = get_or_create_pipeline_service(
|
||||
operator, metadata, config
|
||||
)
|
||||
pipeline = create_or_update_pipeline(
|
||||
task_instance=task_instance,
|
||||
operator=operator,
|
||||
dag=dag,
|
||||
airflow_service_entity=airflow_service_entity,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
operator.log.info("Parsing Lineage")
|
||||
for table in inlets if inlets else []:
|
||||
table_entity = metadata.get_by_name(entity=Table, fqn=table)
|
||||
operator.log.debug(f"from entity {table_entity}")
|
||||
lineage = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(id=table_entity.id, type="table"),
|
||||
toEntity=EntityReference(id=pipeline.id, type="pipeline"),
|
||||
)
|
||||
)
|
||||
operator.log.debug(f"From lineage {lineage}")
|
||||
metadata.add_lineage(lineage)
|
||||
|
||||
for table in outlets if outlets else []:
|
||||
table_entity = metadata.get_by_name(entity=Table, fqn=table)
|
||||
operator.log.debug(f"To entity {table_entity}")
|
||||
lineage = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(id=pipeline.id, type="pipeline"),
|
||||
toEntity=EntityReference(id=table_entity.id, type="table"),
|
||||
)
|
||||
)
|
||||
operator.log.debug(f"To lineage {lineage}")
|
||||
metadata.add_lineage(lineage)
|
||||
|
||||
return pipeline
|
||||
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
operator.log.error(
|
||||
f"Failed to parse Airflow DAG task and publish to OpenMetadata due to {exc}"
|
||||
)
|
||||
operator.log.error(traceback.format_exc())
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_or_create_pipeline_service(
|
||||
operator: "BaseOperator", metadata: OpenMetadata, config: AirflowLineageConfig
|
||||
) -> PipelineService:
|
||||
"""
|
||||
Check if we already have the airflow instance as a PipelineService,
|
||||
otherwise create it.
|
||||
|
||||
:param operator: task from which we extract the lineage
|
||||
:param metadata: OpenMetadata API wrapper
|
||||
:param config: lineage config
|
||||
:return: PipelineService
|
||||
"""
|
||||
operator.log.info("Get Airflow Service ID")
|
||||
airflow_service_entity = metadata.get_by_name(
|
||||
entity=PipelineService, fqn=config.airflow_service_name
|
||||
)
|
||||
|
||||
if airflow_service_entity is None:
|
||||
pipeline_service = CreatePipelineServiceRequest(
|
||||
name=config.airflow_service_name,
|
||||
serviceType=PipelineServiceType.Airflow,
|
||||
connection=PipelineConnection(
|
||||
config=AirflowConnection(
|
||||
hostPort=conf.get("webserver", "base_url"),
|
||||
connection=BackendConnection(),
|
||||
),
|
||||
),
|
||||
)
|
||||
airflow_service_entity = metadata.create_or_update(pipeline_service)
|
||||
if airflow_service_entity:
|
||||
operator.log.info(
|
||||
f"Created airflow service entity - {airflow_service_entity.fullyQualifiedName.__root__}"
|
||||
)
|
||||
else:
|
||||
operator.log.error("Failed to create airflow service entity")
|
||||
|
||||
return airflow_service_entity
|
||||
|
||||
@ -24,3 +24,12 @@ integrations:
|
||||
- integration-name: OpenMetadata
|
||||
external-doc-url: https://open-metadata.org
|
||||
tags: [service]
|
||||
|
||||
hooks:
|
||||
- integration-name: OpenMetadata
|
||||
python-modules:
|
||||
- airflow_provider_openmetadata.hooks.openmetadata
|
||||
|
||||
connection-types:
|
||||
- hook-class-name: airflow_provider_openmetadata.hooks.openmetadata.OpenMetadataHook
|
||||
connection-type: openmetadata
|
||||
|
||||
@ -19,6 +19,7 @@ from typing import Any, Dict, Generic, Optional, Type, TypeVar, Union
|
||||
from pydantic import BaseModel
|
||||
|
||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||
from metadata.generated.schema.type.entityLineage import EntitiesEdge
|
||||
from metadata.ingestion.ometa.client import REST, APIError
|
||||
from metadata.ingestion.ometa.utils import get_entity_type, ometa_logger
|
||||
|
||||
@ -129,3 +130,16 @@ class OMetaLineageMixin(Generic[T]):
|
||||
+ f"{entity.__name__} and {path}: {err}"
|
||||
)
|
||||
return None
|
||||
|
||||
def delete_lineage_edge(self, edge: EntitiesEdge) -> None:
|
||||
"""
|
||||
Remove the given Edge
|
||||
"""
|
||||
try:
|
||||
self.client.delete(
|
||||
f"{self.get_suffix(AddLineageRequest)}/{edge.fromEntity.type}/{edge.fromEntity.id.__root__}/"
|
||||
f"{edge.toEntity.type}/{edge.toEntity.id.__root__}"
|
||||
)
|
||||
except APIError as err:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Error {err.status_code} trying to DELETE linage for {edge}")
|
||||
|
||||
@ -28,9 +28,7 @@ from airflow.operators.bash import BashOperator
|
||||
from airflow.operators.dummy import DummyOperator
|
||||
from airflow.utils.task_group import TaskGroup
|
||||
|
||||
from airflow_provider_openmetadata.lineage.openmetadata import (
|
||||
OpenMetadataLineageBackend,
|
||||
)
|
||||
from airflow_provider_openmetadata.lineage.backend import OpenMetadataLineageBackend
|
||||
from airflow_provider_openmetadata.lineage.utils import get_xlets
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||||
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||||
|
||||
@ -37,111 +37,56 @@ versions match.
|
||||
|
||||
### Adding Lineage Config
|
||||
|
||||
<Note>
|
||||
|
||||
If using OpenMetadata version 0.13.0 or lower, the import for the lineage backend is
|
||||
`airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend`.
|
||||
|
||||
For 0.13.1 or higher, the import has been renamed to `airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend`.
|
||||
|
||||
</Note>
|
||||
|
||||
After the installation, we need to update the Airflow configuration. This can be done following this example on
|
||||
`airflow.cfg`:
|
||||
|
||||
```ini
|
||||
[lineage]
|
||||
backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend
|
||||
backend = airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend
|
||||
airflow_service_name = local_airflow
|
||||
openmetadata_api_endpoint = http://localhost:8585/api
|
||||
auth_provider_type = no-auth
|
||||
auth_provider_type = openmetadata
|
||||
jwt_token = <your-token>
|
||||
```
|
||||
|
||||
Or we can directly provide environment variables:
|
||||
|
||||
```env
|
||||
AIRFLOW__LINEAGE__BACKEND="airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend"
|
||||
AIRFLOW__LINEAGE__BACKEND="airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend"
|
||||
AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME="local_airflow"
|
||||
AIRFLOW__LINEAGE__OPENMETADATA_API_ENDPOINT="http://localhost:8585/api"
|
||||
AIRFLOW__LINEAGE__AUTH_PROVIDER_TYPE="no-auth"
|
||||
AIRFLOW__LINEAGE__AUTH_PROVIDER_TYPE="openmetadata"
|
||||
AIRFLOW__LINEAGE__JWT_TOKEN="<your-token>"
|
||||
```
|
||||
|
||||
We can choose the option that best adapts to our current architecture. Find more information on Airflow configurations
|
||||
[here](https://airflow.apache.org/docs/apache-airflow/stable/howto/set-config.html).
|
||||
|
||||
We are now going to list the configurations for the different SSO. We will use the `ini` format for those,
|
||||
but on your own Airflow you can freely choose.
|
||||
#### Optional Parameters
|
||||
|
||||
#### Google SSO
|
||||
You can also set the following parameters:
|
||||
|
||||
```ini
|
||||
[lineage]
|
||||
backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend
|
||||
airflow_service_name = local_airflow
|
||||
openmetadata_api_endpoint = http://localhost:8585/api
|
||||
auth_provider_type = google
|
||||
# Note that the path should be local in Airflow
|
||||
secret_key = path-to-secret-key-file.json
|
||||
...
|
||||
only_keep_dag_lineage = true
|
||||
max_status = 10
|
||||
```
|
||||
|
||||
#### Okta SSO
|
||||
- `only_keep_dag_lineage` will remove any table lineage not present in the inlets or outlets. This will ensure
|
||||
that any lineage in OpenMetadata comes from your code.
|
||||
- `max_status` controls the number of status to ingest in each run. By default, we'll pick the last 10.
|
||||
|
||||
```ini
|
||||
[lineage]
|
||||
backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend
|
||||
airflow_service_name = local_airflow
|
||||
openmetadata_api_endpoint = http://localhost:8585/api
|
||||
auth_provider_type = okta
|
||||
client_id = client id
|
||||
org_url = org url
|
||||
private_key = private key
|
||||
email = email
|
||||
# Optional
|
||||
scopes = ["scope1", "scope2"]
|
||||
```
|
||||
|
||||
#### Auth0 SSO
|
||||
|
||||
```ini
|
||||
[lineage]
|
||||
backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend
|
||||
airflow_service_name = local_airflow
|
||||
openmetadata_api_endpoint = http://localhost:8585/api
|
||||
auth_provider_type = auth0
|
||||
client_id = client id
|
||||
secret_key = secret key
|
||||
domain = domain
|
||||
```
|
||||
|
||||
#### Azure SSO
|
||||
|
||||
```ini
|
||||
[lineage]
|
||||
backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend
|
||||
airflow_service_name = local_airflow
|
||||
openmetadata_api_endpoint = http://localhost:8585/api
|
||||
auth_provider_type = azure
|
||||
client_id = client id
|
||||
client_secret = client secret
|
||||
authority = authority
|
||||
# Optional
|
||||
scopes = ["scope1", "scope2"]
|
||||
```
|
||||
|
||||
#### OpenMetadata SSO
|
||||
|
||||
```ini
|
||||
[lineage]
|
||||
backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend
|
||||
airflow_service_name = local_airflow
|
||||
openmetadata_api_endpoint = http://localhost:8585/api
|
||||
auth_provider_type = openmetadata
|
||||
jwt_token = token
|
||||
```
|
||||
|
||||
#### Custom OIDC SSO
|
||||
|
||||
```ini
|
||||
[lineage]
|
||||
backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend
|
||||
airflow_service_name = local_airflow
|
||||
openmetadata_api_endpoint = http://localhost:8585/api
|
||||
auth_provider_type = custom-oidc
|
||||
client_id = client id
|
||||
client_secret = client secret
|
||||
token_endpoint = endpoint
|
||||
```
|
||||
|
||||
In the following sections, we'll show how to adapt our pipelines to help us build the lineage information.
|
||||
|
||||
@ -307,7 +252,7 @@ If you are running this example using the quickstart deployment of OpenMetadata,
|
||||
this:
|
||||
|
||||
```
|
||||
backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend
|
||||
backend = airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend
|
||||
airflow_service_name = local_airflow
|
||||
openmetadata_api_endpoint = http://localhost:8585/api
|
||||
auth_provider_type = openmetadata
|
||||
|
||||
@ -0,0 +1,213 @@
|
||||
---
|
||||
title: Airflow Lineage Operator
|
||||
slug: /connectors/pipeline/airflow/lineage-operator
|
||||
---
|
||||
|
||||
# Airflow Lineage Operator
|
||||
|
||||
Another approach to extract Airflow metadata only for the DAGs you want is to use the `OpenMetadataLineageOperator`.
|
||||
|
||||
When the task executes, it will ingest:
|
||||
- The Pipeline Service if it does not exist
|
||||
- The DAG as a Pipeline if it does not exist.
|
||||
- The status of the tasks. We recommend running this Operator as the last step if you want up-to-date statuses.
|
||||
- The lineage from inlets and outlets.
|
||||
|
||||
## Installation
|
||||
|
||||
The Lineage Operator can be directly installed to the Airflow instances as part of the usual OpenMetadata Python
|
||||
distribution:
|
||||
|
||||
```commandline
|
||||
pip3 install "openmetadata-ingestion==x.y.z"
|
||||
```
|
||||
|
||||
Where `x.y.z` is the version of your OpenMetadata server, e.g., 0.13.0. It is important that server and client
|
||||
versions match.
|
||||
|
||||
**It requires the version `0.13.1` or higher**.
|
||||
|
||||
## Example DAG
|
||||
|
||||
An example DAG looks like follows:
|
||||
|
||||
```python
|
||||
# Copyright 2021 Collate
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
"""
|
||||
You can run this DAG from the default OM installation
|
||||
"""
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from textwrap import dedent
|
||||
|
||||
# The DAG object; we'll need this to instantiate a DAG
|
||||
from airflow import DAG
|
||||
|
||||
# Operators; we need this to operate!
|
||||
from airflow.operators.bash import BashOperator
|
||||
from airflow.operators.python import PythonOperator
|
||||
|
||||
# These args will get passed on to each operator
|
||||
# You can override them on a per-task basis during operator initialization
|
||||
from airflow_provider_openmetadata.lineage.operator import OpenMetadataLineageOperator
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
|
||||
OpenMetadataJWTClientConfig,
|
||||
)
|
||||
|
||||
|
||||
default_args = {
|
||||
'retries': 1,
|
||||
'retry_delay': timedelta(minutes=5),
|
||||
}
|
||||
|
||||
|
||||
def explode():
|
||||
raise Exception("Oh no!")
|
||||
|
||||
|
||||
with DAG(
|
||||
'lineage_tutorial_operator',
|
||||
default_args=default_args,
|
||||
description='A simple tutorial DAG',
|
||||
schedule_interval=timedelta(days=1),
|
||||
start_date=datetime(2021, 1, 1),
|
||||
catchup=False,
|
||||
tags=['example'],
|
||||
) as dag:
|
||||
|
||||
# t1, t2 and t3 are examples of tasks created by instantiating operators
|
||||
t1 = BashOperator(
|
||||
task_id='print_date',
|
||||
bash_command='date',
|
||||
outlets={
|
||||
"tables": ["sample_data.ecommerce_db.shopify.dim_address"]
|
||||
}
|
||||
)
|
||||
|
||||
t2 = BashOperator(
|
||||
task_id='sleep',
|
||||
depends_on_past=False,
|
||||
bash_command='sleep 1',
|
||||
retries=3,
|
||||
inlets={
|
||||
"tables": ["sample_data.ecommerce_db.shopify.dim_customer"]
|
||||
}
|
||||
)
|
||||
|
||||
risen = PythonOperator(
|
||||
task_id='explode',
|
||||
provide_context=True,
|
||||
python_callable=explode,
|
||||
retries=0,
|
||||
)
|
||||
|
||||
dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG
|
||||
dag.doc_md = """
|
||||
This is a documentation placed anywhere
|
||||
""" # otherwise, type it like this
|
||||
templated_command = dedent(
|
||||
"""
|
||||
{% for i in range(5) %}
|
||||
echo "{{ ds }}"
|
||||
echo "{{ macros.ds_add(ds, 7)}}"
|
||||
echo "{{ params.my_param }}"
|
||||
{% endfor %}
|
||||
"""
|
||||
)
|
||||
|
||||
t3 = BashOperator(
|
||||
task_id='templated',
|
||||
depends_on_past=False,
|
||||
bash_command=templated_command,
|
||||
params={'my_param': 'Parameter I passed in'},
|
||||
)
|
||||
|
||||
t1 >> [t2, t3]
|
||||
|
||||
server_config = OpenMetadataConnection(
|
||||
hostPort="http://localhost:8585/api",
|
||||
authProvider="openmetadata",
|
||||
securityConfig=OpenMetadataJWTClientConfig(
|
||||
jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
||||
),
|
||||
)
|
||||
|
||||
t4 = OpenMetadataLineageOperator(
|
||||
task_id='lineage_op',
|
||||
depends_on_past=False,
|
||||
server_config=server_config,
|
||||
service_name="airflow_lineage_op_service",
|
||||
only_keep_dag_lineage=True,
|
||||
)
|
||||
|
||||
t1 >> t4
|
||||
```
|
||||
|
||||
## Retrieving the OpenMetadataConnection from Airflow
|
||||
|
||||
In 0.13.1 we have also added an `OpenMetadataHook`, which can be configured from the UI to safely store
|
||||
the parameters to connect to OpenMetadata.
|
||||
|
||||
Go to the Airflow UI > Admin > Connection and create a new `OpenMetadata` connection as follows:
|
||||
|
||||
<Image src="/images/openmetadata/connectors/airflow/airflow-connection.png" alt="Airflow Connection"/>
|
||||
|
||||
Testing the connection will validate that the server is reachable and the installed client can be instantiated properly.
|
||||
|
||||
Once the connection is configured, you can use it in your DAGs without creating the `OpenMetadataConnection` manually
|
||||
|
||||
```python
|
||||
from airflow_provider_openmetadata.hooks.openmetadata import OpenMetadataHook
|
||||
|
||||
openmetadata_hook = OpenMetadataHook(openmetadata_conn_id="om_id")
|
||||
server_config = openmetadata_hook.get_conn()
|
||||
|
||||
OpenMetadataLineageOperator(
|
||||
task_id='lineage_op',
|
||||
depends_on_past=False,
|
||||
server_config=server_config,
|
||||
service_name="airflow_lineage_op_service",
|
||||
only_keep_dag_lineage=True,
|
||||
)
|
||||
```
|
||||
|
||||
### OpenMetadataHook with HTTPS and SSL
|
||||
|
||||
If the OpenMetadata server connection needs to happen through HTTPS, update the `Schema` accordingly to `https`.
|
||||
|
||||
For SSL parameters we have two options:
|
||||
|
||||
#### 1. Ignore the SSL certificates
|
||||
|
||||
You can add the `Extra` value as the following JSON to create the connection that will ignore SSL.
|
||||
|
||||
```json
|
||||
{
|
||||
"verifySSL": "ignore"
|
||||
}
|
||||
```
|
||||
|
||||
#### 2. Validate SSL certificates
|
||||
|
||||
Otherwise, you can use the `validate` value and add the path to the certificate. **It should be reachable locally
|
||||
in your Airflow instance**.
|
||||
|
||||
```json
|
||||
{
|
||||
"verifySSL": "validate",
|
||||
"sslConfig": "path-to-cert"
|
||||
}
|
||||
```
|
||||
@ -27,4 +27,13 @@ Starting with `0.13.0`, we have deprecated the initial configurations for Author
|
||||
|
||||
- Airbyte
|
||||
- Airbyte connector now support Basic Authentication.
|
||||
- Added: `username`, `password`
|
||||
- Added: `username`, `password`
|
||||
|
||||
## Lineage Backend
|
||||
|
||||
In 0.13.1:
|
||||
|
||||
- The import for the Airflow Lineage Backend has been updated from `airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend`
|
||||
to `airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend`.
|
||||
- We removed support from Airflow v1.
|
||||
- The failure callback now only updates the pipeline status if the Pipeline already exists in OpenMetadata.
|
||||
|
||||
@ -428,6 +428,8 @@ site_menu:
|
||||
url: /connectors/pipeline/airflow/gcs
|
||||
- category: Connectors / Pipeline / Airflow / Lineage Backend
|
||||
url: /connectors/pipeline/airflow/lineage-backend
|
||||
- category: Connectors / Pipeline / Airflow / Lineage Operator
|
||||
url: /connectors/pipeline/airflow/lineage-operator
|
||||
- category: Connectors / Pipeline / Airbyte
|
||||
url: /connectors/pipeline/airbyte
|
||||
- category: Connectors / Pipeline / Airbyte / Airflow
|
||||
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 176 KiB |
Loading…
x
Reference in New Issue
Block a user