diff --git a/ingestion/examples/airflow/dags/airflow_lineage_operator.py b/ingestion/examples/airflow/dags/airflow_lineage_operator.py new file mode 100644 index 00000000000..dfae8f4a2d2 --- /dev/null +++ b/ingestion/examples/airflow/dags/airflow_lineage_operator.py @@ -0,0 +1,118 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +You can run this DAG from the default OM installation +""" + +from datetime import datetime, timedelta +from textwrap import dedent + +# The DAG object; we'll need this to instantiate a DAG +from airflow import DAG + +# Operators; we need this to operate! +from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator + +# These args will get passed on to each operator +# You can override them on a per-task basis during operator initialization +from airflow_provider_openmetadata.lineage.operator import OpenMetadataLineageOperator +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( + OpenMetadataJWTClientConfig, +) + +default_args = { + "retries": 1, + "retry_delay": timedelta(minutes=5), +} + + +def explode(): + raise Exception("Oh no!") + + +with DAG( + "lineage_tutorial_operator", + default_args=default_args, + description="A simple tutorial DAG", + schedule_interval=timedelta(days=1), + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example"], +) as dag: + + # t1, t2 and t3 are examples of tasks created by instantiating operators + t1 = BashOperator( + task_id="print_date", + bash_command="date", + outlets={"tables": ["sample_data.ecommerce_db.shopify.dim_address"]}, + ) + + t2 = BashOperator( + task_id="sleep", + depends_on_past=False, + bash_command="sleep 1", + retries=3, + inlets={"tables": ["sample_data.ecommerce_db.shopify.dim_customer"]}, + ) + + risen = PythonOperator( + task_id="explode", + provide_context=True, + python_callable=explode, + retries=0, + ) + + dag.doc_md = ( + __doc__ # providing that you have a docstring at the beginning of the DAG + ) + dag.doc_md = """ + This is a documentation placed anywhere + """ # otherwise, type it like this + templated_command = dedent( + """ + {% for i in range(5) %} + echo "{{ ds }}" + echo "{{ macros.ds_add(ds, 7)}}" + echo "{{ params.my_param }}" + {% endfor %} + """ + ) + + t3 = BashOperator( + task_id="templated", + depends_on_past=False, + bash_command=templated_command, + params={"my_param": "Parameter I passed in"}, + ) + + t1 >> [t2, t3] + + server_config = OpenMetadataConnection( + hostPort="http://localhost:8585/api", + authProvider="openmetadata", + securityConfig=OpenMetadataJWTClientConfig( + jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + ), + ) + + t4 = OpenMetadataLineageOperator( + task_id="lineage_op", + depends_on_past=False, + server_config=server_config, + service_name="airflow_lineage_op_service", + only_keep_dag_lineage=True, + ) + + t1 >> t4 diff --git a/ingestion/src/airflow_provider_openmetadata/README.md b/ingestion/src/airflow_provider_openmetadata/README.md new file mode 100644 index 00000000000..bbe0e7578b1 --- /dev/null +++ b/ingestion/src/airflow_provider_openmetadata/README.md @@ -0,0 +1,67 @@ +# OpenMetadata Airflow Provider + +This package brings: +- Lineage Backend +- Lineage Operator +- OpenMetadata Hook + +Note that this is configured as an entrypoint in the `setup.py`: + +```python +entry_points={ + "apache_airflow_provider": [ + "provider_info = airflow_provider_openmetadata:get_provider_config" + ], +}, +``` + +Therefore, any metadata changes that should be discoverable by Airflow need to be passed in `get_provider_config`. + +More information about that on Airflow's [docs](https://airflow.apache.org/docs/apache-airflow-providers/index.html?utm_cta=website-events-featured-summit#creating-your-own-providers). + +## How to use the OpenMetadataHook + +In the Airflow UI you can create a new OpenMetadata connection. + +Then, load it as follows: + +```python +from airflow_provider_openmetadata.hooks.openmetadata import OpenMetadataHook + +openmetadata_hook = OpenMetadataHook(openmetadata_conn_id="om_id") # The ID you provided +server_config = openmetadata_hook.get_conn() +``` + +## How to use the OpenMetadataLineageOperator + +```python +from airflow_provider_openmetadata.lineage.operator import OpenMetadataLineageOperator + +OpenMetadataLineageOperator( + task_id='lineage_op', + depends_on_past=False, + server_config=server_config, + service_name="your-airflow-service", + only_keep_dag_lineage=True, +) +``` + +You can get the `server_config` variable using the `OpenMetadataHook` as shown above, or create it +directly: + +```python +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( + OpenMetadataJWTClientConfig, +) + +server_config = OpenMetadataConnection( + hostPort="http://localhost:8585/api", + authProvider="openmetadata", + securityConfig=OpenMetadataJWTClientConfig( + jwtToken="" + ), +) +``` diff --git a/ingestion/src/airflow_provider_openmetadata/__init__.py b/ingestion/src/airflow_provider_openmetadata/__init__.py index 768579e0d91..bf670a6845a 100644 --- a/ingestion/src/airflow_provider_openmetadata/__init__.py +++ b/ingestion/src/airflow_provider_openmetadata/__init__.py @@ -22,7 +22,16 @@ def get_provider_config(): """ return { "name": "OpenMetadata", - "description": "OpenMetadata ", + "description": "`OpenMetadata `__", "package-name": "openmetadata-ingestion", "version": "0.4.1", + "connection-types": [ + { + "connection-type": "openmetadata", + "hook-class-name": "airflow_provider_openmetadata.hooks.openmetadata.OpenMetadataHook", + } + ], + "hook-class-names": [ + "airflow_provider_openmetadata.hooks.openmetadata.OpenMetadataHook", + ], } diff --git a/ingestion/src/airflow_provider_openmetadata/hooks/__init__.py b/ingestion/src/airflow_provider_openmetadata/hooks/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/airflow_provider_openmetadata/hooks/openmetadata.py b/ingestion/src/airflow_provider_openmetadata/hooks/openmetadata.py new file mode 100644 index 00000000000..7d167864589 --- /dev/null +++ b/ingestion/src/airflow_provider_openmetadata/hooks/openmetadata.py @@ -0,0 +1,99 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +This hook allows storing the connection to +an OpenMetadata server and use it for your +operators. +""" +from typing import Any + +from airflow.hooks.base import BaseHook +from airflow.models import Connection + +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + AuthProvider, + OpenMetadataConnection, + VerifySSL, +) +from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( + OpenMetadataJWTClientConfig, +) +from metadata.generated.schema.security.ssl.validateSSLClientConfig import ( + ValidateSSLClientConfig, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata + + +class OpenMetadataHook(BaseHook): + """ + Airflow hook to store and use an `OpenMetadataConnection` + """ + + conn_name_attr: str = "openmetadata_conn_id" + default_conn_name = "openmetadata_default" + conn_type = "openmetadata" + hook_name = "OpenMetadata" + + def __init__(self, openmetadata_conn_id: str = default_conn_name) -> None: + super().__init__() + self.openmetadata_conn_id = openmetadata_conn_id + # Add defaults + self.default_schema = "http" + self.default_port = 8585 + self.default_verify_ssl = VerifySSL.no_ssl + self.default_ssl_config = None + + def get_conn(self) -> OpenMetadataConnection: + + conn: Connection = self.get_connection(self.openmetadata_conn_id) + jwt_token = conn.get_password() + if not jwt_token: + raise ValueError("JWT Token should be informed.") + + if not conn.host: + raise ValueError("Host should be informed.") + + port = conn.port if conn.port else self.default_port + schema = conn.schema if conn.schema else self.default_schema + + extra = conn.extra_dejson if conn.get_extra() else {} + verify_ssl = extra.get("verifySSL") or self.default_verify_ssl + ssl_config = ( + ValidateSSLClientConfig(certificatePath=extra["sslConfig"]) + if extra.get("sslConfig") + else self.default_ssl_config + ) + + om_conn = OpenMetadataConnection( + hostPort=f"{schema}://{conn.host}:{port}/api", + authProvider=AuthProvider.openmetadata, + securityConfig=OpenMetadataJWTClientConfig(jwtToken=jwt_token), + verifySSL=verify_ssl, + sslConfig=ssl_config, + ) + + return om_conn + + def test_connection(self): + """Test that we can instantiate the ometa client with the given connection""" + try: + OpenMetadata(self.get_conn()) + return True, "Connection successful" + except Exception as err: + return False, str(err) + + @staticmethod + def get_ui_field_behaviour() -> dict[str, Any]: + """Returns custom field behaviour""" + return { + "hidden_fields": ["login"], + "relabeling": {"password": "JWT Token"}, + } diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py b/ingestion/src/airflow_provider_openmetadata/lineage/backend.py similarity index 80% rename from ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py rename to ingestion/src/airflow_provider_openmetadata/lineage/backend.py index d2f65f2fa04..83871e6c579 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/backend.py @@ -14,7 +14,7 @@ OpenMetadata Airflow Lineage Backend """ import traceback -from typing import TYPE_CHECKING, Dict, List, Optional +from typing import Dict, List, Optional from airflow.lineage.backend import LineageBackend @@ -22,12 +22,9 @@ from airflow_provider_openmetadata.lineage.config.loader import ( AirflowLineageConfig, get_lineage_config, ) -from airflow_provider_openmetadata.lineage.utils import get_xlets, parse_lineage +from airflow_provider_openmetadata.lineage.runner import AirflowLineageRunner from metadata.ingestion.ometa.ometa_api import OpenMetadata -if TYPE_CHECKING: - from airflow.models.baseoperator import BaseOperator - # pylint: disable=too-few-public-methods class OpenMetadataLineageBackend(LineageBackend): @@ -50,16 +47,9 @@ class OpenMetadataLineageBackend(LineageBackend): only if you are using google as SSO """ - def __init__(self) -> None: - """ - Instantiate a superclass object and run lineage config function - """ - super().__init__() - _ = get_lineage_config() - # pylint: disable=protected-access - @staticmethod # needed for Airflow 1.10.x def send_lineage( + self, operator: "BaseOperator", inlets: Optional[List] = None, outlets: Optional[List] = None, @@ -81,10 +71,16 @@ class OpenMetadataLineageBackend(LineageBackend): config: AirflowLineageConfig = get_lineage_config() metadata = OpenMetadata(config.metadata_config) - op_inlets = get_xlets(operator, "_inlets") - op_outlets = get_xlets(operator, "_outlets") + runner = AirflowLineageRunner( + metadata=metadata, + service_name=config.airflow_service_name, + dag=context["dag"], + context=context, + only_keep_dag_lineage=config.only_keep_dag_lineage, + max_status=config.max_status, + ) + runner.execute() - parse_lineage(config, context, operator, op_inlets, op_outlets, metadata) except Exception as exc: # pylint: disable=broad-except operator.log.error(traceback.format_exc()) operator.log.error(exc) diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/callback.py b/ingestion/src/airflow_provider_openmetadata/lineage/callback.py index e12abbf4ea6..6c59d436dd3 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/callback.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/callback.py @@ -43,23 +43,29 @@ def failure_callback(context: Dict[str, str]) -> None: metadata = OpenMetadata(config.metadata_config) operator: "BaseOperator" = context["task"] + dag: "DAG" = context["dag"] - operator.log.info("Parsing lineage & pipeline status on failure...") + operator.log.info("Updating pipeline status on error...") - op_inlets = get_xlets(operator, "_inlets") - op_outlets = get_xlets(operator, "_outlets") - - # Get the pipeline created or updated during the lineage - pipeline = parse_lineage( - config, context, operator, op_inlets, op_outlets, metadata + airflow_service_entity: PipelineService = metadata.get_by_name( + entity=PipelineService, fqn=config.airflow_service_name + ) + pipeline: Pipeline = metadata.get_by_name( + entity=Pipeline, + fqn=f"{airflow_service_entity.name.__root__}.{dag.dag_id}", ) - add_status( - operator=operator, - pipeline=pipeline, - metadata=metadata, - context=context, - ) + if pipeline: + add_status( + operator=operator, + pipeline=pipeline, + metadata=metadata, + context=context, + ) + else: + logging.warning( + f"Pipeline {airflow_service_entity.name.__root__}.{dag.dag_id} not found. Skipping status update." + ) except Exception as exc: # pylint: disable=broad-except logging.error(traceback.format_exc()) diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/config/loader.py b/ingestion/src/airflow_provider_openmetadata/lineage/config/loader.py index d5e8d786dcd..35866660a80 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/config/loader.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/config/loader.py @@ -32,6 +32,8 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata class AirflowLineageConfig(BaseModel): airflow_service_name: str metadata_config: OpenMetadataConnection + only_keep_dag_lineage: bool = False + max_status: int = 10 def parse_airflow_config( @@ -59,6 +61,12 @@ def parse_airflow_config( return AirflowLineageConfig( airflow_service_name=airflow_service_name, + # Check if value is a literal string `true` + only_keep_dag_lineage=conf.get( + LINEAGE, "only_keep_dag_lineage", fallback="false" + ) + == "true", + max_status=int(conf.get(LINEAGE, "max_status", fallback=10)), metadata_config=OpenMetadataConnection( hostPort=conf.get( LINEAGE, diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/operator.py b/ingestion/src/airflow_provider_openmetadata/lineage/operator.py new file mode 100644 index 00000000000..ebf7b3ddd6e --- /dev/null +++ b/ingestion/src/airflow_provider_openmetadata/lineage/operator.py @@ -0,0 +1,73 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +OpenMetadata Airflow Lineage Operator +""" +import traceback + +from airflow.models.baseoperator import BaseOperator +from airflow.utils.context import Context + +from airflow_provider_openmetadata.lineage.runner import AirflowLineageRunner +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata + + +class OpenMetadataLineageOperator(BaseOperator): + """ + This Operator will check the inlets and outlets of its DAG + and push the lineage to OpenMetadata. + + It requires the OpenMetadataConnection to be passed as + an argument to instantiate the ometa client. + + if `only_keep_dag` is True, we will remove any lineage related + to the DAG that is not part of the inlets/outlets of its tasks. + """ + + def __init__( + self, + server_config: OpenMetadataConnection, + service_name: str, + only_keep_dag_lineage: bool = False, + max_status: int = 10, + **kwargs, + ) -> None: + super().__init__(**kwargs) + self.server_config = server_config + self.service_name = service_name + self.only_keep_dag_lineage = only_keep_dag_lineage + self.max_status = max_status + + def execute(self, context: Context) -> None: + """ + Main logic to check the context for lineage + and push it to OpenMetadata using the Python Client. + """ + try: + metadata = OpenMetadata(self.server_config) + runner = AirflowLineageRunner( + metadata=metadata, + service_name=self.service_name, + dag=self.dag, + context=context, + only_keep_dag_lineage=self.only_keep_dag_lineage, + max_status=self.max_status, + ) + + runner.execute() + except Exception as err: + self.dag.log.info(traceback.format_exc()) + self.dag.log.error(f"Error executing the lineage runner - {err}") + raise err diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/runner.py b/ingestion/src/airflow_provider_openmetadata/lineage/runner.py new file mode 100644 index 00000000000..ec693a8d632 --- /dev/null +++ b/ingestion/src/airflow_provider_openmetadata/lineage/runner.py @@ -0,0 +1,376 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +OpenMetadata Airflow Provider Lineage Runner +""" +from itertools import groupby +from typing import List, Set + +from airflow.configuration import conf +from pydantic import BaseModel + +from airflow_provider_openmetadata.lineage.utils import STATUS_MAP, get_xlets +from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest +from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.api.services.createPipelineService import ( + CreatePipelineServiceRequest, +) +from metadata.generated.schema.entity.data.pipeline import ( + Pipeline, + PipelineStatus, + StatusType, + Task, + TaskStatus, +) +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.connections.pipeline.airflowConnection import ( + AirflowConnection, +) +from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import ( + BackendConnection, +) +from metadata.generated.schema.entity.services.pipelineService import ( + PipelineConnection, + PipelineService, + PipelineServiceType, +) +from metadata.generated.schema.type.basic import Uuid +from metadata.generated.schema.type.entityLineage import EntitiesEdge +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.helpers import datetime_to_ts + + +class XLets(BaseModel): + """ + Group inlets and outlets from all tasks in a DAG + """ + + inlets: Set[str] + outlets: Set[str] + + +class SimpleEdge(BaseModel): + """ + Simple Edge representation with FQN and id + """ + + fqn: str + id: str + + +class AirflowLineageRunner: + """ + Given the OpenMetadata connection, a service name and a DAG: + + 1. Create the Pipeline Service (if not exists) + 2. Create or update the Pipeline (DAG + tasks) + 3. Add the task status (Task Instances). We'll pick this up from the available information. + This operator should run the last to have the complete view. + 4. Add Pipeline Lineage from xlets + + This Runner will be called either from: + 1. Lineage Backend + 2. Lineage Operator + In both cases, this will run directly on an Airflow instance. Therefore, + we'll use the airflow config data to populate entities' details. + """ + + def __init__( + self, + metadata: OpenMetadata, + service_name: str, + dag: "DAG", + context: "Context", + only_keep_dag_lineage: bool = False, + max_status: int = 10, + ): + self.metadata = metadata + self.service_name = service_name + self.only_keep_dag_lineage = only_keep_dag_lineage + self.max_status = max_status + + self.dag = dag + self.context = context + + def get_or_create_pipeline_service(self) -> PipelineService: + """ + Fetch the Pipeline Service from OM. If it does not exist, + create it. + """ + service_entity: PipelineService = self.metadata.get_by_name( + entity=PipelineService, fqn=self.service_name + ) + + if service_entity: + return service_entity + + else: + pipeline_service: PipelineService = self.metadata.create_or_update( + CreatePipelineServiceRequest( + name=self.service_name, + serviceType=PipelineServiceType.Airflow, + connection=PipelineConnection( + config=AirflowConnection( + hostPort=conf.get("webserver", "base_url"), + connection=BackendConnection(), + ), + ), + ) + ) + + if pipeline_service is None: + raise RuntimeError("Failed to create Airflow service.") + + return pipeline_service + + def get_task_url(self, task: "Operator"): + return f"/taskinstance/list/?flt1_dag_id_equals={self.dag.dag_id}&_flt_3_task_id={task.task_id}" + + def get_om_tasks(self) -> List[Task]: + """ + Get all tasks from the DAG and map them to + OpenMetadata Task Entities + """ + return [ + Task( + name=task.task_id, + taskUrl=self.get_task_url(task), + taskType=task.task_type, + startDate=task.start_date.isoformat() if task.start_date else None, + endDate=task.end_date.isoformat() if task.end_date else None, + downstreamTasks=list(task.downstream_task_ids) + if task.downstream_task_ids + else None, + ) + for task in self.dag.tasks or [] + ] + + def create_pipeline_entity(self, pipeline_service: PipelineService) -> Pipeline: + """ + Create the Pipeline Entity + """ + self.dag.log.info("Creating or updating Pipeline Entity from DAG...") + pipeline_request = CreatePipelineRequest( + name=self.dag.dag_id, + description=self.dag.description, + pipelineUrl=f"/tree?dag_id={self.dag.dag_id}", + concurrency=self.dag.max_active_tasks, + pipelineLocation=self.dag.fileloc, + startDate=self.dag.start_date.isoformat() if self.dag.start_date else None, + tasks=self.get_om_tasks(), + service=EntityReference( + id=pipeline_service.id, + type="pipelineService", + ), + ) + + return self.metadata.create_or_update(pipeline_request) + + def get_all_pipeline_status(self) -> List[PipelineStatus]: + """ + Iterate over the DAG's task instances and map + them to PipelineStatus + """ + + # This list is already ordered by Execution Date + grouped_ti: List[List["TaskInstance"]] = [ + list(value) + for _, value in groupby( + self.dag.get_task_instances(), key=lambda ti: ti.run_id + ) + ] + # Order descending by execution date + grouped_ti.reverse() + + return [ + self.get_pipeline_status(task_instances) + for task_instances in grouped_ti[: self.max_status] + ] + + @staticmethod + def get_dag_status_from_task_instances(task_instances: List["TaskInstance"]) -> str: + """ + If any task is in pending state, then return pending. + If any task is in failed state, return failed. + Otherwise, return Success. + """ + task_statuses = [ + STATUS_MAP.get(task_instance.state, StatusType.Pending.value) + for task_instance in task_instances + ] + if any(status == StatusType.Pending.value for status in task_statuses): + return StatusType.Pending.value + if any(status == StatusType.Failed.value for status in task_statuses): + return StatusType.Failed.value + + return StatusType.Successful.value + + def get_pipeline_status( + self, task_instances: List["TaskInstance"] + ) -> PipelineStatus: + """ + Given the task instances for a run, prep the PipelineStatus + """ + + task_status = [ + TaskStatus( + name=task_instance.task_id, + executionStatus=STATUS_MAP.get( + task_instance.state, StatusType.Pending.value + ), + startTime=datetime_to_ts(task_instance.start_date), + endTime=datetime_to_ts(task_instance.end_date), + logLink=task_instance.log_url, + ) + for task_instance in task_instances + ] + return PipelineStatus( + # Use any of the task execution dates for the status execution date + timestamp=datetime_to_ts(task_instances[0].execution_date), + executionStatus=self.get_dag_status_from_task_instances(task_instances), + taskStatus=task_status, + ) + + def add_all_pipeline_status(self, pipeline: Pipeline) -> None: + """ + Get the latest Pipeline Status from the DAG and send + it to OM + """ + pipeline_status_list = self.get_all_pipeline_status() + + for status in pipeline_status_list: + self.metadata.add_pipeline_status( + fqn=pipeline.fullyQualifiedName.__root__, status=status + ) + + def get_xlets(self) -> XLets: + """ + Fill the inlets and outlets of the Pipeline by iterating + over all its tasks + """ + _inlets = set() + _outlets = set() + + for task in self.dag.tasks: + _inlets.update(get_xlets(operator=task, xlet_mode="_inlets") or []) + _outlets.update(get_xlets(operator=task, xlet_mode="_outlets") or []) + + return XLets(inlets=_inlets, outlets=_outlets) + + def add_lineage(self, pipeline: Pipeline, xlets: XLets) -> None: + """ + Add the lineage from inlets and outlets + """ + + for table_fqn in xlets.inlets or []: + table_entity = self.metadata.get_by_name(entity=Table, fqn=table_fqn) + try: + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=table_entity.id, type="table"), + toEntity=EntityReference(id=pipeline.id, type="pipeline"), + ) + ) + self.metadata.add_lineage(lineage) + except AttributeError as err: + self.dag.log.error( + f"Error trying to access Entity data due to: {err}." + f" Is the table [{table_fqn}] present in OpenMetadata?" + ) + + for table_fqn in xlets.outlets or []: + table_entity = self.metadata.get_by_name(entity=Table, fqn=table_fqn) + try: + lineage = AddLineageRequest( + edge=EntitiesEdge( + fromEntity=EntityReference(id=pipeline.id, type="pipeline"), + toEntity=EntityReference(id=table_entity.id, type="table"), + ) + ) + self.metadata.add_lineage(lineage) + except AttributeError as err: + self.dag.log.error( + f"Error trying to access Entity data due to: {err}." + f" Is the table [{table_fqn}] present in OpenMetadata?" + ) + + def clean_lineage(self, pipeline: Pipeline, xlets: XLets): + """ + Clean the lineage nodes that are not part of xlets. + + We'll only clean up table nodes + """ + lineage_data = self.metadata.get_lineage_by_name( + entity=Pipeline, + fqn=pipeline.fullyQualifiedName.__root__, + up_depth=1, + down_depth=1, + ) + + upstream_edges = [ + next( + ( + SimpleEdge(fqn=node["fullyQualifiedName"], id=node["id"]) + for node in lineage_data.get("nodes") or [] + if node["id"] == upstream_edge["fromEntity"] + and node["type"] == "table" + ) + ) + for upstream_edge in lineage_data.get("upstreamEdges") or [] + ] + downstream_edges = [ + next( + ( + SimpleEdge(fqn=node["fullyQualifiedName"], id=node["id"]) + for node in lineage_data.get("nodes") or [] + if node["id"] == downstream_edge["toEntity"] + and node["type"] == "table" + ) + ) + for downstream_edge in lineage_data.get("downstreamEdges") or [] + ] + + for edge in upstream_edges: + if edge.fqn not in xlets.inlets: + self.dag.log.info(f"Removing upstream edge with {edge.fqn}") + edge_to_remove = EntitiesEdge( + fromEntity=EntityReference(id=edge.id, type="table"), + toEntity=EntityReference(id=pipeline.id, type="pipeline"), + ) + self.metadata.delete_lineage_edge(edge=edge_to_remove) + + for edge in downstream_edges: + if edge.fqn not in xlets.outlets: + self.dag.log.info(f"Removing downstream edge with {edge.fqn}") + edge_to_remove = EntitiesEdge( + fromEntity=EntityReference(id=pipeline.id, type="pipeline"), + toEntity=EntityReference(id=edge.id, type="table"), + ) + self.metadata.delete_lineage_edge(edge=edge_to_remove) + + def execute(self): + """ + Run the whole ingestion logic + """ + self.dag.log.info("Executing Airflow Lineage Runner...") + pipeline_service = self.get_or_create_pipeline_service() + pipeline = self.create_pipeline_entity(pipeline_service) + self.add_all_pipeline_status(pipeline) + + xlets = self.get_xlets() + self.add_lineage(pipeline, xlets) + if self.only_keep_dag_lineage: + self.dag.log.info( + "`only_keep_dag_lineage` is set to True. Cleaning lineage not in inlets or outlets..." + ) + self.clean_lineage(pipeline, xlets) diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py index 8ee5127c988..858196c1146 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py @@ -10,41 +10,17 @@ # limitations under the License. """ -OpenMetadata Airflow Lineage Backend +OpenMetadata Airflow Provider utilities """ -import traceback from typing import TYPE_CHECKING, Dict, List, Optional -from airflow.configuration import conf - -from airflow_provider_openmetadata.lineage.config.loader import AirflowLineageConfig -from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest -from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest -from metadata.generated.schema.api.services.createPipelineService import ( - CreatePipelineServiceRequest, -) from metadata.generated.schema.entity.data.pipeline import ( Pipeline, PipelineStatus, StatusType, - Task, TaskStatus, ) -from metadata.generated.schema.entity.data.table import Table -from metadata.generated.schema.entity.services.connections.pipeline.airflowConnection import ( - AirflowConnection, -) -from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import ( - BackendConnection, -) -from metadata.generated.schema.entity.services.pipelineService import ( - PipelineConnection, - PipelineService, - PipelineServiceType, -) -from metadata.generated.schema.type.entityLineage import EntitiesEdge -from metadata.generated.schema.type.entityReference import EntityReference from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils.helpers import datetime_to_ts @@ -55,40 +31,13 @@ if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance -_STATUS_MAP = { - "running": StatusType.Pending, - "success": StatusType.Successful, - "failed": StatusType.Failed, +STATUS_MAP = { + "success": StatusType.Successful.value, + "failed": StatusType.Failed.value, + "queued": StatusType.Pending.value, } -def is_airflow_version_1() -> bool: - """ - Check varying imports between Airflow v1 & v2 - """ - # pylint: disable=unused-import,import-outside-toplevel - try: - - return False - except ModuleNotFoundError: - - return True - - -def parse_v1_xlets(xlet: dict) -> Optional[List[str]]: - """ - Parse airflow xlets for V1 - :param xlet: airflow v1 xlet dict - :return: table list or None - """ - if isinstance(xlet, dict): - tables = xlet.get("tables") - if tables and isinstance(tables, list): - return tables - - return None - - def parse_xlets(xlet: List[dict]) -> Optional[List[str]]: """ Parse airflow xlets for V1 @@ -118,112 +67,17 @@ def get_xlets( :return: list of tables FQN """ xlet = getattr(operator, xlet_mode) - if is_airflow_version_1(): - tables = parse_v1_xlets(xlet) - - else: - tables = parse_xlets(xlet) + tables = parse_xlets(xlet) if not tables: - operator.log.info(f"Not finding proper {xlet_mode} in task {operator.task_id}") + operator.log.debug(f"Not finding proper {xlet_mode} in task {operator.task_id}") + + else: + operator.log.info(f"Found {xlet_mode} {tables} in task {operator.task_id}") return tables -def create_or_update_pipeline( # pylint: disable=too-many-locals - task_instance: "TaskInstance", - operator: "BaseOperator", - dag: "DAG", - airflow_service_entity: PipelineService, - metadata: OpenMetadata, -) -> Pipeline: - """ - Prepare the upsert of pipeline entity with the given task - - We will: - - Create the pipeline Entity - - Append the task being processed - - Clean deleted tasks based on the DAG children information - - :param task_instance: task run being processed - :param dag_run: DAG run being processed - :param operator: task being examined by lineage - :param dag: airflow dag - :param airflow_service_entity: PipelineService - :param metadata: OpenMetadata API client - :return: PipelineEntity - """ - dag_url = f"/tree?dag_id={dag.dag_id}" - task_url = f"/taskinstance/list/?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={operator.task_id}" - - dag_start_date = dag.start_date.isoformat() if dag.start_date else None - task_start_date = ( - task_instance.start_date.isoformat() if task_instance.start_date else None - ) - task_end_date = ( - task_instance.end_date.isoformat() if task_instance.end_date else None - ) - - downstream_tasks = [] - if getattr(operator, "downstream_task_ids", None): - downstream_tasks = operator.downstream_task_ids - - operator.log.info(f"downstream tasks {downstream_tasks}") - - task = Task( - name=operator.task_id, - displayName=operator.task_id, - taskUrl=task_url, - taskType=operator.task_type, - startDate=task_start_date, - endDate=task_end_date, - downstreamTasks=downstream_tasks, - ) - - # Check if the pipeline already exists - operator.log.info( - f"Checking if the pipeline {airflow_service_entity.name.__root__}.{dag.dag_id} exists. If not, we will create it." - ) - current_pipeline: Pipeline = metadata.get_by_name( - entity=Pipeline, - fqn=f"{airflow_service_entity.name.__root__}.{dag.dag_id}", - fields=["tasks"], - ) - - # Create pipeline if not exists or update its properties - pipeline_request = CreatePipelineRequest( - name=dag.dag_id, - displayName=dag.dag_id, - description=dag.description, - pipelineUrl=dag_url, - concurrency=current_pipeline.concurrency if current_pipeline else None, - pipelineLocation=current_pipeline.pipelineLocation - if current_pipeline - else None, - startDate=dag_start_date, - tasks=current_pipeline.tasks - if current_pipeline - else None, # use the current tasks, if any - service=EntityReference(id=airflow_service_entity.id, type="pipelineService"), - owner=current_pipeline.owner if current_pipeline else None, - tags=current_pipeline.tags if current_pipeline else None, - ) - pipeline: Pipeline = metadata.create_or_update(pipeline_request) - - # Add the task we are processing in the lineage backend - operator.log.info("Adding tasks to pipeline...") - updated_pipeline = metadata.add_task_to_pipeline(pipeline, task) - - # Clean pipeline - try: - operator.log.info("Cleaning pipeline tasks...") - updated_pipeline = metadata.clean_pipeline_tasks(updated_pipeline, dag.task_ids) - except Exception as exc: # pylint: disable=broad-except - operator.log.warning(f"Error cleaning pipeline tasks {exc}") - - return updated_pipeline - - def get_dag_status(all_tasks: List[str], task_status: List[TaskStatus]): """ Based on the task information and the total DAG tasks, cook the @@ -290,7 +144,7 @@ def add_status( updated_task_status = [ TaskStatus( name=task_instance.task_id, - executionStatus=_STATUS_MAP.get(task_instance.state), + executionStatus=STATUS_MAP.get(task_instance.state), startTime=datetime_to_ts(task_instance.start_date), endTime=datetime_to_ts(task_instance.end_date), logLink=task_instance.log_url, @@ -311,117 +165,3 @@ def add_status( metadata.add_pipeline_status( fqn=pipeline.fullyQualifiedName.__root__, status=updated_status ) - - -# pylint: disable=too-many-arguments,too-many-locals -def parse_lineage( - config: AirflowLineageConfig, - context: Dict, - operator: "BaseOperator", - inlets: List, - outlets: List, - metadata: OpenMetadata, -) -> Optional[Pipeline]: - """ - Main logic to extract properties from DAG and the - triggered operator to ingest lineage data into - OpenMetadata - - :param config: lineage configuration - :param context: airflow runtime context - :param operator: task being executed - :param inlets: list of upstream tables - :param outlets: list of downstream tables - :param metadata: OpenMetadata client - """ - operator.log.info("Parsing Lineage for OpenMetadata") - - dag: "DAG" = context["dag"] - task_instance: "TaskInstance" = context["task_instance"] - - try: - - airflow_service_entity = get_or_create_pipeline_service( - operator, metadata, config - ) - pipeline = create_or_update_pipeline( - task_instance=task_instance, - operator=operator, - dag=dag, - airflow_service_entity=airflow_service_entity, - metadata=metadata, - ) - - operator.log.info("Parsing Lineage") - for table in inlets if inlets else []: - table_entity = metadata.get_by_name(entity=Table, fqn=table) - operator.log.debug(f"from entity {table_entity}") - lineage = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference(id=table_entity.id, type="table"), - toEntity=EntityReference(id=pipeline.id, type="pipeline"), - ) - ) - operator.log.debug(f"From lineage {lineage}") - metadata.add_lineage(lineage) - - for table in outlets if outlets else []: - table_entity = metadata.get_by_name(entity=Table, fqn=table) - operator.log.debug(f"To entity {table_entity}") - lineage = AddLineageRequest( - edge=EntitiesEdge( - fromEntity=EntityReference(id=pipeline.id, type="pipeline"), - toEntity=EntityReference(id=table_entity.id, type="table"), - ) - ) - operator.log.debug(f"To lineage {lineage}") - metadata.add_lineage(lineage) - - return pipeline - - except Exception as exc: # pylint: disable=broad-except - operator.log.error( - f"Failed to parse Airflow DAG task and publish to OpenMetadata due to {exc}" - ) - operator.log.error(traceback.format_exc()) - - return None - - -def get_or_create_pipeline_service( - operator: "BaseOperator", metadata: OpenMetadata, config: AirflowLineageConfig -) -> PipelineService: - """ - Check if we already have the airflow instance as a PipelineService, - otherwise create it. - - :param operator: task from which we extract the lineage - :param metadata: OpenMetadata API wrapper - :param config: lineage config - :return: PipelineService - """ - operator.log.info("Get Airflow Service ID") - airflow_service_entity = metadata.get_by_name( - entity=PipelineService, fqn=config.airflow_service_name - ) - - if airflow_service_entity is None: - pipeline_service = CreatePipelineServiceRequest( - name=config.airflow_service_name, - serviceType=PipelineServiceType.Airflow, - connection=PipelineConnection( - config=AirflowConnection( - hostPort=conf.get("webserver", "base_url"), - connection=BackendConnection(), - ), - ), - ) - airflow_service_entity = metadata.create_or_update(pipeline_service) - if airflow_service_entity: - operator.log.info( - f"Created airflow service entity - {airflow_service_entity.fullyQualifiedName.__root__}" - ) - else: - operator.log.error("Failed to create airflow service entity") - - return airflow_service_entity diff --git a/ingestion/src/airflow_provider_openmetadata/provider.yaml b/ingestion/src/airflow_provider_openmetadata/provider.yaml index 07c9c4f00e6..e6842979c88 100644 --- a/ingestion/src/airflow_provider_openmetadata/provider.yaml +++ b/ingestion/src/airflow_provider_openmetadata/provider.yaml @@ -24,3 +24,12 @@ integrations: - integration-name: OpenMetadata external-doc-url: https://open-metadata.org tags: [service] + +hooks: + - integration-name: OpenMetadata + python-modules: + - airflow_provider_openmetadata.hooks.openmetadata + +connection-types: + - hook-class-name: airflow_provider_openmetadata.hooks.openmetadata.OpenMetadataHook + connection-type: openmetadata diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py index 0b3164a45f3..e6570df2062 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py @@ -19,6 +19,7 @@ from typing import Any, Dict, Generic, Optional, Type, TypeVar, Union from pydantic import BaseModel from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest +from metadata.generated.schema.type.entityLineage import EntitiesEdge from metadata.ingestion.ometa.client import REST, APIError from metadata.ingestion.ometa.utils import get_entity_type, ometa_logger @@ -129,3 +130,16 @@ class OMetaLineageMixin(Generic[T]): + f"{entity.__name__} and {path}: {err}" ) return None + + def delete_lineage_edge(self, edge: EntitiesEdge) -> None: + """ + Remove the given Edge + """ + try: + self.client.delete( + f"{self.get_suffix(AddLineageRequest)}/{edge.fromEntity.type}/{edge.fromEntity.id.__root__}/" + f"{edge.toEntity.type}/{edge.toEntity.id.__root__}" + ) + except APIError as err: + logger.debug(traceback.format_exc()) + logger.error(f"Error {err.status_code} trying to DELETE linage for {edge}") diff --git a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py index 58826a7ac6e..2e87eb7a779 100644 --- a/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py +++ b/ingestion/tests/integration/lineage/airflow/test_airflow_lineage.py @@ -28,9 +28,7 @@ from airflow.operators.bash import BashOperator from airflow.operators.dummy import DummyOperator from airflow.utils.task_group import TaskGroup -from airflow_provider_openmetadata.lineage.openmetadata import ( - OpenMetadataLineageBackend, -) +from airflow_provider_openmetadata.lineage.backend import OpenMetadataLineageBackend from airflow_provider_openmetadata.lineage.utils import get_xlets from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest from metadata.generated.schema.api.data.createDatabaseSchema import ( diff --git a/openmetadata-docs/content/connectors/pipeline/airflow/lineage-backend.md b/openmetadata-docs/content/connectors/pipeline/airflow/lineage-backend.md index 6cba8b3c6aa..87b68f250ae 100644 --- a/openmetadata-docs/content/connectors/pipeline/airflow/lineage-backend.md +++ b/openmetadata-docs/content/connectors/pipeline/airflow/lineage-backend.md @@ -37,111 +37,56 @@ versions match. ### Adding Lineage Config + + +If using OpenMetadata version 0.13.0 or lower, the import for the lineage backend is +`airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend`. + +For 0.13.1 or higher, the import has been renamed to `airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend`. + + + After the installation, we need to update the Airflow configuration. This can be done following this example on `airflow.cfg`: ```ini [lineage] -backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend +backend = airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend airflow_service_name = local_airflow openmetadata_api_endpoint = http://localhost:8585/api -auth_provider_type = no-auth +auth_provider_type = openmetadata +jwt_token = ``` Or we can directly provide environment variables: ```env -AIRFLOW__LINEAGE__BACKEND="airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend" +AIRFLOW__LINEAGE__BACKEND="airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend" AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME="local_airflow" AIRFLOW__LINEAGE__OPENMETADATA_API_ENDPOINT="http://localhost:8585/api" -AIRFLOW__LINEAGE__AUTH_PROVIDER_TYPE="no-auth" +AIRFLOW__LINEAGE__AUTH_PROVIDER_TYPE="openmetadata" +AIRFLOW__LINEAGE__JWT_TOKEN="" ``` We can choose the option that best adapts to our current architecture. Find more information on Airflow configurations [here](https://airflow.apache.org/docs/apache-airflow/stable/howto/set-config.html). -We are now going to list the configurations for the different SSO. We will use the `ini` format for those, -but on your own Airflow you can freely choose. +#### Optional Parameters -#### Google SSO +You can also set the following parameters: ```ini [lineage] -backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend -airflow_service_name = local_airflow -openmetadata_api_endpoint = http://localhost:8585/api -auth_provider_type = google -# Note that the path should be local in Airflow -secret_key = path-to-secret-key-file.json +... +only_keep_dag_lineage = true +max_status = 10 ``` -#### Okta SSO +- `only_keep_dag_lineage` will remove any table lineage not present in the inlets or outlets. This will ensure +that any lineage in OpenMetadata comes from your code. +- `max_status` controls the number of status to ingest in each run. By default, we'll pick the last 10. -```ini -[lineage] -backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend -airflow_service_name = local_airflow -openmetadata_api_endpoint = http://localhost:8585/api -auth_provider_type = okta -client_id = client id -org_url = org url -private_key = private key -email = email -# Optional -scopes = ["scope1", "scope2"] -``` -#### Auth0 SSO - -```ini -[lineage] -backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend -airflow_service_name = local_airflow -openmetadata_api_endpoint = http://localhost:8585/api -auth_provider_type = auth0 -client_id = client id -secret_key = secret key -domain = domain -``` - -#### Azure SSO - -```ini -[lineage] -backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend -airflow_service_name = local_airflow -openmetadata_api_endpoint = http://localhost:8585/api -auth_provider_type = azure -client_id = client id -client_secret = client secret -authority = authority -# Optional -scopes = ["scope1", "scope2"] -``` - -#### OpenMetadata SSO - -```ini -[lineage] -backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend -airflow_service_name = local_airflow -openmetadata_api_endpoint = http://localhost:8585/api -auth_provider_type = openmetadata -jwt_token = token -``` - -#### Custom OIDC SSO - -```ini -[lineage] -backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend -airflow_service_name = local_airflow -openmetadata_api_endpoint = http://localhost:8585/api -auth_provider_type = custom-oidc -client_id = client id -client_secret = client secret -token_endpoint = endpoint -``` In the following sections, we'll show how to adapt our pipelines to help us build the lineage information. @@ -307,7 +252,7 @@ If you are running this example using the quickstart deployment of OpenMetadata, this: ``` -backend = airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend +backend = airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend airflow_service_name = local_airflow openmetadata_api_endpoint = http://localhost:8585/api auth_provider_type = openmetadata diff --git a/openmetadata-docs/content/connectors/pipeline/airflow/lineage-operator.md b/openmetadata-docs/content/connectors/pipeline/airflow/lineage-operator.md new file mode 100644 index 00000000000..1cb7fc16705 --- /dev/null +++ b/openmetadata-docs/content/connectors/pipeline/airflow/lineage-operator.md @@ -0,0 +1,213 @@ +--- +title: Airflow Lineage Operator +slug: /connectors/pipeline/airflow/lineage-operator +--- + +# Airflow Lineage Operator + +Another approach to extract Airflow metadata only for the DAGs you want is to use the `OpenMetadataLineageOperator`. + +When the task executes, it will ingest: +- The Pipeline Service if it does not exist +- The DAG as a Pipeline if it does not exist. +- The status of the tasks. We recommend running this Operator as the last step if you want up-to-date statuses. +- The lineage from inlets and outlets. + +## Installation + +The Lineage Operator can be directly installed to the Airflow instances as part of the usual OpenMetadata Python +distribution: + +```commandline +pip3 install "openmetadata-ingestion==x.y.z" +``` + +Where `x.y.z` is the version of your OpenMetadata server, e.g., 0.13.0. It is important that server and client +versions match. + +**It requires the version `0.13.1` or higher**. + +## Example DAG + +An example DAG looks like follows: + +```python +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +You can run this DAG from the default OM installation +""" + +from datetime import datetime, timedelta +from textwrap import dedent + +# The DAG object; we'll need this to instantiate a DAG +from airflow import DAG + +# Operators; we need this to operate! +from airflow.operators.bash import BashOperator +from airflow.operators.python import PythonOperator + +# These args will get passed on to each operator +# You can override them on a per-task basis during operator initialization +from airflow_provider_openmetadata.lineage.operator import OpenMetadataLineageOperator + +from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( + OpenMetadataConnection, +) +from metadata.generated.schema.security.client.openMetadataJWTClientConfig import ( + OpenMetadataJWTClientConfig, +) + + +default_args = { + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + + +def explode(): + raise Exception("Oh no!") + + +with DAG( + 'lineage_tutorial_operator', + default_args=default_args, + description='A simple tutorial DAG', + schedule_interval=timedelta(days=1), + start_date=datetime(2021, 1, 1), + catchup=False, + tags=['example'], +) as dag: + + # t1, t2 and t3 are examples of tasks created by instantiating operators + t1 = BashOperator( + task_id='print_date', + bash_command='date', + outlets={ + "tables": ["sample_data.ecommerce_db.shopify.dim_address"] + } + ) + + t2 = BashOperator( + task_id='sleep', + depends_on_past=False, + bash_command='sleep 1', + retries=3, + inlets={ + "tables": ["sample_data.ecommerce_db.shopify.dim_customer"] + } + ) + + risen = PythonOperator( + task_id='explode', + provide_context=True, + python_callable=explode, + retries=0, + ) + + dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG + dag.doc_md = """ + This is a documentation placed anywhere + """ # otherwise, type it like this + templated_command = dedent( + """ + {% for i in range(5) %} + echo "{{ ds }}" + echo "{{ macros.ds_add(ds, 7)}}" + echo "{{ params.my_param }}" + {% endfor %} + """ + ) + + t3 = BashOperator( + task_id='templated', + depends_on_past=False, + bash_command=templated_command, + params={'my_param': 'Parameter I passed in'}, + ) + + t1 >> [t2, t3] + + server_config = OpenMetadataConnection( + hostPort="http://localhost:8585/api", + authProvider="openmetadata", + securityConfig=OpenMetadataJWTClientConfig( + jwtToken="eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + ), + ) + + t4 = OpenMetadataLineageOperator( + task_id='lineage_op', + depends_on_past=False, + server_config=server_config, + service_name="airflow_lineage_op_service", + only_keep_dag_lineage=True, + ) + + t1 >> t4 +``` + +## Retrieving the OpenMetadataConnection from Airflow + +In 0.13.1 we have also added an `OpenMetadataHook`, which can be configured from the UI to safely store +the parameters to connect to OpenMetadata. + +Go to the Airflow UI > Admin > Connection and create a new `OpenMetadata` connection as follows: + +Airflow Connection + +Testing the connection will validate that the server is reachable and the installed client can be instantiated properly. + +Once the connection is configured, you can use it in your DAGs without creating the `OpenMetadataConnection` manually + +```python +from airflow_provider_openmetadata.hooks.openmetadata import OpenMetadataHook + +openmetadata_hook = OpenMetadataHook(openmetadata_conn_id="om_id") +server_config = openmetadata_hook.get_conn() + +OpenMetadataLineageOperator( + task_id='lineage_op', + depends_on_past=False, + server_config=server_config, + service_name="airflow_lineage_op_service", + only_keep_dag_lineage=True, +) +``` + +### OpenMetadataHook with HTTPS and SSL + +If the OpenMetadata server connection needs to happen through HTTPS, update the `Schema` accordingly to `https`. + +For SSL parameters we have two options: + +#### 1. Ignore the SSL certificates + +You can add the `Extra` value as the following JSON to create the connection that will ignore SSL. + +```json +{ + "verifySSL": "ignore" +} +``` + +#### 2. Validate SSL certificates + +Otherwise, you can use the `validate` value and add the path to the certificate. **It should be reachable locally +in your Airflow instance**. + +```json +{ + "verifySSL": "validate", + "sslConfig": "path-to-cert" +} +``` diff --git a/openmetadata-docs/content/deployment/upgrade/versions/012-to-013.md b/openmetadata-docs/content/deployment/upgrade/versions/012-to-013.md index 8766f731bde..ff7ebf36293 100644 --- a/openmetadata-docs/content/deployment/upgrade/versions/012-to-013.md +++ b/openmetadata-docs/content/deployment/upgrade/versions/012-to-013.md @@ -27,4 +27,13 @@ Starting with `0.13.0`, we have deprecated the initial configurations for Author - Airbyte - Airbyte connector now support Basic Authentication. - - Added: `username`, `password` \ No newline at end of file + - Added: `username`, `password` + +## Lineage Backend + +In 0.13.1: + +- The import for the Airflow Lineage Backend has been updated from `airflow_provider_openmetadata.lineage.openmetadata.OpenMetadataLineageBackend` +to `airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend`. +- We removed support from Airflow v1. +- The failure callback now only updates the pipeline status if the Pipeline already exists in OpenMetadata. diff --git a/openmetadata-docs/content/menu.md b/openmetadata-docs/content/menu.md index 3cef609ef60..41a521925d0 100644 --- a/openmetadata-docs/content/menu.md +++ b/openmetadata-docs/content/menu.md @@ -428,6 +428,8 @@ site_menu: url: /connectors/pipeline/airflow/gcs - category: Connectors / Pipeline / Airflow / Lineage Backend url: /connectors/pipeline/airflow/lineage-backend + - category: Connectors / Pipeline / Airflow / Lineage Operator + url: /connectors/pipeline/airflow/lineage-operator - category: Connectors / Pipeline / Airbyte url: /connectors/pipeline/airbyte - category: Connectors / Pipeline / Airbyte / Airflow diff --git a/openmetadata-docs/images/openmetadata/connectors/airflow/airflow-connection.png b/openmetadata-docs/images/openmetadata/connectors/airflow/airflow-connection.png new file mode 100644 index 00000000000..c5cd7596daa Binary files /dev/null and b/openmetadata-docs/images/openmetadata/connectors/airflow/airflow-connection.png differ