From 1c21abebdd9d83daf88632bafe1decaf26f4e744 Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Mon, 28 Mar 2022 08:33:02 +0200 Subject: [PATCH] Fix #3645 - Revisit lineage for Airflow 1.10.15 (#3696) Fix #3645 - Revisit lineage for Airflow 1.10.15 (#3696) --- ingestion/setup.py | 6 ++- .../lineage/openmetadata.py | 2 +- .../lineage/utils.py | 46 +++++++++++++++---- 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/ingestion/setup.py b/ingestion/setup.py index effee856353..cd670e980b3 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -40,7 +40,7 @@ base_requirements = { "python-jose==3.3.0", "sqlalchemy>=1.4.0", "sql-metadata~=2.0.0", - "requests~=2.26", + "requests>=2.23", "cryptography", "Jinja2>=2.11.3", "PyYAML", @@ -66,7 +66,9 @@ plugins: Dict[str, Set[str]] = { "marshmallow-sqlalchemy>=0.26.0", "SQLAlchemy-Utils>=0.38.0", "pymysql>=1.0.2", + "requests==2.26.0", }, + "airflow-container-1.10.15": {"markupsafe==2.0.1 ", "requests==2.23.0"}, "amundsen": {"neo4j~=4.4.0"}, "athena": {"PyAthena[SQLAlchemy]"}, "atlas": {}, @@ -187,7 +189,7 @@ setup( *[ requirements for plugin, requirements in plugins.items() - if plugin != "db2" + if plugin not in {"airflow-container-1.10.15", "db2"} ] ) ), diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py b/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py index db12a90bfa1..177df3e8ede 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/openmetadata.py @@ -58,8 +58,8 @@ class OpenMetadataLineageBackend(LineageBackend): _ = get_lineage_config() # pylint: disable=protected-access + @staticmethod # needed for Airflow 1.10.x def send_lineage( - self, operator: "BaseOperator", inlets: Optional[List] = None, outlets: Optional[List] = None, diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py index 9ec1b96a6d0..3a354bacbbc 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/utils.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/utils.py @@ -14,7 +14,7 @@ OpenMetadata Airflow Lineage Backend """ import traceback -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Dict, List, Optional from airflow.configuration import conf @@ -70,9 +70,37 @@ def is_airflow_version_1() -> bool: return True +def parse_v1_xlets(xlet: dict) -> Optional[List[str]]: + """ + Parse airflow xlets for V1 + :param xlet: airflow v1 xlet dict + :return: table list or None + """ + if isinstance(xlet, dict): + tables = xlet.get("tables") + if tables and isinstance(tables, list): + return tables + + return None + + +def parse_xlets(xlet: List[dict]) -> Optional[List[str]]: + """ + Parse airflow xlets for V1 + :param xlet: airflow v2 xlet dict + :return: table list or None + """ + if len(xlet) and isinstance(xlet[0], dict): + tables = xlet[0].get("tables") + if tables and isinstance(tables, list): + return tables + + return None + + def get_xlets( operator: "BaseOperator", xlet_mode: str = "_inlets" -) -> Union[Optional[List[str]], Any]: +) -> Optional[List[str]]: """ Given an Airflow DAG Task, obtain the tables set in inlets or outlets. @@ -86,15 +114,15 @@ def get_xlets( """ xlet = getattr(operator, xlet_mode) if is_airflow_version_1(): - return xlet + tables = parse_v1_xlets(xlet) - if len(xlet) and isinstance(xlet[0], dict): - tables = xlet[0].get("tables") - if isinstance(tables, list) and len(tables): - return tables + else: + tables = parse_xlets(xlet) - operator.log.info(f"Not finding proper {xlet_mode} in task {operator.task_id}") - return None + if not tables: + operator.log.info(f"Not finding proper {xlet_mode} in task {operator.task_id}") + + return tables def create_or_update_pipeline( # pylint: disable=too-many-locals